changed example

This commit is contained in:
cfsamson
2020-02-05 10:11:21 +01:00
parent 1ea3056b5c
commit 6dd174c9e1
7 changed files with 603 additions and 157 deletions

View File

@@ -814,6 +814,7 @@ fn main() {
# } # }
# } # }
</code></pre></pre> </code></pre></pre>
<h2><a class="header" href="#asyncawait-and-concurrent-futures" id="asyncawait-and-concurrent-futures">Async/Await and concurrent Futures</a></h2>
<p>This is the first time we actually see the <code>async/await</code> syntax so let's <p>This is the first time we actually see the <code>async/await</code> syntax so let's
finish this book by explaining them briefly.</p> finish this book by explaining them briefly.</p>
<p>Hopefully, the <code>await</code> syntax looks pretty familiar. It has a lot in common <p>Hopefully, the <code>await</code> syntax looks pretty familiar. It has a lot in common
@@ -834,11 +835,11 @@ code. For us to actually await multiple futures at the same time we somehow need
to <code>spawn</code> them so they're polled once, but does not cause our thread to sleep to <code>spawn</code> them so they're polled once, but does not cause our thread to sleep
and wait for them one after one.</p> and wait for them one after one.</p>
<p>Our example as it stands now returns this:</p> <p>Our example as it stands now returns this:</p>
<pre><code>Future got 1 at time: 1.00. <pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 3.00. Future got 2 at time: 3.00.
</code></pre> </code></pre>
<p>If these <code>Futures</code> were executed asynchronously we would expect to see:</p> <p>If these <code>Futures</code> were executed asynchronously we would expect to see:</p>
<pre><code>Future got 1 at time: 1.00. <pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 2.00. Future got 2 at time: 2.00.
</code></pre> </code></pre>
<p>To accomplish this we can create the simplest possible <code>spawn</code> function I could <p>To accomplish this we can create the simplest possible <code>spawn</code> function I could
@@ -862,7 +863,12 @@ come up with:</p>
} }
</code></pre> </code></pre>
<p>Now if we change our code in <code>main</code> to look like this instead.</p> <p>Now if we change our code in <code>main</code> to look like this instead.</p>
<pre><pre class="playpen"><code class="language-rust">fn main() { <pre><pre class="playpen"><code class="language-rust"># use std::{
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
# thread::{self, JoinHandle}, time::{Duration, Instant}
# };
fn main() {
let start = Instant::now(); let start = Instant::now();
let reactor = Reactor::new(); let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor)); let reactor = Arc::new(Mutex::new(reactor));
@@ -898,9 +904,174 @@ come up with:</p>
block_on(mainfut); block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
# //// ===== EXECUTOR =====
# fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&amp;waker);
# let val = loop {
# let pinned = unsafe { Pin::new_unchecked(&amp;mut future) };
# match Future::poll(pinned, &amp;mut cx) {
# Poll::Ready(val) =&gt; break val,
# Poll::Pending =&gt; thread::park(),
# };
# };
# val
# }
#
# fn spawn&lt;F: Future&gt;(future: F) -&gt; Pin&lt;Box&lt;F&gt;&gt; {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&amp;waker);
# let mut boxed = Box::pin(future);
# let _ = Future::poll(boxed.as_mut(), &amp;mut cx);
# boxed
# }
#
# // ===== FUTURE IMPLEMENTATION =====
# #[derive(Clone)]
# struct MyWaker {
# thread: thread::Thread,
# }
#
# #[derive(Clone)]
# pub struct Task {
# id: usize,
# reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;,
# data: u64,
# is_registered: bool,
# }
#
# fn mywaker_wake(s: &amp;MyWaker) {
# let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark();
# }
#
# fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
# }
#
# const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new(
# |s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# )
# };
#
# fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
# let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
# unsafe { Waker::from_raw(raw_waker) }
# }
#
# impl Task {
# fn new(reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;, data: u64, id: usize) -&gt; Self {
# Task {
# id,
# reactor,
# data,
# is_registered: false,
# }
# }
# }
#
# impl Future for Task {
# type Output = usize;
# fn poll(mut self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
# let mut r = self.reactor.lock().unwrap();
# if r.is_ready(self.id) {
# Poll::Ready(self.id)
# } else if self.is_registered {
# Poll::Pending
# } else {
# r.register(self.data, cx.waker().clone(), self.id);
# drop(r);
# self.is_registered = true;
# Poll::Pending
# }
# }
# }
#
# // ===== REACTOR =====
# struct Reactor {
# dispatcher: Sender&lt;Event&gt;,
# handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
# readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Simple(Waker, u64, usize),
# }
#
# impl Reactor {
# fn new() -&gt; Self {
# let (tx, rx) = channel::&lt;Event&gt;();
# let readylist = Arc::new(Mutex::new(vec![]));
# let rl_clone = readylist.clone();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# println!(&quot;GOT EVENT: {:?}&quot;, event);
# let rl_clone = rl_clone.clone();
# match event {
# Event::Close =&gt; break,
# Event::Simple(waker, duration, id) =&gt; {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
# waker.wake();
# });
#
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# readylist,
# dispatcher: tx,
# handle: Some(handle),
# }
# }
#
# fn register(&amp;mut self, duration: u64, waker: Waker, data: usize) {
# self.dispatcher
# .send(Event::Simple(waker, duration, data))
# .unwrap();
# }
#
# fn close(&amp;mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
#
# fn is_ready(&amp;self, id_to_check: usize) -&gt; bool {
# self.readylist
# .lock()
# .map(|rl| rl.iter().any(|id| *id == id_to_check))
# .unwrap()
# }
# }
#
# impl Drop for Reactor {
# fn drop(&amp;mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
</code></pre></pre> </code></pre></pre>
<p>Now, if we try to run our example again</p>
<p>If you add this code to our example and run it, you'll see:</p> <p>If you add this code to our example and run it, you'll see:</p>
<pre><code>Future got 1 at time: 1.00. <pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 2.00. Future got 2 at time: 2.00.
</code></pre> </code></pre>
<p>Exactly as we expected.</p> <p>Exactly as we expected.</p>

