From 3ba348601a83aeb9ff339147049b7da0e5dfa992 Mon Sep 17 00:00:00 2001 From: Carl Fredrik Samson Date: Sat, 5 Dec 2020 23:01:35 +0100 Subject: [PATCH] prevent wake_by_ref from decreasing refcount. Fixes #22 --- src/6_future_example.md | 44 +++++++++++++++++++-------------------- src/8_finished_example.md | 8 +++---- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/6_future_example.md b/src/6_future_example.md index 895028c..5bb0925 100644 --- a/src/6_future_example.md +++ b/src/6_future_example.md @@ -71,9 +71,9 @@ fn block_on(mut future: F) -> F::Output { // We poll in a loop, but it's not a busy loop. It will only run when // an event occurs, or a thread has a "spurious wakeup" (an unexpected wakeup // that can happen for no good reason). - let val = loop { + let val = loop { match Future::poll(future.as_mut(), &mut cx) { - + // when the Future is ready we're finished Poll::Ready(val) => break val, @@ -166,10 +166,10 @@ fn mywaker_clone(s: &MyWaker) -> RawWaker { // set of functions const VTABLE: RawWakerVTable = unsafe { RawWakerVTable::new( - |s| mywaker_clone(&*(s as *const MyWaker)), // clone - |s| mywaker_wake(&*(s as *const MyWaker)), // wake - |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref - |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount + |s| mywaker_clone(&*(s as *const MyWaker)), // clone + |s| mywaker_wake(&*(s as *const MyWaker)), // wake + |s| (*(s as *const MyWaker)).parker.unpark(), // wake by ref (don't decrease refcount) + |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount ) }; @@ -204,7 +204,7 @@ impl Future for Task { // If it's ready we set its state to `Finished` *r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished; Poll::Ready(self.id) - + // If it isn't finished we check the map we have stored in our Reactor // over id's we have registered and see if it's there } else if r.tasks.contains_key(&self.id) { @@ -347,7 +347,7 @@ impl Reactor { handle: None, tasks: HashMap::new(), }))); - + // Notice that we'll need to use `weak` reference here. If we don't, // our `Reactor` will not get `dropped` when our main thread is finished // since we're holding internal references to it. @@ -409,7 +409,7 @@ impl Reactor { } // Register a new task with the reactor. In this particular example - // we panic if a task with the same id get's registered twice + // we panic if a task with the same id get's registered twice fn register(&mut self, duration: u64, waker: Waker, id: usize) { if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() { panic!("Tried to insert a task with id: '{}', twice!", id); @@ -458,7 +458,7 @@ fn main() { // Many runtimes create a global `reactor` we pass it as an argument let reactor = Reactor::new(); - + // We create two tasks: // - first parameter is the `reactor` // - the second is a timeout in seconds @@ -496,7 +496,7 @@ fn main() { # }); # let waker = waker_into_waker(Arc::into_raw(mywaker)); # let mut cx = Context::from_waker(&waker); -# +# # // SAFETY: we shadow `future` so it can't be accessed again. # let mut future = unsafe { Pin::new_unchecked(&mut future) }; # let val = loop { @@ -537,7 +537,7 @@ fn main() { # RawWakerVTable::new( # |s| mywaker_clone(&*(s as *const MyWaker)), // clone # |s| mywaker_wake(&*(s as *const MyWaker)), // wake -# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref +# |s| (*(s as *const MyWaker)).thread.unpark(), // wake by ref (don't decrease refcount) # |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount # ) # }; @@ -584,7 +584,7 @@ fn main() { # handle: Option>, # tasks: HashMap, # } -# +# # #[derive(Debug)] # enum Event { # Close, @@ -599,7 +599,7 @@ fn main() { # handle: None, # tasks: HashMap::new(), # }))); -# +# # let reactor_clone = Arc::downgrade(&reactor); # let handle = thread::spawn(move || { # let mut handles = vec![]; @@ -624,7 +624,7 @@ fn main() { # reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap(); # reactor # } -# +# # fn wake(&mut self, id: usize) { # self.tasks.get_mut(&id).map(|state| { # match mem::replace(state, TaskState::Ready) { @@ -634,14 +634,14 @@ fn main() { # } # }).unwrap(); # } -# +# # fn register(&mut self, duration: u64, waker: Waker, id: usize) { # if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() { # panic!("Tried to insert a task with id: '{}', twice!", id); # } # self.dispatcher.send(Event::Timeout(duration, id)).unwrap(); # } -# +# # fn is_ready(&self, id: usize) -> bool { # self.tasks.get(&id).map(|state| match state { # TaskState::Ready => true, @@ -667,12 +667,12 @@ The last point is relevant when we move on the the last paragraph. > There is one subtle thing to note about our example. What happens if we pass > in the same `id` for both events? -> +> > ```rust, ignore > let future1 = Task::new(reactor.clone(), 1, 1); > let future2 = Task::new(reactor.clone(), 2, 1); > ``` -> +> > We'll discuss this a bit more under exercises in the last chapter where we > also look at ways to fix it. For now, just make a note of it so you're aware > of the problem. @@ -796,9 +796,9 @@ fn block_on(mut future: F) -> F::Output { let mywaker = Arc::new(MyWaker { parker: parker.clone() }); <--- NB! let waker = mywaker_into_waker(Arc::into_raw(mywaker)); let mut cx = Context::from_waker(&waker); - + // SAFETY: we shadow `future` so it can't be accessed again. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; + let mut future = unsafe { Pin::new_unchecked(&mut future) }; loop { match Future::poll(future.as_mut(), &mut cx) { Poll::Ready(val) => break val, @@ -822,7 +822,7 @@ fn mywaker_wake(s: &MyWaker) { } ``` -And that's really all there is to it. +And that's really all there is to it. > If you checked out the playground link that showcased how park/unpark could [cause subtle > problems](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b2343661fe3d271c91c6977ab8e681d0) diff --git a/src/8_finished_example.md b/src/8_finished_example.md index d21f959..ba6c800 100644 --- a/src/8_finished_example.md +++ b/src/8_finished_example.md @@ -56,9 +56,9 @@ fn block_on(mut future: F) -> F::Output { let mywaker = Arc::new(MyWaker { parker: parker.clone() }); let waker = mywaker_into_waker(Arc::into_raw(mywaker)); let mut cx = Context::from_waker(&waker); - + // SAFETY: we shadow `future` so it can't be accessed again. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; + let mut future = unsafe { Pin::new_unchecked(&mut future) }; loop { match Future::poll(future.as_mut(), &mut cx) { Poll::Ready(val) => break val, @@ -93,7 +93,7 @@ const VTABLE: RawWakerVTable = unsafe { RawWakerVTable::new( |s| mywaker_clone(&*(s as *const MyWaker)), // clone |s| mywaker_wake(&*(s as *const MyWaker)), // wake - |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref + |s| (*(s as *const MyWaker)).parker.unpark(), // wake by ref (don't decrease refcount) |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount ) }; @@ -151,7 +151,7 @@ impl Reactor { handle: None, tasks: HashMap::new(), }))); - + let reactor_clone = Arc::downgrade(&reactor); let handle = thread::spawn(move || { let mut handles = vec![];