Files
books-futures-explained/book/6_future_example.html
Carl Fredrik Samson 92680313ea finished book
2020-02-01 17:23:49 +01:00

892 lines
43 KiB
HTML

<!DOCTYPE HTML>
<html lang="en" class="sidebar-visible no-js">
<head>
<!-- Book generated using mdBook -->
<meta charset="UTF-8">
<title>The main example - Futures Explained in 200 Lines of Rust</title>
<meta content="text/html; charset=utf-8" http-equiv="Content-Type">
<meta name="description" content="This book aims to explain Futures in Rust using an example driven approach.">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="theme-color" content="#ffffff" />
<link rel="shortcut icon" href="favicon.png">
<link rel="stylesheet" href="css/variables.css">
<link rel="stylesheet" href="css/general.css">
<link rel="stylesheet" href="css/chrome.css">
<link rel="stylesheet" href="css/print.css" media="print">
<!-- Fonts -->
<link rel="stylesheet" href="FontAwesome/css/font-awesome.css">
<link href="https://fonts.googleapis.com/css?family=Open+Sans:300italic,400italic,600italic,700italic,800italic,400,300,600,700,800" rel="stylesheet" type="text/css">
<link href="https://fonts.googleapis.com/css?family=Source+Code+Pro:500" rel="stylesheet" type="text/css">
<!-- Highlight.js Stylesheets -->
<link rel="stylesheet" href="highlight.css">
<link rel="stylesheet" href="tomorrow-night.css">
<link rel="stylesheet" href="ayu-highlight.css">
<!-- Custom theme stylesheets -->
</head>
<body class="light">
<!-- Provide site root to javascript -->
<script type="text/javascript">
var path_to_root = "";
var default_theme = "light";
</script>
<!-- Work around some values being stored in localStorage wrapped in quotes -->
<script type="text/javascript">
try {
var theme = localStorage.getItem('mdbook-theme');
var sidebar = localStorage.getItem('mdbook-sidebar');
if (theme.startsWith('"') && theme.endsWith('"')) {
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
}
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
}
} catch (e) { }
</script>
<!-- Set the theme before any content is loaded, prevents flash -->
<script type="text/javascript">
var theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
document.body.className = theme;
document.querySelector('html').className = theme + ' js';
</script>
<!-- Hide / unhide sidebar before it is displayed -->
<script type="text/javascript">
var html = document.querySelector('html');
var sidebar = 'hidden';
if (document.body.clientWidth >= 1080) {
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
sidebar = sidebar || 'visible';
}
html.classList.remove('sidebar-visible');
html.classList.add("sidebar-" + sidebar);
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<div class="sidebar-scrollbox">
<ol class="chapter"><li><a href="0_introduction.html"><strong aria-hidden="true">1.</strong> Introduction</a></li><li><a href="1_background_information.html"><strong aria-hidden="true">2.</strong> Some background information</a></li><li><a href="2_trait_objects.html"><strong aria-hidden="true">3.</strong> Trait objects and fat pointers</a></li><li><a href="3_generators_pin.html"><strong aria-hidden="true">4.</strong> Generators and Pin</a></li><li><a href="4_pin.html"><strong aria-hidden="true">5.</strong> Pin</a></li><li><a href="6_future_example.html" class="active"><strong aria-hidden="true">6.</strong> The main example</a></li><li><a href="7_conclusion.html"><strong aria-hidden="true">7.</strong> Conclusion and exercises</a></li><li><a href="8_finished_example.html"><strong aria-hidden="true">8.</strong> Finished example (editable)</a></li></ol>
</div>
<div id="sidebar-resize-handle" class="sidebar-resize-handle"></div>
</nav>
<div id="page-wrapper" class="page-wrapper">
<div class="page">
<div id="menu-bar" class="menu-bar">
<div id="menu-bar-sticky-container">
<div class="left-buttons">
<button id="sidebar-toggle" class="icon-button" type="button" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
<i class="fa fa-bars"></i>
</button>
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
<i class="fa fa-paint-brush"></i>
</button>
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
<li role="none"><button role="menuitem" class="theme" id="light">Light (default)</button></li>
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
</ul>
<button id="search-toggle" class="icon-button" type="button" title="Search. (Shortkey: s)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="S" aria-controls="searchbar">
<i class="fa fa-search"></i>
</button>
</div>
<h1 class="menu-title">Futures Explained in 200 Lines of Rust</h1>
<div class="right-buttons">
<a href="print.html" title="Print this book" aria-label="Print this book">
<i id="print-button" class="fa fa-print"></i>
</a>
<a href="https://github.com/cfsamson/books-futures-explained" title="Git repository" aria-label="Git repository">
<i id="git-repository-button" class="fa fa-github"></i>
</a>
</div>
</div>
</div>
<div id="search-wrapper" class="hidden">
<form id="searchbar-outer" class="searchbar-outer">
<input type="search" name="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
</form>
<div id="searchresults-outer" class="searchresults-outer hidden">
<div id="searchresults-header" class="searchresults-header"></div>
<ul id="searchresults">
</ul>
</div>
</div>
<!-- Apply ARIA attributes after the sidebar and the sidebar toggle button are added to the DOM -->
<script type="text/javascript">
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
});
</script>
<div id="content" class="content">
<main>
<h1><a class="header" href="#futures-in-rust" id="futures-in-rust">Futures in Rust</a></h1>
<p>We'll create our own <code>Futures</code> together with a fake reactor and a simple
executor which allows you to edit, run an play around with the code right here
in your browser.</p>
<p>I'll walk you through the example, but if you want to check it out closer, you
can always <a href="https://github.com/cfsamson/examples-futures">clone the repository</a> and play around with the code yourself. There
are two branches. The <code>basic_example</code> is this code, and the <code>basic_example_commented</code>
is this example with extensive comments.</p>
<blockquote>
<p>If you want to follow along as we go through, initalize a new cargo project
by creating a new folder and run <code>cargo init</code> inside it. Everything we write
here will be in <code>main.rs</code></p>
</blockquote>
<h2><a class="header" href="#implementing-our-own-futures" id="implementing-our-own-futures">Implementing our own Futures</a></h2>
<p>Let's start off by getting all our imports right away so you can follow along</p>
<pre><code class="language-rust noplaypen ignore">use std::{
future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread::{self, JoinHandle}, time::{Duration, Instant}
};
</code></pre>
<h2><a class="header" href="#the-executor" id="the-executor">The Executor</a></h2>
<p>The executors task is to take one or more futures and run them to completion.</p>
<p>The first thing an <code>executor</code> does when it gets a <code>Future</code> is polling it.</p>
<p><strong>When polled one of three things can happen:</strong></p>
<ul>
<li>The future returns <code>Ready</code> and we schedule whatever chained operations to run</li>
<li>The future hasn't been polled before so we pass it a <code>Waker</code> and suspend it</li>
<li>The futures has been polled before but is not ready and returns <code>Pending</code></li>
</ul>
<p>Rust provides a way for the Reactor and Executor to communicate through the <code>Waker</code>. The reactor stores this <code>Waker</code> and calls <code>Waker::wake()</code> on it once
a <code>Future</code> has resolved and should be polled again.</p>
<p><strong>Our Executor will look like this:</strong></p>
<pre><code class="language-rust noplaypen ignore">// Our executor takes any object which implements the `Future` trait
fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
// the first thing we do is to construct a `Waker` which we'll pass on to
// the `reactor` so it can wake us up when an event is ready.
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
// The context struct is just a wrapper for a `Waker` object. Maybe in the
// future this will do more, but right now it's just a wrapper.
let mut cx = Context::from_waker(&amp;waker);
// We poll in a loop, but it's not a busy loop. It will only run when
// an event occurs, or a thread has a &quot;spurious wakeup&quot; (an unexpected wakeup
// that can happen for no good reason).
let val = loop {
// So, since we run this on one thread and run one future to completion
// we can pin the `Future` to the stack. This is unsafe, but saves an
// allocation. We could `Box::pin` it too if we wanted. This is however
// safe since we don't move the `Future` here.
let pinned = unsafe { Pin::new_unchecked(&amp;mut future) };
match Future::poll(pinned, &amp;mut cx) {
// when the Future is ready we're finished
Poll::Ready(val) =&gt; break val,
// If we get a `pending` future we just go to sleep...
Poll::Pending =&gt; thread::park(),
};
};
val
}
</code></pre>
<p>Inn all the examples here I've chose to comment the code extensively. I find it
easier to follow that way than dividing if up into many paragraphs.</p>
<p>We'll see more about the <code>Waker</code> in the next paragraph, but just look at it like
a <em>trait object</em> like the one we constructed in the first chapter.</p>
<blockquote>
<p><code>Context</code> is just a wrapper around the <code>Waker</code>. At the time of writing this
book it's nothing more. In the future it might be possible that the <code>Context</code>
object will do more than just wrapping a <code>Future</code> so having this extra
abstraction gives some flexibility.</p>
</blockquote>
<p>You'll notice how we use <code>Pin</code> here to pin the future when we poll it.</p>
<p>Now that you've read so much about <code>Generators</code> and <code>Pin</code> already this should
be rather easy to understand. <code>Future</code> is a state machine, every <code>await</code> point
is a <code>yield</code> point. We could borrow data across <code>await</code> points and we meet the
exact same challenges as we do when borrowing across <code>yield</code> points.</p>
<p>As we explained in that chapter, we use <code>Pin</code> and the guarantees that give us to
allow <code>Futures</code> to have self references.</p>
<h2><a class="header" href="#the-future-implementation" id="the-future-implementation">The <code>Future</code> implementation</a></h2>
<p>In Rust we call an interruptible task a <code>Future</code>. Futures has a well defined interface, which means they can be used across the entire ecosystem. We can chain
these <code>Futures</code> so that once a &quot;leaf future&quot; is ready we'll perform a set of
operations. </p>
<p>These operations can spawn new leaf futures themselves.</p>
<p><strong>Our Future implementation looks like this:</strong></p>
<pre><code class="language-rust noplaypen ignore">// This is the definition of our `Waker`. We use a regular thread-handle here.
// It works but it's not a good solution. It's easy to fix though, I'll explain
// after this code snippet.
#[derive(Clone)]
struct MyWaker {
thread: thread::Thread,
}
// This is the definition of our `Future`. It keeps all the information we
// need. This one holds a reference to our `reactor`, that's just to make
// this example as easy as possible. It doesn't need to hold a reference to
// the whole reactor, but it needs to be able to register itself with the
// reactor.
#[derive(Clone)]
pub struct Task {
id: usize,
reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;,
data: u64,
is_registered: bool,
}
// These are function definitions we'll use for our waker. Remember the
// &quot;Trait Objects&quot; chapter from the book.
fn mywaker_wake(s: &amp;MyWaker) {
let waker_ptr: *const MyWaker = s;
let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
waker_arc.thread.unpark();
}
// Since we use an `Arc` cloning is just increasing the refcount on the smart
// pointer.
fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
let arc = unsafe { Arc::from_raw(s).clone() };
std::mem::forget(arc.clone()); // increase ref count
RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
}
// This is actually a &quot;helper funtcion&quot; to create a `Waker` vtable. In contrast
// to when we created a `Trait Object` from scratch we don't need to concern
// ourselves with the actual layout of the `vtable` and only provide a fixed
// set of functions
const VTABLE: RawWakerVTable = unsafe {
RawWakerVTable::new(
|s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
|s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
|s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
)
};
// Instead of implementing this on the `MyWaker` oject in `impl Mywaker...` we
// just use this pattern instead since it saves us some lines of code.
fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
impl Task {
fn new(reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;, data: u64, id: usize) -&gt; Self {
Task {
id,
reactor,
data,
is_registered: false,
}
}
}
// This is our `Future` implementation
impl Future for Task {
// The output for this kind of `leaf future` is just an `usize`. For other
// futures this could be something more interesting like a bytearray.
type Output = usize;
fn poll(mut self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
let mut r = self.reactor.lock().unwrap();
// we check with the `Reactor` if this future is in its &quot;readylist&quot;
// i.e. if it's `Ready`
if r.is_ready(self.id) {
// if it is, we return the data. In this case it's just the ID of
// the task.
Poll::Ready(self.id)
} else if self.is_registered {
// If the future is registered alredy, we just return `Pending`
Poll::Pending
} else {
// If we get here, it must be the first time this `Future` is polled
// so we register a task with our `reactor`
r.register(self.data, cx.waker().clone(), self.id);
// oh, we have to drop the lock on our `Mutex` here because we can't
// have a shared and exclusive borrow at the same time
drop(r);
self.is_registered = true;
Poll::Pending
}
}
}
</code></pre>
<p>This is mostly pretty straight forward. The confusing part is the strange way
we need to construct the <code>Waker</code>, but since we've already created our own
<em>trait objects</em> from raw parts, this looks pretty familiar. Actually, it's
even a bit easier.</p>
<p>We use an <code>Arc</code> here to pass out a ref-counted borrow of our <code>MyWaker</code>. This
is pretty normal, and makes this easy and safe to work with. Cloning a <code>Waker</code>
is just increasing the refcount in this case.</p>
<p>Dropping a <code>Waker</code> is as easy as decreasing the refcount. Now, in special
cases we could choose to not use an <code>Arc</code>. So this low-level method is there
to allow such cases. </p>
<p>Indeed, if we only used <code>Arc</code> there is no reason for us to go through all the
trouble of creating our own <code>vtable</code> and a <code>RawWaker</code>. We could just implement
a normal trait.</p>
<p>Fortunately, in the future this will probably be possible in the standard
library as well. For now, <a href="https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/task/trait.ArcWake.html">this trait lives in the nursery</a>, but mye
guess is that this will be a part of the standard library after som maturing.</p>
<p>We choose to pass in a reference to the whole <code>Reactor</code> here. This isn't normal.
The reactor will often be a global resource which let's us register interests
without passing around a reference.</p>
<h3><a class="header" href="#why-using-thread-parkunpark-is-a-bad-idea-for-a-library" id="why-using-thread-parkunpark-is-a-bad-idea-for-a-library">Why using thread park/unpark is a bad idea for a library</a></h3>
<p>It could deadlock easily since anyone could get a handle to the <code>executor thread</code>
and call park/unpark on it.</p>
<p>If one of our <code>Futures</code> holds a handle to our thread and takes it with it to a different thread the following could happen:</p>
<ol>
<li>A future could call <code>unpark</code> on the executor thread from a different thread</li>
<li>Our <code>executor</code> thinks that data is ready and wakes up and polls the future</li>
<li>The future is not ready yet when polled, but at that exact same time the
<code>Reactor</code> gets an event and calls <code>wake()</code> which also unparks our thread.</li>
<li>This could happen before we go to sleep again since these processes
run in parallel.</li>
<li>Our reactor has called <code>wake</code> but our thread is still sleeping since it was
awake already at that point.</li>
<li>We're deadlocked and our program stops working</li>
</ol>
<p>There are many better solutions, here are some:</p>
<ul>
<li>Use <code>std::sync::CondVar</code></li>
<li>Use <a href="https://docs.rs/crossbeam/0.7.3/crossbeam/sync/struct.Parker.html">crossbeam::sync::Parker</a></li>
</ul>
<h2><a class="header" href="#the-reactor" id="the-reactor">The Reactor</a></h2>
<p>This is the home stretch, and not strictly <code>Future</code> related, but we need one
to have an example to run.</p>
<p>Since concurrency mostly makes sense when interacting with the outside world (or
at least some peripheral), we need something to actually abstract over this
interaction in an asynchronous way. </p>
<p>This is the <code>Reactors</code> job. Most often you'll see reactors in rust use a library called <a href="https://github.com/tokio-rs/mio">Mio</a>, which provides non
blocking APIs and event notification for several platforms.</p>
<p>The reactor will typically give you something like a <code>TcpStream</code> (or any other resource) which you'll use to create an I/O request. What you get in return
is a <code>Future</code>. </p>
<blockquote>
<p>If the <code>Reactor</code> is registered as a global resource (which
is pretty normal), our <code>Task</code> in would instead be a special <code>TcpStream</code> which
registers interest with the global <code>Reactor</code> and no reference is needed.</p>
</blockquote>
<p>We can call this kind of <code>Future</code> a &quot;leaf Future`, since it's some operation
we'll actually wait on and that we can chain operations on which are performed
once the leaf future is ready. </p>
<p><strong>Our Reactor will look like this:</strong></p>
<pre><code class="language-rust noplaypen ignore">// This is a &quot;fake&quot; reactor. It does no real I/O, but that also makes our
// code possible to run in the book and in the playground
struct Reactor {
// we need some way of registering a Task with the reactor. Normally this
// would be an &quot;interest&quot; in an I/O event
dispatcher: Sender&lt;Event&gt;,
handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
// This is a list of tasks that are ready, which means they should be polled
// for data.
readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;,
}
// We just have two kind of events. A timeout event, a &quot;timeout&quot; event called
// `Simple` and a `Close` event to close down our reactor.
#[derive(Debug)]
enum Event {
Close,
Simple(Waker, u64, usize),
}
impl Reactor {
fn new() -&gt; Self {
// The way we register new events with our reactor is using a regular
// channel
let (tx, rx) = channel::&lt;Event&gt;();
let readylist = Arc::new(Mutex::new(vec![]));
let rl_clone = readylist.clone();
// This `Vec` will hold handles to all threads we spawn so we can
// join them later on and finish our programm in a good manner
let mut handles = vec![];
// This will be the &quot;Reactor thread&quot;
let handle = thread::spawn(move || {
// This simulates some I/O resource
for event in rx {
let rl_clone = rl_clone.clone();
match event {
// If we get a close event we break out of the loop we're in
Event::Close =&gt; break,
Event::Simple(waker, duration, id) =&gt; {
// When we get an event we simply spawn a new thread...
let event_handle = thread::spawn(move || {
//... which will just sleep for the number of seconds
// we provided when creating the `Task`.
thread::sleep(Duration::from_secs(duration));
// When it's done sleeping we put the ID of this task
// on the &quot;readylist&quot;
rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
// Then we call `wake` which will wake up our
// executor and start polling the futures
waker.wake();
});
handles.push(event_handle);
}
}
}
// When we exit the Reactor we first join all the handles on
// the child threads we've spawned so we catch any panics and
// release all resources.
for handle in handles {
handle.join().unwrap();
}
});
Reactor {
readylist,
dispatcher: tx,
handle: Some(handle),
}
}
fn register(&amp;mut self, duration: u64, waker: Waker, data: usize) {
// registering an event is as simple as sending an `Event` through
// the channel.
self.dispatcher
.send(Event::Simple(waker, duration, data))
.unwrap();
}
fn close(&amp;mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
// We need a way to check if any event's are ready. This will simply
// look through the &quot;readylist&quot; for an event macthing the ID we want to
// check for.
fn is_ready(&amp;self, id_to_check: usize) -&gt; bool {
self.readylist
.lock()
.map(|rl| rl.iter().any(|id| *id == id_to_check))
.unwrap()
}
}
// When our `Reactor` is dropped we join the reactor thread with the thread
// owning our `Reactor` so we catch any panics and release all resources.
// It's not needed for this to work, but it really is a best practice to join
// all threads you spawn.
impl Drop for Reactor {
fn drop(&amp;mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}
</code></pre>
<p>It's a lot of code though, but essentially we just spawn off a new thread
and make it sleep for some time which we specify when we create a <code>Task</code>.</p>
<p>Now, let's test our code and see if it works. This code is actually runnable
if you press the &quot;play&quot; button. Since we're sleeping for a couple of seconds
here, just give it some time to run.</p>
<p>In the last chapter we have the <a href="./8_finished_example.html">whole 200 lines in an editable window</a>. You can
also copy that or edit it right in this book.</p>
<pre><pre class="playpen"><code class="language-rust edition2018"># use std::{
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
# thread::{self, JoinHandle}, time::{Duration, Instant}
# };
#
fn main() {
// This is just to make it easier for us to see when our Future was resolved
let start = Instant::now();
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new();
// Since we'll share this between threads we wrap it in a
// atmically-refcounted- mutex.
let reactor = Arc::new(Mutex::new(reactor));
// We create two tasks:
// - first parameter is the `reactor`
// - the second is a timeout in seconds
// - the third is an `id` to identify the task
let future1 = Task::new(reactor.clone(), 2, 1);
let future2 = Task::new(reactor.clone(), 1, 2);
// an `async` block works the same way as an `async fn` in that it compiles
// our code into a state machine, `yielding` at every `await` point.
let fut1 = async {
let val = future1.await;
let dur = (Instant::now() - start).as_secs_f32();
println!(&quot;Future got {} at time: {:.2}.&quot;, val, dur);
};
let fut2 = async {
let val = future2.await;
let dur = (Instant::now() - start).as_secs_f32();
println!(&quot;Future got {} at time: {:.2}.&quot;, val, dur);
};
// Our executor can only run one and one future, this is pretty normal
// though. You have a set of operations containing many futures that
// ends up as a single future that drives them all to completion.
let mainfut = async {
fut1.await;
fut2.await;
};
// This executor will block the main thread until the futures is resolved
block_on(mainfut);
// When we're done, we want to shut down our reactor thread so our program
// ends nicely.
reactor.lock().map(|mut r| r.close()).unwrap();
}
# // ============================ EXECUTOR ====================================
#
# // Our executor takes any object which implements the `Future` trait
# fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
# // the first thing we do is to construct a `Waker` which we'll pass on to
# // the `reactor` so it can wake us up when an event is ready.
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# // The context struct is just a wrapper for a `Waker` object. Maybe in the
# // future this will do more, but right now it's just a wrapper.
# let mut cx = Context::from_waker(&amp;waker);
#
# // We poll in a loop, but it's not a busy loop. It will only run when
# // an event occurs, or a thread has a &quot;spurious wakeup&quot; (an unexpected wakeup
# // that can happen for no good reason).
# let val = loop {
# // So, since we run this on one thread and run one future to completion
# // we can pin the `Future` to the stack. This is unsafe, but saves an
# // allocation. We could `Box::pin` it too if we wanted. This is however
# // safe since we don't move the `Future` here.
# let pinned = unsafe { Pin::new_unchecked(&amp;mut future) };
# match Future::poll(pinned, &amp;mut cx) {
# // when the Future is ready we're finished
# Poll::Ready(val) =&gt; break val,
# // If we get a `pending` future we just go to sleep...
# Poll::Pending =&gt; thread::park(),
# };
# };
# val
# }
#
# // ====================== FUTURE IMPLEMENTATION ==============================
#
# // This is the definition of our `Waker`. We use a regular thread-handle here.
# // It works but it's not a good solution. If one of our `Futures` holds a handle
# // to our thread and takes it with it to a different thread the followinc could
# // happen:
# // 1. Our future calls `unpark` from a different thread
# // 2. Our `executor` thinks that data is ready and wakes up and polls the future
# // 3. The future is not ready yet but one nanosecond later the `Reactor` gets
# // an event and calles `wake()` which also unparks our thread.
# // 4. This could all happen before we go to sleep again since these processes
# // run in parallel.
# // 5. Our reactor has called `wake` but our thread is still sleeping since it was
# // awake alredy at that point.
# // 6. We're deadlocked and our program stops working
# // There are many better soloutions, here are some:
# // - Use `std::sync::CondVar`
# // - Use [crossbeam::sync::Parker](https://docs.rs/crossbeam/0.7.3/crossbeam/sync/# struct.Parker.html)
# #[derive(Clone)]
# struct MyWaker {
# thread: thread::Thread,
# }
#
# // This is the definition of our `Future`. It keeps all the information we
# // need. This one holds a reference to our `reactor`, that's just to make
# // this example as easy as possible. It doesn't need to hold a reference to
# // the whole reactor, but it needs to be able to register itself with the
# // reactor.
# #[derive(Clone)]
# pub struct Task {
# id: usize,
# reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;,
# data: u64,
# is_registered: bool,
# }
#
# // These are function definitions we'll use for our waker. Remember the
# // &quot;Trait Objects&quot; chapter from the book.
# fn mywaker_wake(s: &amp;MyWaker) {
# let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark();
# }
#
# // Since we use an `Arc` cloning is just increasing the refcount on the smart
# // pointer.
# fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
# }
#
# // This is actually a &quot;helper funtcion&quot; to create a `Waker` vtable. In contrast
# // to when we created a `Trait Object` from scratch we don't need to concern
# // ourselves with the actual layout of the `vtable` and only provide a fixed
# // set of functions
# const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new(
# |s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# )
# };
#
# // Instead of implementing this on the `MyWaker` oject in `impl Mywaker...` we
# // just use this pattern instead since it saves us some lines of code.
# fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
# let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
# unsafe { Waker::from_raw(raw_waker) }
# }
#
# impl Task {
# fn new(reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;, data: u64, id: usize) -&gt; Self {
# Task {
# id,
# reactor,
# data,
# is_registered: false,
# }
# }
# }
#
# // This is our `Future` implementation
# impl Future for Task {
# // The output for this kind of `leaf future` is just an `usize`. For other
# // futures this could be something more interesting like a byte stream.
# type Output = usize;
# fn poll(mut self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
# let mut r = self.reactor.lock().unwrap();
# // we check with the `Reactor` if this future is in its &quot;readylist&quot;
# if r.is_ready(self.id) {
# // if it is, we return the data. In this case it's just the ID of
# // the task.
# Poll::Ready(self.id)
# } else if self.is_registered {
# // If the future is registered alredy, we just return `Pending`
# Poll::Pending
# } else {
# // If we get here, it must be the first time this `Future` is polled
# // so we register a task with our `reactor`
# r.register(self.data, cx.waker().clone(), self.id);
# // oh, we have to drop the lock on our `Mutex` here because we can't
# // have a shared and exclusive borrow at the same time
# drop(r);
# self.is_registered = true;
# Poll::Pending
# }
# }
# }
#
# // =============================== REACTOR ===================================
# // This is a &quot;fake&quot; reactor. It does no real I/O, but that also makes our
# // code possible to run in the book and in the playground
# struct Reactor {
# // we need some way of registering a Task with the reactor. Normally this
# // would be an &quot;interest&quot; in an I/O event
# dispatcher: Sender&lt;Event&gt;,
# handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
# // This is a list of tasks that are ready, which means they should be polled
# // for data.
# readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;,
# }
#
# // We just have two kind of events. A timeout event, a &quot;timeout&quot; event called
# // `Simple` and a `Close` event to close down our reactor.
# #[derive(Debug)]
# enum Event {
# Close,
# Simple(Waker, u64, usize),
# }
#
# impl Reactor {
# fn new() -&gt; Self {
# // The way we register new events with our reactor is using a regular
# // channel
# let (tx, rx) = channel::&lt;Event&gt;();
# let readylist = Arc::new(Mutex::new(vec![]));
# let rl_clone = readylist.clone();
#
# // This `Vec` will hold handles to all threads we spawn so we can
# // join them later on and finish our programm in a good manner
# let mut handles = vec![];
# // This will be the &quot;Reactor thread&quot;
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# let rl_clone = rl_clone.clone();
# match event {
# // If we get a close event we break out of the loop we're in
# Event::Close =&gt; break,
# Event::Simple(waker, duration, id) =&gt; {
#
# // When we get an event we simply spawn a new thread...
# let event_handle = thread::spawn(move || {
# //... which will just sleep for the number of seconds
# // we provided when creating the `Task`.
# thread::sleep(Duration::from_secs(duration));
# // When it's done sleeping we put the ID of this task
# // on the &quot;readylist&quot;
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
# // Then we call `wake` which will wake up our
# // executor and start polling the futures
# waker.wake();
# });
#
# handles.push(event_handle);
# }
# }
# }
#
# // When we exit the Reactor we first join all the handles on
# // the child threads we've spawned so we catch any panics and
# // release all resources.
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# readylist,
# dispatcher: tx,
# handle: Some(handle),
# }
# }
#
# fn register(&amp;mut self, duration: u64, waker: Waker, data: usize) {
# // registering an event is as simple as sending an `Event` through
# // the channel.
# self.dispatcher
# .send(Event::Simple(waker, duration, data))
# .unwrap();
# }
#
# fn close(&amp;mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
#
# // We need a way to check if any event's are ready. This will simply
# // look through the &quot;readylist&quot; for an event macthing the ID we want to
# // check for.
# fn is_ready(&amp;self, id_to_check: usize) -&gt; bool {
# self.readylist
# .lock()
# .map(|rl| rl.iter().any(|id| *id == id_to_check))
# .unwrap()
# }
# }
#
# // When our `Reactor` is dropped we join the reactor thread with the thread
# // owning our `Reactor` so we catch any panics and release all resources.
# // It's not needed for this to work, but it really is a best practice to join
# // all threads you spawn.
# impl Drop for Reactor {
# fn drop(&amp;mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
</code></pre></pre>
</main>
<nav class="nav-wrapper" aria-label="Page navigation">
<!-- Mobile navigation buttons -->
<a rel="prev" href="4_pin.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next" href="7_conclusion.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
<div style="clear: both"></div>
</nav>
</div>
</div>
<nav class="nav-wide-wrapper" aria-label="Page navigation">
<a href="4_pin.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a href="7_conclusion.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
</nav>
</div>
<!-- Google Analytics Tag -->
<script type="text/javascript">
var localAddrs = ["localhost", "127.0.0.1", ""];
// make sure we don't activate google analytics if the developer is
// inspecting the book locally...
if (localAddrs.indexOf(document.location.hostname) === -1) {
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-157536992-1', 'auto');
ga('send', 'pageview');
}
</script>
<script src="ace.js" type="text/javascript" charset="utf-8"></script>
<script src="editor.js" type="text/javascript" charset="utf-8"></script>
<script src="mode-rust.js" type="text/javascript" charset="utf-8"></script>
<script src="theme-dawn.js" type="text/javascript" charset="utf-8"></script>
<script src="theme-tomorrow_night.js" type="text/javascript" charset="utf-8"></script>
<script src="elasticlunr.min.js" type="text/javascript" charset="utf-8"></script>
<script src="mark.min.js" type="text/javascript" charset="utf-8"></script>
<script src="searcher.js" type="text/javascript" charset="utf-8"></script>
<script src="clipboard.min.js" type="text/javascript" charset="utf-8"></script>
<script src="highlight.js" type="text/javascript" charset="utf-8"></script>
<script src="book.js" type="text/javascript" charset="utf-8"></script>
<!-- Custom JS scripts -->
</body>
</html>