View File

@@ -160,16 +160,9 @@ run it yourself. Have fun!</p>
fn main() { fn main() {
let start = Instant::now(); let start = Instant::now();
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new(); let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor)); let reactor = Arc::new(Mutex::new(reactor));
let future1 = Task::new(reactor.clone(), 1, 1); let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2); let future2 = Task::new(reactor.clone(), 2, 2);
let fut1 = async { let fut1 = async {
@@ -185,15 +178,17 @@ fn main() {
}; };
let mainfut = async { let mainfut = async {
fut1.await; let handle1 = spawn(fut1);
fut2.await; let handle2 = spawn(fut2);
handle1.await;
handle2.await;
}; };
block_on(mainfut); block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
//// ============================ EXECUTOR ==================================== //// ===== EXECUTOR =====
fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output { fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
let mywaker = Arc::new(MyWaker{ thread: thread::current() }); let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker)); let waker = waker_into_waker(Arc::into_raw(mywaker));
@@ -208,7 +203,16 @@ fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
val val
} }
// ====================== FUTURE IMPLEMENTATION ============================== fn spawn&lt;F: Future&gt;(future: F) -&gt; Pin&lt;Box&lt;F&gt;&gt; {
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&amp;waker);
let mut boxed = Box::pin(future);
let _ = Future::poll(boxed.as_mut(), &amp;mut cx);
boxed
}
// ===== FUTURE IMPLEMENTATION =====
#[derive(Clone)] #[derive(Clone)]
struct MyWaker { struct MyWaker {
thread: thread::Thread, thread: thread::Thread,
@@ -276,7 +280,7 @@ impl Future for Task {
} }
} }
// =============================== REACTOR =================================== // ===== REACTOR =====
struct Reactor { struct Reactor {
dispatcher: Sender&lt;Event&gt;, dispatcher: Sender&lt;Event&gt;,
handle: Option&lt;JoinHandle&lt;()&gt;&gt;, handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
@@ -297,6 +301,7 @@ impl Reactor {
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
// This simulates some I/O resource // This simulates some I/O resource
for event in rx { for event in rx {
println!(&quot;GOT EVENT: {:?}&quot;, event);
let rl_clone = rl_clone.clone(); let rl_clone = rl_clone.clone();
match event { match event {
Event::Close =&gt; break, Event::Close =&gt; break,

View File

@@ -1811,6 +1811,7 @@ fn main() {
# } # }
# } # }
</code></pre></pre> </code></pre></pre>
<h2><a class="header" href="#asyncawait-and-concurrent-futures" id="asyncawait-and-concurrent-futures">Async/Await and concurrent Futures</a></h2>
<p>This is the first time we actually see the <code>async/await</code> syntax so let's <p>This is the first time we actually see the <code>async/await</code> syntax so let's
finish this book by explaining them briefly.</p> finish this book by explaining them briefly.</p>
<p>Hopefully, the <code>await</code> syntax looks pretty familiar. It has a lot in common <p>Hopefully, the <code>await</code> syntax looks pretty familiar. It has a lot in common
@@ -1831,11 +1832,11 @@ code. For us to actually await multiple futures at the same time we somehow need
to <code>spawn</code> them so they're polled once, but does not cause our thread to sleep to <code>spawn</code> them so they're polled once, but does not cause our thread to sleep
and wait for them one after one.</p> and wait for them one after one.</p>
<p>Our example as it stands now returns this:</p> <p>Our example as it stands now returns this:</p>
<pre><code>Future got 1 at time: 1.00. <pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 3.00. Future got 2 at time: 3.00.
</code></pre> </code></pre>
<p>If these <code>Futures</code> were executed asynchronously we would expect to see:</p> <p>If these <code>Futures</code> were executed asynchronously we would expect to see:</p>
<pre><code>Future got 1 at time: 1.00. <pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 2.00. Future got 2 at time: 2.00.
</code></pre> </code></pre>
<p>To accomplish this we can create the simplest possible <code>spawn</code> function I could <p>To accomplish this we can create the simplest possible <code>spawn</code> function I could
@@ -1859,7 +1860,12 @@ come up with:</p>
} }
</code></pre> </code></pre>
<p>Now if we change our code in <code>main</code> to look like this instead.</p> <p>Now if we change our code in <code>main</code> to look like this instead.</p>
<pre><pre class="playpen"><code class="language-rust">fn main() { <pre><pre class="playpen"><code class="language-rust"># use std::{
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
# thread::{self, JoinHandle}, time::{Duration, Instant}
# };
fn main() {
let start = Instant::now(); let start = Instant::now();
let reactor = Reactor::new(); let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor)); let reactor = Arc::new(Mutex::new(reactor));
@@ -1895,9 +1901,174 @@ come up with:</p>
block_on(mainfut); block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
# //// ===== EXECUTOR =====
# fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&amp;waker);
# let val = loop {
# let pinned = unsafe { Pin::new_unchecked(&amp;mut future) };
# match Future::poll(pinned, &amp;mut cx) {
# Poll::Ready(val) =&gt; break val,
# Poll::Pending =&gt; thread::park(),
# };
# };
# val
# }
#
# fn spawn&lt;F: Future&gt;(future: F) -&gt; Pin&lt;Box&lt;F&gt;&gt; {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&amp;waker);
# let mut boxed = Box::pin(future);
# let _ = Future::poll(boxed.as_mut(), &amp;mut cx);
# boxed
# }
#
# // ===== FUTURE IMPLEMENTATION =====
# #[derive(Clone)]
# struct MyWaker {
# thread: thread::Thread,
# }
#
# #[derive(Clone)]
# pub struct Task {
# id: usize,
# reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;,
# data: u64,
# is_registered: bool,
# }
#
# fn mywaker_wake(s: &amp;MyWaker) {
# let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark();
# }
#
# fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
# }
#
# const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new(
# |s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# )
# };
#
# fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
# let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
# unsafe { Waker::from_raw(raw_waker) }
# }
#
# impl Task {
# fn new(reactor: Arc&lt;Mutex&lt;Reactor&gt;&gt;, data: u64, id: usize) -&gt; Self {
# Task {
# id,
# reactor,
# data,
# is_registered: false,
# }
# }
# }
#
# impl Future for Task {
# type Output = usize;
# fn poll(mut self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
# let mut r = self.reactor.lock().unwrap();
# if r.is_ready(self.id) {
# Poll::Ready(self.id)
# } else if self.is_registered {
# Poll::Pending
# } else {
# r.register(self.data, cx.waker().clone(), self.id);
# drop(r);
# self.is_registered = true;
# Poll::Pending
# }
# }
# }
#
# // ===== REACTOR =====
# struct Reactor {
# dispatcher: Sender&lt;Event&gt;,
# handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
# readylist: Arc&lt;Mutex&lt;Vec&lt;usize&gt;&gt;&gt;,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Simple(Waker, u64, usize),
# }
#
# impl Reactor {
# fn new() -&gt; Self {
# let (tx, rx) = channel::&lt;Event&gt;();
# let readylist = Arc::new(Mutex::new(vec![]));
# let rl_clone = readylist.clone();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# println!(&quot;GOT EVENT: {:?}&quot;, event);
# let rl_clone = rl_clone.clone();
# match event {
# Event::Close =&gt; break,
# Event::Simple(waker, duration, id) =&gt; {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
# waker.wake();
# });
#
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# readylist,
# dispatcher: tx,
# handle: Some(handle),
# }
# }
#
# fn register(&amp;mut self, duration: u64, waker: Waker, data: usize) {
# self.dispatcher
# .send(Event::Simple(waker, duration, data))
# .unwrap();
# }
#
# fn close(&amp;mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
#
# fn is_ready(&amp;self, id_to_check: usize) -&gt; bool {
# self.readylist
# .lock()
# .map(|rl| rl.iter().any(|id| *id == id_to_check))
# .unwrap()
# }
# }
#
# impl Drop for Reactor {
# fn drop(&amp;mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
</code></pre></pre> </code></pre></pre>
<p>Now, if we try to run our example again</p>
<p>If you add this code to our example and run it, you'll see:</p> <p>If you add this code to our example and run it, you'll see:</p>
<pre><code>Future got 1 at time: 1.00. <pre><code class="language-ignore">Future got 1 at time: 1.00.
Future got 2 at time: 2.00. Future got 2 at time: 2.00.
</code></pre> </code></pre>
<p>Exactly as we expected.</p> <p>Exactly as we expected.</p>
@@ -1919,16 +2090,9 @@ run it yourself. Have fun!</p>
fn main() { fn main() {
let start = Instant::now(); let start = Instant::now();
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new(); let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor)); let reactor = Arc::new(Mutex::new(reactor));
let future1 = Task::new(reactor.clone(), 1, 1); let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2); let future2 = Task::new(reactor.clone(), 2, 2);
let fut1 = async { let fut1 = async {
@@ -1944,15 +2108,17 @@ fn main() {
}; };
let mainfut = async { let mainfut = async {
fut1.await; let handle1 = spawn(fut1);
fut2.await; let handle2 = spawn(fut2);
handle1.await;
handle2.await;
}; };
block_on(mainfut); block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
//// ============================ EXECUTOR ==================================== //// ===== EXECUTOR =====
fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output { fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
let mywaker = Arc::new(MyWaker{ thread: thread::current() }); let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker)); let waker = waker_into_waker(Arc::into_raw(mywaker));
@@ -1967,7 +2133,16 @@ fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
val val
} }
// ====================== FUTURE IMPLEMENTATION ============================== fn spawn&lt;F: Future&gt;(future: F) -&gt; Pin&lt;Box&lt;F&gt;&gt; {
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&amp;waker);
let mut boxed = Box::pin(future);
let _ = Future::poll(boxed.as_mut(), &amp;mut cx);
boxed
}
// ===== FUTURE IMPLEMENTATION =====
#[derive(Clone)] #[derive(Clone)]
struct MyWaker { struct MyWaker {
thread: thread::Thread, thread: thread::Thread,
@@ -2035,7 +2210,7 @@ impl Future for Task {
} }
} }
// =============================== REACTOR =================================== // ===== REACTOR =====
struct Reactor { struct Reactor {
dispatcher: Sender&lt;Event&gt;, dispatcher: Sender&lt;Event&gt;,
handle: Option&lt;JoinHandle&lt;()&gt;&gt;, handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
@@ -2056,6 +2231,7 @@ impl Reactor {
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
// This simulates some I/O resource // This simulates some I/O resource
for event in rx { for event in rx {
println!(&quot;GOT EVENT: {:?}&quot;, event);
let rl_clone = rl_clone.clone(); let rl_clone = rl_clone.clone();
match event { match event {
Event::Close =&gt; break, Event::Close =&gt; break,

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -403,7 +403,7 @@ here, just give it some time to run.
In the last chapter we have the [whole 200 lines in an editable window](./8_finished_example.md). You can In the last chapter we have the [whole 200 lines in an editable window](./8_finished_example.md). You can
also copy that or edit it right in this book. also copy that or edit it right in this book.
```rust,edition2018 ```rust, edition2018
# use std::{ # use std::{
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex}, # future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, # task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
@@ -456,65 +456,36 @@ fn main() {
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
# // ============================ EXECUTOR ==================================== # // ============================= EXECUTOR ====================================
#
# // Our executor takes any object which implements the `Future` trait
# fn block_on<F: Future>(mut future: F) -> F::Output { # fn block_on<F: Future>(mut future: F) -> F::Output {
# // the first thing we do is to construct a `Waker` which we'll pass on to
# // the `reactor` so it can wake us up when an event is ready.
# let mywaker = Arc::new(MyWaker{ thread: thread::current() }); # let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker)); # let waker = waker_into_waker(Arc::into_raw(mywaker));
# // The context struct is just a wrapper for a `Waker` object. Maybe in the
# // future this will do more, but right now it's just a wrapper.
# let mut cx = Context::from_waker(&waker); # let mut cx = Context::from_waker(&waker);
#
# // We poll in a loop, but it's not a busy loop. It will only run when
# // an event occurs, or a thread has a "spurious wakeup" (an unexpected wakeup
# // that can happen for no good reason).
# let val = loop { # let val = loop {
# // So, since we run this on one thread and run one future to completion
# // we can pin the `Future` to the stack. This is unsafe, but saves an
# // allocation. We could `Box::pin` it too if we wanted. This is however
# // safe since we don't move the `Future` here.
# let pinned = unsafe { Pin::new_unchecked(&mut future) }; # let pinned = unsafe { Pin::new_unchecked(&mut future) };
# match Future::poll(pinned, &mut cx) { # match Future::poll(pinned, &mut cx) {
# // when the Future is ready we're finished
# Poll::Ready(val) => break val, # Poll::Ready(val) => break val,
# // If we get a `pending` future we just go to sleep...
# Poll::Pending => thread::park(), # Poll::Pending => thread::park(),
# }; # };
# }; # };
# val # val
# } # }
# #
# // ====================== FUTURE IMPLEMENTATION ============================== # fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&waker);
# let mut boxed = Box::pin(future);
# let _ = Future::poll(boxed.as_mut(), &mut cx);
# boxed
# }
# #
# // This is the definition of our `Waker`. We use a regular thread-handle here. # // ====================== FUTURE IMPLEMENTATION ==============================
# // It works but it's not a good solution. If one of our `Futures` holds a handle
# // to our thread and takes it with it to a different thread the followinc could
# // happen:
# // 1. Our future calls `unpark` from a different thread
# // 2. Our `executor` thinks that data is ready and wakes up and polls the future
# // 3. The future is not ready yet but one nanosecond later the `Reactor` gets
# // an event and calles `wake()` which also unparks our thread.
# // 4. This could all happen before we go to sleep again since these processes
# // run in parallel.
# // 5. Our reactor has called `wake` but our thread is still sleeping since it was
# // awake alredy at that point.
# // 6. We're deadlocked and our program stops working
# // There are many better soloutions, here are some:
# // - Use `std::sync::CondVar`
# // - Use [crossbeam::sync::Parker](https://docs.rs/crossbeam/0.7.3/crossbeam/sync/# struct.Parker.html)
# #[derive(Clone)] # #[derive(Clone)]
# struct MyWaker { # struct MyWaker {
# thread: thread::Thread, # thread: thread::Thread,
# } # }
# #
# // This is the definition of our `Future`. It keeps all the information we
# // need. This one holds a reference to our `reactor`, that's just to make
# // this example as easy as possible. It doesn't need to hold a reference to
# // the whole reactor, but it needs to be able to register itself with the
# // reactor.
# #[derive(Clone)] # #[derive(Clone)]
# pub struct Task { # pub struct Task {
# id: usize, # id: usize,
@@ -523,26 +494,18 @@ fn main() {
# is_registered: bool, # is_registered: bool,
# } # }
# #
# // These are function definitions we'll use for our waker. Remember the
# // "Trait Objects" chapter from the book.
# fn mywaker_wake(s: &MyWaker) { # fn mywaker_wake(s: &MyWaker) {
# let waker_ptr: *const MyWaker = s; # let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)}; # let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark(); # waker_arc.thread.unpark();
# } # }
# #
# // Since we use an `Arc` cloning is just increasing the refcount on the smart
# // pointer.
# fn mywaker_clone(s: &MyWaker) -> RawWaker { # fn mywaker_clone(s: &MyWaker) -> RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() }; # let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count # std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE) # RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
# } # }
# #
# // This is actually a "helper funtcion" to create a `Waker` vtable. In contrast
# // to when we created a `Trait Object` from scratch we don't need to concern
# // ourselves with the actual layout of the `vtable` and only provide a fixed
# // set of functions
# const VTABLE: RawWakerVTable = unsafe { # const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new( # RawWakerVTable::new(
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone # |s| mywaker_clone(&*(s as *const MyWaker)), // clone
@@ -552,8 +515,6 @@ fn main() {
# ) # )
# }; # };
# #
# // Instead of implementing this on the `MyWaker` oject in `impl Mywaker...` we
# // just use this pattern instead since it saves us some lines of code.
# fn waker_into_waker(s: *const MyWaker) -> Waker { # fn waker_into_waker(s: *const MyWaker) -> Waker {
# let raw_waker = RawWaker::new(s as *const (), &VTABLE); # let raw_waker = RawWaker::new(s as *const (), &VTABLE);
# unsafe { Waker::from_raw(raw_waker) } # unsafe { Waker::from_raw(raw_waker) }
@@ -570,27 +531,16 @@ fn main() {
# } # }
# } # }
# #
# // This is our `Future` implementation
# impl Future for Task { # impl Future for Task {
# // The output for this kind of `leaf future` is just an `usize`. For other
# // futures this could be something more interesting like a byte stream.
# type Output = usize; # type Output = usize;
# fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
# let mut r = self.reactor.lock().unwrap(); # let mut r = self.reactor.lock().unwrap();
# // we check with the `Reactor` if this future is in its "readylist"
# if r.is_ready(self.id) { # if r.is_ready(self.id) {
# // if it is, we return the data. In this case it's just the ID of
# // the task.
# Poll::Ready(self.id) # Poll::Ready(self.id)
# } else if self.is_registered { # } else if self.is_registered {
# // If the future is registered alredy, we just return `Pending`
# Poll::Pending # Poll::Pending
# } else { # } else {
# // If we get here, it must be the first time this `Future` is polled
# // so we register a task with our `reactor`
# r.register(self.data, cx.waker().clone(), self.id); # r.register(self.data, cx.waker().clone(), self.id);
# // oh, we have to drop the lock on our `Mutex` here because we can't
# // have a shared and exclusive borrow at the same time
# drop(r); # drop(r);
# self.is_registered = true; # self.is_registered = true;
# Poll::Pending # Poll::Pending
@@ -599,20 +549,11 @@ fn main() {
# } # }
# #
# // =============================== REACTOR =================================== # // =============================== REACTOR ===================================
# // This is a "fake" reactor. It does no real I/O, but that also makes our
# // code possible to run in the book and in the playground
# struct Reactor { # struct Reactor {
# // we need some way of registering a Task with the reactor. Normally this
# // would be an "interest" in an I/O event
# dispatcher: Sender<Event>, # dispatcher: Sender<Event>,
# handle: Option<JoinHandle<()>>, # handle: Option<JoinHandle<()>>,
# // This is a list of tasks that are ready, which means they should be polled
# // for data.
# readylist: Arc<Mutex<Vec<usize>>>, # readylist: Arc<Mutex<Vec<usize>>>,
# } # }
#
# // We just have two kind of events. A timeout event, a "timeout" event called
# // `Timeout` and a `Close` event to close down our reactor.
# #[derive(Debug)] # #[derive(Debug)]
# enum Event { # enum Event {
# Close, # Close,
@@ -621,35 +562,21 @@ fn main() {
# #
# impl Reactor { # impl Reactor {
# fn new() -> Self { # fn new() -> Self {
# // The way we register new events with our reactor is using a regular
# // channel
# let (tx, rx) = channel::<Event>(); # let (tx, rx) = channel::<Event>();
# let readylist = Arc::new(Mutex::new(vec![])); # let readylist = Arc::new(Mutex::new(vec![]));
# let rl_clone = readylist.clone(); # let rl_clone = readylist.clone();
#
# // This `Vec` will hold handles to all threads we spawn so we can
# // join them later on and finish our programm in a good manner
# let mut handles = vec![]; # let mut handles = vec![];
# // This will be the "Reactor thread"
# let handle = thread::spawn(move || { # let handle = thread::spawn(move || {
# // This simulates some I/O resource # // This simulates some I/O resource
# for event in rx { # for event in rx {
# println!("GOT EVENT: {:?}", event);
# let rl_clone = rl_clone.clone(); # let rl_clone = rl_clone.clone();
# match event { # match event {
# // If we get a close event we break out of the loop we're in
# Event::Close => break, # Event::Close => break,
# Event::Timeout(waker, duration, id) => { # Event::Timeout(waker, duration, id) => {
#
# // When we get an event we simply spawn a new thread...
# let event_handle = thread::spawn(move || { # let event_handle = thread::spawn(move || {
# //... which will just sleep for the number of seconds
# // we provided when creating the `Task`.
# thread::sleep(Duration::from_secs(duration)); # thread::sleep(Duration::from_secs(duration));
# // When it's done sleeping we put the ID of this task
# // on the "readylist"
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap(); # rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
# // Then we call `wake` which will wake up our
# // executor and start polling the futures
# waker.wake(); # waker.wake();
# }); # });
# #
@@ -658,9 +585,6 @@ fn main() {
# } # }
# } # }
# #
# // When we exit the Reactor we first join all the handles on
# // the child threads we've spawned so we catch any panics and
# // release all resources.
# for handle in handles { # for handle in handles {
# handle.join().unwrap(); # handle.join().unwrap();
# } # }
@@ -674,8 +598,6 @@ fn main() {
# } # }
# #
# fn register(&mut self, duration: u64, waker: Waker, data: usize) { # fn register(&mut self, duration: u64, waker: Waker, data: usize) {
# // registering an event is as simple as sending an `Event` through
# // the channel.
# self.dispatcher # self.dispatcher
# .send(Event::Timeout(waker, duration, data)) # .send(Event::Timeout(waker, duration, data))
# .unwrap(); # .unwrap();
@@ -685,9 +607,6 @@ fn main() {
# self.dispatcher.send(Event::Close).unwrap(); # self.dispatcher.send(Event::Close).unwrap();
# } # }
# #
# // We need a way to check if any event's are ready. This will simply
# // look through the "readylist" for an event macthing the ID we want to
# // check for.
# fn is_ready(&self, id_to_check: usize) -> bool { # fn is_ready(&self, id_to_check: usize) -> bool {
# self.readylist # self.readylist
# .lock() # .lock()
@@ -696,10 +615,6 @@ fn main() {
# } # }
# } # }
# #
# // When our `Reactor` is dropped we join the reactor thread with the thread
# // owning our `Reactor` so we catch any panics and release all resources.
# // It's not needed for this to work, but it really is a best practice to join
# // all threads you spawn.
# impl Drop for Reactor { # impl Drop for Reactor {
# fn drop(&mut self) { # fn drop(&mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap(); # self.handle.take().map(|h| h.join().unwrap()).unwrap();
@@ -707,6 +622,8 @@ fn main() {
# } # }
``` ```
## Async/Await and concurrent Futures
This is the first time we actually see the `async/await` syntax so let's This is the first time we actually see the `async/await` syntax so let's
finish this book by explaining them briefly. finish this book by explaining them briefly.
@@ -772,7 +689,12 @@ fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
Now if we change our code in `main` to look like this instead. Now if we change our code in `main` to look like this instead.
```rust, ignore, noplaypen ```rust, edition2018
# use std::{
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
# thread::{self, JoinHandle}, time::{Duration, Instant}
# };
fn main() { fn main() {
let start = Instant::now(); let start = Instant::now();
let reactor = Reactor::new(); let reactor = Reactor::new();
@@ -809,15 +731,182 @@ fn main() {
block_on(mainfut); block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
# // ============================= EXECUTOR ====================================
# fn block_on<F: Future>(mut future: F) -> F::Output {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&waker);
# let val = loop {
# let pinned = unsafe { Pin::new_unchecked(&mut future) };
# match Future::poll(pinned, &mut cx) {
# Poll::Ready(val) => break val,
# Poll::Pending => thread::park(),
# };
# };
# val
# }
#
# fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&waker);
# let mut boxed = Box::pin(future);
# let _ = Future::poll(boxed.as_mut(), &mut cx);
# boxed
# }
#
# // ====================== FUTURE IMPLEMENTATION ==============================
# #[derive(Clone)]
# struct MyWaker {
# thread: thread::Thread,
# }
#
# #[derive(Clone)]
# pub struct Task {
# id: usize,
# reactor: Arc<Mutex<Reactor>>,
# data: u64,
# is_registered: bool,
# }
#
# fn mywaker_wake(s: &MyWaker) {
# let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
# waker_arc.thread.unpark();
# }
#
# fn mywaker_clone(s: &MyWaker) -> RawWaker {
# let arc = unsafe { Arc::from_raw(s).clone() };
# std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
# }
#
# const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new(
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# )
# };
#
# fn waker_into_waker(s: *const MyWaker) -> Waker {
# let raw_waker = RawWaker::new(s as *const (), &VTABLE);
# unsafe { Waker::from_raw(raw_waker) }
# }
#
# impl Task {
# fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
# Task {
# id,
# reactor,
# data,
# is_registered: false,
# }
# }
# }
#
# impl Future for Task {
# type Output = usize;
# fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
# let mut r = self.reactor.lock().unwrap();
# if r.is_ready(self.id) {
# Poll::Ready(self.id)
# } else if self.is_registered {
# Poll::Pending
# } else {
# r.register(self.data, cx.waker().clone(), self.id);
# drop(r);
# self.is_registered = true;
# Poll::Pending
# }
# }
# }
#
# // =============================== REACTOR ===================================
# struct Reactor {
# dispatcher: Sender<Event>,
# handle: Option<JoinHandle<()>>,
# readylist: Arc<Mutex<Vec<usize>>>,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Timeout(Waker, u64, usize),
# }
#
# impl Reactor {
# fn new() -> Self {
# let (tx, rx) = channel::<Event>();
# let readylist = Arc::new(Mutex::new(vec![]));
# let rl_clone = readylist.clone();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# println!("GOT EVENT: {:?}", event);
# let rl_clone = rl_clone.clone();
# match event {
# Event::Close => break,
# Event::Timeout(waker, duration, id) => {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
# waker.wake();
# });
#
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# readylist,
# dispatcher: tx,
# handle: Some(handle),
# }
# }
#
# fn register(&mut self, duration: u64, waker: Waker, data: usize) {
# self.dispatcher
# .send(Event::Timeout(waker, duration, data))
# .unwrap();
# }
#
# fn close(&mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
#
# fn is_ready(&self, id_to_check: usize) -> bool {
# self.readylist
# .lock()
# .map(|rl| rl.iter().any(|id| *id == id_to_check))
# .unwrap()
# }
# }
#
# impl Drop for Reactor {
# fn drop(&mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
``` ```
Now, if we try to run our example again
If you add this code to our example and run it, you'll see: If you add this code to our example and run it, you'll see:
```ignore ```ignore
Future got 1 at time: 1.00. Future got 1 at time: 1.00.
Future got 2 at time: 2.00. Future got 2 at time: 2.00.
``` ```
Exactly as we expected. Exactly as we expected.
Now this `spawn` method is not very sophisticated but it explains the concept. Now this `spawn` method is not very sophisticated but it explains the concept.

View File

@@ -13,16 +13,9 @@ use std::{
fn main() { fn main() {
let start = Instant::now(); let start = Instant::now();
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new(); let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor)); let reactor = Arc::new(Mutex::new(reactor));
let future1 = Task::new(reactor.clone(), 1, 1); let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2); let future2 = Task::new(reactor.clone(), 2, 2);
let fut1 = async { let fut1 = async {
@@ -38,15 +31,17 @@ fn main() {
}; };
let mainfut = async { let mainfut = async {
fut1.await; let handle1 = spawn(fut1);
fut2.await; let handle2 = spawn(fut2);
handle1.await;
handle2.await;
}; };
block_on(mainfut); block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap(); reactor.lock().map(|mut r| r.close()).unwrap();
} }
//// ============================ EXECUTOR ==================================== // ====================== FUTURE IMPLEMENTATION ==============================
fn block_on<F: Future>(mut future: F) -> F::Output { fn block_on<F: Future>(mut future: F) -> F::Output {
let mywaker = Arc::new(MyWaker{ thread: thread::current() }); let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker)); let waker = waker_into_waker(Arc::into_raw(mywaker));
@@ -61,6 +56,15 @@ fn block_on<F: Future>(mut future: F) -> F::Output {
val val
} }
fn spawn<F: Future>(future: F) -> Pin<Box<F>> {
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
let mut boxed = Box::pin(future);
let _ = Future::poll(boxed.as_mut(), &mut cx);
boxed
}
// ====================== FUTURE IMPLEMENTATION ============================== // ====================== FUTURE IMPLEMENTATION ==============================
#[derive(Clone)] #[derive(Clone)]
struct MyWaker { struct MyWaker {
@@ -150,6 +154,7 @@ impl Reactor {
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
// This simulates some I/O resource // This simulates some I/O resource
for event in rx { for event in rx {
println!("GOT EVENT: {:?}", event);
let rl_clone = rl_clone.clone(); let rl_clone = rl_clone.clone();
match event { match event {
Event::Close => break, Event::Close => break,