Added Bonus Section implementing a proper Parker

The problems addressed in the earlier version led to an "incorrect"
example which is bad to pass along after reading a whole book. after
getting some feedback in #2 i decided to show how we can create a
proper `Parker`.

The main example (which I assume most interested readers will copy) now
uses a proper parking thechnique so there should be no more dataraces
left.

I also removed the "Reader Excercise" paragraph suggesting that they
explore a way to implement proper parking since we now show that in
our main example.
This commit is contained in:
Carl Fredrik Samson
2020-04-13 14:16:32 +02:00
parent f4b2029788
commit d9eb756ef7
10 changed files with 441 additions and 240 deletions

View File

@@ -44,6 +44,8 @@ The first thing an `executor` does when it gets a `Future` is polling it.
Rust provides a way for the Reactor and Executor to communicate through the `Waker`. The reactor stores this `Waker` and calls `Waker::wake()` on it once
a `Future` has resolved and should be polled again.
> Notice that this chapter has a bonus section called [A Proper Way to Park our Thread](./6_future_example.md#bonus-section---a-proper-way-to-park-our-thread) which shows how to avoid `thread::park`.
**Our Executor will look like this:**
```rust, noplaypen, ignore
@@ -87,6 +89,14 @@ In all the examples you'll see in this chapter I've chosen to comment the code
extensively. I find it easier to follow along that way so I'll not repeat myself
here and focus only on some important aspects that might need further explanation.
It's worth noting that simply calling `thread::sleep` as we do here can lead to
both deadlocks and errors. We'll explain a bit more later and fix this if you
read all the way to the [Bonus Section](./6_future_example.md##bonus-section---a-proper-way-to-park-our-thread) at
the end of this chapter.
For now, we keep it as simple and easy to understand as we can by just going
to sleep.
Now that you've read so much about `Generator`s and `Pin` already this should
be rather easy to understand. `Future` is a state machine, every `await` point
is a `yield` point. We could borrow data across `await` points and we meet the
@@ -254,26 +264,9 @@ without passing around a reference.
> ### Why using thread park/unpark is a bad idea for a library
>
> It could deadlock easily since anyone could get a handle to the `executor thread`
> and call park/unpark on it.
>
> 1. A future could call `unpark` on the executor thread from a different thread
> 2. Our `executor` thinks that data is ready and wakes up and polls the future
> 3. The future is not ready yet when polled, but at that exact same time the
> `Reactor` gets an event and calls `wake()` which also unparks our thread.
> 4. This could happen before we go to sleep again since these processes
> run in parallel.
> 5. Our reactor has called `wake` but our thread is still sleeping since it was
> awake already at that point.
> 6. We're deadlocked and our program stops working
> There is also the case that our thread could have what's called a
> `spurious wakeup` ([which can happen unexpectedly][spurious_wakeup]), which
> could cause the same deadlock if we're unlucky.
There are several better solutions, here are some:
- [std::sync::CondVar][condvar]
- [crossbeam::sync::Parker][crossbeam_parker]
> and call park/unpark on our thread or we could have a race condition where the
> future resolves and calls `wake` before we have time to go to sleep in our
> executor. We'll se how we can fix this at the end of this chapter.
## The Reactor
@@ -481,14 +474,12 @@ fn main() {
// our code into a state machine, `yielding` at every `await` point.
let fut1 = async {
let val = future1.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
};
let fut2 = async {
let val = future2.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
};
// Our executor can only run one and one future, this is pretty normal
@@ -741,6 +732,98 @@ do really hope that you do continue to explore further.
Don't forget the exercises in the last chapter 😊.
## Bonus Section - a Proper Way to Park our Thread
As we explained earlier in our chapter, simply calling `thread::sleep` is not really
sufficient to implement a proper reactor. You can also reach a tool like the `Parker`
in crossbeam: [crossbeam::sync::Parker][crossbeam_parker]
Since it doesn't require many lines of code to create a working solution ourselves we'll show how
we can solve that by using a `Condvar` and a `Mutex` instead.
Start by implementing our own `Parker` like this:
```rust, ignore
#[derive(Default)]
struct Parker(Mutex<bool>, Condvar);
impl Parker {
fn park(&self) {
// We aquire a lock to the Mutex which protects our flag indicating if we
// should resume execution or not.
let mut resumable = self.0.lock().unwrap();
// We put this in a loop since there is a chance we'll get woken, but
// our flag hasn't changed. If that happens, we simply go back to sleep.
while !*resumable {
// We sleep until someone notifies us
resumable = self.1.wait(resumable).unwrap();
}
// We immidiately set the condition to false, so that next time we call `park` we'll
// go right to sleep.
*resumable = false;
}
fn unpark(&self) {
// We simply acquire a lock to our flag and sets the condition to `runnable` when we
// get it.
*self.0.lock().unwrap() = true;
// We notify our `Condvar` so it wakes up and resumes.
self.1.notify_one();
}
}
```
The `Condvar` in Rust is designed to work together with a Mutex. Usually, you'd think that we don't
release the mutex-lock we acquire in `self.0.lock().unwrap();` before we go to sleep. Which means
that our `unpark` function never will acquire a lock to our flag and we deadlock.
Using `Condvar` we avoid this since the `Condvar` will consume our lock so it's released at the
moment we go to sleep.
When we resume again, our `Condvar` returns our lock so we can continue to operate on it.
This means we need to make some very slight changes to our executor like this:
```rust, ignore
fn block_on<F: Future>(mut future: F) -> F::Output {
let parker = Arc::new(Parker::default()); // <--- NB!
let mywaker = Arc::new(MyWaker { parker: parker.clone() }); <--- NB!
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
// SAFETY: we shadow `future` so it can't be accessed again.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
loop {
match Future::poll(future.as_mut(), &mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => parker.park(), // <--- NB!
};
}
}
```
And we need to change our `Waker` like this:
```rust, ignore
#[derive(Clone)]
struct MyWaker {
parker: Arc<Parker>,
}
fn mywaker_wake(s: &MyWaker) {
let waker_arc = unsafe { Arc::from_raw(s) };
waker_arc.parker.unpark();
}
```
And that's really all there is to it. The next chapter shows our finished code with this
improvement which you can explore further if you wish.
[mio]: https://github.com/tokio-rs/mio
[arc_wake]: https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/task/trait.ArcWake.html
[example_repo]: https://github.com/cfsamson/examples-futures

View File

@@ -8,19 +8,15 @@ run it yourself. Have fun!
fn main() {
let start = Instant::now();
let reactor = Reactor::new();
let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2);
let fut1 = async {
let val = future1.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
let val = Task::new(reactor.clone(), 1, 1).await;
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
};
let fut2 = async {
let val = future2.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
let val = Task::new(reactor.clone(), 2, 2).await;
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
};
let mainfut = async {
@@ -31,35 +27,50 @@ fn main() {
block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap();
}
use std::{
future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
future::Future, sync::{ mpsc::{channel, Sender}, Arc, Mutex, Condvar},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem, pin::Pin,
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
// ============================= EXECUTOR ====================================
fn block_on<F: Future>(mut future: F) -> F::Output {
let mywaker = Arc::new(MyWaker {
thread: thread::current(),
});
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
#[derive(Default)]
struct Parker(Mutex<bool>, Condvar);
// SAFETY: we shadow `future` so it can't be accessed again.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
let val = loop {
match Future::poll(future.as_mut(), &mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => thread::park(),
};
};
val
impl Parker {
fn park(&self) {
let mut resumable = self.0.lock().unwrap();
while !*resumable {
resumable = self.1.wait(resumable).unwrap();
}
*resumable = false;
}
fn unpark(&self) {
*self.0.lock().unwrap() = true;
self.1.notify_one();
}
}
fn block_on<F: Future>(mut future: F) -> F::Output {
let parker = Arc::new(Parker::default());
let mywaker = Arc::new(MyWaker { parker: parker.clone() });
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
// SAFETY: we shadow `future` so it can't be accessed again.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
loop {
match Future::poll(future.as_mut(), &mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => parker.park(),
};
}
}
// ====================== FUTURE IMPLEMENTATION ==============================
#[derive(Clone)]
struct MyWaker {
thread: thread::Thread,
parker: Arc<Parker>,
}
#[derive(Clone)]
@@ -70,9 +81,8 @@ pub struct Task {
}
fn mywaker_wake(s: &MyWaker) {
let waker_ptr: *const MyWaker = s;
let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
waker_arc.thread.unpark();
let waker_arc = unsafe { Arc::from_raw(s) };
waker_arc.parker.unpark();
}
fn mywaker_clone(s: &MyWaker) -> RawWaker {
@@ -90,7 +100,7 @@ const VTABLE: RawWakerVTable = unsafe {
)
};
fn waker_into_waker(s: *const MyWaker) -> Waker {
fn mywaker_into_waker(s: *const MyWaker) -> Waker {
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
@@ -106,21 +116,17 @@ impl Future for Task {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut r = self.reactor.lock().unwrap();
if r.is_ready(self.id) {
println!("POLL: TASK {} IS READY", self.id);
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
Poll::Ready(self.id)
} else if r.tasks.contains_key(&self.id) {
println!("POLL: REPLACED WAKER FOR TASK: {}", self.id);
r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
Poll::Pending
} else {
println!("POLL: REGISTERED TASK: {}, WAKER: {:?}", self.id, cx.waker());
r.register(self.data, cx.waker().clone(), self.id);
Poll::Pending
}
}
}
// =============================== REACTOR ===================================
enum TaskState {
Ready,
@@ -172,13 +178,12 @@ impl Reactor {
}
fn wake(&mut self, id: usize) {
self.tasks.get_mut(&id).map(|state| {
match mem::replace(state, TaskState::Ready) {
TaskState::NotReady(waker) => waker.wake(),
TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
_ => unreachable!()
}
}).unwrap();
let state = self.tasks.get_mut(&id).unwrap();
match mem::replace(state, TaskState::Ready) {
TaskState::NotReady(waker) => waker.wake(),
TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
_ => unreachable!()
}
}
fn register(&mut self, duration: u64, waker: Waker, id: usize) {

View File

@@ -17,16 +17,6 @@ So our implementation has taken some obvious shortcuts and could use some improv
Actually digging into the code and try things yourself is a good way to learn. Here are
some good exercises if you want to explore more:
### Avoid `thread::park`
The big problem using `Thread::park` and `Thread::unpark` is that the user can access these
same methods from their own code. Try to use another method to suspend our thread and wake
it up again on our command. Some hints:
* Check out `CondVars`, here are two sources [Wikipedia][condvar_wiki] and the
docs for [`CondVar`][condvar_std]
* Take a look at crates that help you with this exact problem like [Crossbeam ](https://github.com/crossbeam-rs/crossbeam)\(specifically the [`Parker`](https://docs.rs/crossbeam/0.7.3/crossbeam/sync/struct.Parker.html)\)
### Avoid wrapping the whole `Reactor` in a mutex and pass it around
First of all, protecting the whole `Reactor` and passing it around is overkill. We're only