added main example

This commit is contained in:
Carl Fredrik Samson
2020-01-30 21:02:37 +01:00
parent a171ae55d0
commit 59f00d69e9
26 changed files with 2197 additions and 1966 deletions

View File

@@ -1,268 +0,0 @@
# Naive example
```rust
#![feature(duration_float)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
fn main() {
let readylist = Arc::new(Mutex::new(vec![]));
let mut reactor = Reactor::new();
let mywaker = MyWaker::new(1, thread::current(), readylist.clone());
reactor.register(2, mywaker);
let mywaker = MyWaker::new(2, thread::current(), readylist.clone());
reactor.register(2, mywaker);
executor_run(reactor, readylist);
}
// ====== EXECUTOR ======
fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) {
let start = Instant::now();
loop {
let mut rl_locked = rl.lock().unwrap();
while let Some(event) = rl_locked.pop() {
let dur = (Instant::now() - start).as_secs_f32();
println!("Event {} just happened at time: {:.2}.", event, dur);
reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
}
drop(rl_locked);
if reactor.outstanding.load(Ordering::Relaxed) == 0 {
reactor.close();
break;
}
thread::park();
}
}
// ====== "FUTURE" IMPL ======
#[derive(Debug)]
struct MyWaker {
id: usize,
thread: thread::Thread,
readylist: Arc<Mutex<Vec<usize>>>,
}
impl MyWaker {
fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self {
MyWaker {
id,
thread,
readylist,
}
}
fn wake(&self) {
self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
self.thread.unpark();
}
}
#[derive(Debug, Clone)]
pub struct Task {
id: usize,
pending: bool,
}
// ===== REACTOR =====
struct Reactor {
dispatcher: Sender<Event>,
handle: Option<JoinHandle<()>>,
outstanding: AtomicUsize,
}
#[derive(Debug)]
enum Event {
Close,
Simple(MyWaker, u64),
}
impl Reactor {
fn new() -> Self {
let (tx, rx) = channel::<Event>();
let mut handles = vec![];
let handle = thread::spawn(move || {
// This simulates some I/O resource
for event in rx {
match event {
Event::Close => break,
Event::Simple(mywaker, duration) => {
let event_handle = thread::spawn(move || {
thread::sleep(Duration::from_secs(duration));
mywaker.wake();
});
handles.push(event_handle);
}
}
}
for handle in handles {
handle.join().unwrap();
}
});
Reactor {
dispatcher: tx,
handle: Some(handle),
outstanding: AtomicUsize::new(0),
}
}
fn register(&mut self, duration: u64, mywaker: MyWaker) {
self.dispatcher
.send(Event::Simple(mywaker, duration))
.unwrap();
self.outstanding.fetch_add(1, Ordering::Relaxed);
}
fn close(&mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
}
impl Drop for Reactor {
fn drop(&mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}
```
```rust
#![feature(duration_float)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
fn main() {
let readylist = Arc::new(Mutex::new(vec![]));
let mut reactor = Reactor::new();
let mywaker = MyWaker::new(1, thread::current(), readylist.clone());
reactor.register(2, mywaker);
let mywaker = MyWaker::new(2, thread::current(), readylist.clone());
reactor.register(2, mywaker);
executor_run(reactor, readylist);
}
# // ====== EXECUTOR ======
# fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) {
# let start = Instant::now();
# loop {
# let mut rl_locked = rl.lock().unwrap();
# while let Some(event) = rl_locked.pop() {
# let dur = (Instant::now() - start).as_secs_f32();
# println!("Event {} just happened at time: {:.2}.", event, dur);
# reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
# }
# drop(rl_locked);
#
# if reactor.outstanding.load(Ordering::Relaxed) == 0 {
# reactor.close();
# break;
# }
#
# thread::park();
# }
# }
#
# // ====== "FUTURE" IMPL ======
# #[derive(Debug)]
# struct MyWaker {
# id: usize,
# thread: thread::Thread,
# readylist: Arc<Mutex<Vec<usize>>>,
# }
#
# impl MyWaker {
# fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self {
# MyWaker {
# id,
# thread,
# readylist,
# }
# }
#
# fn wake(&self) {
# self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
# self.thread.unpark();
# }
# }
#
#
# #[derive(Debug, Clone)]
# pub struct Task {
# id: usize,
# pending: bool,
# }
#
# // ===== REACTOR =====
# struct Reactor {
# dispatcher: Sender<Event>,
# handle: Option<JoinHandle<()>>,
# outstanding: AtomicUsize,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Simple(MyWaker, u64),
# }
#
# impl Reactor {
# fn new() -> Self {
# let (tx, rx) = channel::<Event>();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# match event {
# Event::Close => break,
# Event::Simple(mywaker, duration) => {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# mywaker.wake();
# });
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# dispatcher: tx,
# handle: Some(handle),
# outstanding: AtomicUsize::new(0),
# }
# }
#
# fn register(&mut self, duration: u64, mywaker: MyWaker) {
# self.dispatcher
# .send(Event::Simple(mywaker, duration))
# .unwrap();
# self.outstanding.fetch_add(1, Ordering::Relaxed);
# }
#
# fn close(&mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
# }
#
# impl Drop for Reactor {
# fn drop(&mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }
```

