diff --git a/book/.nojekyll b/book/.nojekyll new file mode 100644 index 0000000..8631215 --- /dev/null +++ b/book/.nojekyll @@ -0,0 +1 @@ +This file makes sure that Github Pages doesn't process mdBook's output. \ No newline at end of file diff --git a/book/0_0_introduction.html b/book/0_0_introduction.html new file mode 100644 index 0000000..d2c374a --- /dev/null +++ b/book/0_0_introduction.html @@ -0,0 +1,233 @@ + + +
+ + +Before we start implementing our Futures , we'll go through some background
+information that will help demystify some of the concepts we encounter.
If you find the concepts of concurrency and async programming confusing in
+general, I know where you're coming from and I have written some resources to
+try to give a high level overview that will make it easier to learn Rusts
+Futures afterwards:
The single most confusing topic we encounter when implementing our own Futures
+is how we implement a Waker . Creating a Waker involves creating a vtable
+which allows using dynamic dispatch to call methods on a type erased trait
+object we construct our selves.
If you want to know more about dynamic dispatch in Rust I can recommend this article:
+https://alschwalm.com/blog/static/2017/03/07/exploring-dynamic-dispatch-in-rust/
+Let's explain this a bit more in detail.
+Let's take a look at the size of some different pointer types in Rust. If we +run the following code:
++use std::mem::size_of; +trait SomeTrait { } + +fn main() { + println!("Size of Box<i32>: {}", size_of::<Box<i32>>()); + println!("Size of &i32: {}", size_of::<&i32>()); + println!("Size of &Box<i32>: {}", size_of::<&Box<i32>>()); + println!("Size of Box<Trait>: {}", size_of::<Box<SomeTrait>>()); + println!("Size of &dyn Trait: {}", size_of::<&dyn SomeTrait>()); + println!("Size of &[i32]: {}", size_of::<&[i32]>()); + println!("Size of &[&dyn Trait]: {}", size_of::<&[&dyn SomeTrait]>()); + println!("Size of [i32; 10]: {}", size_of::<[i32; 10]>()); + println!("Size of [&dyn Trait; 10]: {}", size_of::<[&dyn SomeTrait; 10]>()); +} +
As you see from the output after running this, the sizes of the references varies. +Most are 8 bytes (which is a pointer size on 64 bit systems), but some are 16 +bytes.
+The 16 byte sized pointers are called "fat pointers" since they carry more extra +information.
+In the case of &[i32] :
(or part of an array the slice refers to)
+The one we'll concern ourselves about is the references to traits, or +trait objects as they're called in Rust.
+&dyn SomeTrait is an example of a trait object
The layout for a pointer to a trait object looks like this:
+data for the trait objectvtable for the trait objectThe reason for this is to allow us to refer to an object we know nothing about +except that it implements the methods defined by our trait. To allow this we use +dynamic dispatch.
+Let's explain this in code instead of words by implementing our own trait +object from these parts:
++// A reference to a trait object is a fat pointer: (data_ptr, vtable_ptr) +trait Test { + fn add(&self) -> i32; + fn sub(&self) -> i32; + fn mul(&self) -> i32; +} + +// This will represent our home brewn fat pointer to a trait object +#[repr(C)] +struct FatPointer<'a> { + /// A reference is a pointer to an instantiated `Data` instance + data: &'a mut Data, + /// Since we need to pass in literal values like length and alignment it's + /// easiest for us to convert pointers to usize-integers instead of the other way around. + vtable: *const usize, +} + +// This is the data in our trait object. It's just two numbers we want to operate on. +struct Data { + a: i32, + b: i32, +} + +// ====== function definitions ====== +fn add(s: &Data) -> i32 { + s.a + s.b +} +fn sub(s: &Data) -> i32 { + s.a - s.b +} +fn mul(s: &Data) -> i32 { + s.a * s.b +} + +fn main() { + let mut data = Data {a: 3, b: 2}; + // vtable is like special purpose array of pointer-length types with a fixed + // format where the three first values has a special meaning like the + // length of the array is encoded in the array itself as the second value. + let vtable = vec![ + 0, // pointer to `Drop` (which we're not implementing here) + 6, // lenght of vtable + 8, // alignment + // we need to make sure we add these in the same order as defined in the Trait. + // Try changing the order of add and sub and see what happens. + add as usize, // function pointer + sub as usize, // function pointer + mul as usize, // function pointer + ]; + + let fat_pointer = FatPointer { data: &mut data, vtable: vtable.as_ptr()}; + let test = unsafe { std::mem::transmute::<FatPointer, &dyn Test>(fat_pointer) }; + + // And voalá, it's now a trait object we can call methods on + println!("Add: 3 + 2 = {}", test.add()); + println!("Sub: 3 - 2 = {}", test.sub()); + println!("Mul: 3 * 2 = {}", test.mul()); +} + +
If you run this code by pressing the "play" button at the top you'll se it +outputs just what we expect.
+This code example is editable so you can change it +and run it to see what happens.
+The reason we go through this will be clear later on when we implement our own
+Waker we'll actually set up a vtable like we do here to and knowing what
+it is will make this much less mysterious.
If you don't know what this is, you should take a few minutes and read about
+it. You will encounter the term Reactor and Executor a lot when working
+with async code in Rust.
I have written a quick introduction explaining this pattern before which you +can take a look at here:
+ + +I'll re-iterate the most important parts here.
+This pattern consists of at least 2 parts:
+This is a pattern not only used in Rust, but it's very popular in Rust due to +how well it separates concerns between handling and scheduling tasks, and queing +and responding to I/O events.
+The only thing Rust as a language defines is the task. In Rust we call an
+incorruptible task a Future. Futures has a well defined interface, which means
+they can be used across the entire ecosystem.
In addition, Rust provides a way for the Reactor and Executor to communicate
+through the Waker. We'll get to know these in the following chapters.
Providing these pieces let's Rust take care a lot of the ergonomic "friction" +programmers meet when faced with async code, and still not dictate any +preferred runtime to actually do the scheduling and I/O queues.
+It's important to know that Rust doesn't provide a runtime, so you have to choose +one. async std and tokio are two popular ones.
+With that out of the way, let's move on to our main example.
+ ++use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +fn main() { + let readylist = Arc::new(Mutex::new(vec![])); + let mut reactor = Reactor::new(); + + let mywaker = MyWaker::new(1, thread::current(), readylist.clone()); + reactor.register(2, mywaker); + + let mywaker = MyWaker::new(2, thread::current(), readylist.clone()); + reactor.register(2, mywaker); + + executor_run(reactor, readylist); +} +// ====== EXECUTOR ====== +fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) { + let start = Instant::now(); + loop { + let mut rl_locked = rl.lock().unwrap(); + while let Some(event) = rl_locked.pop() { + let dur = (Instant::now() - start).as_secs_f32(); + println!("Event {} just happened at time: {:.2}.", event, dur); + reactor.outstanding.fetch_sub(1, Ordering::Relaxed); + } + drop(rl_locked); + + if reactor.outstanding.load(Ordering::Relaxed) == 0 { + reactor.close(); + break; + } + + thread::park(); + } +} + +// ====== "FUTURE" IMPL ====== +#[derive(Debug)] +struct MyWaker { + id: usize, + thread: thread::Thread, + readylist: Arc<Mutex<Vec<usize>>>, +} + +impl MyWaker { + fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self { + MyWaker { + id, + thread, + readylist, + } + } + + fn wake(&self) { + self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap(); + self.thread.unpark(); + } +} + + +#[derive(Debug, Clone)] +pub struct Task { + id: usize, + pending: bool, +} + +// ===== REACTOR ===== +struct Reactor { + dispatcher: Sender<Event>, + handle: Option<JoinHandle<()>>, + outstanding: AtomicUsize, +} +#[derive(Debug)] +enum Event { + Close, + Simple(MyWaker, u64), +} + +impl Reactor { + fn new() -> Self { + let (tx, rx) = channel::<Event>(); + let mut handles = vec![]; + let handle = thread::spawn(move || { + // This simulates some I/O resource + for event in rx { + match event { + Event::Close => break, + Event::Simple(mywaker, duration) => { + let event_handle = thread::spawn(move || { + thread::sleep(Duration::from_secs(duration)); + mywaker.wake(); + }); + handles.push(event_handle); + } + } + } + + for handle in handles { + handle.join().unwrap(); + } + }); + + Reactor { + dispatcher: tx, + handle: Some(handle), + outstanding: AtomicUsize::new(0), + } + } + + fn register(&mut self, duration: u64, mywaker: MyWaker) { + self.dispatcher + .send(Event::Simple(mywaker, duration)) + .unwrap(); + self.outstanding.fetch_add(1, Ordering::Relaxed); + } + + fn close(&mut self) { + self.dispatcher.send(Event::Close).unwrap(); + } +} + +impl Drop for Reactor { + fn drop(&mut self) { + self.handle.take().map(|h| h.join().unwrap()).unwrap(); + } +} +
+ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +fn main() { + let readylist = Arc::new(Mutex::new(vec![])); + let mut reactor = Reactor::new(); + + let mywaker = MyWaker::new(1, thread::current(), readylist.clone()); + reactor.register(2, mywaker); + + let mywaker = MyWaker::new(2, thread::current(), readylist.clone()); + reactor.register(2, mywaker); + + executor_run(reactor, readylist); +} +// ====== EXECUTOR ====== +fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) { + let start = Instant::now(); + loop { + let mut rl_locked = rl.lock().unwrap(); + while let Some(event) = rl_locked.pop() { + let dur = (Instant::now() - start).as_secs_f32(); + println!("Event {} just happened at time: {:.2}.", event, dur); + reactor.outstanding.fetch_sub(1, Ordering::Relaxed); + } + drop(rl_locked); + + if reactor.outstanding.load(Ordering::Relaxed) == 0 { + reactor.close(); + break; + } + + thread::park(); + } +} + +// ====== "FUTURE" IMPL ====== +#[derive(Debug)] +struct MyWaker { + id: usize, + thread: thread::Thread, + readylist: Arc<Mutex<Vec<usize>>>, +} + +impl MyWaker { + fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self { + MyWaker { + id, + thread, + readylist, + } + } + + fn wake(&self) { + self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap(); + self.thread.unpark(); + } +} + + +#[derive(Debug, Clone)] +pub struct Task { + id: usize, + pending: bool, +} + +// ===== REACTOR ===== +struct Reactor { + dispatcher: Sender<Event>, + handle: Option<JoinHandle<()>>, + outstanding: AtomicUsize, +} +#[derive(Debug)] +enum Event { + Close, + Simple(MyWaker, u64), +} + +impl Reactor { + fn new() -> Self { + let (tx, rx) = channel::<Event>(); + let mut handles = vec![]; + let handle = thread::spawn(move || { + // This simulates some I/O resource + for event in rx { + match event { + Event::Close => break, + Event::Simple(mywaker, duration) => { + let event_handle = thread::spawn(move || { + thread::sleep(Duration::from_secs(duration)); + mywaker.wake(); + }); + handles.push(event_handle); + } + } + } + + for handle in handles { + handle.join().unwrap(); + } + }); + + Reactor { + dispatcher: tx, + handle: Some(handle), + outstanding: AtomicUsize::new(0), + } + } + + fn register(&mut self, duration: u64, mywaker: MyWaker) { + self.dispatcher + .send(Event::Simple(mywaker, duration)) + .unwrap(); + self.outstanding.fetch_add(1, Ordering::Relaxed); + } + + fn close(&mut self) { + self.dispatcher.send(Event::Close).unwrap(); + } +} + +impl Drop for Reactor { + fn drop(&mut self) { + self.handle.take().map(|h| h.join().unwrap()).unwrap(); + } +} +
It's important to know that Rust doesn't provide a runtime, so you have to choose -one. async std and tokio are two popular ones.
+one. async std and tokio are two popular ones.With that out of the way, let's move on to our main example.
-use std::sync::atomic::{AtomicUsize, Ordering}; @@ -461,7 +461,7 @@ impl Drop for Reactor { } }
use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; diff --git a/src/0_0_Introduction.md b/src/0_0_Introduction.md index e10b99d..d3a8a6d 100644 --- a/src/0_0_Introduction.md +++ b/src/0_0_Introduction.md @@ -1 +1,83 @@ -# Introduction +# Futures Explained in 200 Lines of Rust + +This book aims to explain `Futures` in Rust using an example driven approach. + +The goal is to get a better understanding of `Futures` by implementing a toy +`Reactor`, a very simple `Executor` and our own `Futures`. + +We'll start off solving a small problem without `Futures`, `Wakers` or async/await +and then gradually adapt our example so it implements all these concepts, and +can be solved using the executor provided by both `tokio` and `async_str`. + +In the end I've made some reader excercises you can do if you want to fix some +of the most glaring ommissions and shortcuts we took and create a slightly better +example yourself. + +## What does this book give you that isn't covered elsewhere? + +That's a valid question. There are many good resources and examples already. First +of all, this book will point you to some background information that I have found +very valuable to get an understanding of concurrent programming in general. + +I find that many discussions arise, not because `Futures` is a hard concept to +grasp, but that concurrent programming is a hard concept in general. + +Secondly, I've always found small runnable examples very exiting to learn from. It's +all code that you can download, play with and learn from. + +## What we'll do and not + +**We'll:** + +- Implement our own `Futures` and get to know the `Reactor/Executor` pattern +- Implement our own waker and learn why it's a bit foreign compared to other types +- Talk a bit about runtime complexity and what to keep in mind when writing async Rust. +- Make sure all examples can be run on the playground +- Not rely on any helpers or libraries, but try to face the complexity and learn + +**We'll not:** + +- Talk about how futures are implemented in Rust the language, the state machine and so on +- Explain how the different runtimes differ, however, you'll hopefully be a bit +better off if you read this before you go research them +- Explain concurrent programming, but I will supply sources + +I do want to explore Rusts internal implementation but that will be for a later +book. + +## Credits and thanks + +I'll like to take the chance of thanking the people behind `mio`, `tokio`, +`async_std`, `Futures`, `libc`, `crossbeam` and many other libraries which so +much is built upon. Reading and exploring some of this code is nothing less than +impressive. + +## Why is `Futures` in Rust hard to understand + +Well, I think it has to do with several things: + +1. Futures has a very interesting implementation, compiling down to a state +machine using generators to suspend and resume execution. In a language such as +Rust this is pretty hard to do ergonomically and safely. You are exposed to some +if this complexity when working with futures and want to understand them, not +only learn how to use them. + +2. Rust doesn't provide a runtime. That means you'll actually have to choose one +yourself and actually know what a `Reactor` and an `Executor` is. While not +too difficult, you need to make more choices than you need in GO and other +languages designed with a concurrent programming in mind and ships with a +runtime. + +3. Futures exist in two versions, Futures 1.0 and Futures 3.0. Futures 1.0 was +known to have some issues regarding ergonomics. Turns out that modelling +async coding after `Promises` in JavaScript can turn in to extremely long errors +and type signatures with a type system as Rust. + +Futures 3.0 are not compatible with Futures 1.0 without performing some work. + +4. Async await syntax was recently stabilized + +what we'll +really do is to stub out a `Reactor`, and `Executor` and implement + + diff --git a/src/0_2_naive_implementation.md b/src/0_2_naive_implementation.md index 80a7a0e..4200058 100644 --- a/src/0_2_naive_implementation.md +++ b/src/0_2_naive_implementation.md @@ -133,7 +133,7 @@ impl Drop for Reactor { } ``` -```rust,editable +```rust use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex};