6.3 KiB
6.3 KiB
Our finished code
Here is the whole example. You can edit it right here in your browser and run it yourself. Have fun!
fn main() {
let start = Instant::now();
let reactor = Reactor::new();
let future1 = Task::new(reactor.clone(), 1, 1);
let future2 = Task::new(reactor.clone(), 2, 2);
let fut1 = async {
let val = future1.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
};
let fut2 = async {
let val = future2.await;
let dur = (Instant::now() - start).as_secs_f32();
println!("Future got {} at time: {:.2}.", val, dur);
};
let mainfut = async {
fut1.await;
fut2.await;
};
block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap();
}
use std::{
future::Future, pin::Pin, sync::{ mpsc::{channel, Sender}, Arc, Mutex,},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem,
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
// ============================= EXECUTOR ====================================
fn block_on<F: Future>(mut future: F) -> F::Output {
let mywaker = Arc::new(MyWaker {
thread: thread::current(),
});
let waker = waker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
// SAFETY: we shadow `future` so it can't be accessed again.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
let val = loop {
match Future::poll(future.as_mut(), &mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => thread::park(),
};
};
val
}
// ====================== FUTURE IMPLEMENTATION ==============================
#[derive(Clone)]
struct MyWaker {
thread: thread::Thread,
}
#[derive(Clone)]
pub struct Task {
id: usize,
reactor: Arc<Mutex<Box<Reactor>>>,
data: u64,
}
fn mywaker_wake(s: &MyWaker) {
let waker_ptr: *const MyWaker = s;
let waker_arc = unsafe { Arc::from_raw(waker_ptr) };
waker_arc.thread.unpark();
}
fn mywaker_clone(s: &MyWaker) -> RawWaker {
let arc = unsafe { Arc::from_raw(s) };
std::mem::forget(arc.clone()); // increase ref count
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}
const VTABLE: RawWakerVTable = unsafe {
RawWakerVTable::new(
|s| mywaker_clone(&*(s as *const MyWaker)), // clone
|s| mywaker_wake(&*(s as *const MyWaker)), // wake
|s| mywaker_wake(*(s as *const &MyWaker)), // wake by ref
|s| drop(Arc::from_raw(s as *const MyWaker)), // decrease refcount
)
};
fn waker_into_waker(s: *const MyWaker) -> Waker {
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
impl Task {
fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
Task { id, reactor, data }
}
}
impl Future for Task {
type Output = usize;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut r = self.reactor.lock().unwrap();
if r.is_ready(self.id) {
println!("POLL: TASK {} IS READY", self.id);
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
Poll::Ready(self.id)
} else if r.tasks.contains_key(&self.id) {
println!("POLL: REPLACED WAKER FOR TASK: {}", self.id);
r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
Poll::Pending
} else {
println!("POLL: REGISTERED TASK: {}, WAKER: {:?}", self.id, cx.waker());
r.register(self.data, cx.waker().clone(), self.id);
Poll::Pending
}
}
}
// =============================== REACTOR ===================================
enum TaskState {
Ready,
NotReady(Waker),
Finished,
}
struct Reactor {
dispatcher: Sender<Event>,
handle: Option<JoinHandle<()>>,
tasks: HashMap<usize, TaskState>,
}
#[derive(Debug)]
enum Event {
Close,
Timeout(u64, usize),
}
impl Reactor {
fn new() -> Arc<Mutex<Box<Self>>> {
let (tx, rx) = channel::<Event>();
let reactor = Arc::new(Mutex::new(Box::new(Reactor {
dispatcher: tx,
handle: None,
tasks: HashMap::new(),
})));
let reactor_clone = Arc::downgrade(&reactor);
let handle = thread::spawn(move || {
let mut handles = vec![];
for event in rx {
let reactor = reactor_clone.clone();
match event {
Event::Close => break,
Event::Timeout(duration, id) => {
let event_handle = thread::spawn(move || {
thread::sleep(Duration::from_secs(duration));
let reactor = reactor.upgrade().unwrap();
reactor.lock().map(|mut r| r.wake(id)).unwrap();
});
handles.push(event_handle);
}
}
}
handles.into_iter().for_each(|handle| handle.join().unwrap());
});
reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
reactor
}
fn wake(&mut self, id: usize) {
self.tasks.get_mut(&id).map(|state| {
match mem::replace(state, TaskState::Ready) {
TaskState::NotReady(waker) => waker.wake(),
TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
_ => unreachable!()
}
}).unwrap();
}
fn register(&mut self, duration: u64, waker: Waker, id: usize) {
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
panic!("Tried to insert a task with id: '{}', twice!", id);
}
self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
}
fn close(&mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
fn is_ready(&self, id: usize) -> bool {
self.tasks.get(&id).map(|state| match state {
TaskState::Ready => true,
_ => false,
}).unwrap_or(false)
}
}
impl Drop for Reactor {
fn drop(&mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}