View File

@@ -1 +0,0 @@
# Proper Waker

View File

@@ -1 +0,0 @@
# Proper Future

View File

@@ -1 +0,0 @@
# Supporting async/await

View File

@@ -1 +0,0 @@
# Bonus: concurrent futures

View File

@@ -503,250 +503,10 @@ they did their unsafe implementation.
Now, the code which is created and the need for `Pin` to allow for borrowing
across `yield` points should be pretty clear.
## Pin
> **Relevant for**
>
> 1. To understand `Generators` and `Futures`
> 2. Knowing how to use `Pin` is required when implementing your own `Future`
> 3. To understand self-referential types in Rust
> 4. This is the way borrowing across `await` points is accomplished
>
> `Pin` was suggested in [RFC#2349][rfc2349]
Ping consists of the `Pin` type and the `Unpin` marker. Let's start off with some general rules:
1. Pin does nothing special, it only prevents the user of an API to violate some assumtions you make when writing your (most likely) unsafe code.
2. Most standard library types implement `Unpin`
3. `Unpin` means it's OK for this type to be moved even when pinned.
4. If you `Box` a value, that boxed value automatcally implements `Unpin`.
5. The main use case for `Pin` is to allow self referential types
6. The implementation behind objects that doens't implement `Unpin` is most likely unsafe
1. `Pin` prevents users from your code to break the assumtions you make when writing the `unsafe` implementation
2. It doesn't solve the fact that you'll have to write unsafe code to actually implement it
7. You're not really meant to be implementing `!Unpin`, but you can on nightly with a feature flag
> Unsafe code does not mean it's litterally "unsafe", it only relieves the
> guarantees you normally get from the compiler. An `unsafe` implementation can
> be perfectly safe to do, but you have no safety net.
Let's take a look at an example:
```rust,editable
use std::pin::Pin;
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b());
std::mem::swap(&mut test1, &mut test2); // try commenting out this line
println!("a: {}, b: {}", test2.a(), test2.b());
}
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
}
impl Test {
fn new(txt: &str) -> Self {
let a = String::from(txt);
Test {
a,
b: std::ptr::null(),
}
}
fn init(&mut self) {
let self_ref: *const String = &self.a;
self.b = self_ref;
}
fn a(&self) -> &str {
&self.a
}
fn b(&self) -> &String {
unsafe {&*(self.b)}
}
}
```
As you can see this results in unwanted behavior. The pointer to `b` stays the
same and points to the old value. It's easy to get this to segfault, and fail
in other spectacular ways as well.
Pin essentially prevents the **user** of your unsafe code
(even if that means yourself) move the value after it's pinned.
If we change the example to using `Pin` instead:
```rust,editable
use std::pin::Pin;
use std::marker::PhantomPinned;
pub fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
let mut test2 = Test::new("test2");
test2.init();
let mut test2_pin = unsafe { Pin::new_unchecked(&mut test2) };
println!(
"a: {}, b: {}",
Test::a(test1_pin.as_ref()),
Test::b(test1_pin.as_ref())
);
// Try to uncomment this and see what happens
// std::mem::swap(test1_pin.as_mut(), test2_pin.as_mut());
println!(
"a: {}, b: {}",
Test::a(test2_pin.as_ref()),
Test::b(test2_pin.as_ref())
);
}
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
let a = String::from(txt);
Test {
a,
b: std::ptr::null(),
// This makes our type `!Unpin`
_marker: PhantomPinned,
}
}
fn init(&mut self) {
let self_ptr: *const String = &self.a;
self.b = self_ptr;
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
```
Now, what we've done here is pinning a stack address. That will always be
`unsafe` if our type implements `!Unpin`, in other words. That our type is not
`Unpin` which is the norm.
We use some tricks here, including requiring an `init`. If we want to fix that
and let users avoid `unsafe` we need to place our data on the heap.
Stack pinning will always depend on the current stack frame we're in, so we
can't create a self referential object in one stack frame and return it since
any pointers we take to "self" is invalidated.
The next example solves some of our friction at the cost of a heap allocation.
```rust, editbable
use std::pin::Pin;
use std::marker::PhantomPinned;
pub fn main() {
let mut test1 = Test::new("test1");
let mut test2 = Test::new("test2");
println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
// Try to uncomment this and see what happens
// std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let a = String::from(txt);
let t = Test {
a,
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.as_ref().a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
```
Seeing this we're ready to sum up with a few more points to remember about
pinning:
1. Pinning only makes sense to do for types that are `!Unpin`
2. Pinning a `!Unpin` pointer to the stack will requires `unsafe`
3. Pinning a boxed value will not require `unsafe`, even if the type is `!Unpin`
4. If T: Unpin (which is the default), then Pin<'a, T> is entirely equivalent to &'a mut T.
5. Getting a `&mut T` to a pinned pointer requires unsafe if `T: !Unpin`
6. Pinning is really only useful when implementing self-referential types.
For all intents and purposes you can think of `!Unpin` = self-referential-type
The fact that boxing (heap allocating) a value that implements `!Unpin` is safe
makes sense. Once the data is allocated on the heap it will have a stable address.
There are ways to safely give some guarantees on stack pinning as well, but right
now you need to use a crate like [pin_utils]:[pin_utils] to do that.
### Projection/structural pinning
In short, projection is using a field on your type. `mystruct.field1` is a
projection. Structural pinning is using `Pin` on struct fields. This has several
caveats and is not something you'll normally see so I refer to the documentation
for that.
### Pin and Drop
The `Pin` guarantee exists from the moment the value is pinned until it's dropped.
In the `Drop` implementation you take a mutabl reference to `self`, which means
extra care must be taken when implementing `Drop` for pinned types.
## Putting it all together
This is exactly what we'll do when we implement our own `Futures` stay tuned,
we're soon finished.
[rfc2033]: https://github.com/rust-lang/rfcs/blob/master/text/2033-experimental-coroutines.md
[greenthreads]: https://cfsamson.gitbook.io/green-threads-explained-in-200-lines-of-rust/
[rfc1823]: https://github.com/rust-lang/rfcs/pull/1823
[rfc1832]: https://github.com/rust-lang/rfcs/pull/1832
[rfc2349]: https://github.com/rust-lang/rfcs/blob/master/text/2349-pin.md
[optimizing-await]: https://tmandry.gitlab.io/blog/posts/optimizing-await-1/
[pin_utils]: https://github.com/rust-lang/rfcs/blob/master/text/2349-pin.md
[optimizing-await]: https://tmandry.gitlab.io/blog/posts/optimizing-await-1/

242
src/1_3_pin.md Normal file
View File

@@ -0,0 +1,242 @@
## Pin
> **Relevant for**
>
> 1. To understand `Generators` and `Futures`
> 2. Knowing how to use `Pin` is required when implementing your own `Future`
> 3. To understand self-referential types in Rust
> 4. This is the way borrowing across `await` points is accomplished
>
> `Pin` was suggested in [RFC#2349][rfc2349]
Ping consists of the `Pin` type and the `Unpin` marker. Let's start off with some general rules:
1. Pin does nothing special, it only prevents the user of an API to violate some assumtions you make when writing your (most likely) unsafe code.
2. Most standard library types implement `Unpin`
3. `Unpin` means it's OK for this type to be moved even when pinned.
4. If you `Box` a value, that boxed value automatcally implements `Unpin`.
5. The main use case for `Pin` is to allow self referential types
6. The implementation behind objects that doens't implement `Unpin` is most likely unsafe
1. `Pin` prevents users from your code to break the assumtions you make when writing the `unsafe` implementation
2. It doesn't solve the fact that you'll have to write unsafe code to actually implement it
7. You're not really meant to be implementing `!Unpin`, but you can on nightly with a feature flag
> Unsafe code does not mean it's litterally "unsafe", it only relieves the
> guarantees you normally get from the compiler. An `unsafe` implementation can
> be perfectly safe to do, but you have no safety net.
Let's take a look at an example:
```rust,editable
use std::pin::Pin;
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b());
std::mem::swap(&mut test1, &mut test2); // try commenting out this line
println!("a: {}, b: {}", test2.a(), test2.b());
}
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
}
impl Test {
fn new(txt: &str) -> Self {
let a = String::from(txt);
Test {
a,
b: std::ptr::null(),
}
}
fn init(&mut self) {
let self_ref: *const String = &self.a;
self.b = self_ref;
}
fn a(&self) -> &str {
&self.a
}
fn b(&self) -> &String {
unsafe {&*(self.b)}
}
}
```
As you can see this results in unwanted behavior. The pointer to `b` stays the
same and points to the old value. It's easy to get this to segfault, and fail
in other spectacular ways as well.
Pin essentially prevents the **user** of your unsafe code
(even if that means yourself) move the value after it's pinned.
If we change the example to using `Pin` instead:
```rust,editable
use std::pin::Pin;
use std::marker::PhantomPinned;
pub fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
let mut test2 = Test::new("test2");
test2.init();
let mut test2_pin = unsafe { Pin::new_unchecked(&mut test2) };
println!(
"a: {}, b: {}",
Test::a(test1_pin.as_ref()),
Test::b(test1_pin.as_ref())
);
// Try to uncomment this and see what happens
// std::mem::swap(test1_pin.as_mut(), test2_pin.as_mut());
println!(
"a: {}, b: {}",
Test::a(test2_pin.as_ref()),
Test::b(test2_pin.as_ref())
);
}
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
let a = String::from(txt);
Test {
a,
b: std::ptr::null(),
// This makes our type `!Unpin`
_marker: PhantomPinned,
}
}
fn init(&mut self) {
let self_ptr: *const String = &self.a;
self.b = self_ptr;
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
```
Now, what we've done here is pinning a stack address. That will always be
`unsafe` if our type implements `!Unpin`, in other words. That our type is not
`Unpin` which is the norm.
We use some tricks here, including requiring an `init`. If we want to fix that
and let users avoid `unsafe` we need to place our data on the heap.
Stack pinning will always depend on the current stack frame we're in, so we
can't create a self referential object in one stack frame and return it since
any pointers we take to "self" is invalidated.
The next example solves some of our friction at the cost of a heap allocation.
```rust, editbable
use std::pin::Pin;
use std::marker::PhantomPinned;
pub fn main() {
let mut test1 = Test::new("test1");
let mut test2 = Test::new("test2");
println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
// Try to uncomment this and see what happens
// std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let a = String::from(txt);
let t = Test {
a,
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.as_ref().a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
```
Seeing this we're ready to sum up with a few more points to remember about
pinning:
1. Pinning only makes sense to do for types that are `!Unpin`
2. Pinning a `!Unpin` pointer to the stack will requires `unsafe`
3. Pinning a boxed value will not require `unsafe`, even if the type is `!Unpin`
4. If T: Unpin (which is the default), then Pin<'a, T> is entirely equivalent to &'a mut T.
5. Getting a `&mut T` to a pinned pointer requires unsafe if `T: !Unpin`
6. Pinning is really only useful when implementing self-referential types.
For all intents and purposes you can think of `!Unpin` = self-referential-type
The fact that boxing (heap allocating) a value that implements `!Unpin` is safe
makes sense. Once the data is allocated on the heap it will have a stable address.
There are ways to safely give some guarantees on stack pinning as well, but right
now you need to use a crate like [pin_utils]:[pin_utils] to do that.
### Projection/structural pinning
In short, projection is using a field on your type. `mystruct.field1` is a
projection. Structural pinning is using `Pin` on struct fields. This has several
caveats and is not something you'll normally see so I refer to the documentation
for that.
### Pin and Drop
The `Pin` guarantee exists from the moment the value is pinned until it's dropped.
In the `Drop` implementation you take a mutable reference to `self`, which means
extra care must be taken when implementing `Drop` for pinned types.
## Putting it all together
This is exactly what we'll do when we implement our own `Futures` stay tuned,
we're soon finished.
[rfc2349]: https://github.com/rust-lang/rfcs/blob/master/text/2349-pin.md
[pin_utils]: https://github.com/rust-lang/rfcs/blob/master/text/2349-pin.md

195
src/2_0_future_example.md Normal file
View File

@@ -0,0 +1,195 @@
# Implementing our own Future
```rust
use std::{
future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread::{self, JoinHandle}, time::{Duration, Instant}
};
fn main() {
let start = Instant::now();
// Many runtimes create a glocal `reactor` we pass it as an argument
let reactor = Reactor::new();
let reactor = Arc::new(Mutex::new(reactor));
let future1 = Task::new(reactor.clone(), 2, 1);
let future2 = Task::new(reactor.clone(), 1, 2);
let fut1 = async {
let val = future1.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
};
let fut2 = async {
let val = future2.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
};
let mainfut = async {
fut1.await;
fut2.await;
};
block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap();
}
//// ============================ EXECUTOR ====================================
fn block_on<F: Future>(mut future: F) -> F::Output {
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
let val = loop {
let pinned = unsafe { Pin::new_unchecked(&mut future) };
match Future::poll(pinned, &mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => thread::park(),
};
};
val
}
// ====================== FUTURE IMPLEMENTATION ==============================
#[derive(Clone)]
struct MyWaker {
thread: thread::Thread,
}
#[derive(Clone)]
pub struct Task {
id: usize,
reactor: Arc<Mutex<Reactor>>,
data: u64,
is_registered: bool,
}
fn mywaker_wake(s: &MyWaker) {
let waker_ptr: *const MyWaker = s;
let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
waker_arc.thread.unpark();
}
fn mywaker_clone(s: &MyWaker) -> RawWaker {
let arc = unsafe { Arc::from_raw(s).clone() };
std::mem::forget(arc.clone()); // increase ref count
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}
const VTABLE: RawWakerVTable = unsafe {
RawWakerVTable::new(
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
)
};
fn waker_into_waker(s: *const MyWaker) -> Waker {
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
impl Task {
fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
Task {
id,
reactor,
data,
is_registered: false,
}
}
}
impl Future for Task {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut r = self.reactor.lock().unwrap();
if r.is_ready(self.id) {
Poll::Ready(self.id)
} else if self.is_registered {
Poll::Pending
} else {
r.register(self.data, cx.waker().clone(), self.id);
drop(r);
self.is_registered = true;
Poll::Pending
}
}
}
// =============================== REACTOR ===================================
struct Reactor {
dispatcher: Sender<Event>,
handle: Option<JoinHandle<()>>,
readylist: Arc<Mutex<Vec<usize>>>,
}
#[derive(Debug)]
enum Event {
Close,
Simple(Waker, u64, usize),
}
impl Reactor {
fn new() -> Self {
let (tx, rx) = channel::<Event>();
let readylist = Arc::new(Mutex::new(vec![]));
let rl_clone = readylist.clone();
let mut handles = vec![];
let handle = thread::spawn(move || {
// This simulates some I/O resource
for event in rx {
let rl_clone = rl_clone.clone();
match event {
Event::Close => break,
Event::Simple(waker, duration, id) => {
let event_handle = thread::spawn(move || {
thread::sleep(Duration::from_secs(duration));
rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
waker.wake();
});
handles.push(event_handle);
}
}
}
for handle in handles {
handle.join().unwrap();
}
});
Reactor {
readylist,
dispatcher: tx,
handle: Some(handle),
}
}
fn register(&mut self, duration: u64, waker: Waker, data: usize) {
self.dispatcher
.send(Event::Simple(waker, duration, data))
.unwrap();
}
fn close(&mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
fn is_ready(&self, id_to_check: usize) -> bool {
self.readylist
.lock()
.map(|rl| rl.iter().any(|id| *id == id_to_check))
.unwrap()
}
}
impl Drop for Reactor {
fn drop(&mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}
```

View File

@@ -0,0 +1 @@
# Bonus 1: concurrent futures

View File

@@ -1,11 +1,9 @@
# Summary
- [Introduction](./0_0_introduction.md)
- [Some background information](./0_1_background_information.md)
- [Trait objects and fat pointers](./0_1_1_trait_objects.md)
- [Generators and Pin](./0_1_2_generators_pin.md)
- [Naive example](./0_2_naive_implementation.md)
- [Proper Waker](./0_3_proper_waker.md)
- [Proper Future](0_4_proper_future.md)
- [Supporting async/await](0_5_async_wait.md)
- [Bonus: concurrent futures](0_6_concurrent_futures.md)
- [Some background information](./1_0_background_information.md)
- [Trait objects and fat pointers](./1_1_trait_objects.md)
- [Generators and Pin](./1_2_generators_pin.md)
- [Pin](./1_3_pin.md)
- [The main example](./2_0_future_example.md)
- [Bonus 1: concurrent futures](2_1_concurrent_futures.md)