made tests pass

This commit is contained in:
Carl Fredrik Samson
2020-02-01 16:43:25 +01:00
parent 451aca80b8
commit b509e4d32b

View File

@@ -42,7 +42,7 @@ a `Future` has resolved and should be polled again.
**Our Executor will look like this:** **Our Executor will look like this:**
```rust, noplaypen ```rust, noplaypen, ignore
// Our executor takes any object which implements the `Future` trait // Our executor takes any object which implements the `Future` trait
fn block_on<F: Future>(mut future: F) -> F::Output { fn block_on<F: Future>(mut future: F) -> F::Output {
// the first thing we do is to construct a `Waker` which we'll pass on to // the first thing we do is to construct a `Waker` which we'll pass on to
@@ -95,7 +95,7 @@ allow `Futures` to have self references.
## The `Future` implementation ## The `Future` implementation
```rust, noplaypen ```rust, noplaypen, ignore
// This is the definition of our `Waker`. We use a regular thread-handle here. // This is the definition of our `Waker`. We use a regular thread-handle here.
// It works but it's not a good solution. It's easy to fix though, I'll explain // It works but it's not a good solution. It's easy to fix though, I'll explain
// after this code snippet. // after this code snippet.
@@ -265,7 +265,7 @@ once the leaf future is ready.
Our Reactor will look like this: Our Reactor will look like this:
```rust, noplaypen ```rust, noplaypen, ignore
// This is a "fake" reactor. It does no real I/O, but that also makes our // This is a "fake" reactor. It does no real I/O, but that also makes our
// code possible to run in the book and in the playground // code possible to run in the book and in the playground
struct Reactor { struct Reactor {
@@ -432,10 +432,10 @@ fn main() {
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
#//// ============================ EXECUTOR ==================================== # // ============================ EXECUTOR ====================================
# #
#// Our executor takes any object which implements the `Future` trait # // Our executor takes any object which implements the `Future` trait
#fn block_on<F: Future>(mut future: F) -> F::Output { # fn block_on<F: Future>(mut future: F) -> F::Output {
# // the first thing we do is to construct a `Waker` which we'll pass on to # // the first thing we do is to construct a `Waker` which we'll pass on to
# // the `reactor` so it can wake us up when an event is ready. # // the `reactor` so it can wake us up when an event is ready.
# let mywaker = Arc::new(MyWaker{ thread: thread::current() }); # let mywaker = Arc::new(MyWaker{ thread: thread::current() });
@@ -461,119 +461,119 @@ fn main() {
# }; # };
# }; # };
# val # val
#} # }
# #
#// ====================== FUTURE IMPLEMENTATION ============================== # // ====================== FUTURE IMPLEMENTATION ==============================
# #
#// This is the definition of our `Waker`. We use a regular thread-handle here. # // This is the definition of our `Waker`. We use a regular thread-handle here.
#// It works but it's not a good solution. If one of our `Futures` holds a handle # // It works but it's not a good solution. If one of our `Futures` holds a handle
#// to our thread and takes it with it to a different thread the followinc could # // to our thread and takes it with it to a different thread the followinc could
#// happen: # // happen:
#// 1. Our future calls `unpark` from a different thread # // 1. Our future calls `unpark` from a different thread
#// 2. Our `executor` thinks that data is ready and wakes up and polls the future # // 2. Our `executor` thinks that data is ready and wakes up and polls the future
#// 3. The future is not ready yet but one nanosecond later the `Reactor` gets # // 3. The future is not ready yet but one nanosecond later the `Reactor` gets
#// an event and calles `wake()` which also unparks our thread. # // an event and calles `wake()` which also unparks our thread.
#// 4. This could all happen before we go to sleep again since these processes # // 4. This could all happen before we go to sleep again since these processes
#// run in parallel. # // run in parallel.
#// 5. Our reactor has called `wake` but our thread is still sleeping since it was # // 5. Our reactor has called `wake` but our thread is still sleeping since it was
#// awake alredy at that point. # // awake alredy at that point.
#// 6. We're deadlocked and our program stops working # // 6. We're deadlocked and our program stops working
#// There are many better soloutions, here are some: # // There are many better soloutions, here are some:
#// - Use `std::sync::CondVar` # // - Use `std::sync::CondVar`
#// - Use [crossbeam::sync::Parker](https://docs.rs/crossbeam/0.7.3/crossbeam/sync/#struct.Parker.html) # // - Use [crossbeam::sync::Parker](https://docs.rs/crossbeam/0.7.3/crossbeam/sync/# struct.Parker.html)
##[derive(Clone)] # #[derive(Clone)]
#struct MyWaker { # struct MyWaker {
# thread: thread::Thread, # thread: thread::Thread,
#} # }
# #
#// This is the definition of our `Future`. It keeps all the information we # // This is the definition of our `Future`. It keeps all the information we
#// need. This one holds a reference to our `reactor`, that's just to make # // need. This one holds a reference to our `reactor`, that's just to make
#// this example as easy as possible. It doesn't need to hold a reference to # // this example as easy as possible. It doesn't need to hold a reference to
#// the whole reactor, but it needs to be able to register itself with the # // the whole reactor, but it needs to be able to register itself with the
#// reactor. # // reactor.
##[derive(Clone)] # #[derive(Clone)]
#pub struct Task { # pub struct Task {
# id: usize, # id: usize,
# reactor: Arc<Mutex<Reactor>>, # reactor: Arc<Mutex<Reactor>>,
# data: u64, # data: u64,
# is_registered: bool, # is_registered: bool,
#} # }
# #
#// These are function definitions we'll use for our waker. Remember the # // These are function definitions we'll use for our waker. Remember the
#// "Trait Objects" chapter from the book. # // "Trait Objects" chapter from the book.
#fn mywaker_wake(s: &MyWaker) { # fn mywaker_wake(s: &MyWaker) {
# let waker_ptr: *const MyWaker = s; # let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)}; # let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark(); # waker_arc.thread.unpark();
#} # }
# #
#// Since we use an `Arc` cloning is just increasing the refcount on the smart # // Since we use an `Arc` cloning is just increasing the refcount on the smart
#// pointer. # // pointer.
#fn mywaker_clone(s: &MyWaker) -> RawWaker { # fn mywaker_clone(s: &MyWaker) -> RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() }; # let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count # std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE) # RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
#} # }
# #
#// This is actually a "helper funtcion" to create a `Waker` vtable. In contrast # // This is actually a "helper funtcion" to create a `Waker` vtable. In contrast
#// to when we created a `Trait Object` from scratch we don't need to concern # // to when we created a `Trait Object` from scratch we don't need to concern
#// ourselves with the actual layout of the `vtable` and only provide a fixed # // ourselves with the actual layout of the `vtable` and only provide a fixed
#// set of functions # // set of functions
#const VTABLE: RawWakerVTable = unsafe { # const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new( # RawWakerVTable::new(
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone # |s| mywaker_clone(&*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake # |s| mywaker_wake(&*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref # |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount # |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# ) # )
#}; # };
# #
#// Instead of implementing this on the `MyWaker` oject in `impl Mywaker...` we # // Instead of implementing this on the `MyWaker` oject in `impl Mywaker...` we
#// just use this pattern instead since it saves us some lines of code. # // just use this pattern instead since it saves us some lines of code.
#fn waker_into_waker(s: *const MyWaker) -> Waker { # fn waker_into_waker(s: *const MyWaker) -> Waker {
# let raw_waker = RawWaker::new(s as *const (), &VTABLE); # let raw_waker = RawWaker::new(s as *const (), &VTABLE);
# unsafe { Waker::from_raw(raw_waker) } # unsafe { Waker::from_raw(raw_waker) }
#} # }
# #
#impl Task { # impl Task {
# fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self { # fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
# Task { # Task {
# id, # id,
# reactor, # reactor,
# data, # data,
# is_registered: false, # is_registered: false,
# } # }
# } # }
#} # }
# #
#// This is our `Future` implementation # // This is our `Future` implementation
#impl Future for Task { # impl Future for Task {
# // The output for this kind of `leaf future` is just an `usize`. For other # // The output for this kind of `leaf future` is just an `usize`. For other
# // futures this could be something more interesting like a byte stream. # // futures this could be something more interesting like a byte stream.
# type Output = usize; # type Output = usize;
# fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
# let mut r = self.reactor.lock().unwrap(); # let mut r = self.reactor.lock().unwrap();
# // we check with the `Reactor` if this future is in its "readylist" # // we check with the `Reactor` if this future is in its "readylist"
# if r.is_ready(self.id) { # if r.is_ready(self.id) {
# // if it is, we return the data. In this case it's just the ID of # // if it is, we return the data. In this case it's just the ID of
# // the task. # // the task.
# Poll::Ready(self.id) # Poll::Ready(self.id)
# } else if self.is_registered { # } else if self.is_registered {
# // If the future is registered alredy, we just return `Pending` # // If the future is registered alredy, we just return `Pending`
# Poll::Pending # Poll::Pending
# } else { # } else {
# // If we get here, it must be the first time this `Future` is polled # // If we get here, it must be the first time this `Future` is polled
# // so we register a task with our `reactor` # // so we register a task with our `reactor`
# r.register(self.data, cx.waker().clone(), self.id); # r.register(self.data, cx.waker().clone(), self.id);
# // oh, we have to drop the lock on our `Mutex` here because we can't # // oh, we have to drop the lock on our `Mutex` here because we can't
# // have a shared and exclusive borrow at the same time # // have a shared and exclusive borrow at the same time
# drop(r); # drop(r);
# self.is_registered = true; # self.is_registered = true;
# Poll::Pending # Poll::Pending
# } # }
# } # }
#} # }
# #
# // =============================== REACTOR =================================== # // =============================== REACTOR ===================================
# // This is a "fake" reactor. It does no real I/O, but that also makes our # // This is a "fake" reactor. It does no real I/O, but that also makes our
# // code possible to run in the book and in the playground # // code possible to run in the book and in the playground