Futures Explained in 200 Lines of Rust

This book aims to explain Futures in Rust using an example driven approach.

The goal is to get a better understanding of Futures by implementing a toy Reactor, a very simple Executor and our own Futures.

We'll start off solving a small problem without Futures, Wakers or async/await and then gradually adapt our example so it implements all these concepts, and can be solved using the executor provided by both tokio and async_str.

In the end I've made some reader excercises you can do if you want to fix some of the most glaring ommissions and shortcuts we took and create a slightly better example yourself.

What does this book give you that isn't covered elsewhere?

That's a valid question. There are many good resources and examples already. First of all, this book will point you to some background information that I have found very valuable to get an understanding of concurrent programming in general.

I find that many discussions arise, not because Futures is a hard concept to grasp, but that concurrent programming is a hard concept in general.

Secondly, I've always found small runnable examples very exiting to learn from. It's all code that you can download, play with and learn from.

What we'll do and not

We'll:

  • Implement our own Futures and get to know the Reactor/Executor pattern
  • Implement our own waker and learn why it's a bit foreign compared to other types
  • Talk a bit about runtime complexity and what to keep in mind when writing async Rust.
  • Make sure all examples can be run on the playground
  • Not rely on any helpers or libraries, but try to face the complexity and learn

We'll not:

  • Talk about how futures are implemented in Rust the language, the state machine and so on
  • Explain how the different runtimes differ, however, you'll hopefully be a bit better off if you read this before you go research them
  • Explain concurrent programming, but I will supply sources

I do want to explore Rusts internal implementation but that will be for a later book.

Credits and thanks

I'll like to take the chance of thanking the people behind mio, tokio, async_std, Futures, libc, crossbeam and many other libraries which so much is built upon. Reading and exploring some of this code is nothing less than impressive.

Why is Futures in Rust hard to understand

Well, I think it has to do with several things:

  1. Futures has a very interesting implementation, compiling down to a state machine using generators to suspend and resume execution. In a language such as Rust this is pretty hard to do ergonomically and safely. You are exposed to some if this complexity when working with futures and want to understand them, not only learn how to use them.

  2. Rust doesn't provide a runtime. That means you'll actually have to choose one yourself and actually know what a Reactor and an Executor is. While not too difficult, you need to make more choices than you need in GO and other languages designed with a concurrent programming in mind and ships with a runtime.

  3. Futures exist in two versions, Futures 1.0 and Futures 3.0. Futures 1.0 was known to have some issues regarding ergonomics. Turns out that modelling async coding after Promises in JavaScript can turn in to extremely long errors and type signatures with a type system as Rust.

Futures 3.0 are not compatible with Futures 1.0 without performing some work.

  1. Async await syntax was recently stabilized

what we'll really do is to stub out a Reactor, and Executor and implement

Some background information

Before we start implementing our Futures , we'll go through some background information that will help demystify some of the concepts we encounter.

Concurrency in general

If you find the concepts of concurrency and async programming confusing in general, I know where you're coming from and I have written some resources to try to give a high level overview that will make it easier to learn Rusts Futures afterwards:

Trait objects and fat pointers

Trait objects and dynamic dispatch

The single most confusing topic we encounter when implementing our own Futures is how we implement a Waker . Creating a Waker involves creating a vtable which allows using dynamic dispatch to call methods on a type erased trait object we construct our selves.

If you want to know more about dynamic dispatch in Rust I can recommend this article:

https://alschwalm.com/blog/static/2017/03/07/exploring-dynamic-dispatch-in-rust/

Let's explain this a bit more in detail.

Fat pointers in Rust

Let's take a look at the size of some different pointer types in Rust. If we run the following code:

# use std::mem::size_of;
trait SomeTrait { }

