Our finished code
Here is the whole example. You can edit it right here in your browser and run it yourself. Have fun!
fn main() { let start = Instant::now(); let reactor = Reactor::new(); let future1 = Task::new(reactor.clone(), 1, 1); let future2 = Task::new(reactor.clone(), 2, 2); let fut1 = async { let val = future1.await; let dur = (Instant::now() - start).as_secs_f32(); println!("Future got {} at time: {:.2}.", val, dur); }; let fut2 = async { let val = future2.await; let dur = (Instant::now() - start).as_secs_f32(); println!("Future got {} at time: {:.2}.", val, dur); }; let mainfut = async { fut1.await; fut2.await; }; block_on(mainfut); reactor.lock().map(|mut r| r.close()).unwrap(); } use std::{ future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem, thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap }; // ============================= EXECUTOR ==================================== fn block_on<F: Future>(mut future: F) -> F::Output { let mywaker = Arc::new(MyWaker { thread: thread::current(), }); let waker = waker_into_waker(Arc::into_raw(mywaker)); let mut cx = Context::from_waker(&waker); // SAFETY: we shadow `future` so it can't be accessed again. let mut future = unsafe { Pin::new_unchecked(&mut future) }; let val = loop { match Future::poll(future.as_mut(), &mut cx) { Poll::Ready(val) => break val, Poll::Pending => thread::park(), }; }; val } // ====================== FUTURE IMPLEMENTATION ============================== #[derive(Clone)] struct MyWaker { thread: thread::Thread, } #[derive(Clone)] pub struct Task { id: usize, reactor: Arc<Mutex<Box<Reactor>>>, data: u64, } fn mywaker_wake(s: &MyWaker) { let waker_ptr: *const MyWaker = s; let waker_arc = unsafe { Arc::from_raw(waker_ptr) }; waker_arc.thread.unpark(); } fn mywaker_clone(s: &MyWaker) -> RawWaker { let arc = unsafe { Arc::from_raw(s) }; std::mem::forget(arc.clone()); // increase ref count RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE) } const VTABLE: RawWakerVTable = unsafe { RawWakerVTable::new( |s| mywaker_clone(&*(s as *const MyWaker)), // clone |s| mywaker_wake(&*(s as *const MyWaker)), // wake |s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref |s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount ) }; fn waker_into_waker(s: *const MyWaker) -> Waker { let raw_waker = RawWaker::new(s as *const (), &VTABLE); unsafe { Waker::from_raw(raw_waker) } } impl Task { fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self { Task { id, reactor, data } } } impl Future for Task { type Output = usize; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut r = self.reactor.lock().unwrap(); if r.is_ready(self.id) { *r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished; Poll::Ready(self.id) } else if r.tasks.contains_key(&self.id) { r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone())); Poll::Pending } else { r.register(self.data, cx.waker().clone(), self.id); Poll::Pending } } } // =============================== REACTOR =================================== enum TaskState { Ready, NotReady(Waker), Finished, } struct Reactor { dispatcher: Sender<Event>, handle: Option<JoinHandle<()>>, tasks: HashMap<usize, TaskState>, } #[derive(Debug)] enum Event { Close, Timeout(u64, usize), } impl Reactor { fn new() -> Arc<Mutex<Box<Self>>> { let (tx, rx) = channel::<Event>(); let reactor = Arc::new(Mutex::new(Box::new(Reactor { dispatcher: tx, handle: None, tasks: HashMap::new(), }))); let reactor_clone = Arc::downgrade(&reactor); let handle = thread::spawn(move || { let mut handles = vec![]; // This simulates some I/O resource for event in rx { println!("REACTOR: {:?}", event); let reactor = reactor_clone.clone(); match event { Event::Close => break, Event::Timeout(duration, id) => { let event_handle = thread::spawn(move || { thread::sleep(Duration::from_secs(duration)); let reactor = reactor.upgrade().unwrap(); reactor.lock().map(|mut r| r.wake(id)).unwrap(); }); handles.push(event_handle); } } } handles.into_iter().for_each(|handle| handle.join().unwrap()); }); reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap(); reactor } fn wake(&mut self, id: usize) { self.tasks.get_mut(&id).map(|state| { match mem::replace(state, TaskState::Ready) { TaskState::NotReady(waker) => waker.wake(), TaskState::Finished => panic!("Called 'wake' twice on task: {}", id), _ => unreachable!() } }).unwrap(); } fn register(&mut self, duration: u64, waker: Waker, id: usize) { if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() { panic!("Tried to insert a task with id: '{}', twice!", id); } self.dispatcher.send(Event::Timeout(duration, id)).unwrap(); } fn close(&mut self) { self.dispatcher.send(Event::Close).unwrap(); } fn is_ready(&self, id: usize) -> bool { self.tasks.get(&id).map(|state| match state { TaskState::Ready => true, _ => false, }).unwrap_or(false) } } impl Drop for Reactor { fn drop(&mut self) { self.handle.take().map(|h| h.join().unwrap()).unwrap(); } }