This commit is contained in:
Carl Fredrik Samson
2020-01-26 20:41:41 +01:00
parent d0a018bfd1
commit c52fa3085a
23 changed files with 1267 additions and 914 deletions

View File

@@ -1,5 +1,5 @@
<!DOCTYPE HTML>
<html lang="en" class="sidebar-visible no-js light">
<html lang="en" class="sidebar-visible no-js">
<head>
<!-- Book generated using mdBook -->
<meta charset="UTF-8">
@@ -34,11 +34,11 @@
</head>
<body>
<body class="light">
<!-- Provide site root to javascript -->
<script type="text/javascript">
var path_to_root = "";
var default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? "light" : "light";
var default_theme = "light";
</script>
<!-- Work around some values being stored in localStorage wrapped in quotes -->
@@ -62,11 +62,8 @@
var theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
var html = document.querySelector('html');
html.classList.remove('no-js')
html.classList.remove('light')
html.classList.add(theme);
html.classList.add('js');
document.body.className = theme;
document.querySelector('html').className = theme + ' js';
</script>
<!-- Hide / unhide sidebar before it is displayed -->
@@ -82,8 +79,8 @@
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<div id="sidebar-scrollbox" class="sidebar-scrollbox">
<ol class="chapter"><li class="expanded "><a href="0_0_introduction.html"><strong aria-hidden="true">1.</strong> Introduction</a></li><li class="expanded "><a href="0_1_background_information.html"><strong aria-hidden="true">2.</strong> Some background information</a></li><li class="expanded "><a href="0_2_naive_implementation.html"><strong aria-hidden="true">3.</strong> Naive example</a></li><li class="expanded "><a href="0_3_proper_waker.html"><strong aria-hidden="true">4.</strong> Proper Waker</a></li><li class="expanded "><a href="0_4_proper_future.html"><strong aria-hidden="true">5.</strong> Proper Future</a></li><li class="expanded "><a href="0_5_async_wait.html"><strong aria-hidden="true">6.</strong> Supporting async/await</a></li><li class="expanded "><a href="0_6_concurrent_futures.html"><strong aria-hidden="true">7.</strong> Bonus: concurrent futures</a></li></ol>
<div class="sidebar-scrollbox">
<ol class="chapter"><li><a href="0_0_introduction.html"><strong aria-hidden="true">1.</strong> Introduction</a></li><li><a href="0_1_background_information.html"><strong aria-hidden="true">2.</strong> Some background information</a></li><li><ol class="section"><li><a href="0_1_1_trait_objects.html"><strong aria-hidden="true">2.1.</strong> Trait objects and fat pointers</a></li><li><a href="0_1_2_generators_pin.html"><strong aria-hidden="true">2.2.</strong> Generators and Pin</a></li></ol></li><li><a href="0_2_naive_implementation.html"><strong aria-hidden="true">3.</strong> Naive example</a></li><li><a href="0_3_proper_waker.html"><strong aria-hidden="true">4.</strong> Proper Waker</a></li><li><a href="0_4_proper_future.html"><strong aria-hidden="true">5.</strong> Proper Future</a></li><li><a href="0_5_async_wait.html"><strong aria-hidden="true">6.</strong> Supporting async/await</a></li><li><a href="0_6_concurrent_futures.html"><strong aria-hidden="true">7.</strong> Bonus: concurrent futures</a></li></ol>
</div>
<div id="sidebar-resize-handle" class="sidebar-resize-handle"></div>
</nav>
@@ -150,7 +147,77 @@
<div id="content" class="content">
<main>
<h1><a class="header" href="#introduction" id="introduction">Introduction</a></h1>
<h1><a class="header" href="#futures-explained-in-200-lines-of-rust" id="futures-explained-in-200-lines-of-rust">Futures Explained in 200 Lines of Rust</a></h1>
<p>This book aims to explain <code>Futures</code> in Rust using an example driven approach.</p>
<p>The goal is to get a better understanding of <code>Futures</code> by implementing a toy
<code>Reactor</code>, a very simple <code>Executor</code> and our own <code>Futures</code>. </p>
<p>We'll start off solving a small problem without <code>Futures</code>, <code>Wakers</code> or async/await
and then gradually adapt our example so it implements all these concepts, and
can be solved using the executor provided by both <code>tokio</code> and <code>async_str</code>.</p>
<p>In the end I've made some reader excercises you can do if you want to fix some
of the most glaring ommissions and shortcuts we took and create a slightly better
example yourself.</p>
<h2><a class="header" href="#what-does-this-book-give-you-that-isnt-covered-elsewhere" id="what-does-this-book-give-you-that-isnt-covered-elsewhere">What does this book give you that isn't covered elsewhere?</a></h2>
<p>That's a valid question. There are many good resources and examples already. First
of all, this book will point you to some background information that I have found
very valuable to get an understanding of concurrent programming in general.</p>
<p>I find that many discussions arise, not because <code>Futures</code> is a hard concept to
grasp, but that concurrent programming is a hard concept in general.</p>
<p>Secondly, I've always found small runnable examples very exiting to learn from. It's
all code that you can download, play with and learn from.</p>
<h2><a class="header" href="#what-well-do-and-not" id="what-well-do-and-not">What we'll do and not</a></h2>
<p><strong>We'll:</strong></p>
<ul>
<li>Implement our own <code>Futures</code> and get to know the <code>Reactor/Executor</code> pattern</li>
<li>Implement our own waker and learn why it's a bit foreign compared to other types</li>
<li>Talk a bit about runtime complexity and what to keep in mind when writing async Rust.</li>
<li>Make sure all examples can be run on the playground</li>
<li>Not rely on any helpers or libraries, but try to face the complexity and learn</li>
</ul>
<p><strong>We'll not:</strong></p>
<ul>
<li>Talk about how futures are implemented in Rust the language, the state machine and so on</li>
<li>Explain how the different runtimes differ, however, you'll hopefully be a bit
better off if you read this before you go research them</li>
<li>Explain concurrent programming, but I will supply sources</li>
</ul>
<p>I do want to explore Rusts internal implementation but that will be for a later
book.</p>
<h2><a class="header" href="#credits-and-thanks" id="credits-and-thanks">Credits and thanks</a></h2>
<p>I'll like to take the chance of thanking the people behind <code>mio</code>, <code>tokio</code>,
<code>async_std</code>, <code>Futures</code>, <code>libc</code>, <code>crossbeam</code> and many other libraries which so
much is built upon. Reading and exploring some of this code is nothing less than
impressive.</p>
<h2><a class="header" href="#why-is-futures-in-rust-hard-to-understand" id="why-is-futures-in-rust-hard-to-understand">Why is <code>Futures</code> in Rust hard to understand</a></h2>
<p>Well, I think it has to do with several things:</p>
<ol>
<li>
<p>Futures has a very interesting implementation, compiling down to a state
machine using generators to suspend and resume execution. In a language such as
Rust this is pretty hard to do ergonomically and safely. You are exposed to some
if this complexity when working with futures and want to understand them, not
only learn how to use them.</p>
</li>
<li>
<p>Rust doesn't provide a runtime. That means you'll actually have to choose one
yourself and actually know what a <code>Reactor</code> and an <code>Executor</code> is. While not
too difficult, you need to make more choices than you need in GO and other
languages designed with a concurrent programming in mind and ships with a
runtime.</p>
</li>
<li>
<p>Futures exist in two versions, Futures 1.0 and Futures 3.0. Futures 1.0 was
known to have some issues regarding ergonomics. Turns out that modelling
async coding after <code>Promises</code> in JavaScript can turn in to extremely long errors
and type signatures with a type system as Rust.</p>
</li>
</ol>
<p>Futures 3.0 are not compatible with Futures 1.0 without performing some work.</p>
<ol start="4">
<li>Async await syntax was recently stabilized</li>
</ol>
<p>what we'll
really do is to stub out a <code>Reactor</code>, and <code>Executor</code> and implement</p>
<h1><a class="header" href="#some-background-information" id="some-background-information">Some background information</a></h1>
<p>Before we start implementing our <code>Futures</code> , we'll go through some background
information that will help demystify some of the concepts we encounter.</p>
@@ -176,8 +243,8 @@ object we construct our selves.</p>
<h2><a class="header" href="#fat-pointers-in-rust" id="fat-pointers-in-rust">Fat pointers in Rust</a></h2>
<p>Let's take a look at the size of some different pointer types in Rust. If we
run the following code:</p>
<pre><pre class="playpen"><code class="language-rust"><span class="boring">use std::mem::size_of;
</span>trait SomeTrait { }
<pre><pre class="playpen"><code class="language-rust"># use std::mem::size_of;
trait SomeTrait { }
fn main() {
println!(&quot;Size of Box&lt;i32&gt;: {}&quot;, size_of::&lt;Box&lt;i32&gt;&gt;());
@@ -330,6 +397,8 @@ preferred runtime to actually do the scheduling and I/O queues.</p>
<p>It's important to know that Rust doesn't provide a runtime, so you have to choose
one. <a href="https://github.com/async-rs/async-std">async std</a> and <a href="https://github.com/tokio-rs/tokio">tokio</a> are two popular ones.</p>
<p>With that out of the way, let's move on to our main example.</p>
<h1><a class="header" href="#trait-objects-and-fat-pointers" id="trait-objects-and-fat-pointers">Trait objects and fat pointers</a></h1>
<h1><a class="header" href="#generators-and-pin" id="generators-and-pin">Generators and Pin</a></h1>
<h1><a class="header" href="#naive-example" id="naive-example">Naive example</a></h1>
<pre><pre class="playpen"><code class="language-rust">use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
@@ -479,118 +548,118 @@ fn main() {
executor_run(reactor, readylist);
}
<span class="boring">// ====== EXECUTOR ======
</span><span class="boring">fn executor_run(mut reactor: Reactor, rl: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;) {
</span><span class="boring"> let start = Instant::now();
</span><span class="boring"> loop {
</span><span class="boring"> let mut rl_locked = rl.lock().unwrap();
</span><span class="boring"> while let Some(event) = rl_locked.pop() {
</span><span class="boring"> let dur = (Instant::now() - start).as_secs_f32();
</span><span class="boring"> println!(&quot;Event {} just happened at time: {:.2}.&quot;, event, dur);
</span><span class="boring"> reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
</span><span class="boring"> }
</span><span class="boring"> drop(rl_locked);
</span><span class="boring">
</span><span class="boring"> if reactor.outstanding.load(Ordering::Relaxed) == 0 {
</span><span class="boring"> reactor.close();
</span><span class="boring"> break;
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> thread::park();
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">// ====== &quot;FUTURE&quot; IMPL ======
</span><span class="boring">#[derive(Debug)]
</span><span class="boring">struct MyWaker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::Thread,
</span><span class="boring"> readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl MyWaker {
</span><span class="boring"> fn new(id: usize, thread: thread::Thread, readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;) -&gt; Self {
</span><span class="boring"> MyWaker {
</span><span class="boring"> id,
</span><span class="boring"> thread,
</span><span class="boring"> readylist,
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn wake(&amp;self) {
</span><span class="boring"> self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
</span><span class="boring"> self.thread.unpark();
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">
</span><span class="boring">#[derive(Debug, Clone)]
</span><span class="boring">pub struct Task {
</span><span class="boring"> id: usize,
</span><span class="boring"> pending: bool,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">// ===== REACTOR =====
</span><span class="boring">struct Reactor {
</span><span class="boring"> dispatcher: Sender&lt;Event&gt;,
</span><span class="boring"> handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
</span><span class="boring"> outstanding: AtomicUsize,
</span><span class="boring">}
</span><span class="boring">#[derive(Debug)]
</span><span class="boring">enum Event {
</span><span class="boring"> Close,
</span><span class="boring"> Simple(MyWaker, u64),
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Reactor {
</span><span class="boring"> fn new() -&gt; Self {
</span><span class="boring"> let (tx, rx) = channel::&lt;Event&gt;();
</span><span class="boring"> let mut handles = vec![];
</span><span class="boring"> let handle = thread::spawn(move || {
</span><span class="boring"> // This simulates some I/O resource
</span><span class="boring"> for event in rx {
</span><span class="boring"> match event {
</span><span class="boring"> Event::Close =&gt; break,
</span><span class="boring"> Event::Simple(mywaker, duration) =&gt; {
</span><span class="boring"> let event_handle = thread::spawn(move || {
</span><span class="boring"> thread::sleep(Duration::from_secs(duration));
</span><span class="boring"> mywaker.wake();
</span><span class="boring"> });
</span><span class="boring"> handles.push(event_handle);
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> for handle in handles {
</span><span class="boring"> handle.join().unwrap();
</span><span class="boring"> }
</span><span class="boring"> });
</span><span class="boring">
</span><span class="boring"> Reactor {
</span><span class="boring"> dispatcher: tx,
</span><span class="boring"> handle: Some(handle),
</span><span class="boring"> outstanding: AtomicUsize::new(0),
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn register(&amp;mut self, duration: u64, mywaker: MyWaker) {
</span><span class="boring"> self.dispatcher
</span><span class="boring"> .send(Event::Simple(mywaker, duration))
</span><span class="boring"> .unwrap();
</span><span class="boring"> self.outstanding.fetch_add(1, Ordering::Relaxed);
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn close(&amp;mut self) {
</span><span class="boring"> self.dispatcher.send(Event::Close).unwrap();
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Drop for Reactor {
</span><span class="boring"> fn drop(&amp;mut self) {
</span><span class="boring"> self.handle.take().map(|h| h.join().unwrap()).unwrap();
</span><span class="boring"> }
</span><span class="boring">}
</span></code></pre></pre>
# // ====== EXECUTOR ======
# fn executor_run(mut reactor: Reactor, rl: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;) {
# let start = Instant::now();
# loop {
# let mut rl_locked = rl.lock().unwrap();
# while let Some(event) = rl_locked.pop() {
# let dur = (Instant::now() - start).as_secs_f32();
# println!(&quot;Event {} just happened at time: {:.2}.&quot;, event, dur);
# reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
# }
# drop(rl_locked);
#
# if reactor.outstanding.load(Ordering::Relaxed) == 0 {
# reactor.close();
# break;
# }
#
# thread::park();
# }
# }
#
# // ====== &quot;FUTURE&quot; IMPL ======
# #[derive(Debug)]
# struct MyWaker {
# id: usize,
# thread: thread::Thread,
# readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;,
# }
#
# impl MyWaker {
# fn new(id: usize, thread: thread::Thread, readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;) -&gt; Self {
# MyWaker {
# id,
# thread,
# readylist,
# }
# }
#
# fn wake(&amp;self) {
# self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
# self.thread.unpark();
# }
# }
#
#
# #[derive(Debug, Clone)]
# pub struct Task {
# id: usize,
# pending: bool,
# }
#
# // ===== REACTOR =====
# struct Reactor {
# dispatcher: Sender&lt;Event&gt;,
# handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
# outstanding: AtomicUsize,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Simple(MyWaker, u64),
# }
#
# impl Reactor {
# fn new() -&gt; Self {
# let (tx, rx) = channel::&lt;Event&gt;();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# match event {
# Event::Close =&gt; break,
# Event::Simple(mywaker, duration) =&gt; {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# mywaker.wake();
# });
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# dispatcher: tx,
# handle: Some(handle),
# outstanding: AtomicUsize::new(0),
# }
# }
#
# fn register(&amp;mut self, duration: u64, mywaker: MyWaker) {
# self.dispatcher
# .send(Event::Simple(mywaker, duration))
# .unwrap();
# self.outstanding.fetch_add(1, Ordering::Relaxed);
# }
#
# fn close(&amp;mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
# }
#
# impl Drop for Reactor {
# fn drop(&amp;mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
</code></pre></pre>
<h1><a class="header" href="#proper-waker" id="proper-waker">Proper Waker</a></h1>
<h1><a class="header" href="#proper-future" id="proper-future">Proper Future</a></h1>
<h1><a class="header" href="#supporting-asyncawait" id="supporting-asyncawait">Supporting async/await</a></h1>
@@ -618,30 +687,7 @@ fn main() {
</div>
<!-- Livereload script (if served using the cli tool) -->
<script type="text/javascript">
var socket = new WebSocket("ws://localhost:3001");
socket.onmessage = function (event) {
if (event.data === "reload") {
socket.close();
location.reload(true); // force reload from server (not from cache)
}
};
window.onbeforeunload = function() {
socket.close();
}
</script>
<script type="text/javascript">
window.playpen_copyable = true;
</script>