prevent wake_by_ref from decreasing refcount. Fixes #22
This commit is contained in:
@@ -71,9 +71,9 @@ fn block_on<F: Future>(mut future: F) -> F::Output {
|
|||||||
// We poll in a loop, but it's not a busy loop. It will only run when
|
// We poll in a loop, but it's not a busy loop. It will only run when
|
||||||
// an event occurs, or a thread has a "spurious wakeup" (an unexpected wakeup
|
// an event occurs, or a thread has a "spurious wakeup" (an unexpected wakeup
|
||||||
// that can happen for no good reason).
|
// that can happen for no good reason).
|
||||||
let val = loop {
|
let val = loop {
|
||||||
match Future::poll(future.as_mut(), &mut cx) {
|
match Future::poll(future.as_mut(), &mut cx) {
|
||||||
|
|
||||||
// when the Future is ready we're finished
|
// when the Future is ready we're finished
|
||||||
Poll::Ready(val) => break val,
|
Poll::Ready(val) => break val,
|
||||||
|
|
||||||
@@ -166,10 +166,10 @@ fn mywaker_clone(s: &MyWaker) -> RawWaker {
|
|||||||
// set of functions
|
// set of functions
|
||||||
const VTABLE: RawWakerVTable = unsafe {
|
const VTABLE: RawWakerVTable = unsafe {
|
||||||
RawWakerVTable::new(
|
RawWakerVTable::new(
|
||||||
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||||
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||||
|s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
|s| (*(s as *const MyWaker)).parker.unpark(), // wake by ref (don't decrease refcount)
|
||||||
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -204,7 +204,7 @@ impl Future for Task {
|
|||||||
// If it's ready we set its state to `Finished`
|
// If it's ready we set its state to `Finished`
|
||||||
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
|
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
|
||||||
Poll::Ready(self.id)
|
Poll::Ready(self.id)
|
||||||
|
|
||||||
// If it isn't finished we check the map we have stored in our Reactor
|
// If it isn't finished we check the map we have stored in our Reactor
|
||||||
// over id's we have registered and see if it's there
|
// over id's we have registered and see if it's there
|
||||||
} else if r.tasks.contains_key(&self.id) {
|
} else if r.tasks.contains_key(&self.id) {
|
||||||
@@ -347,7 +347,7 @@ impl Reactor {
|
|||||||
handle: None,
|
handle: None,
|
||||||
tasks: HashMap::new(),
|
tasks: HashMap::new(),
|
||||||
})));
|
})));
|
||||||
|
|
||||||
// Notice that we'll need to use `weak` reference here. If we don't,
|
// Notice that we'll need to use `weak` reference here. If we don't,
|
||||||
// our `Reactor` will not get `dropped` when our main thread is finished
|
// our `Reactor` will not get `dropped` when our main thread is finished
|
||||||
// since we're holding internal references to it.
|
// since we're holding internal references to it.
|
||||||
@@ -409,7 +409,7 @@ impl Reactor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Register a new task with the reactor. In this particular example
|
// Register a new task with the reactor. In this particular example
|
||||||
// we panic if a task with the same id get's registered twice
|
// we panic if a task with the same id get's registered twice
|
||||||
fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
||||||
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
||||||
panic!("Tried to insert a task with id: '{}', twice!", id);
|
panic!("Tried to insert a task with id: '{}', twice!", id);
|
||||||
@@ -458,7 +458,7 @@ fn main() {
|
|||||||
|
|
||||||
// Many runtimes create a global `reactor` we pass it as an argument
|
// Many runtimes create a global `reactor` we pass it as an argument
|
||||||
let reactor = Reactor::new();
|
let reactor = Reactor::new();
|
||||||
|
|
||||||
// We create two tasks:
|
// We create two tasks:
|
||||||
// - first parameter is the `reactor`
|
// - first parameter is the `reactor`
|
||||||
// - the second is a timeout in seconds
|
// - the second is a timeout in seconds
|
||||||
@@ -496,7 +496,7 @@ fn main() {
|
|||||||
# });
|
# });
|
||||||
# let waker = waker_into_waker(Arc::into_raw(mywaker));
|
# let waker = waker_into_waker(Arc::into_raw(mywaker));
|
||||||
# let mut cx = Context::from_waker(&waker);
|
# let mut cx = Context::from_waker(&waker);
|
||||||
#
|
#
|
||||||
# // SAFETY: we shadow `future` so it can't be accessed again.
|
# // SAFETY: we shadow `future` so it can't be accessed again.
|
||||||
# let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
# let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
||||||
# let val = loop {
|
# let val = loop {
|
||||||
@@ -537,7 +537,7 @@ fn main() {
|
|||||||
# RawWakerVTable::new(
|
# RawWakerVTable::new(
|
||||||
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||||
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||||
# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
# |s| (*(s as *const MyWaker)).thread.unpark(), // wake by ref (don't decrease refcount)
|
||||||
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||||
# )
|
# )
|
||||||
# };
|
# };
|
||||||
@@ -584,7 +584,7 @@ fn main() {
|
|||||||
# handle: Option<JoinHandle<()>>,
|
# handle: Option<JoinHandle<()>>,
|
||||||
# tasks: HashMap<usize, TaskState>,
|
# tasks: HashMap<usize, TaskState>,
|
||||||
# }
|
# }
|
||||||
#
|
#
|
||||||
# #[derive(Debug)]
|
# #[derive(Debug)]
|
||||||
# enum Event {
|
# enum Event {
|
||||||
# Close,
|
# Close,
|
||||||
@@ -599,7 +599,7 @@ fn main() {
|
|||||||
# handle: None,
|
# handle: None,
|
||||||
# tasks: HashMap::new(),
|
# tasks: HashMap::new(),
|
||||||
# })));
|
# })));
|
||||||
#
|
#
|
||||||
# let reactor_clone = Arc::downgrade(&reactor);
|
# let reactor_clone = Arc::downgrade(&reactor);
|
||||||
# let handle = thread::spawn(move || {
|
# let handle = thread::spawn(move || {
|
||||||
# let mut handles = vec![];
|
# let mut handles = vec![];
|
||||||
@@ -624,7 +624,7 @@ fn main() {
|
|||||||
# reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
|
# reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
|
||||||
# reactor
|
# reactor
|
||||||
# }
|
# }
|
||||||
#
|
#
|
||||||
# fn wake(&mut self, id: usize) {
|
# fn wake(&mut self, id: usize) {
|
||||||
# self.tasks.get_mut(&id).map(|state| {
|
# self.tasks.get_mut(&id).map(|state| {
|
||||||
# match mem::replace(state, TaskState::Ready) {
|
# match mem::replace(state, TaskState::Ready) {
|
||||||
@@ -634,14 +634,14 @@ fn main() {
|
|||||||
# }
|
# }
|
||||||
# }).unwrap();
|
# }).unwrap();
|
||||||
# }
|
# }
|
||||||
#
|
#
|
||||||
# fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
# fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
||||||
# if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
# if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
||||||
# panic!("Tried to insert a task with id: '{}', twice!", id);
|
# panic!("Tried to insert a task with id: '{}', twice!", id);
|
||||||
# }
|
# }
|
||||||
# self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
|
# self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
|
||||||
# }
|
# }
|
||||||
#
|
#
|
||||||
# fn is_ready(&self, id: usize) -> bool {
|
# fn is_ready(&self, id: usize) -> bool {
|
||||||
# self.tasks.get(&id).map(|state| match state {
|
# self.tasks.get(&id).map(|state| match state {
|
||||||
# TaskState::Ready => true,
|
# TaskState::Ready => true,
|
||||||
@@ -667,12 +667,12 @@ The last point is relevant when we move on the the last paragraph.
|
|||||||
|
|
||||||
> There is one subtle thing to note about our example. What happens if we pass
|
> There is one subtle thing to note about our example. What happens if we pass
|
||||||
> in the same `id` for both events?
|
> in the same `id` for both events?
|
||||||
>
|
>
|
||||||
> ```rust, ignore
|
> ```rust, ignore
|
||||||
> let future1 = Task::new(reactor.clone(), 1, 1);
|
> let future1 = Task::new(reactor.clone(), 1, 1);
|
||||||
> let future2 = Task::new(reactor.clone(), 2, 1);
|
> let future2 = Task::new(reactor.clone(), 2, 1);
|
||||||
> ```
|
> ```
|
||||||
>
|
>
|
||||||
> We'll discuss this a bit more under exercises in the last chapter where we
|
> We'll discuss this a bit more under exercises in the last chapter where we
|
||||||
> also look at ways to fix it. For now, just make a note of it so you're aware
|
> also look at ways to fix it. For now, just make a note of it so you're aware
|
||||||
> of the problem.
|
> of the problem.
|
||||||
@@ -796,9 +796,9 @@ fn block_on<F: Future>(mut future: F) -> F::Output {
|
|||||||
let mywaker = Arc::new(MyWaker { parker: parker.clone() }); <--- NB!
|
let mywaker = Arc::new(MyWaker { parker: parker.clone() }); <--- NB!
|
||||||
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
|
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
|
||||||
let mut cx = Context::from_waker(&waker);
|
let mut cx = Context::from_waker(&waker);
|
||||||
|
|
||||||
// SAFETY: we shadow `future` so it can't be accessed again.
|
// SAFETY: we shadow `future` so it can't be accessed again.
|
||||||
let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
||||||
loop {
|
loop {
|
||||||
match Future::poll(future.as_mut(), &mut cx) {
|
match Future::poll(future.as_mut(), &mut cx) {
|
||||||
Poll::Ready(val) => break val,
|
Poll::Ready(val) => break val,
|
||||||
@@ -822,7 +822,7 @@ fn mywaker_wake(s: &MyWaker) {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
And that's really all there is to it.
|
And that's really all there is to it.
|
||||||
|
|
||||||
> If you checked out the playground link that showcased how park/unpark could [cause subtle
|
> If you checked out the playground link that showcased how park/unpark could [cause subtle
|
||||||
> problems](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b2343661fe3d271c91c6977ab8e681d0)
|
> problems](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b2343661fe3d271c91c6977ab8e681d0)
|
||||||
|
|||||||
@@ -56,9 +56,9 @@ fn block_on<F: Future>(mut future: F) -> F::Output {
|
|||||||
let mywaker = Arc::new(MyWaker { parker: parker.clone() });
|
let mywaker = Arc::new(MyWaker { parker: parker.clone() });
|
||||||
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
|
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
|
||||||
let mut cx = Context::from_waker(&waker);
|
let mut cx = Context::from_waker(&waker);
|
||||||
|
|
||||||
// SAFETY: we shadow `future` so it can't be accessed again.
|
// SAFETY: we shadow `future` so it can't be accessed again.
|
||||||
let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
||||||
loop {
|
loop {
|
||||||
match Future::poll(future.as_mut(), &mut cx) {
|
match Future::poll(future.as_mut(), &mut cx) {
|
||||||
Poll::Ready(val) => break val,
|
Poll::Ready(val) => break val,
|
||||||
@@ -93,7 +93,7 @@ const VTABLE: RawWakerVTable = unsafe {
|
|||||||
RawWakerVTable::new(
|
RawWakerVTable::new(
|
||||||
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||||
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||||
|s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
|s| (*(s as *const MyWaker)).parker.unpark(), // wake by ref (don't decrease refcount)
|
||||||
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
@@ -151,7 +151,7 @@ impl Reactor {
|
|||||||
handle: None,
|
handle: None,
|
||||||
tasks: HashMap::new(),
|
tasks: HashMap::new(),
|
||||||
})));
|
})));
|
||||||
|
|
||||||
let reactor_clone = Arc::downgrade(&reactor);
|
let reactor_clone = Arc::downgrade(&reactor);
|
||||||
let handle = thread::spawn(move || {
|
let handle = thread::spawn(move || {
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|||||||
Reference in New Issue
Block a user