From 18b7b4707a5f64c5fd1f60a025caee5f1d9fffca Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sun, 20 Mar 2022 23:46:17 +0800 Subject: [PATCH] feat: add mini tokio --- __concurrent/async_study/Cargo.lock | 72 +++++++++++++++++ __concurrent/async_study/Cargo.toml | 1 + .../async_study/examples/mini_tokio.rs | 77 +++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 __concurrent/async_study/examples/mini_tokio.rs diff --git a/__concurrent/async_study/Cargo.lock b/__concurrent/async_study/Cargo.lock index 784fb33..a22f357 100644 --- a/__concurrent/async_study/Cargo.lock +++ b/__concurrent/async_study/Cargo.lock @@ -7,6 +7,7 @@ name = "async_study" version = "0.1.0" dependencies = [ "bytes", + "futures", "futures-util", "tokio", ] @@ -29,12 +30,71 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + [[package]] name = "futures-task" version = "0.3.21" @@ -47,10 +107,16 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -217,6 +283,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" + [[package]] name = "smallvec" version = "1.8.0" diff --git a/__concurrent/async_study/Cargo.toml b/__concurrent/async_study/Cargo.toml index f8e3525..cae8074 100644 --- a/__concurrent/async_study/Cargo.toml +++ b/__concurrent/async_study/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" tokio = { version = "1.0", features = ["full"] } futures-util = { version = "0.3", default-features = false } bytes = "1.1" +futures = "0.3" diff --git a/__concurrent/async_study/examples/mini_tokio.rs b/__concurrent/async_study/examples/mini_tokio.rs new file mode 100644 index 0000000..c542fbf --- /dev/null +++ b/__concurrent/async_study/examples/mini_tokio.rs @@ -0,0 +1,77 @@ +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use futures::task; + +// from: https://tokio.rs/tokio/tutorial/async +fn main() { + let mut mini_tokio = MiniTokio::new(); + + mini_tokio.spawn(async { + let when = Instant::now() + Duration::from_millis(10); + let future = Delay { when }; + + let out = future.await; + assert_eq!(out, "done"); + }); + + mini_tokio.run(); +} + +struct MiniTokio { + tasks: VecDeque, +} + +type Task = Pin + Send>>; + +impl MiniTokio { + fn new() -> MiniTokio { + MiniTokio { + tasks: VecDeque::new(), + } + } + + /// Spawn a future onto the mini-tokio instance. + fn spawn(&mut self, future: F) + where + F: Future + Send + 'static, + { + self.tasks.push_back(Box::pin(future)); + } + + fn run(&mut self) { + let waker = task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + while let Some(mut task) = self.tasks.pop_front() { + if task.as_mut().poll(&mut cx).is_pending() { + self.tasks.push_back(task); + } + } + } +} + + +struct Delay { + when: Instant, +} + +impl Future for Delay { + type Output = &'static str; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll<&'static str> + { + if Instant::now() >= self.when { + println!("Hello world"); + Poll::Ready("done") + } else { + // Ignore this line for now. + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} \ No newline at end of file