fn main() {
    println!("Size of Box<i32>: {}", size_of::<Box<i32>>());
    println!("Size of &i32: {}", size_of::<&i32>());
    println!("Size of &Box<i32>: {}", size_of::<&Box<i32>>());
    println!("Size of Box<Trait>: {}", size_of::<Box<SomeTrait>>());
    println!("Size of &dyn Trait: {}", size_of::<&dyn SomeTrait>());
    println!("Size of &[i32]: {}", size_of::<&[i32]>());
    println!("Size of &[&dyn Trait]: {}", size_of::<&[&dyn SomeTrait]>());
    println!("Size of [i32; 10]: {}", size_of::<[i32; 10]>());
    println!("Size of [&dyn Trait; 10]: {}", size_of::<[&dyn SomeTrait; 10]>());
}

As you see from the output after running this, the sizes of the references varies. Most are 8 bytes (which is a pointer size on 64 bit systems), but some are 16 bytes.

The 16 byte sized pointers are called "fat pointers" since they carry more extra information.

In the case of &[i32] :

  • The first 8 bytes is the actual pointer to the first element in the array

(or part of an array the slice refers to)

  • The second 8 bytes is the length of the slice.

The one we'll concern ourselves about is the references to traits, or trait objects as they're called in Rust.

&dyn SomeTrait is an example of a trait object

The layout for a pointer to a trait object looks like this:

  • The first 8 bytes points to the data for the trait object
  • The second 8 bytes points to the vtable for the trait object

The reason for this is to allow us to refer to an object we know nothing about except that it implements the methods defined by our trait. To allow this we use dynamic dispatch.

Let's explain this in code instead of words by implementing our own trait object from these parts:

// A reference to a trait object is a fat pointer: (data_ptr, vtable_ptr)
trait Test {
    fn add(&self) -> i32;
    fn sub(&self) -> i32;
    fn mul(&self) -> i32;
}

// This will represent our home brewn fat pointer to a trait object
#[repr(C)]
struct FatPointer<'a> {
    /// A reference is a pointer to an instantiated `Data` instance
    data: &'a mut Data,
    /// Since we need to pass in literal values like length and alignment it's
    /// easiest for us to convert pointers to usize-integers instead of the other way around.
    vtable: *const usize,
}

// This is the data in our trait object. It's just two numbers we want to operate on.
struct Data {
    a: i32,
    b: i32,
}

// ====== function definitions ======
fn add(s: &Data) -> i32 {
    s.a + s.b
}
fn sub(s: &Data) -> i32 {
    s.a - s.b
}
fn mul(s: &Data) -> i32 {
    s.a * s.b
}

fn main() {
    let mut data = Data {a: 3, b: 2};
    // vtable is like special purpose array of pointer-length types with a fixed
    // format where the three first values has a special meaning like the
    // length of the array is encoded in the array itself as the second value.
    let vtable = vec![
        0,            // pointer to `Drop` (which we're not implementing here)
        6,            // lenght of vtable
        8,            // alignment
        // we need to make sure we add these in the same order as defined in the Trait.
        // Try changing the order of add and sub and see what happens.
        add as usize, // function pointer
        sub as usize, // function pointer 
        mul as usize, // function pointer
    ];

    let fat_pointer = FatPointer { data: &mut data, vtable: vtable.as_ptr()};
    let test = unsafe { std::mem::transmute::<FatPointer, &dyn Test>(fat_pointer) };

    // And voalá, it's now a trait object we can call methods on
    println!("Add: 3 + 2 = {}", test.add());
    println!("Sub: 3 - 2 = {}", test.sub());
    println!("Mul: 3 * 2 = {}", test.mul());
}

If you run this code by pressing the "play" button at the top you'll se it outputs just what we expect.

This code example is editable so you can change it and run it to see what happens.

The reason we go through this will be clear later on when we implement our own Waker we'll actually set up a vtable like we do here to and knowing what it is will make this much less mysterious.

Reactor/Executor pattern

If you don't know what this is, you should take a few minutes and read about it. You will encounter the term Reactor and Executor a lot when working with async code in Rust.

I have written a quick introduction explaining this pattern before which you can take a look at here:

homepage

Epoll, Kqueue and IOCP Explained - The Reactor-Executor Pattern

I'll re-iterate the most important parts here.

