minor fixes to the debug printout of the main example

This commit is contained in:
Carl Fredrik Samson
2020-04-11 00:23:39 +02:00
parent 8934c46679
commit 02bb33c6b6
22 changed files with 1881 additions and 1616 deletions

View File

@@ -1,5 +1,5 @@
<!DOCTYPE HTML>
<html lang="en" class="sidebar-visible no-js">
<html lang="en" class="sidebar-visible no-js light">
<head>
<!-- Book generated using mdBook -->
<meta charset="UTF-8">
@@ -32,11 +32,11 @@
</head>
<body class="light">
<body>
<!-- Provide site root to javascript -->
<script type="text/javascript">
var path_to_root = "";
var default_theme = "light";
var default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? "light" : "light";
</script>
<!-- Work around some values being stored in localStorage wrapped in quotes -->
@@ -60,8 +60,11 @@
var theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
document.body.className = theme;
document.querySelector('html').className = theme + ' js';
var html = document.querySelector('html');
html.classList.remove('no-js')
html.classList.remove('light')
html.classList.add(theme);
html.classList.add('js');
</script>
<!-- Hide / unhide sidebar before it is displayed -->
@@ -77,8 +80,8 @@
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<div class="sidebar-scrollbox">
<ol class="chapter"><li class="affix"><a href="introduction.html">Introduction</a></li><li><a href="0_background_information.html"><strong aria-hidden="true">1.</strong> Background information</a></li><li><a href="1_futures_in_rust.html"><strong aria-hidden="true">2.</strong> Futures in Rust</a></li><li><a href="2_waker_context.html"><strong aria-hidden="true">3.</strong> Waker and Context</a></li><li><a href="3_generators_async_await.html"><strong aria-hidden="true">4.</strong> Generators and async/await</a></li><li><a href="4_pin.html"><strong aria-hidden="true">5.</strong> Pin</a></li><li><a href="6_future_example.html" class="active"><strong aria-hidden="true">6.</strong> Implementing Futures</a></li><li><a href="8_finished_example.html"><strong aria-hidden="true">7.</strong> Finished example (editable)</a></li><li class="affix"><a href="conclusion.html">Conclusion and exercises</a></li></ol>
<div id="sidebar-scrollbox" class="sidebar-scrollbox">
<ol class="chapter"><li class="expanded affix "><a href="introduction.html">Introduction</a></li><li class="expanded "><a href="0_background_information.html"><strong aria-hidden="true">1.</strong> Background information</a></li><li class="expanded "><a href="1_futures_in_rust.html"><strong aria-hidden="true">2.</strong> Futures in Rust</a></li><li class="expanded "><a href="2_waker_context.html"><strong aria-hidden="true">3.</strong> Waker and Context</a></li><li class="expanded "><a href="3_generators_async_await.html"><strong aria-hidden="true">4.</strong> Generators and async/await</a></li><li class="expanded "><a href="4_pin.html"><strong aria-hidden="true">5.</strong> Pin</a></li><li class="expanded "><a href="6_future_example.html" class="active"><strong aria-hidden="true">6.</strong> Implementing Futures</a></li><li class="expanded "><a href="8_finished_example.html"><strong aria-hidden="true">7.</strong> Finished example (editable)</a></li><li class="expanded affix "><a href="conclusion.html">Conclusion and exercises</a></li></ol>
</div>
<div id="sidebar-resize-handle" class="sidebar-resize-handle"></div>
</nav>
@@ -169,7 +172,7 @@ here will be in <code>main.rs</code></p>
<p>Let's start off by getting all our imports right away so you can follow along</p>
<pre><code class="language-rust noplaypen ignore">use std::{
future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
</code></pre>
@@ -569,13 +572,13 @@ and make it sleep for some time which we specify when we create a <code>Task</co
of seconds here, just give it some time to run.</p>
<p>In the last chapter we have the <a href="./8_finished_example.html">whole 200 lines in an editable window</a>
which you can edit and change the way you like.</p>
<pre><pre class="playpen"><code class="language-rust edition2018"># use std::{
# future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
# thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
# };
#
fn main() {
<pre><pre class="playpen"><code class="language-rust edition2018"><span class="boring">use std::{
</span><span class="boring"> future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
</span><span class="boring"> task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
</span><span class="boring"> thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
</span><span class="boring">};
</span><span class="boring">
</span>fn main() {
// This is just to make it easier for us to see when our Future was resolved
let start = Instant::now();
@@ -618,179 +621,181 @@ fn main() {
// ends nicely.
reactor.lock().map(|mut r| r.close()).unwrap();
}
# // ============================= EXECUTOR ====================================
# fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
# let mywaker = Arc::new(MyWaker {
# thread: thread::current(),
# });
# let waker = waker_into_waker(Arc::into_raw(mywaker));
# let mut cx = Context::from_waker(&amp;waker);
#
# // SAFETY: we shadow `future` so it can't be accessed again.
# let mut future = unsafe { Pin::new_unchecked(&amp;mut future) };
# let val = loop {
# match Future::poll(future.as_mut(), &amp;mut cx) {
# Poll::Ready(val) =&gt; break val,
# Poll::Pending =&gt; thread::park(),
# };
# };
# val
# }
#
# // ====================== FUTURE IMPLEMENTATION ==============================
# #[derive(Clone)]
# struct MyWaker {
# thread: thread::Thread,
# }
#
# #[derive(Clone)]
# pub struct Task {
# id: usize,
# reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;,
# data: u64,
# }
#
# fn mywaker_wake(s: &amp;MyWaker) {
# let waker_ptr: *const MyWaker = s;
# let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
# waker_arc.thread.unpark();
# }
#
# fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
# let arc = unsafe { Arc::from_raw(s) };
# std::mem::forget(arc.clone()); // increase ref count
# RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
# }
#
# const VTABLE: RawWakerVTable = unsafe {
# RawWakerVTable::new(
# |s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
# |s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
# |s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
# )
# };
#
# fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
# let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
# unsafe { Waker::from_raw(raw_waker) }
# }
#
# impl Task {
# fn new(reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;, data: u64, id: usize) -&gt; Self {
# Task { id, reactor, data }
# }
# }
#
# impl Future for Task {
# type Output = usize;
# fn poll(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
# let mut r = self.reactor.lock().unwrap();
# if r.is_ready(self.id) {
# *r.tasks.get_mut(&amp;self.id).unwrap() = TaskState::Finished;
# Poll::Ready(self.id)
# } else if r.tasks.contains_key(&amp;self.id) {
# r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
# Poll::Pending
# } else {
# r.register(self.data, cx.waker().clone(), self.id);
# Poll::Pending
# }
# }
# }
#
# // =============================== REACTOR ===================================
# enum TaskState {
# Ready,
# NotReady(Waker),
# Finished,
# }
# struct Reactor {
# dispatcher: Sender&lt;Event&gt;,
# handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
# tasks: HashMap&lt;usize, TaskState&gt;,
# }
#
# #[derive(Debug)]
# enum Event {
# Close,
# Timeout(u64, usize),
# }
#
# impl Reactor {
# fn new() -&gt; Arc&lt;Mutex&lt;Box&lt;Self&gt;&gt;&gt; {
# let (tx, rx) = channel::&lt;Event&gt;();
# let reactor = Arc::new(Mutex::new(Box::new(Reactor {
# dispatcher: tx,
# handle: None,
# tasks: HashMap::new(),
# })));
#
# let reactor_clone = Arc::downgrade(&amp;reactor);
# let handle = thread::spawn(move || {
# let mut handles = vec![];
# // This simulates some I/O resource
# for event in rx {
# println!(&quot;REACTOR: {:?}&quot;, event);
# let reactor = reactor_clone.clone();
# match event {
# Event::Close =&gt; break,
# Event::Timeout(duration, id) =&gt; {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# let reactor = reactor.upgrade().unwrap();
# reactor.lock().map(|mut r| r.wake(id)).unwrap();
# });
# handles.push(event_handle);
# }
# }
# }
# handles.into_iter().for_each(|handle| handle.join().unwrap());
# });
# reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
# reactor
# }
#
# fn wake(&amp;mut self, id: usize) {
# self.tasks.get_mut(&amp;id).map(|state| {
# match mem::replace(state, TaskState::Ready) {
# TaskState::NotReady(waker) =&gt; waker.wake(),
# TaskState::Finished =&gt; panic!(&quot;Called 'wake' twice on task: {}&quot;, id),
# _ =&gt; unreachable!()
# }
# }).unwrap();
# }
#
# fn register(&amp;mut self, duration: u64, waker: Waker, id: usize) {
# if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
# panic!(&quot;Tried to insert a task with id: '{}', twice!&quot;, id);
# }
# self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
# }
#
# fn close(&amp;mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
#
# fn is_ready(&amp;self, id: usize) -&gt; bool {
# self.tasks.get(&amp;id).map(|state| match state {
# TaskState::Ready =&gt; true,
# _ =&gt; false,
# }).unwrap_or(false)
# }
# }
#
# impl Drop for Reactor {
# fn drop(&amp;mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
</code></pre></pre>
<p>I added a debug printout of the events the reactor registered interest for so we can observe
two things:</p>
<span class="boring">// ============================= EXECUTOR ====================================
</span><span class="boring">fn block_on&lt;F: Future&gt;(mut future: F) -&gt; F::Output {
</span><span class="boring"> let mywaker = Arc::new(MyWaker {
</span><span class="boring"> thread: thread::current(),
</span><span class="boring"> });
</span><span class="boring"> let waker = waker_into_waker(Arc::into_raw(mywaker));
</span><span class="boring"> let mut cx = Context::from_waker(&amp;waker);
</span><span class="boring">
</span><span class="boring"> // SAFETY: we shadow `future` so it can't be accessed again.
</span><span class="boring"> let mut future = unsafe { Pin::new_unchecked(&amp;mut future) };
</span><span class="boring"> let val = loop {
</span><span class="boring"> match Future::poll(future.as_mut(), &amp;mut cx) {
</span><span class="boring"> Poll::Ready(val) =&gt; break val,
</span><span class="boring"> Poll::Pending =&gt; thread::park(),
</span><span class="boring"> };
</span><span class="boring"> };
</span><span class="boring"> val
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">// ====================== FUTURE IMPLEMENTATION ==============================
</span><span class="boring">#[derive(Clone)]
</span><span class="boring">struct MyWaker {
</span><span class="boring"> thread: thread::Thread,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">#[derive(Clone)]
</span><span class="boring">pub struct Task {
</span><span class="boring"> id: usize,
</span><span class="boring"> reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;,
</span><span class="boring"> data: u64,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">fn mywaker_wake(s: &amp;MyWaker) {
</span><span class="boring"> let waker_ptr: *const MyWaker = s;
</span><span class="boring"> let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
</span><span class="boring"> waker_arc.thread.unpark();
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">fn mywaker_clone(s: &amp;MyWaker) -&gt; RawWaker {
</span><span class="boring"> let arc = unsafe { Arc::from_raw(s) };
</span><span class="boring"> std::mem::forget(arc.clone()); // increase ref count
</span><span class="boring"> RawWaker::new(Arc::into_raw(arc) as *const (), &amp;VTABLE)
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">const VTABLE: RawWakerVTable = unsafe {
</span><span class="boring"> RawWakerVTable::new(
</span><span class="boring"> |s| mywaker_clone(&amp;*(s as *const MyWaker)), // clone
</span><span class="boring"> |s| mywaker_wake(&amp;*(s as *const MyWaker)), // wake
</span><span class="boring"> |s| mywaker_wake(*(s as *const &amp;MyWaker)), // wake by ref
</span><span class="boring"> |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
</span><span class="boring"> )
</span><span class="boring">};
</span><span class="boring">
</span><span class="boring">fn waker_into_waker(s: *const MyWaker) -&gt; Waker {
</span><span class="boring"> let raw_waker = RawWaker::new(s as *const (), &amp;VTABLE);
</span><span class="boring"> unsafe { Waker::from_raw(raw_waker) }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Task {
</span><span class="boring"> fn new(reactor: Arc&lt;Mutex&lt;Box&lt;Reactor&gt;&gt;&gt;, data: u64, id: usize) -&gt; Self {
</span><span class="boring"> Task { id, reactor, data }
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Future for Task {
</span><span class="boring"> type Output = usize;
</span><span class="boring"> fn poll(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;'_&gt;) -&gt; Poll&lt;Self::Output&gt; {
</span><span class="boring"> let mut r = self.reactor.lock().unwrap();
</span><span class="boring"> if r.is_ready(self.id) {
</span><span class="boring"> println!(&quot;POLL: TASK {} IS READY&quot;, self.id);
</span><span class="boring"> *r.tasks.get_mut(&amp;self.id).unwrap() = TaskState::Finished;
</span><span class="boring"> Poll::Ready(self.id)
</span><span class="boring"> } else if r.tasks.contains_key(&amp;self.id) {
</span><span class="boring"> println!(&quot;POLL: REPLACED WAKER FOR TASK: {}&quot;, self.id);
</span><span class="boring"> r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
</span><span class="boring"> Poll::Pending
</span><span class="boring"> } else {
</span><span class="boring"> println!(&quot;POLL: REGISTERED TASK: {}, WAKER: {:?}&quot;, self.id, cx.waker());
</span><span class="boring"> r.register(self.data, cx.waker().clone(), self.id);
</span><span class="boring"> Poll::Pending
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">// =============================== REACTOR ===================================
</span><span class="boring">enum TaskState {
</span><span class="boring"> Ready,
</span><span class="boring"> NotReady(Waker),
</span><span class="boring"> Finished,
</span><span class="boring">}
</span><span class="boring">struct Reactor {
</span><span class="boring"> dispatcher: Sender&lt;Event&gt;,
</span><span class="boring"> handle: Option&lt;JoinHandle&lt;()&gt;&gt;,
</span><span class="boring"> tasks: HashMap&lt;usize, TaskState&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">#[derive(Debug)]
</span><span class="boring">enum Event {
</span><span class="boring"> Close,
</span><span class="boring"> Timeout(u64, usize),
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Reactor {
</span><span class="boring"> fn new() -&gt; Arc&lt;Mutex&lt;Box&lt;Self&gt;&gt;&gt; {
</span><span class="boring"> let (tx, rx) = channel::&lt;Event&gt;();
</span><span class="boring"> let reactor = Arc::new(Mutex::new(Box::new(Reactor {
</span><span class="boring"> dispatcher: tx,
</span><span class="boring"> handle: None,
</span><span class="boring"> tasks: HashMap::new(),
</span><span class="boring"> })));
</span><span class="boring">
</span><span class="boring"> let reactor_clone = Arc::downgrade(&amp;reactor);
</span><span class="boring"> let handle = thread::spawn(move || {
</span><span class="boring"> let mut handles = vec![];
</span><span class="boring"> // This simulates some I/O resource
</span><span class="boring"> for event in rx {
</span><span class="boring"> println!(&quot;REACTOR: {:?}&quot;, event);
</span><span class="boring"> let reactor = reactor_clone.clone();
</span><span class="boring"> match event {
</span><span class="boring"> Event::Close =&gt; break,
</span><span class="boring"> Event::Timeout(duration, id) =&gt; {
</span><span class="boring"> let event_handle = thread::spawn(move || {
</span><span class="boring"> thread::sleep(Duration::from_secs(duration));
</span><span class="boring"> let reactor = reactor.upgrade().unwrap();
</span><span class="boring"> reactor.lock().map(|mut r| r.wake(id)).unwrap();
</span><span class="boring"> });
</span><span class="boring"> handles.push(event_handle);
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring"> }
</span><span class="boring"> handles.into_iter().for_each(|handle| handle.join().unwrap());
</span><span class="boring"> });
</span><span class="boring"> reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
</span><span class="boring"> reactor
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn wake(&amp;mut self, id: usize) {
</span><span class="boring"> self.tasks.get_mut(&amp;id).map(|state| {
</span><span class="boring"> match mem::replace(state, TaskState::Ready) {
</span><span class="boring"> TaskState::NotReady(waker) =&gt; waker.wake(),
</span><span class="boring"> TaskState::Finished =&gt; panic!(&quot;Called 'wake' twice on task: {}&quot;, id),
</span><span class="boring"> _ =&gt; unreachable!()
</span><span class="boring"> }
</span><span class="boring"> }).unwrap();
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn register(&amp;mut self, duration: u64, waker: Waker, id: usize) {
</span><span class="boring"> if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
</span><span class="boring"> panic!(&quot;Tried to insert a task with id: '{}', twice!&quot;, id);
</span><span class="boring"> }
</span><span class="boring"> self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn close(&amp;mut self) {
</span><span class="boring"> self.dispatcher.send(Event::Close).unwrap();
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> fn is_ready(&amp;self, id: usize) -&gt; bool {
</span><span class="boring"> self.tasks.get(&amp;id).map(|state| match state {
</span><span class="boring"> TaskState::Ready =&gt; true,
</span><span class="boring"> _ =&gt; false,
</span><span class="boring"> }).unwrap_or(false)
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Drop for Reactor {
</span><span class="boring"> fn drop(&amp;mut self) {
</span><span class="boring"> self.handle.take().map(|h| h.join().unwrap()).unwrap();
</span><span class="boring"> }
</span><span class="boring">}
</span></code></pre></pre>
<p>I added a some debug printouts so we can observe a couple of things:</p>
<ol>
<li>How the <code>Waker</code> object looks just like the <em>trait object</em> we talked about in an earlier chapter</li>
<li>In what order the events register interest with the reactor</li>
<li>The program flow from start to finish</li>
</ol>
<p>The last point is relevant when we move on the the last paragraph.</p>
<h2><a class="header" href="#asyncawait-and-concurrecy" id="asyncawait-and-concurrecy">Async/Await and concurrecy</a></h2>
@@ -907,6 +912,18 @@ do really hope that you do continue to explore further.</p>
<script type="text/javascript">
window.playpen_line_numbers = true;
</script>
<script type="text/javascript">
window.playpen_copyable = true;
</script>
<script src="ace.js" type="text/javascript" charset="utf-8"></script>
<script src="editor.js" type="text/javascript" charset="utf-8"></script>
<script src="mode-rust.js" type="text/javascript" charset="utf-8"></script>