finished book!!!!!!

This commit is contained in:
Carl Fredrik Samson
2020-04-06 01:51:18 +02:00
parent 3a3ad1eeea
commit 15d7c726f8
18 changed files with 720 additions and 1172 deletions

View File

@@ -48,26 +48,33 @@ a `Future` has resolved and should be polled again.
```rust, noplaypen, ignore
// Our executor takes any object which implements the `Future` trait
fn block_on<F: Future>(mut future: F) -> F::Output {
// the first thing we do is to construct a `Waker` which we'll pass on to
// the `reactor` so it can wake us up when an event is ready.
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
// The context struct is just a wrapper for a `Waker` object. Maybe in the
// future this will do more, but right now it's just a wrapper.
let mut cx = Context::from_waker(&waker);
// So, since we run this on one thread and run one future to completion
// we can pin the `Future` to the stack. This is unsafe, but saves an
// allocation. We could `Box::pin` it too if we wanted. This is however
// safe since we shadow `future` so it can't be accessed again and will
// not move until it's dropped.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
// We poll in a loop, but it's not a busy loop. It will only run when
// an event occurs, or a thread has a "spurious wakeup" (an unexpected wakeup
// that can happen for no good reason).
let val = loop {
// So, since we run this on one thread and run one future to completion
// we can pin the `Future` to the stack. This is unsafe, but saves an
// allocation. We could `Box::pin` it too if we wanted. This is however
// safe since we don't move the `Future` here.
let pinned = unsafe { Pin::new_unchecked(&mut future) };
match Future::poll(pinned, &mut cx) {
// when the Future is ready we're finished
Poll::Ready(val) => break val,
// If we get a `pending` future we just go to sleep...
Poll::Pending => thread::park(),
};
@@ -141,7 +148,7 @@ fn mywaker_wake(s: &MyWaker) {
// Since we use an `Arc` cloning is just increasing the refcount on the smart
// pointer.
fn mywaker_clone(s: &MyWaker) -> RawWaker {
let arc = unsafe { Arc::from_raw(s).clone() };
let arc = unsafe { Arc::from_raw(s) };
std::mem::forget(arc.clone()); // increase ref count
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}
@@ -179,24 +186,30 @@ impl Task {
// This is our `Future` implementation
impl Future for Task {
// The output for our kind of `leaf future` is just an `usize`. For other
// futures this could be something more interesting like a byte array.
type Output = usize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut r = self.reactor.lock().unwrap();
// we check with the `Reactor` if this future is in its "readylist"
// i.e. if it's `Ready`
if r.is_ready(self.id) {
// if it is, we return the data. In this case it's just the ID of
// the task since this is just a very simple example.
Poll::Ready(self.id)
} else if self.is_registered {
// If the future is registered alredy, we just return `Pending`
Poll::Pending
} else {
// If we get here, it must be the first time this `Future` is polled
// so we register a task with our `reactor`
r.register(self.data, cx.waker().clone(), self.id);
// oh, we have to drop the lock on our `Mutex` here because we can't
// have a shared and exclusive borrow at the same time
drop(r);
@@ -232,29 +245,26 @@ We choose to pass in a reference to the whole `Reactor` here. This isn't normal.
The reactor will often be a global resource which let's us register interests
without passing around a reference.
### Why using thread park/unpark is a bad idea for a library
It could deadlock easily since anyone could get a handle to the `executor thread`
and call park/unpark on it.
If one of our `Futures` holds a handle to our thread, or any unrelated code
calls `unpark` on our thread, the following could happen:
1. A future could call `unpark` on the executor thread from a different thread
2. Our `executor` thinks that data is ready and wakes up and polls the future
3. The future is not ready yet when polled, but at that exact same time the
`Reactor` gets an event and calls `wake()` which also unparks our thread.
4. This could happen before we go to sleep again since these processes
run in parallel.
5. Our reactor has called `wake` but our thread is still sleeping since it was
awake already at that point.
6. We're deadlocked and our program stops working
> ### Why using thread park/unpark is a bad idea for a library
>
> It could deadlock easily since anyone could get a handle to the `executor thread`
> and call park/unpark on it.
>
> 1. A future could call `unpark` on the executor thread from a different thread
> 2. Our `executor` thinks that data is ready and wakes up and polls the future
> 3. The future is not ready yet when polled, but at that exact same time the
> `Reactor` gets an event and calls `wake()` which also unparks our thread.
> 4. This could happen before we go to sleep again since these processes
> run in parallel.
> 5. Our reactor has called `wake` but our thread is still sleeping since it was
> awake already at that point.
> 6. We're deadlocked and our program stops working
> There is also the case that our thread could have what's called a
`spurious wakeup` ([which can happen unexpectedly][spurious_wakeup]), which
could cause the same deadlock if we're unlucky.
> `spurious wakeup` ([which can happen unexpectedly][spurious_wakeup]), which
> could cause the same deadlock if we're unlucky.
There are many better solutions, here are some:
There are several better solutions, here are some:
- Use [std::sync::CondVar][condvar]
- Use [crossbeam::sync::Parker][crossbeam_parker]
@@ -279,8 +289,19 @@ is a `Future`.
>registers interest with the global `Reactor` and no reference is needed.
We can call this kind of `Future` a "leaf Future", since it's some operation
we'll actually wait on and that we can chain operations on which are performed
once the leaf future is ready.
we'll actually wait on and which we can chain operations on which are performed
once the leaf future is ready.
The reactor we create here will also create **leaf-futures**, accept a waker and
call it once the task is finished.
The task we're implementing is the simplest I could find. It's a timer that
only spawns a thread and puts it to sleep for a number of seconds we specify
when acquiring the leaf-future.
To be able to run the code here in the browser there is not much real I/O we
can do so just pretend that this is actually represents some useful I/O operation
for the sake of this example.
**Our Reactor will look like this:**
@@ -288,10 +309,12 @@ once the leaf future is ready.
// This is a "fake" reactor. It does no real I/O, but that also makes our
// code possible to run in the book and in the playground
struct Reactor {
// we need some way of registering a Task with the reactor. Normally this
// would be an "interest" in an I/O event
dispatcher: Sender<Event>,
handle: Option<JoinHandle<()>>,
// This is a list of tasks that are ready, which means they should be polled
// for data.
readylist: Arc<Mutex<Vec<usize>>>,
@@ -316,11 +339,13 @@ impl Reactor {
// This `Vec` will hold handles to all threads we spawn so we can
// join them later on and finish our programm in a good manner
let mut handles = vec![];
// This will be the "Reactor thread"
let handle = thread::spawn(move || {
for event in rx {
let rl_clone = rl_clone.clone();
match event {
// If we get a close event we break out of the loop we're in
Event::Close => break,
Event::Timeout(waker, duration, id) => {
@@ -328,12 +353,15 @@ impl Reactor {
// When we get an event we simply spawn a new thread
// which will simulate some I/O resource...
let event_handle = thread::spawn(move || {
//... by sleeping for the number of seconds
// we provided when creating the `Task`.
thread::sleep(Duration::from_secs(duration));
// When it's done sleeping we put the ID of this task
// on the "readylist"
rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
// Then we call `wake` which will wake up our
// executor and start polling the futures
waker.wake();
@@ -360,6 +388,7 @@ impl Reactor {
}
fn register(&mut self, duration: u64, waker: Waker, data: usize) {
// registering an event is as simple as sending an `Event` through
// the channel.
self.dispatcher
@@ -416,6 +445,7 @@ fn main() {
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new();
// Since we'll share this between threads we wrap it in a
// atmically-refcounted- mutex.
let reactor = Arc::new(Mutex::new(reactor));
@@ -451,6 +481,7 @@ fn main() {
// This executor will block the main thread until the futures is resolved
block_on(mainfut);
// When we're done, we want to shut down our reactor thread so our program
// ends nicely.
reactor.lock().map(|mut r| r.close()).unwrap();
@@ -471,15 +502,6 @@ fn main() {
# val
# }
#
# fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&waker);
# let mut boxed = Box::pin(future);
# let _ = Future::poll(boxed.as_mut(), &mut cx);
# boxed
# }
#
# // ====================== FUTURE IMPLEMENTATION ==============================
# #[derive(Clone)]
# struct MyWaker {
@@ -632,12 +654,6 @@ The last point is relevant when we move on the the last paragraph.
## Async/Await and concurrent Futures
This is the first time we actually see the `async/await` syntax so let's
finish this book by explaining them briefly.
Hopefully, the `await` syntax looks pretty familiar. It has a lot in common
with `yield` and indeed, it works in much the same way.
The `async` keyword can be used on functions as in `async fn(...)` or on a
block as in `async { ... }`. Both will turn your function, or block, into a
`Future`.
@@ -645,13 +661,14 @@ block as in `async { ... }`. Both will turn your function, or block, into a
These `Futures` are rather simple. Imagine our generator from a few chapters
back. Every `await` point is like a `yield` point.
Instead of `yielding` a value we pass in, it yields the `Future` we're awaiting.
In turn this `Future` is polled.
Instead of `yielding` a value we pass in, it yields the `Future` we're awaiting,
so when we poll a future the first time we run the code up until the first
`await` point where it yields a new Future we poll and so on until we reach
a **leaf-future**.
Now, as is the case in our code, our `mainfut` contains two non-leaf futures
which it awaits, and all that happens is that these state machines are polled
as well until some "leaf future" in the end is finally polled and either
returns `Ready` or `Pending`.
until some "leaf future" in the end either returns `Ready` or `Pending`.
The way our example is right now, it's not much better than regular synchronous
code. For us to actually await multiple futures at the same time we somehow need
@@ -672,254 +689,14 @@ Future got 1 at time: 1.00.
Future got 2 at time: 2.00.
```
To accomplish this we can create the simplest possible `spawn` function I could
come up with:
Now, this is the point where I'll refer you to some better resources for
implementing just that. You should have a pretty good understanding of the
concept of Futures by now.
```rust, ignore, noplaypen
fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
// We start off the same way as we did before
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
// But we need to Box this Future. We can't pin it to this stack frame
// since we'll return before the `Future` is resolved so it must be pinned
// to the heap.
let mut boxed = Box::pin(future);
// Now we poll and just discard the result. This way, we register a `Waker`
// with our `Reactor` and kick of whatever operation we're expecting.
let _ = Future::poll(boxed.as_mut(), &mut cx);
// We still need this `Future` since we'll await it later so we return it...
boxed
}
```
The next step should be getting to know how more advanced runtimes work and
how they implement different ways of running Futures to completion.
Now if we change our code in `main` to look like this instead.
```rust, edition2018
# use std::{
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
# thread::{self, JoinHandle}, time::{Duration, Instant}
# };
fn main() {
let start = Instant::now();
let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor));
let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2);
let fut1 = async {
let val = future1.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
};
let fut2 = async {
let val = future2.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
};
// You'll notice everything stays the same until this point
let mainfut = async {
// Here we "kick off" our first `Future`
let handle1 = spawn(fut1);
// And the second one
let handle2 = spawn(fut2);
// Now, they're already started, and when they get polled in our
// executor now they will just return `Pending`, or if we somehow used
// so much time that they're already resolved, they will return `Ready`.
handle1.await;
handle2.await;
};
block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap();
}
# // ============================= EXECUTOR ====================================
# fn block_on<F: Future>(mut future: F) -> F::Output {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&waker);
# let val = loop {
# let pinned = unsafe { Pin::new_unchecked(&mut future) };
# match Future::poll(pinned, &mut cx) {
# Poll::Ready(val) => break val,
# Poll::Pending => thread::park(),
# };
# };
# val
# }
#
# fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&waker);
# let mut boxed = Box::pin(future);
# let _ = Future::poll(boxed.as_mut(), &mut cx);
# boxed
# }
#
# // ====================== FUTURE IMPLEMENTATION ==============================
# #[derive(Clone)]
# struct MyWaker {
# thread: thread::Thread,
# }
#
# #[derive(Clone)]
# pub struct Task {
# id: usize,
# reactor: Arc<Mutex<Reactor>>,
# data: u64,
# is_registered: bool,
# }
#
# fn mywaker_wake(s: &MyWaker) {
# let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark();
# }
#
# fn mywaker_clone(s: &MyWaker) -> RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
# }
#
# const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new(
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# )
# };
#
# fn waker_into_waker(s: *const MyWaker) -> Waker {
# let raw_waker = RawWaker::new(s as *const (), &VTABLE);
# unsafe { Waker::from_raw(raw_waker) }
# }
#
# impl Task {
# fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
# Task {
# id,
# reactor,
# data,
# is_registered: false,
# }
# }
# }
#
# impl Future for Task {
# type Output = usize;
# fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
# let mut r = self.reactor.lock().unwrap();
# if r.is_ready(self.id) {
# Poll::Ready(self.id)
# } else if self.is_registered {
# Poll::Pending
# } else {
# r.register(self.data, cx.waker().clone(), self.id);
# drop(r);
# self.is_registered = true;
# Poll::Pending
# }
# }
# }
#
# // =============================== REACTOR ===================================
# struct Reactor {
# dispatcher: Sender<Event>,
# handle: Option<JoinHandle<()>>,
# readylist: Arc<Mutex<Vec<usize>>>,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Timeout(Waker, u64, usize),
# }
#
# impl Reactor {
# fn new() -> Self {
# let (tx, rx) = channel::<Event>();
# let readylist = Arc::new(Mutex::new(vec![]));
# let rl_clone = readylist.clone();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# println!("REACTOR: {:?}", event);
# let rl_clone = rl_clone.clone();
# match event {
# Event::Close => break,
# Event::Timeout(waker, duration, id) => {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
# waker.wake();
# });
#
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# readylist,
# dispatcher: tx,
# handle: Some(handle),
# }
# }
#
# fn register(&mut self, duration: u64, waker: Waker, data: usize) {
# self.dispatcher
# .send(Event::Timeout(waker, duration, data))
# .unwrap();
# }
#
# fn close(&mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
#
# fn is_ready(&self, id_to_check: usize) -> bool {
# self.readylist
# .lock()
# .map(|rl| rl.iter().any(|id| *id == id_to_check))
# .unwrap()
# }
# }
#
# impl Drop for Reactor {
# fn drop(&mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
```
Now, if we try to run our example again
If you add this code to our example and run it, you'll see:
```ignore
Future got 1 at time: 1.00.
Future got 2 at time: 2.00.
```
Exactly as we expected.
Now this `spawn` method is not very sophisticated but it explains the concept.
I've [challenged you to create a better version](./conclusion.md#building-a-better-exectuor) and pointed you at a better resource
in the next chapter under [reader exercises](./conclusion.md#reader-exercises).
I [challenge you to create a better version](./conclusion.md#building-a-better-exectuor).
That's actually it for now. There are probably much more to learn, but I think it
will be easier once the fundamental concepts are there and that further