This pattern consists of at least 2 parts:

  1. A reactor
    • handles some kind of event queue
    • has the responsibility of respoonding to events
  2. An executor
    • Often has a scheduler
    • Holds a set of suspended tasks, and has the responsibility of resuming them when an event has occurred
  3. The concept of a task
    • A set of operations that can be stopped half way and resumed later on

This is a pattern not only used in Rust, but it's very popular in Rust due to how well it separates concerns between handling and scheduling tasks, and queing and responding to I/O events.

The only thing Rust as a language defines is the task. In Rust we call an incorruptible task a Future. Futures has a well defined interface, which means they can be used across the entire ecosystem.

In addition, Rust provides a way for the Reactor and Executor to communicate through the Waker. We'll get to know these in the following chapters.

Providing these pieces let's Rust take care a lot of the ergonomic "friction" programmers meet when faced with async code, and still not dictate any preferred runtime to actually do the scheduling and I/O queues.

It's important to know that Rust doesn't provide a runtime, so you have to choose one. async std and tokio are two popular ones.

With that out of the way, let's move on to our main example.

Generators and Pin

So the second difficult part that there seems to be a lot of questions about is Generators and the Pin type.

Generators

**Relevant for:**

- Understanding how the async/await syntax works
- Why we need `Pin`
- Why Rusts async model is extremely efficient

The motivation for Generators can be found in RFC#2033. It's very well written and I can recommend reading through it (it talks as much about async/await as it does about generators).

Basically, there were three main options that were discussed when Rust was desiging how the language would handle concurrency:

  1. Stackful coroutines, better known as green threads.
  2. Using combinators.
  3. Stackless coroutines, better known as generators.

Stackful coroutines/green threads

I've written about green threads before. Go check out Green Threads Explained in 200 lines of Rust if you're interested.

Green threads uses the same mechanisms as an OS does by creating a thread for each task, setting up a stack and forcing the CPU to save it's state and jump from one task(thread) to another. We yield control to the scheduler which then continues running a different task.

Rust had green threads once, but they were removed before it hit 1.0. The state of execution is stored in each stack so in such a solution there would be no need for async, await, Futures or Pin. All this would be implementation details for the library.

Combinators

Futures 1.0 used combinators. If you've worked with Promises in JavaScript, you already know combinators. In Rust they look like this:


# #![allow(unused_variables)]
#fn main() {
let future = Connection::connect(conn_str).and_then(|conn| {
    conn.query("somerequest").map(|row|{
        SomeStruct::from(row)
    }).collect::<Vec<SomeStruct>>()
});

let rows: Result<Vec<SomeStruct>, SomeLibraryError> = block_on(future).unwrap();

#}

While an effective solution there are mainly two downsides I'll focus on:

  1. The error messages produced could be extremely long and arcane
  2. Not optimal memory usage

The reason for the higher than optimal memory usage is that this is basically a callback-based approach, where each closure stores all the data it needs for computation. This means that as we chain these, the memory required to store the needed state increases with each added step.

Stackless coroutines/generators

This is the model used in Async/Await today. It has two advantages:

  1. It's easy to convert normal Rust code to a stackless corotuine using using async/await as keywords (it can even be done using a macro).
  2. It uses memory very efficiently

The second point is in contrast to Futures 1.0 (well, both are efficient in practice but thats beside the point). Generators are implemented as state machines. The memory footprint of a chain of computations is only defined by the largest footprint any single step requires. That means that adding steps to a chain of computations might not require any added memory at all.

How generators work

In Nightly Rust today you can use the yield keyword. Basically using this keyword in a closure, converts it to a generator. A closure looking like this (I'm going to use the terminology that's currently in Rust):


# #![allow(unused_variables)]
#fn main() {
let a = 4;
let b = move || {
        println!("Hello");
        yield a * 2;
        println!("world!");
    };

if let GeneratorState::Yielded(n) = gen.resume() {
        println!("Got value {}", n);
    }

if let GeneratorState::Complete(()) = gen.resume() {
        ()
};
#}

Early on, before there was a consensus about the design of Pin, this compiled to something looking similar to this:

fn main() {
    let mut gen = GeneratorA::start(4);

    if let GeneratorState::Yielded(n) = gen.resume() {
        println!("Got value {}", n);
    }

    if let GeneratorState::Complete(()) = gen.resume() {
        ()
    };
}

