several improvements, see #2 for more details
This commit is contained in:
@@ -23,9 +23,9 @@ Let's start off by getting all our imports right away so you can follow along
|
||||
|
||||
```rust, noplaypen, ignore
|
||||
use std::{
|
||||
future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
|
||||
future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
|
||||
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
|
||||
thread::{self, JoinHandle}, time::{Duration, Instant}
|
||||
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
|
||||
};
|
||||
```
|
||||
|
||||
@@ -131,9 +131,8 @@ struct MyWaker {
|
||||
#[derive(Clone)]
|
||||
pub struct Task {
|
||||
id: usize,
|
||||
reactor: Arc<Mutex<Reactor>>,
|
||||
reactor: Arc<Mutex<Box<Reactor>>>,
|
||||
data: u64,
|
||||
is_registered: bool,
|
||||
}
|
||||
|
||||
// These are function definitions we'll use for our waker. Remember the
|
||||
@@ -173,48 +172,57 @@ fn waker_into_waker(s: *const MyWaker) -> Waker {
|
||||
}
|
||||
|
||||
impl Task {
|
||||
fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
|
||||
Task {
|
||||
id,
|
||||
reactor,
|
||||
data,
|
||||
is_registered: false,
|
||||
}
|
||||
fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
|
||||
Task { id, reactor, data }
|
||||
}
|
||||
}
|
||||
|
||||
// This is our `Future` implementation
|
||||
impl Future for Task {
|
||||
|
||||
// The output for our kind of `leaf future` is just an `usize`. For other
|
||||
// futures this could be something more interesting like a byte array.
|
||||
type Output = usize;
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
|
||||
// Poll is the what drives the state machine forward and it's the only
|
||||
// method we'll need to call to drive futures to completion.
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
|
||||
// We need to get access the reactor in our `poll` method so we acquire
|
||||
// a lock on that.
|
||||
let mut r = self.reactor.lock().unwrap();
|
||||
|
||||
// we check with the `Reactor` if this future is in its "readylist"
|
||||
// i.e. if it's `Ready`
|
||||
// First we check if the task is marked as ready
|
||||
if r.is_ready(self.id) {
|
||||
|
||||
// if it is, we return the data. In this case it's just the ID of
|
||||
// the task since this is just a very simple example.
|
||||
// If it's ready we set its state to `Finished`
|
||||
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
|
||||
Poll::Ready(self.id)
|
||||
} else if self.is_registered {
|
||||
|
||||
// If it isn't finished we check the map we have stored in our Reactor
|
||||
// over id's we have registered and see if it's there
|
||||
} else if r.tasks.contains_key(&self.id) {
|
||||
|
||||
// If the future is registered alredy, we just return `Pending`
|
||||
// This is important. The docs says that on multiple calls to poll,
|
||||
// only the Waker from the Context passed to the most recent call
|
||||
// should be scheduled to receive a wakeup. That's why we insert
|
||||
// this waker into the map (which will return the old one which will
|
||||
// get dropped) before we return `Pending`.
|
||||
r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
|
||||
Poll::Pending
|
||||
} else {
|
||||
|
||||
// If we get here, it must be the first time this `Future` is polled
|
||||
// so we register a task with our `reactor`
|
||||
// If it's not ready, and not in the map it's a new task so we
|
||||
// register that with the Reactor and return `Pending`
|
||||
r.register(self.data, cx.waker().clone(), self.id);
|
||||
|
||||
// oh, we have to drop the lock on our `Mutex` here because we can't
|
||||
// have a shared and exclusive borrow at the same time
|
||||
drop(r);
|
||||
self.is_registered = true;
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
// Note that we're holding a lock on the `Mutex` which protects the
|
||||
// Reactor all the way until the end of this scope. This means that
|
||||
// even if our task were to complete immidiately, it will not be
|
||||
// able to call `wake` while we're in our `Poll` method.
|
||||
|
||||
// Since we can make this guarantee, it's now the Executors job to
|
||||
// handle this possible race condition where `Wake` is called after
|
||||
// `poll` but before our thread goes to sleep.
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -303,6 +311,15 @@ for the sake of this example.
|
||||
**Our Reactor will look like this:**
|
||||
|
||||
```rust, noplaypen, ignore
|
||||
// This is a "fake" reactor. It does no real I/O, but that also makes our
|
||||
// code possible to run in the book and in the playground
|
||||
// The different states a task can have in this Reactor
|
||||
enum TaskState {
|
||||
Ready,
|
||||
NotReady(Waker),
|
||||
Finished,
|
||||
}
|
||||
|
||||
// This is a "fake" reactor. It does no real I/O, but that also makes our
|
||||
// code possible to run in the book and in the playground
|
||||
struct Reactor {
|
||||
@@ -312,106 +329,118 @@ struct Reactor {
|
||||
dispatcher: Sender<Event>,
|
||||
handle: Option<JoinHandle<()>>,
|
||||
|
||||
// This is a list of tasks that are ready, which means they should be polled
|
||||
// for data.
|
||||
readylist: Arc<Mutex<Vec<usize>>>,
|
||||
// This is a list of tasks
|
||||
tasks: HashMap<usize, TaskState>,
|
||||
}
|
||||
|
||||
// We just have two kind of events. An event called `Timeout`
|
||||
// and a `Close` event to close down our reactor.
|
||||
// This represents the Events we can send to our reactor thread. In this
|
||||
// example it's only a Timeout or a Close event.
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
Close,
|
||||
Timeout(Waker, u64, usize),
|
||||
Timeout(u64, usize),
|
||||
}
|
||||
|
||||
impl Reactor {
|
||||
fn new() -> Self {
|
||||
// The way we register new events with our reactor is using a regular
|
||||
// channel
|
||||
|
||||
// We choose to return an atomic reference counted, mutex protected, heap
|
||||
// allocated `Reactor`. Just to make it easy to explain... No, the reason
|
||||
// we do this is:
|
||||
//
|
||||
// 1. We know that only thread-safe reactors will be created.
|
||||
// 2. By heap allocating it we can obtain a reference to a stable address
|
||||
// that's not dependent on the stack frame of the function that called `new`
|
||||
fn new() -> Arc<Mutex<Box<Self>>> {
|
||||
let (tx, rx) = channel::<Event>();
|
||||
let readylist = Arc::new(Mutex::new(vec![]));
|
||||
let rl_clone = readylist.clone();
|
||||
let reactor = Arc::new(Mutex::new(Box::new(Reactor {
|
||||
dispatcher: tx,
|
||||
handle: None,
|
||||
tasks: HashMap::new(),
|
||||
})));
|
||||
|
||||
// Notice that we'll need to use `weak` reference here. If we don't,
|
||||
// our `Reactor` will not get `dropped` when our main thread is finished
|
||||
// since we're holding internal references to it.
|
||||
|
||||
// This `Vec` will hold handles to all the threads we spawn so we can
|
||||
// join them later on and finish our programm in a good manner
|
||||
let mut handles = vec![];
|
||||
// Since we're collecting all `JoinHandles` from the threads we spawn
|
||||
// and make sure to join them we know that `Reactor` will be alive
|
||||
// longer than any reference held by the threads we spawn here.
|
||||
let reactor_clone = Arc::downgrade(&reactor);
|
||||
|
||||
// This will be the "Reactor thread"
|
||||
// This will be our Reactor-thread. The Reactor-thread will in our case
|
||||
// just spawn new threads which will serve as timers for us.
|
||||
let handle = thread::spawn(move || {
|
||||
let mut handles = vec![];
|
||||
|
||||
// This simulates some I/O resource
|
||||
for event in rx {
|
||||
let rl_clone = rl_clone.clone();
|
||||
println!("REACTOR: {:?}", event);
|
||||
let reactor = reactor_clone.clone();
|
||||
match event {
|
||||
|
||||
// If we get a close event we break out of the loop we're in
|
||||
Event::Close => break,
|
||||
Event::Timeout(waker, duration, id) => {
|
||||
Event::Timeout(duration, id) => {
|
||||
|
||||
// When we get an event we simply spawn a new thread
|
||||
// which will simulate some I/O resource...
|
||||
// We spawn a new thread that will serve as a timer
|
||||
// and will call `wake` on the correct `Waker` once
|
||||
// it's done.
|
||||
let event_handle = thread::spawn(move || {
|
||||
|
||||
//... by sleeping for the number of seconds
|
||||
// we provided when creating the `Task`.
|
||||
thread::sleep(Duration::from_secs(duration));
|
||||
|
||||
// When it's done sleeping we put the ID of this task
|
||||
// on the "readylist"
|
||||
rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
|
||||
|
||||
// Then we call `wake` which will wake up our
|
||||
// executor and start polling the futures
|
||||
waker.wake();
|
||||
let reactor = reactor.upgrade().unwrap();
|
||||
reactor.lock().map(|mut r| r.wake(id)).unwrap();
|
||||
});
|
||||
|
||||
handles.push(event_handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When we exit the Reactor we first join all the handles on
|
||||
// the child threads we've spawned so we catch any panics and
|
||||
// release any resources.
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
// This is important for us since we need to know that these
|
||||
// threads don't live longer than our Reactor-thread. Our
|
||||
// Reactor-thread will be joined when `Reactor` gets dropped.
|
||||
handles.into_iter().for_each(|handle| handle.join().unwrap());
|
||||
});
|
||||
reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
|
||||
reactor
|
||||
}
|
||||
|
||||
Reactor {
|
||||
readylist,
|
||||
dispatcher: tx,
|
||||
handle: Some(handle),
|
||||
// The wake function will call wake on the waker for the task with the
|
||||
// corresponding id.
|
||||
fn wake(&mut self, id: usize) {
|
||||
self.tasks.get_mut(&id).map(|state| {
|
||||
|
||||
// No matter what state the task was in we can safely set it
|
||||
// to ready at this point. This lets us get ownership over the
|
||||
// the data that was there before we replaced it.
|
||||
match mem::replace(state, TaskState::Ready) {
|
||||
TaskState::NotReady(waker) => waker.wake(),
|
||||
TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
|
||||
_ => unreachable!()
|
||||
}
|
||||
}).unwrap();
|
||||
}
|
||||
|
||||
// Register a new task with the reactor. In this particular example
|
||||
// we panic if a task with the same id get's registered twice
|
||||
fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
||||
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
||||
panic!("Tried to insert a task with id: '{}', twice!", id);
|
||||
}
|
||||
self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
|
||||
}
|
||||
|
||||
fn register(&mut self, duration: u64, waker: Waker, data: usize) {
|
||||
|
||||
// registering an event is as simple as sending an `Event` through
|
||||
// the channel.
|
||||
self.dispatcher
|
||||
.send(Event::Timeout(waker, duration, data))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// We send a close event to the reactor so it closes down our reactor-thread
|
||||
fn close(&mut self) {
|
||||
self.dispatcher.send(Event::Close).unwrap();
|
||||
}
|
||||
|
||||
// We need a way to check if any event's are ready. This will simply
|
||||
// look through the "readylist" for an event macthing the ID we want to
|
||||
// check for.
|
||||
fn is_ready(&self, id_to_check: usize) -> bool {
|
||||
self.readylist
|
||||
.lock()
|
||||
.map(|rl| rl.iter().any(|id| *id == id_to_check))
|
||||
.unwrap()
|
||||
// We simply checks if a task with this id is in the state `TaskState::Ready`
|
||||
fn is_ready(&self, id: usize) -> bool {
|
||||
self.tasks.get(&id).map(|state| match state {
|
||||
TaskState::Ready => true,
|
||||
_ => false,
|
||||
}).unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
// When our `Reactor` is dropped we join the reactor thread with the thread
|
||||
// owning our `Reactor` so we catch any panics and release all resources.
|
||||
// It's not needed for this to work, but it really is a best practice to join
|
||||
// all threads you spawn.
|
||||
impl Drop for Reactor {
|
||||
fn drop(&mut self) {
|
||||
self.handle.take().map(|h| h.join().unwrap()).unwrap();
|
||||
@@ -430,9 +459,9 @@ which you can edit and change the way you like.
|
||||
|
||||
```rust, edition2018
|
||||
# use std::{
|
||||
# future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
|
||||
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
|
||||
# thread::{self, JoinHandle}, time::{Duration, Instant}
|
||||
# future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
|
||||
# task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
|
||||
# thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
|
||||
# };
|
||||
#
|
||||
fn main() {
|
||||
@@ -441,10 +470,6 @@ fn main() {
|
||||
|
||||
// Many runtimes create a glocal `reactor` we pass it as an argument
|
||||
let reactor = Reactor::new();
|
||||
|
||||
// Since we'll share this between threads we wrap it in a
|
||||
// atmically-refcounted- mutex.
|
||||
let reactor = Arc::new(Mutex::new(reactor));
|
||||
|
||||
// We create two tasks:
|
||||
// - first parameter is the `reactor`
|
||||
@@ -482,15 +507,18 @@ fn main() {
|
||||
// ends nicely.
|
||||
reactor.lock().map(|mut r| r.close()).unwrap();
|
||||
}
|
||||
|
||||
# // ============================= EXECUTOR ====================================
|
||||
# fn block_on<F: Future>(mut future: F) -> F::Output {
|
||||
# let mywaker = Arc::new(MyWaker{ thread: thread::current() });
|
||||
# let mywaker = Arc::new(MyWaker {
|
||||
# thread: thread::current(),
|
||||
# });
|
||||
# let waker = waker_into_waker(Arc::into_raw(mywaker));
|
||||
# let mut cx = Context::from_waker(&waker);
|
||||
#
|
||||
# // SAFETY: we shadow `future` so it can't be accessed again.
|
||||
# let mut future = unsafe { Pin::new_unchecked(&mut future) };
|
||||
# let val = loop {
|
||||
# let pinned = unsafe { Pin::new_unchecked(&mut future) };
|
||||
# match Future::poll(pinned, &mut cx) {
|
||||
# match Future::poll(future.as_mut(), &mut cx) {
|
||||
# Poll::Ready(val) => break val,
|
||||
# Poll::Pending => thread::park(),
|
||||
# };
|
||||
@@ -507,29 +535,28 @@ fn main() {
|
||||
# #[derive(Clone)]
|
||||
# pub struct Task {
|
||||
# id: usize,
|
||||
# reactor: Arc<Mutex<Reactor>>,
|
||||
# reactor: Arc<Mutex<Box<Reactor>>>,
|
||||
# data: u64,
|
||||
# is_registered: bool,
|
||||
# }
|
||||
#
|
||||
# fn mywaker_wake(s: &MyWaker) {
|
||||
# let waker_ptr: *const MyWaker = s;
|
||||
# let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
|
||||
# let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
|
||||
# waker_arc.thread.unpark();
|
||||
# }
|
||||
#
|
||||
# fn mywaker_clone(s: &MyWaker) -> RawWaker {
|
||||
# let arc = unsafe { Arc::from_raw(s).clone() };
|
||||
# let arc = unsafe { Arc::from_raw(s) };
|
||||
# std::mem::forget(arc.clone()); // increase ref count
|
||||
# RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
|
||||
# }
|
||||
#
|
||||
# const VTABLE: RawWakerVTable = unsafe {
|
||||
# RawWakerVTable::new(
|
||||
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||
# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
||||
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||
# |s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||
# |s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||
# |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
||||
# |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||
# )
|
||||
# };
|
||||
#
|
||||
@@ -539,97 +566,106 @@ fn main() {
|
||||
# }
|
||||
#
|
||||
# impl Task {
|
||||
# fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
|
||||
# Task {
|
||||
# id,
|
||||
# reactor,
|
||||
# data,
|
||||
# is_registered: false,
|
||||
# }
|
||||
# fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
|
||||
# Task { id, reactor, data }
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# impl Future for Task {
|
||||
# type Output = usize;
|
||||
# fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
# fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
# let mut r = self.reactor.lock().unwrap();
|
||||
# if r.is_ready(self.id) {
|
||||
# *r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
|
||||
# Poll::Ready(self.id)
|
||||
# } else if self.is_registered {
|
||||
# } else if r.tasks.contains_key(&self.id) {
|
||||
# r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
|
||||
# Poll::Pending
|
||||
# } else {
|
||||
# r.register(self.data, cx.waker().clone(), self.id);
|
||||
# drop(r);
|
||||
# self.is_registered = true;
|
||||
# Poll::Pending
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# // =============================== REACTOR ===================================
|
||||
# enum TaskState {
|
||||
# Ready,
|
||||
# NotReady(Waker),
|
||||
# Finished,
|
||||
# }
|
||||
# struct Reactor {
|
||||
# dispatcher: Sender<Event>,
|
||||
# handle: Option<JoinHandle<()>>,
|
||||
# readylist: Arc<Mutex<Vec<usize>>>,
|
||||
# tasks: HashMap<usize, TaskState>,
|
||||
# }
|
||||
#
|
||||
# #[derive(Debug)]
|
||||
# enum Event {
|
||||
# Close,
|
||||
# Timeout(Waker, u64, usize),
|
||||
# Timeout(u64, usize),
|
||||
# }
|
||||
#
|
||||
# impl Reactor {
|
||||
# fn new() -> Self {
|
||||
# fn new() -> Arc<Mutex<Box<Self>>> {
|
||||
# let (tx, rx) = channel::<Event>();
|
||||
# let readylist = Arc::new(Mutex::new(vec![]));
|
||||
# let rl_clone = readylist.clone();
|
||||
# let mut handles = vec![];
|
||||
# let reactor = Arc::new(Mutex::new(Box::new(Reactor {
|
||||
# dispatcher: tx,
|
||||
# handle: None,
|
||||
# tasks: HashMap::new(),
|
||||
# })));
|
||||
#
|
||||
# let reactor_clone = Arc::downgrade(&reactor);
|
||||
# let handle = thread::spawn(move || {
|
||||
# let mut handles = vec![];
|
||||
# // This simulates some I/O resource
|
||||
# for event in rx {
|
||||
# println!("REACTOR: {:?}", event);
|
||||
# let rl_clone = rl_clone.clone();
|
||||
# let reactor = reactor_clone.clone();
|
||||
# match event {
|
||||
# Event::Close => break,
|
||||
# Event::Timeout(waker, duration, id) => {
|
||||
# Event::Timeout(duration, id) => {
|
||||
# let event_handle = thread::spawn(move || {
|
||||
# thread::sleep(Duration::from_secs(duration));
|
||||
# rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
|
||||
# waker.wake();
|
||||
# let reactor = reactor.upgrade().unwrap();
|
||||
# reactor.lock().map(|mut r| r.wake(id)).unwrap();
|
||||
# });
|
||||
#
|
||||
# handles.push(event_handle);
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# for handle in handles {
|
||||
# handle.join().unwrap();
|
||||
# }
|
||||
# handles.into_iter().for_each(|handle| handle.join().unwrap());
|
||||
# });
|
||||
#
|
||||
# Reactor {
|
||||
# readylist,
|
||||
# dispatcher: tx,
|
||||
# handle: Some(handle),
|
||||
# }
|
||||
# reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
|
||||
# reactor
|
||||
# }
|
||||
#
|
||||
# fn register(&mut self, duration: u64, waker: Waker, data: usize) {
|
||||
# self.dispatcher
|
||||
# .send(Event::Timeout(waker, duration, data))
|
||||
# .unwrap();
|
||||
# fn wake(&mut self, id: usize) {
|
||||
# self.tasks.get_mut(&id).map(|state| {
|
||||
# match mem::replace(state, TaskState::Ready) {
|
||||
# TaskState::NotReady(waker) => waker.wake(),
|
||||
# TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
|
||||
# _ => unreachable!()
|
||||
# }
|
||||
# }).unwrap();
|
||||
# }
|
||||
#
|
||||
# fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
||||
# if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
||||
# panic!("Tried to insert a task with id: '{}', twice!", id);
|
||||
# }
|
||||
# self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
|
||||
# }
|
||||
#
|
||||
# fn close(&mut self) {
|
||||
# self.dispatcher.send(Event::Close).unwrap();
|
||||
# }
|
||||
#
|
||||
# fn is_ready(&self, id_to_check: usize) -> bool {
|
||||
# self.readylist
|
||||
# .lock()
|
||||
# .map(|rl| rl.iter().any(|id| *id == id_to_check))
|
||||
# .unwrap()
|
||||
# fn is_ready(&self, id: usize) -> bool {
|
||||
# self.tasks.get(&id).map(|state| match state {
|
||||
# TaskState::Ready => true,
|
||||
# _ => false,
|
||||
# }).unwrap_or(false)
|
||||
# }
|
||||
# }
|
||||
#
|
||||
|
||||
@@ -5,16 +5,9 @@ Here is the whole example. You can edit it right here in your browser and
|
||||
run it yourself. Have fun!
|
||||
|
||||
```rust,editable,edition2018
|
||||
use std::{
|
||||
future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},
|
||||
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
|
||||
thread::{self, JoinHandle}, time::{Duration, Instant}
|
||||
};
|
||||
|
||||
fn main() {
|
||||
let start = Instant::now();
|
||||
let reactor = Reactor::new();
|
||||
let reactor = Arc::new(Mutex::new(reactor));
|
||||
let future1 = Task::new(reactor.clone(), 1, 1);
|
||||
let future2 = Task::new(reactor.clone(), 2, 2);
|
||||
|
||||
@@ -38,10 +31,17 @@ fn main() {
|
||||
block_on(mainfut);
|
||||
reactor.lock().map(|mut r| r.close()).unwrap();
|
||||
}
|
||||
use std::{
|
||||
future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
|
||||
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
|
||||
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
|
||||
};
|
||||
|
||||
// ============================= EXECUTOR ====================================
|
||||
fn block_on<F: Future>(mut future: F) -> F::Output {
|
||||
let mywaker = Arc::new(MyWaker{ thread: thread::current() });
|
||||
let mywaker = Arc::new(MyWaker {
|
||||
thread: thread::current(),
|
||||
});
|
||||
let waker = waker_into_waker(Arc::into_raw(mywaker));
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
|
||||
@@ -65,14 +65,13 @@ struct MyWaker {
|
||||
#[derive(Clone)]
|
||||
pub struct Task {
|
||||
id: usize,
|
||||
reactor: Arc<Mutex<Reactor>>,
|
||||
reactor: Arc<Mutex<Box<Reactor>>>,
|
||||
data: u64,
|
||||
is_registered: bool,
|
||||
}
|
||||
|
||||
fn mywaker_wake(s: &MyWaker) {
|
||||
let waker_ptr: *const MyWaker = s;
|
||||
let waker_arc = unsafe {Arc::from_raw(waker_ptr)};
|
||||
let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
|
||||
waker_arc.thread.unpark();
|
||||
}
|
||||
|
||||
@@ -84,10 +83,10 @@ fn mywaker_clone(s: &MyWaker) -> RawWaker {
|
||||
|
||||
const VTABLE: RawWakerVTable = unsafe {
|
||||
RawWakerVTable::new(
|
||||
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||
|s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
||||
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|
||||
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|
||||
|s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|
||||
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
|
||||
)
|
||||
};
|
||||
|
||||
@@ -97,97 +96,106 @@ fn waker_into_waker(s: *const MyWaker) -> Waker {
|
||||
}
|
||||
|
||||
impl Task {
|
||||
fn new(reactor: Arc<Mutex<Reactor>>, data: u64, id: usize) -> Self {
|
||||
Task {
|
||||
id,
|
||||
reactor,
|
||||
data,
|
||||
is_registered: false,
|
||||
}
|
||||
fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
|
||||
Task { id, reactor, data }
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Task {
|
||||
type Output = usize;
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut r = self.reactor.lock().unwrap();
|
||||
if r.is_ready(self.id) {
|
||||
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
|
||||
Poll::Ready(self.id)
|
||||
} else if self.is_registered {
|
||||
} else if r.tasks.contains_key(&self.id) {
|
||||
r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
|
||||
Poll::Pending
|
||||
} else {
|
||||
r.register(self.data, cx.waker().clone(), self.id);
|
||||
drop(r);
|
||||
self.is_registered = true;
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// =============================== REACTOR ===================================
|
||||
enum TaskState {
|
||||
Ready,
|
||||
NotReady(Waker),
|
||||
Finished,
|
||||
}
|
||||
struct Reactor {
|
||||
dispatcher: Sender<Event>,
|
||||
handle: Option<JoinHandle<()>>,
|
||||
readylist: Arc<Mutex<Vec<usize>>>,
|
||||
tasks: HashMap<usize, TaskState>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
Close,
|
||||
Timeout(Waker, u64, usize),
|
||||
Timeout(u64, usize),
|
||||
}
|
||||
|
||||
impl Reactor {
|
||||
fn new() -> Self {
|
||||
fn new() -> Arc<Mutex<Box<Self>>> {
|
||||
let (tx, rx) = channel::<Event>();
|
||||
let readylist = Arc::new(Mutex::new(vec![]));
|
||||
let rl_clone = readylist.clone();
|
||||
let mut handles = vec![];
|
||||
let reactor = Arc::new(Mutex::new(Box::new(Reactor {
|
||||
dispatcher: tx,
|
||||
handle: None,
|
||||
tasks: HashMap::new(),
|
||||
})));
|
||||
|
||||
let reactor_clone = Arc::downgrade(&reactor);
|
||||
let handle = thread::spawn(move || {
|
||||
let mut handles = vec![];
|
||||
// This simulates some I/O resource
|
||||
for event in rx {
|
||||
println!("REACTOR: {:?}", event);
|
||||
let rl_clone = rl_clone.clone();
|
||||
let reactor = reactor_clone.clone();
|
||||
match event {
|
||||
Event::Close => break,
|
||||
Event::Timeout(waker, duration, id) => {
|
||||
Event::Timeout(duration, id) => {
|
||||
let event_handle = thread::spawn(move || {
|
||||
thread::sleep(Duration::from_secs(duration));
|
||||
rl_clone.lock().map(|mut rl| rl.push(id)).unwrap();
|
||||
waker.wake();
|
||||
let reactor = reactor.upgrade().unwrap();
|
||||
reactor.lock().map(|mut r| r.wake(id)).unwrap();
|
||||
});
|
||||
|
||||
handles.push(event_handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
handles.into_iter().for_each(|handle| handle.join().unwrap());
|
||||
});
|
||||
|
||||
Reactor {
|
||||
readylist,
|
||||
dispatcher: tx,
|
||||
handle: Some(handle),
|
||||
}
|
||||
reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
|
||||
reactor
|
||||
}
|
||||
|
||||
fn register(&mut self, duration: u64, waker: Waker, data: usize) {
|
||||
self.dispatcher
|
||||
.send(Event::Timeout(waker, duration, data))
|
||||
.unwrap();
|
||||
fn wake(&mut self, id: usize) {
|
||||
self.tasks.get_mut(&id).map(|state| {
|
||||
match mem::replace(state, TaskState::Ready) {
|
||||
TaskState::NotReady(waker) => waker.wake(),
|
||||
TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
|
||||
_ => unreachable!()
|
||||
}
|
||||
}).unwrap();
|
||||
}
|
||||
|
||||
fn register(&mut self, duration: u64, waker: Waker, id: usize) {
|
||||
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
|
||||
panic!("Tried to insert a task with id: '{}', twice!", id);
|
||||
}
|
||||
self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
|
||||
}
|
||||
|
||||
fn close(&mut self) {
|
||||
self.dispatcher.send(Event::Close).unwrap();
|
||||
}
|
||||
|
||||
fn is_ready(&self, id_to_check: usize) -> bool {
|
||||
self.readylist
|
||||
.lock()
|
||||
.map(|rl| rl.iter().any(|id| *id == id_to_check))
|
||||
.unwrap()
|
||||
fn is_ready(&self, id: usize) -> bool {
|
||||
self.tasks.get(&id).map(|state| match state {
|
||||
TaskState::Ready => true,
|
||||
_ => false,
|
||||
}).unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user