Files
books-futures-explained/book/6_future_example.html
2020-04-14 00:37:03 +02:00

1020 lines
53 KiB
HTML

<!DOCTYPE HTML>
<html lang="en" class="sidebar-visible no-js light">
<head>
<!-- Book generated using mdBook -->
<meta charset="UTF-8">
<title>Implementing Futures - Futures Explained in 200 Lines of Rust</title>
<meta content="text/html; charset=utf-8" http-equiv="Content-Type">
<meta name="description" content="This book aims to explain Futures in Rust using an example driven approach.">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="theme-color" content="#ffffff" />
<link rel="shortcut icon" href="favicon.png">
<link rel="stylesheet" href="css/variables.css">
<link rel="stylesheet" href="css/general.css">
<link rel="stylesheet" href="css/chrome.css">
<link rel="stylesheet" href="css/print.css" media="print">
<!-- Fonts -->
<link rel="stylesheet" href="FontAwesome/css/font-awesome.css">
<link href="https://fonts.googleapis.com/css?family=Open+Sans:300italic,400italic,600italic,700italic,800italic,400,300,600,700,800" rel="stylesheet" type="text/css">
<link href="https://fonts.googleapis.com/css?family=Source+Code+Pro:500" rel="stylesheet" type="text/css">
<!-- Highlight.js Stylesheets -->
<link rel="stylesheet" href="highlight.css">
<link rel="stylesheet" href="tomorrow-night.css">
<link rel="stylesheet" href="ayu-highlight.css">
<!-- Custom theme stylesheets -->
</head>
<body>
<!-- Provide site root to javascript -->
<script type="text/javascript">
var path_to_root = "";
var default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? "light" : "light";
</script>
<!-- Work around some values being stored in localStorage wrapped in quotes -->
<script type="text/javascript">
try {
var theme = localStorage.getItem('mdbook-theme');
var sidebar = localStorage.getItem('mdbook-sidebar');
if (theme.startsWith('"') && theme.endsWith('"')) {
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
}
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
}
} catch (e) { }
</script>
<!-- Set the theme before any content is loaded, prevents flash -->
<script type="text/javascript">
var theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
var html = document.querySelector('html');
html.classList.remove('no-js')
html.classList.remove('light')
html.classList.add(theme);
html.classList.add('js');
</script>
<!-- Hide / unhide sidebar before it is displayed -->
<script type="text/javascript">
var html = document.querySelector('html');
var sidebar = 'hidden';
if (document.body.clientWidth >= 1080) {
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
sidebar = sidebar || 'visible';
}
html.classList.remove('sidebar-visible');
html.classList.add("sidebar-" + sidebar);
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<div id="sidebar-scrollbox" class="sidebar-scrollbox">
<ol class="chapter"><li class="expanded affix "><a href="introduction.html">Introduction</a></li><li class="expanded "><a href="0_background_information.html"><strong aria-hidden="true">1.</strong> Background information</a></li><li class="expanded "><a href="1_futures_in_rust.html"><strong aria-hidden="true">2.</strong> Futures in Rust</a></li><li class="expanded "><a href="2_waker_context.html"><strong aria-hidden="true">3.</strong> Waker and Context</a></li><li class="expanded "><a href="3_generators_async_await.html"><strong aria-hidden="true">4.</strong> Generators and async/await</a></li><li class="expanded "><a href="4_pin.html"><strong aria-hidden="true">5.</strong> Pin</a></li><li class="expanded "><a href="6_future_example.html" class="active"><strong aria-hidden="true">6.</strong> Implementing Futures</a></li><li class="expanded "><a href="8_finished_example.html"><strong aria-hidden="true">7.</strong> Finished example (editable)</a></li><li class="expanded affix "><a href="conclusion.html">Conclusion and exercises</a></li></ol>
</div>
<div id="sidebar-resize-handle" class="sidebar-resize-handle"></div>
</nav>
<div id="page-wrapper" class="page-wrapper">
<div class="page">
<div id="menu-bar" class="menu-bar">
<div id="menu-bar-sticky-container">
<div class="left-buttons">
<button id="sidebar-toggle" class="icon-button" type="button" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
<i class="fa fa-bars"></i>
</button>
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
<i class="fa fa-paint-brush"></i>
</button>
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
<li role="none"><button role="menuitem" class="theme" id="light">Light (default)</button></li>
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
</ul>
<button id="search-toggle" class="icon-button" type="button" title="Search. (Shortkey: s)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="S" aria-controls="searchbar">
<i class="fa fa-search"></i>
</button>
</div>
<h1 class="menu-title">Futures Explained in 200 Lines of Rust</h1>
<div class="right-buttons">
<a href="print.html" title="Print this book" aria-label="Print this book">
<i id="print-button" class="fa fa-print"></i>
</a>
<a href="https://github.com/cfsamson/books-futures-explained" title="Git repository" aria-label="Git repository">
<i id="git-repository-button" class="fa fa-github"></i>
</a>
</div>
</div>
</div>
<div id="search-wrapper" class="hidden">
<form id="searchbar-outer" class="searchbar-outer">
<input type="search" name="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
</form>
<div id="searchresults-outer" class="searchresults-outer hidden">
<div id="searchresults-header" class="searchresults-header"></div>
<ul id="searchresults">
</ul>
</div>
</div>
<!-- Apply ARIA attributes after the sidebar and the sidebar toggle button are added to the DOM -->
<script type="text/javascript">
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
});
</script>
<div id="content" class="content">
<main>
<h1><a class="header" href="#implementing-futures---main-example" id="implementing-futures---main-example">Implementing Futures - main example</a></h1>
<p>We'll create our own Futures together with a fake reactor and a simple
executor which allows you to edit, run an play around with the code right here
in your browser.</p>
<p>I'll walk you through the example, but if you want to check it out closer, you
can always <a href="https://github.com/cfsamson/examples-futures">clone the repository</a> and play around with the code
yourself or just copy it from the next chapter.</p>
<p>There are several branches explained in the readme, but two are
relevant for this chapter. The <code>main</code> branch is the example we go through here,
and the <code>basic_example_commented</code> branch is this example with extensive
comments.</p>
<blockquote>
<p>If you want to follow along as we go through, initialize a new cargo project
by creating a new folder and run <code>cargo init</code> inside it. Everything we write
here will be in <code>main.rs</code></p>
</blockquote>
<h2><a class="header" href="#implementing-our-own-futures" id="implementing-our-own-futures">Implementing our own Futures</a></h2>
<p>Let's start off by getting all our imports right away so you can follow along</p>
<pre><code class="language-rust noplaypen ignore">use std::{
future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
</code></pre>
<h2><a class="header" href="#the-executor" id="the-executor">The Executor</a></h2>
<p>The executors responsibility is to take one or more futures and run them to completion.</p>
<p>The first thing an <code>executor</code> does when it gets a <code>Future</code> is polling it.</p>
<p><strong>When polled one of three things can happen:</strong></p>
<ul>
<li>The future returns <code>Ready</code> and we schedule whatever chained operations to run</li>
<li>The future hasn't been polled before so we pass it a <code>Waker</code> and suspend it</li>
<li>The futures has been polled before but is not ready and returns <code>Pending</code></li>
</ul>
<p>Rust provides a way for the Reactor and Executor to communicate through the <code>Waker</code>. The reactor stores this <code>Waker</code> and calls <code>Waker::wake()</code> on it once
a <code>Future</code> has resolved and should be polled again.</p>
<blockquote>
<p>Notice that this chapter has a bonus section called <a href="./6_future_example.html#bonus-section---a-proper-way-to-park-our-thread">A Proper Way to Park our Thread</a> which shows how to avoid <code>thread::park</code>.</p>
</blockquote>
<p><strong>Our Executor will look like this:</strong></p>
<pre><code class="language-rust noplaypen ignore">// Our executor takes any object which implements the `Future` trait
fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
// the first thing we do is to construct a `Waker` which we'll pass on to
// the `reactor` so it can wake us up when an event is ready.
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
// The context struct is just a wrapper for a `Waker` object. Maybe in the
// future this will do more, but right now it's just a wrapper.
let mut cx = Context::from_waker(&amp;waker);
// So, since we run this on one thread and run one future to completion
// we can pin the `Future` to the stack. This is unsafe, but saves an
// allocation. We could `Box::pin` it too if we wanted. This is however
// safe since we shadow `future` so it can't be accessed again and will
// not move until it's dropped.
let mut future = unsafe { Pin::new_unchecked(&amp;mut future) };
// We poll in a loop, but it's not a busy loop. It will only run when
// an event occurs, or a thread has a &quot;spurious wakeup&quot; (an unexpected wakeup
// that can happen for no good reason).
let val = loop {
match Future::poll(future, &amp;mut cx) {
// when the Future is ready we're finished
Poll::Ready(val) =&gt; break val,
// If we get a `pending` future we just go to sleep...
Poll::Pending =&gt; thread::park(),
};
};
val
}
</code></pre>
<p>In all the examples you'll see in this chapter I've chosen to comment the code
extensively. I find it easier to follow along that way so I'll not repeat myself
here and focus only on some important aspects that might need further explanation.</p>
<p>It's worth noting that simply calling <code>thread::sleep</code> as we do here can lead to
both deadlocks and errors. We'll explain a bit more later and fix this if you
read all the way to the <a href="./6_future_example.html##bonus-section---a-proper-way-to-park-our-thread">Bonus Section</a> at
the end of this chapter.</p>
<p>For now, we keep it as simple and easy to understand as we can by just going
to sleep.</p>
<p>Now that you've read so much about <code>Generator</code>s and <code>Pin</code> already this should
be rather easy to understand. <code>Future</code> is a state machine, every <code>await</code> point
is a <code>yield</code> point. We could borrow data across <code>await</code> points and we meet the
exact same challenges as we do when borrowing across <code>yield</code> points.</p>
<blockquote>
<p><code>Context</code> is just a wrapper around the <code>Waker</code>. At the time of writing this
book it's nothing more. In the future it might be possible that the <code>Context</code>
object will do more than just wrapping a <code>Future</code> so having this extra
abstraction gives some flexibility.</p>
</blockquote>
<p>As explained in the <a href="./3_generators_pin.html">chapter about generators</a>, we use
<code>Pin</code> and the guarantees that give us to allow <code>Future</code>s to have self
references.</p>
<h2><a class="header" href="#the-future-implementation" id="the-future-implementation">The <code>Future</code> implementation</a></h2>
<p>Futures has a well defined interface, which means they can be used across the
entire ecosystem.</p>
<p>We can chain these <code>Future</code>s so that once a <strong>leaf-future</strong> is
ready we'll perform a set of operations until either the task is finished or we
reach yet another <strong>leaf-future</strong> which we'll wait for and yield control to the
scheduler.</p>
<p><strong>Our Future implementation looks like this:</strong></p>
<pre><code class="language-rust noplaypen ignore">// This is the definition of our `Waker`. We use a regular thread-handle here.
// It works but it's not a good solution. It's easy to fix though, I'll explain
// after this code snippet.
#[derive(Clone)]
struct MyWaker {
thread: thread::Thread,
}
// This is the definition of our `Future`. It keeps all the information we
// need. This one holds a reference to our `reactor`, that's just to make
// this example as easy as possible. It doesn't need to hold a reference to
// the whole reactor, but it needs to be able to register itself with the
// reactor.
#[derive(Clone)]
pub struct Task {
id: usize,
reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;,
data: u64,
}
// These are function definitions we'll use for our waker. Remember the
// &quot;Trait Objects&quot; chapter earlier.
fn mywaker_wake(s: &amp;MyWaker) {
let waker_ptr: *const MyWaker = s;
let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
waker_arc.thread.unpark();
}
// Since we use an `Arc` cloning is just increasing the refcount on the smart
// pointer.
fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
let arc = unsafe { Arc::from_raw(s) };
std::mem::forget(arc.clone()); // increase ref count
RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
}
// This is actually a &quot;helper funtcion&quot; to create a `Waker` vtable. In contrast
// to when we created a `Trait Object` from scratch we don't need to concern
// ourselves with the actual layout of the `vtable` and only provide a fixed
// set of functions
const VTABLE: RawWakerVTable = unsafe {
RawWakerVTable::new(
|s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
|s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
|s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
)
};
// Instead of implementing this on the `MyWaker` object in `impl Mywaker...` we
// just use this pattern instead since it saves us some lines of code.
fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
impl Task {
fn new(reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;, data: u64, id: usize) -&gt; Self {
Task { id, reactor, data }
}
}
// This is our `Future` implementation
impl Future for Task {
type Output = usize;
// Poll is the what drives the state machine forward and it's the only
// method we'll need to call to drive futures to completion.
fn poll(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
// We need to get access the reactor in our `poll` method so we acquire
// a lock on that.
let mut r = self.reactor.lock().unwrap();
// First we check if the task is marked as ready
if r.is_ready(self.id) {
// If it's ready we set its state to `Finished`
*r.tasks.get_mut(&amp;self.id).unwrap() = TaskState::Finished;
Poll::Ready(self.id)
// If it isn't finished we check the map we have stored in our Reactor
// over id's we have registered and see if it's there
} else if r.tasks.contains_key(&amp;self.id) {
// This is important. The docs says that on multiple calls to poll,
// only the Waker from the Context passed to the most recent call
// should be scheduled to receive a wakeup. That's why we insert
// this waker into the map (which will return the old one which will
// get dropped) before we return `Pending`.
r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
Poll::Pending
} else {
// If it's not ready, and not in the map it's a new task so we
// register that with the Reactor and return `Pending`
r.register(self.data, cx.waker().clone(), self.id);
Poll::Pending
}
// Note that we're holding a lock on the `Mutex` which protects the
// Reactor all the way until the end of this scope. This means that
// even if our task were to complete immidiately, it will not be
// able to call `wake` while we're in our `Poll` method.
// Since we can make this guarantee, it's now the Executors job to
// handle this possible race condition where `Wake` is called after
// `poll` but before our thread goes to sleep.
}
}
</code></pre>
<p>This is mostly pretty straight forward. The confusing part is the strange way
we need to construct the <code>Waker</code>, but since we've already created our own
<em>trait objects</em> from raw parts, this looks pretty familiar. Actually, it's
even a bit easier.</p>
<p>We use an <code>Arc</code> here to pass out a ref-counted borrow of our <code>MyWaker</code>. This
is pretty normal, and makes this easy and safe to work with. Cloning a <code>Waker</code>
is just increasing the refcount in this case.</p>
<p>Dropping a <code>Waker</code> is as easy as decreasing the refcount. Now, in special
cases we could choose to not use an <code>Arc</code>. So this low-level method is there
to allow such cases.</p>
<p>Indeed, if we only used <code>Arc</code> there is no reason for us to go through all the
trouble of creating our own <code>vtable</code> and a <code>RawWaker</code>. We could just implement
a normal trait.</p>
<p>Fortunately, in the future this will probably be possible in the standard
library as well. For now, <a href="https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/task/trait.ArcWake.html">this trait lives in the nursery</a>, but my
guess is that this will be a part of the standard library after som maturing.</p>
<p>We choose to pass in a reference to the whole <code>Reactor</code> here. This isn't normal.
The reactor will often be a global resource which let's us register interests
without passing around a reference.</p>
<blockquote>
<h3><a class="header" href="#why-using-thread-parkunpark-is-a-bad-idea-for-a-library" id="why-using-thread-parkunpark-is-a-bad-idea-for-a-library">Why using thread park/unpark is a bad idea for a library</a></h3>
<p>It could deadlock easily since anyone could get a handle to the <code>executor thread</code>
and call park/unpark on our thread. I've made <a href="https://play.rust-lang.org/?version=stable&amp;mode=debug&amp;edition=2018&amp;gist=b2343661fe3d271c91c6977ab8e681d0">an example with comments on the
playground</a> that showcases how such an error could occur. You can also read a bit more about this in <a href="https://github.com/rust-lang/futures-rs/pull/2010">issue 2010</a>
in the futures crate.</p>
</blockquote>
<h2><a class="header" href="#the-reactor" id="the-reactor">The Reactor</a></h2>
<p>This is the home stretch, and not strictly <code>Future</code> related, but we need one
to have an example to run.</p>
<p>Since concurrency mostly makes sense when interacting with the outside world (or
at least some peripheral), we need something to actually abstract over this
interaction in an asynchronous way.</p>
<p>This is the Reactors job. Most often you'll see reactors in Rust use a library
called <a href="https://github.com/tokio-rs/mio">Mio</a>, which provides non blocking APIs and event notification for
several platforms.</p>
<p>The reactor will typically give you something like a <code>TcpStream</code> (or any other
resource) which you'll use to create an I/O request. What you get in return is a
<code>Future</code>.</p>
<blockquote>
<p>If our reactor did some real I/O work our <code>Task</code> in would instead be represent
a non-blocking <code>TcpStream</code> which registers interest with the global <code>Reactor</code>.
Passing around a reference to the Reactor itself is pretty uncommon but I find
it makes reasoning about what's happening easier.</p>
</blockquote>
<p>Our example task is a timer that only spawns a thread and puts it to sleep for
the number of seconds we specify. The reactor we create here will create a
<strong>leaf-future</strong> representing each timer. In return the Reactor receives a waker
which it will call once the task is finished.</p>
<p>To be able to run the code here in the browser there is not much real I/O we
can do so just pretend that this is actually represents some useful I/O operation
for the sake of this example.</p>
<p><strong>Our Reactor will look like this:</strong></p>
<pre><code class="language-rust noplaypen ignore">// This is a &quot;fake&quot; reactor. It does no real I/O, but that also makes our
// code possible to run in the book and in the playground
// The different states a task can have in this Reactor
enum TaskState {
Ready,
NotReady(Waker),
Finished,
}
// This is a &quot;fake&quot; reactor. It does no real I/O, but that also makes our
// code possible to run in the book and in the playground
struct Reactor {
// we need some way of registering a Task with the reactor. Normally this
// would be an &quot;interest&quot; in an I/O event
dispatcher: Sender&lt;Event&gt;,
handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
// This is a list of tasks
tasks: HashMap&lt;usize, TaskState&gt;,
}
// This represents the Events we can send to our reactor thread. In this
// example it's only a Timeout or a Close event.
#[derive(Debug)]
enum Event {
Close,
Timeout(u64, usize),
}
impl Reactor {
// We choose to return an atomic reference counted, mutex protected, heap
// allocated `Reactor`. Just to make it easy to explain... No, the reason
// we do this is:
//
// 1. We know that only thread-safe reactors will be created.
// 2. By heap allocating it we can obtain a reference to a stable address
// that's not dependent on the stack frame of the function that called `new`
fn new() -&gt; Arc&lt;Mutex&lt;Box&lt;Self&gt;&gt;&gt; {
let (tx, rx) = channel::&lt;Event&gt;();
let reactor = Arc::new(Mutex::new(Box::new(Reactor {
dispatcher: tx,
handle: None,
tasks: HashMap::new(),
})));
// Notice that we'll need to use `weak` reference here. If we don't,
// our `Reactor` will not get `dropped` when our main thread is finished
// since we're holding internal references to it.
// Since we're collecting all `JoinHandles` from the threads we spawn
// and make sure to join them we know that `Reactor` will be alive
// longer than any reference held by the threads we spawn here.
let reactor_clone = Arc::downgrade(&amp;reactor);
// This will be our Reactor-thread. The Reactor-thread will in our case
// just spawn new threads which will serve as timers for us.
let handle = thread::spawn(move || {
let mut handles = vec![];
// This simulates some I/O resource
for event in rx {
println!(&quot;REACTOR: {:?}&quot;, event);
let reactor = reactor_clone.clone();
match event {
Event::Close =&gt; break,
Event::Timeout(duration, id) =&gt; {
// We spawn a new thread that will serve as a timer
// and will call `wake` on the correct `Waker` once
// it's done.
let event_handle = thread::spawn(move || {
thread::sleep(Duration::from_secs(duration));
let reactor = reactor.upgrade().unwrap();
reactor.lock().map(|mut r| r.wake(id)).unwrap();
});
handles.push(event_handle);
}
}
}
// This is important for us since we need to know that these
// threads don't live longer than our Reactor-thread. Our
// Reactor-thread will be joined when `Reactor` gets dropped.
handles.into_iter().for_each(|handle| handle.join().unwrap());
});
reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
reactor
}
// The wake function will call wake on the waker for the task with the
// corresponding id.
fn wake(&amp;mut self, id: usize) {
self.tasks.get_mut(&amp;id).map(|state| {
// No matter what state the task was in we can safely set it
// to ready at this point. This lets us get ownership over the
// the data that was there before we replaced it.
match mem::replace(state, TaskState::Ready) {
TaskState::NotReady(waker) =&gt; waker.wake(),
TaskState::Finished =&gt; panic!(&quot;Called 'wake' twice on task: {}&quot;, id),
_ =&gt; unreachable!()
}
}).unwrap();
}
// Register a new task with the reactor. In this particular example
// we panic if a task with the same id get's registered twice
fn register(&amp;mut self, duration: u64, waker: Waker, id: usize) {
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
panic!(&quot;Tried to insert a task with id: '{}', twice!&quot;, id);
}
self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
}
// We send a close event to the reactor so it closes down our reactor-thread
fn close(&amp;mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
// We simply checks if a task with this id is in the state `TaskState::Ready`
fn is_ready(&amp;self, id: usize) -&gt; bool {
self.tasks.get(&amp;id).map(|state| match state {
TaskState::Ready =&gt; true,
_ =&gt; false,
}).unwrap_or(false)
}
}
impl Drop for Reactor {
fn drop(&amp;mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}
</code></pre>
<p>It's a lot of code though, but essentially we just spawn off a new thread
and make it sleep for some time which we specify when we create a <code>Task</code>.</p>
<p>Now, let's test our code and see if it works. Since we're sleeping for a couple
of seconds here, just give it some time to run.</p>
<p>In the last chapter we have the <a href="./8_finished_example.html">whole 200 lines in an editable window</a>
which you can edit and change the way you like.</p>
<pre><pre class="playpen"><code class="language-rust edition2018"><span class="boring">use std::{
</span><span class="boring"> future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
</span><span class="boring"> task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
</span><span class="boring"> thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
</span><span class="boring">};
</span><span class="boring">
</span>fn main() {
// This is just to make it easier for us to see when our Future was resolved
let start = Instant::now();
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new();
// We create two tasks:
// - first parameter is the `reactor`
// - the second is a timeout in seconds
// - the third is an `id` to identify the task
let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2);
// an `async` block works the same way as an `async fn` in that it compiles
// our code into a state machine, `yielding` at every `await` point.
let fut1 = async {
let val = future1.await;
println!(&quot;Got {} at time: {:.2}.&quot;, val, start.elapsed().as_secs_f32());
};
let fut2 = async {
let val = future2.await;
println!(&quot;Got {} at time: {:.2}.&quot;, val, start.elapsed().as_secs_f32());
};
// Our executor can only run one and one future, this is pretty normal
// though. You have a set of operations containing many futures that
// ends up as a single future that drives them all to completion.
let mainfut = async {
fut1.await;
fut2.await;
};
// This executor will block the main thread until the futures is resolved
block_on(mainfut);
// When we're done, we want to shut down our reactor thread so our program
// ends nicely.
reactor.lock().map(|mut r| r.close()).unwrap();
}
<span class="boring">// ============================= EXECUTOR ====================================
</span><span class="boring">fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
</span><span class="boring"> let mywaker = Arc::new(MyWaker {
</span><span class="boring"> thread: thread::current(),
</span><span class="boring"> });
</span><span class="boring"> let waker = waker_into_waker(Arc::into_raw(mywaker));
</span><span class="boring"> let mut cx = Context::from_waker(&amp;waker);
</span><span class="boring">
</span><span class="boring"> // SAFETY: we shadow `future` so it can't be accessed again.
</span><span class="boring"> let mut future = unsafe { Pin::new_unchecked(&amp;mut future) };
</span><span class="boring"> let val = loop {
</span><span class="boring"> match Future::poll(future.as_mut(), &amp;mut cx) {
</span><span class="boring"> Poll::Ready(val) =&gt; break val,
</span><span class="boring"> Poll::Pending =&gt; thread::park(),
</span><span class="boring"> };
</span><span class="boring"> };
</span><span class="boring"> val
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">// ====================== FUTURE IMPLEMENTATION ==============================
</span><span class="boring">#[derive(Clone)]
</span><span class="boring">struct MyWaker {
</span><span class="boring"> thread: thread::Thread,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">#[derive(Clone)]
</span><span class="boring">pub struct Task {
</span><span class="boring"> id: usize,
</span><span class="boring"> reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;,
</span><span class="boring"> data: u64,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">fn mywaker_wake(s: &amp;MyWaker) {
</span><span class="boring"> let waker_ptr: *const MyWaker = s;
</span><span class="boring"> let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
</span><span class="boring"> waker_arc.thread.unpark();
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
</span><span class="boring"> let arc = unsafe { Arc::from_raw(s) };
</span><span class="boring"> std::mem::forget(arc.clone()); // increase ref count
</span><span class="boring"> RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">const VTABLE: RawWakerVTable = unsafe {
</span><span class="boring"> RawWakerVTable::new(
</span><span class="boring"> |s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
</span><span class="boring"> |s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
</span><span class="boring"> |s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
</span><span class="boring"> |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
</span><span class="boring"> )
</span><span class="boring">};
</span><span class="boring">
</span><span class="boring">fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
</span><span class="boring"> let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
</span><span class="boring"> unsafe { Waker::from_raw(raw_waker) }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Task {
</span><span class="boring"> fn new(reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;, data: u64, id: usize) -&gt; Self {
</span><span class="boring"> Task { id, reactor, data }
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Future for Task {
</span><span class="boring"> type Output = usize;
</span><span class="boring"> fn poll(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
</span><span class="boring"> let mut r = self.reactor.lock().unwrap();
</span><span class="boring"> if r.is_ready(self.id) {
</span><span class="boring"> println!(&quot;POLL: TASK {} IS READY&quot;, self.id);
</span><span class="boring"> *r.tasks.get_mut(&amp;self.id).unwrap() = TaskState::Finished;
</span><span class="boring"> Poll::Ready(self.id)
</span><span class="boring"> } else if r.tasks.contains_key(&amp;self.id) {
</span><span class="boring"> println!(&quot;POLL: REPLACED WAKER FOR TASK: {}&quot;, self.id);
</span><span class="boring"> r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
</span><span class="boring"> Poll::Pending
</span><span class="boring"> } else {
</span><span class="boring"> println!(&quot;POLL: REGISTERED TASK: {}, WAKER: {:?}&quot;, self.id, cx.waker());
</span><span class="boring"> r.register(self.data, cx.waker().clone(), self.id);
</span><span class="boring"> Poll::Pending
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">// =============================== REACTOR ===================================
</span><span class="boring">enum TaskState {
</span><span class="boring"> Ready,
</span><span class="boring"> NotReady(Waker),
</span><span class="boring"> Finished,
</span><span class="boring">}
</span><span class="boring">struct Reactor {
</span><span class="boring"> dispatcher: Sender&lt;Event&gt;,
</span><span class="boring"> handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
</span><span class="boring"> tasks: HashMap&lt;usize, TaskState&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">#[derive(Debug)]
</span><span class="boring">enum Event {
</span><span class="boring"> Close,
</span><span class="boring"> Timeout(u64, usize),
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Reactor {
</span><span class="boring"> fn new() -&gt; Arc&lt;Mutex&lt;Box&lt;Self&gt;&gt;&gt; {
</span><span class="boring"> let (tx, rx) = channel::&lt;Event&gt;();
</span><span class="boring"> let reactor = Arc::new(Mutex::new(Box::new(Reactor {
</span><span class="boring"> dispatcher: tx,
</span><span class="boring"> handle: None,
</span><span class="boring"> tasks: HashMap::new(),
</span><span class="boring"> })));
</span><span class="boring">
</span><span class="boring"> let reactor_clone = Arc::downgrade(&amp;reactor);
</span><span class="boring"> let handle = thread::spawn(move || {
</span><span class="boring"> let mut handles = vec![];
</span><span class="boring"> // This simulates some I/O resource
</span><span class="boring"> for event in rx {
</span><span class="boring"> println!(&quot;REACTOR: {:?}&quot;, event);
</span><span class="boring"> let reactor = reactor_clone.clone();
</span><span class="boring"> match event {
</span><span class="boring"> Event::Close =&gt; break,
</span><span class="boring"> Event::Timeout(duration, id) =&gt; {
</span><span class="boring"> let event_handle = thread::spawn(move || {
</span><span class="boring"> thread::sleep(Duration::from_secs(duration));
</span><span class="boring"> let reactor = reactor.upgrade().unwrap();
</span><span class="boring"> reactor.lock().map(|mut r| r.wake(id)).unwrap();
</span><span class="boring"> });
</span><span class="boring"> handles.push(event_handle);
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring"> handles.into_iter().for_each(|handle| handle.join().unwrap());
</span><span class="boring"> });
</span><span class="boring"> reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
</span><span class="boring"> reactor
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn wake(&amp;mut self, id: usize) {
</span><span class="boring"> self.tasks.get_mut(&amp;id).map(|state| {
</span><span class="boring"> match mem::replace(state, TaskState::Ready) {
</span><span class="boring"> TaskState::NotReady(waker) =&gt; waker.wake(),
</span><span class="boring"> TaskState::Finished =&gt; panic!(&quot;Called 'wake' twice on task: {}&quot;, id),
</span><span class="boring"> _ =&gt; unreachable!()
</span><span class="boring"> }
</span><span class="boring"> }).unwrap();
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn register(&amp;mut self, duration: u64, waker: Waker, id: usize) {
</span><span class="boring"> if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
</span><span class="boring"> panic!(&quot;Tried to insert a task with id: '{}', twice!&quot;, id);
</span><span class="boring"> }
</span><span class="boring"> self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn close(&amp;mut self) {
</span><span class="boring"> self.dispatcher.send(Event::Close).unwrap();
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn is_ready(&amp;self, id: usize) -&gt; bool {
</span><span class="boring"> self.tasks.get(&amp;id).map(|state| match state {
</span><span class="boring"> TaskState::Ready =&gt; true,
</span><span class="boring"> _ =&gt; false,
</span><span class="boring"> }).unwrap_or(false)
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Drop for Reactor {
</span><span class="boring"> fn drop(&amp;mut self) {
</span><span class="boring"> self.handle.take().map(|h| h.join().unwrap()).unwrap();
</span><span class="boring"> }
</span><span class="boring">}
</span></code></pre></pre>
<p>I added a some debug printouts so we can observe a couple of things:</p>
<ol>
<li>How the <code>Waker</code> object looks just like the <em>trait object</em> we talked about in an earlier chapter</li>
<li>The program flow from start to finish</li>
</ol>
<p>The last point is relevant when we move on the the last paragraph.</p>
<h2><a class="header" href="#asyncawait-and-concurrecy" id="asyncawait-and-concurrecy">Async/Await and concurrecy</a></h2>
<p>The <code>async</code> keyword can be used on functions as in <code>async fn(...)</code> or on a
block as in <code>async { ... }</code>. Both will turn your function, or block, into a
<code>Future</code>.</p>
<p>These Futures are rather simple. Imagine our generator from a few chapters
back. Every <code>await</code> point is like a <code>yield</code> point.</p>
<p>Instead of <code>yielding</code> a value we pass in, we yield the result of calling <code>poll</code> on
the next <code>Future</code> we're awaiting.</p>
<p>Our <code>mainfut</code> contains two non-leaf futures which it will call <code>poll</code> on. <strong>Non-leaf-futures</strong>
has a <code>poll</code> method that simply polls their inner futures and these state machines
are polled until some &quot;leaf future&quot; in the end either returns <code>Ready</code> or <code>Pending</code>.</p>
<p>The way our example is right now, it's not much better than regular synchronous
code. For us to actually await multiple futures at the same time we somehow need
to <code>spawn</code> them so the executor starts running them concurrently.</p>
<p>Our example as it stands now returns this:</p>
<pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 3.00.
</code></pre>
<p>If these Futures were executed asynchronously we would expect to see:</p>
<pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 2.00.
</code></pre>
<blockquote>
<p>Note that this doesn't mean they need to run in parallel. They <em>can</em> run in
parallel but there is no requirement. Remember that we're waiting for some
external resource so we can fire off many such calls on a single thread and
handle each event as it resolves.</p>
</blockquote>
<p>Now, this is the point where I'll refer you to some better resources for
implementing a better executor. You should have a pretty good understanding of
the concept of Futures by now helping you along the way.</p>
<p>The next step should be getting to know how more advanced runtimes work and
how they implement different ways of running Futures to completion.</p>
<p><a href="./conclusion.html#building-a-better-exectuor">If I were you I would read this next, and try to implement it for our example.</a>.</p>
<p>That's actually it for now. There as probably much more to learn, this is enough
for today.</p>
<p>I hope exploring Futures and async in general gets easier after this read and I
do really hope that you do continue to explore further.</p>
<p>Don't forget the exercises in the last chapter 😊.</p>
<h2><a class="header" href="#bonus-section---a-proper-way-to-park-our-thread" id="bonus-section---a-proper-way-to-park-our-thread">Bonus Section - a Proper Way to Park our Thread</a></h2>
<p>As we explained earlier in our chapter, simply calling <code>thread::sleep</code> is not really
sufficient to implement a proper reactor. You can also reach a tool like the <code>Parker</code>
in crossbeam: <a href="https://docs.rs/crossbeam/0.7.3/crossbeam/sync/struct.Parker.html">crossbeam::sync::Parker</a></p>
<p>Since it doesn't require many lines of code to create a working solution ourselves we'll show how
we can solve that by using a <code>Condvar</code> and a <code>Mutex</code> instead.</p>
<p>Start by implementing our own <code>Parker</code> like this:</p>
<pre><code class="language-rust ignore">#[derive(Default)]
struct Parker(Mutex&lt;bool&gt;, Condvar);
impl Parker {
fn park(&amp;self) {
// We aquire a lock to the Mutex which protects our flag indicating if we
// should resume execution or not.
let mut resumable = self.0.lock().unwrap();
// We put this in a loop since there is a chance we'll get woken, but
// our flag hasn't changed. If that happens, we simply go back to sleep.
while !*resumable {
// We sleep until someone notifies us
resumable = self.1.wait(resumable).unwrap();
}
// We immidiately set the condition to false, so that next time we call `park` we'll
// go right to sleep.
*resumable = false;
}
fn unpark(&amp;self) {
// We simply acquire a lock to our flag and sets the condition to `runnable` when we
// get it.
*self.0.lock().unwrap() = true;
// We notify our `Condvar` so it wakes up and resumes.
self.1.notify_one();
}
}
</code></pre>
<p>The <code>Condvar</code> in Rust is designed to work together with a Mutex. Usually, you'd think that we don't
release the mutex-lock we acquire in <code>self.0.lock().unwrap();</code> before we go to sleep. Which means
that our <code>unpark</code> function never will acquire a lock to our flag and we deadlock.</p>
<p>Using <code>Condvar</code> we avoid this since the <code>Condvar</code> will consume our lock so it's released at the
moment we go to sleep.</p>
<p>When we resume again, our <code>Condvar</code> returns our lock so we can continue to operate on it.</p>
<p>This means we need to make some very slight changes to our executor like this:</p>
<pre><code class="language-rust ignore">fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
let parker = Arc::new(Parker::default()); // &lt;--- NB!
let mywaker = Arc::new(MyWaker { parker: parker.clone() }); &lt;--- NB!
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&amp;waker);
// SAFETY: we shadow `future` so it can't be accessed again.
let mut future = unsafe { Pin::new_unchecked(&amp;mut future) };
loop {
match Future::poll(future.as_mut(), &amp;mut cx) {
Poll::Ready(val) =&gt; break val,
Poll::Pending =&gt; parker.park(), // &lt;--- NB!
};
}
}
</code></pre>
<p>And we need to change our <code>Waker</code> like this:</p>
<pre><code class="language-rust ignore">#[derive(Clone)]
struct MyWaker {
parker: Arc&lt;Parker&gt;,
}
fn mywaker_wake(s: &amp;MyWaker) {
let waker_arc = unsafe { Arc::from_raw(s) };
waker_arc.parker.unpark();
}
</code></pre>
<p>And that's really all there is to it. </p>
<blockquote>
<p>If you checked out the playground link that showcased how park/unpark could <a href="https://play.rust-lang.org/?version=stable&amp;mode=debug&amp;edition=2018&amp;gist=b2343661fe3d271c91c6977ab8e681d0">cause subtle
problems</a>
you can <a href="https://play.rust-lang.org/?version=stable&amp;mode=debug&amp;edition=2018&amp;gist=bebef0f8a8ce6a9d0d32442cc8381595">check out this example</a> which shows how our final version avoids this problem.</p>
</blockquote>
<p>The next chapter shows our finished code with this
improvement which you can explore further if you wish.</p>
</main>
<nav class="nav-wrapper" aria-label="Page navigation">
<!-- Mobile navigation buttons -->
<a rel="prev" href="4_pin.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next" href="8_finished_example.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
<div style="clear: both"></div>
</nav>
</div>
</div>
<nav class="nav-wide-wrapper" aria-label="Page navigation">
<a href="4_pin.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a href="8_finished_example.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
</nav>
</div>
<!-- Livereload script (if served using the cli tool) -->
<script type="text/javascript">
var socket = new WebSocket("ws://localhost:3001");
socket.onmessage = function (event) {
if (event.data === "reload") {
socket.close();
location.reload(true); // force reload from server (not from cache)
}
};
window.onbeforeunload = function() {
socket.close();
}
</script>
<!-- Google Analytics Tag -->
<script type="text/javascript">
var localAddrs = ["localhost", "127.0.0.1", ""];
// make sure we don't activate google analytics if the developer is
// inspecting the book locally...
if (localAddrs.indexOf(document.location.hostname) === -1) {
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-157536992-1', 'auto');
ga('send', 'pageview');
}
</script>
<script type="text/javascript">
window.playpen_line_numbers = true;
</script>
<script type="text/javascript">
window.playpen_copyable = true;
</script>
<script src="ace.js" type="text/javascript" charset="utf-8"></script>
<script src="editor.js" type="text/javascript" charset="utf-8"></script>
<script src="mode-rust.js" type="text/javascript" charset="utf-8"></script>
<script src="theme-dawn.js" type="text/javascript" charset="utf-8"></script>
<script src="theme-tomorrow_night.js" type="text/javascript" charset="utf-8"></script>
<script src="elasticlunr.min.js" type="text/javascript" charset="utf-8"></script>
<script src="mark.min.js" type="text/javascript" charset="utf-8"></script>
<script src="searcher.js" type="text/javascript" charset="utf-8"></script>
<script src="clipboard.min.js" type="text/javascript" charset="utf-8"></script>
<script src="highlight.js" type="text/javascript" charset="utf-8"></script>
<script src="book.js" type="text/javascript" charset="utf-8"></script>
<!-- Custom JS scripts -->
</body>
</html>