// If you've ever wondered why the parameters are called Y and R the naming from
// the original rfc most likely holds the answer
enum GeneratorState<Y, R> {
    // originally called `CoResult`
    Yielded(Y),  // originally called `Yield(Y)`
    Complete(R), // originally called `Return(R)`
}

trait Generator {
    type Yield;
    type Return;
    fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return>;
}

enum GeneratorA {
    Enter(i32),
    Yield1(i32),
    Exit,
}

impl GeneratorA {
    fn start(a1: i32) -> Self {
        GeneratorA::Enter(a1)
    }
}

impl Generator for GeneratorA {
    type Yield = i32;
    type Return = ();
    fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return> {
        // lets us get ownership over current state
        match std::mem::replace(&mut *self, GeneratorA::Exit) {
            GeneratorA::Enter(a1) => {

          /*|---code before yield1---|*/
          /*|*/ println!("Hello"); /*|*/ 
          /*|*/ let a = a1 * 2;    /*|*/
          /*|------------------------|*/

                *self = GeneratorA::Yield1(a);
                GeneratorState::Yielded(a)
            }
            GeneratorA::Yield1(_) => {

          /*|----code after yield1----|*/
          /*|*/ println!("world!"); /*|*/ 
          /*|-------------------------|*/

                *self = GeneratorA::Exit;
                GeneratorState::Complete(())
            }
            GeneratorA::Exit => panic!("Can't advance an exited generator!"),
        }
    }
}

The yield keyword was discussed first in RFC#1823 and in RFC#1832.

|| {
    let arr: Vec<i32> = (0..a).enumerate().map((i,_) i).collect();
    for n in arr {
        yield n;
    }
    println!("The sum is: {}", arr.iter().sum());
}
|| {
    yield a * 2;
    println!("Hello!");
}

Naive example

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};

fn main() {
    let readylist = Arc::new(Mutex::new(vec![]));
    let mut reactor = Reactor::new();

    let mywaker = MyWaker::new(1, thread::current(), readylist.clone());
    reactor.register(2, mywaker);

    let mywaker = MyWaker::new(2, thread::current(), readylist.clone());
    reactor.register(2, mywaker);
    
    executor_run(reactor, readylist);
}
// ====== EXECUTOR ======
fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) {
    let start = Instant::now();
        loop {
        let mut rl_locked = rl.lock().unwrap();
        while let Some(event) = rl_locked.pop() {
            let dur = (Instant::now() - start).as_secs_f32(); 
            println!("Event {} just happened at time: {:.2}.", event, dur);
            reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
        }
        drop(rl_locked);

        if reactor.outstanding.load(Ordering::Relaxed) == 0 {
            reactor.close();
            break;
        }

        thread::park();
    }
}

// ====== "FUTURE" IMPL ======
#[derive(Debug)]
struct MyWaker {
    id: usize,
    thread: thread::Thread,
    readylist: Arc<Mutex<Vec<usize>>>,
}

impl MyWaker {
    fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self {
        MyWaker {
            id,
            thread,
            readylist,
        }
    }

    fn wake(&self) {
        self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
        self.thread.unpark();
    }
}


#[derive(Debug, Clone)]
pub struct Task {
    id: usize,
    pending: bool, 
}

// ===== REACTOR =====
struct Reactor {
    dispatcher: Sender<Event>,
    handle: Option<JoinHandle<()>>,
    outstanding: AtomicUsize,
}
#[derive(Debug)]
enum Event {
    Close,
    Simple(MyWaker, u64),
}

impl Reactor {
    fn new() -> Self {
        let (tx, rx) = channel::<Event>();
        let mut handles = vec![];
        let handle = thread::spawn(move || {
            // This simulates some I/O resource
            for event in rx {
                match event {
                    Event::Close => break,
                    Event::Simple(mywaker, duration) => {
                        let event_handle = thread::spawn(move || {
                            thread::sleep(Duration::from_secs(duration));
                            mywaker.wake();
                        });
                        handles.push(event_handle);
                    }
                }
            }

            for handle in handles {
                handle.join().unwrap();
            }
        });

        Reactor {
            dispatcher: tx,
            handle: Some(handle),
            outstanding: AtomicUsize::new(0),
        }
    }

