use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
fn main() {
let readylist = Arc::new(Mutex::new(vec![]));
let mut reactor = Reactor::new();
let mywaker = MyWaker::new(1, thread::current(), readylist.clone());
reactor.register(2, mywaker);
let mywaker = MyWaker::new(2, thread::current(), readylist.clone());
reactor.register(2, mywaker);
executor_run(reactor, readylist);
}
// ====== EXECUTOR ======
fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) {
let start = Instant::now();
loop {
let mut rl_locked = rl.lock().unwrap();
while let Some(event) = rl_locked.pop() {
let dur = (Instant::now() - start).as_secs_f32();
println!("Event {} just happened at time: {:.2}.", event, dur);
reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
}
drop(rl_locked);
if reactor.outstanding.load(Ordering::Relaxed) == 0 {
reactor.close();
break;
}
thread::park();
}
}
// ====== "FUTURE" IMPL ======
#[derive(Debug)]
struct MyWaker {
id: usize,
thread: thread::Thread,
readylist: Arc<Mutex<Vec<usize>>>,
}
impl MyWaker {
fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self {
MyWaker {
id,
thread,
readylist,
}
}
fn wake(&self) {
self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
self.thread.unpark();
}
}
#[derive(Debug, Clone)]
pub struct Task {
id: usize,
pending: bool,
}
// ===== REACTOR =====
struct Reactor {
dispatcher: Sender<Event>,
handle: Option<JoinHandle<()>>,
outstanding: AtomicUsize,
}
#[derive(Debug)]
enum Event {
Close,
Simple(MyWaker, u64),
}
impl Reactor {
fn new() -> Self {
let (tx, rx) = channel::<Event>();
let mut handles = vec![];
let handle = thread::spawn(move || {
// This simulates some I/O resource
for event in rx {
match event {
Event::Close => break,
Event::Simple(mywaker, duration) => {
let event_handle = thread::spawn(move || {
thread::sleep(Duration::from_secs(duration));
mywaker.wake();
});
handles.push(event_handle);
}
}
}
for handle in handles {
handle.join().unwrap();
}
});
Reactor {
dispatcher: tx,
handle: Some(handle),
outstanding: AtomicUsize::new(0),
}
}
fn register(&mut self, duration: u64, mywaker: MyWaker) {
self.dispatcher
.send(Event::Simple(mywaker, duration))
.unwrap();
self.outstanding.fetch_add(1, Ordering::Relaxed);
}
fn close(&mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
}
impl Drop for Reactor {
fn drop(&mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
fn main() {
let readylist = Arc::new(Mutex::new(vec![]));
let mut reactor = Reactor::new();
let mywaker = MyWaker::new(1, thread::current(), readylist.clone());
reactor.register(2, mywaker);
let mywaker = MyWaker::new(2, thread::current(), readylist.clone());
reactor.register(2, mywaker);
executor_run(reactor, readylist);
}
# // ====== EXECUTOR ======
# fn executor_run(mut reactor: Reactor, rl: Arc<Mutex<Vec<usize>>>) {
# let start = Instant::now();
# loop {
# let mut rl_locked = rl.lock().unwrap();
# while let Some(event) = rl_locked.pop() {
# let dur = (Instant::now() - start).as_secs_f32();
# println!("Event {} just happened at time: {:.2}.", event, dur);
# reactor.outstanding.fetch_sub(1, Ordering::Relaxed);
# }
# drop(rl_locked);
#
# if reactor.outstanding.load(Ordering::Relaxed) == 0 {
# reactor.close();
# break;
# }
#
# thread::park();
# }
# }
#
# // ====== "FUTURE" IMPL ======
# #[derive(Debug)]
# struct MyWaker {
# id: usize,
# thread: thread::Thread,
# readylist: Arc<Mutex<Vec<usize>>>,
# }
#
# impl MyWaker {
# fn new(id: usize, thread: thread::Thread, readylist: Arc<Mutex<Vec<usize>>>) -> Self {
# MyWaker {
# id,
# thread,
# readylist,
# }
# }
#
# fn wake(&self) {
# self.readylist.lock().map(|mut rl| rl.push(self.id)).unwrap();
# self.thread.unpark();
# }
# }
#
#
# #[derive(Debug, Clone)]
# pub struct Task {
# id: usize,
# pending: bool,
# }
#
# // ===== REACTOR =====
# struct Reactor {
# dispatcher: Sender<Event>,
# handle: Option<JoinHandle<()>>,
# outstanding: AtomicUsize,
# }
# #[derive(Debug)]
# enum Event {
# Close,
# Simple(MyWaker, u64),
# }
#
# impl Reactor {
# fn new() -> Self {
# let (tx, rx) = channel::<Event>();
# let mut handles = vec![];
# let handle = thread::spawn(move || {
# // This simulates some I/O resource
# for event in rx {
# match event {
# Event::Close => break,
# Event::Simple(mywaker, duration) => {
# let event_handle = thread::spawn(move || {
# thread::sleep(Duration::from_secs(duration));
# mywaker.wake();
# });
# handles.push(event_handle);
# }
# }
# }
#
# for handle in handles {
# handle.join().unwrap();
# }
# });
#
# Reactor {
# dispatcher: tx,
# handle: Some(handle),
# outstanding: AtomicUsize::new(0),
# }
# }
#
# fn register(&mut self, duration: u64, mywaker: MyWaker) {
# self.dispatcher
# .send(Event::Simple(mywaker, duration))
# .unwrap();
# self.outstanding.fetch_add(1, Ordering::Relaxed);
# }
#
# fn close(&mut self) {
# self.dispatcher.send(Event::Close).unwrap();
# }
# }
#
# impl Drop for Reactor {
# fn drop(&mut self) {
# self.handle.take().map(|h| h.join().unwrap()).unwrap();
# }
# }