    fn register(&mut self, duration: u64, mywaker: MyWaker) {
        self.dispatcher
            .send(Event::Simple(mywaker, duration))
            .unwrap();
        self.outstanding.fetch_add(1, Ordering::Relaxed);
    }

    fn close(&mut self) {
        self.dispatcher.send(Event::Close).unwrap();
    }
}

impl Drop for Reactor {
    fn drop(&mut self) {
        self.handle.take().map(|h| h.join().unwrap()).unwrap();
    }
}
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};

fn main() {
    let readylist = Arc::new(Mutex::new(vec![]));
    let mut reactor = Reactor::new();

    let mywaker = MyWaker::new(1, thread::current(), readylist.clone());
    reactor.register(2, mywaker);

    let mywaker = MyWaker::new(2, thread::current(), readylist.clone());
    reactor.register(2, mywaker);
    
    executor_run(reactor, readylist);
}
# // ====== EXECUTOR ======
# fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) {
#     let start = Instant::now();
#         loop {
#         let mut rl_locked = rl.lock().unwrap();
#         while let Some(event) = rl_locked.pop() {
#             let dur = (Instant::now() - start).as_secs_f32(); 
#             println!("Event {} just happened at time: {:.2}.", event, dur);
#             reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
#         }
#         drop(rl_locked);
# 
#         if reactor.outstanding.load(Ordering::Relaxed) == 0 {
#             reactor.close();
#             break;
#         }
# 
#         thread::park();
#     }
# }
# 
# // ====== "FUTURE" IMPL ======
# #[derive(Debug)]
# struct MyWaker {
#     id: usize,
#     thread: thread::Thread,
#     readylist: Arc<Mutex<Vec<usize>>>,
# }
# 
# impl MyWaker {
#     fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self {
#         MyWaker {
#             id,
#             thread,
#             readylist,
#         }
#     }
# 
#     fn wake(&self) {
#         self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
#         self.thread.unpark();
#     }
# }
# 
# 
# #[derive(Debug, Clone)]
# pub struct Task {
#     id: usize,
#     pending: bool, 
# }
# 
# // ===== REACTOR =====
# struct Reactor {
#     dispatcher: Sender<Event>,
#     handle: Option<JoinHandle<()>>,
#     outstanding: AtomicUsize,
# }
# #[derive(Debug)]
# enum Event {
#     Close,
#     Simple(MyWaker, u64),
# }
# 
# impl Reactor {
#     fn new() -> Self {
#         let (tx, rx) = channel::<Event>();
#         let mut handles = vec![];
#         let handle = thread::spawn(move || {
#             // This simulates some I/O resource
#             for event in rx {
#                 match event {
#                     Event::Close => break,
#                     Event::Simple(mywaker, duration) => {
#                         let event_handle = thread::spawn(move || {
#                             thread::sleep(Duration::from_secs(duration));
#                             mywaker.wake();
#                         });
#                         handles.push(event_handle);
#                     }
#                 }
#             }
# 
#             for handle in handles {
#                 handle.join().unwrap();
#             }
#         });
# 
#         Reactor {
#             dispatcher: tx,
#             handle: Some(handle),
#             outstanding: AtomicUsize::new(0),
#         }
#     }
# 
#     fn register(&mut self, duration: u64, mywaker: MyWaker) {
#         self.dispatcher
#             .send(Event::Simple(mywaker, duration))
#             .unwrap();
#         self.outstanding.fetch_add(1, Ordering::Relaxed);
#     }
# 
#     fn close(&mut self) {
#         self.dispatcher.send(Event::Close).unwrap();
#     }
# }
# 
# impl Drop for Reactor {
#     fn drop(&mut self) {
#         self.handle.take().map(|h| h.join().unwrap()).unwrap();
#     }
# }

Proper Waker

Proper Future

Supporting async/await

Bonus: concurrent futures