diff --git a/__concurrent/async_study/Cargo.lock b/__concurrent/async_study/Cargo.lock index a22f357..2a6ce3a 100644 --- a/__concurrent/async_study/Cargo.lock +++ b/__concurrent/async_study/Cargo.lock @@ -8,8 +8,10 @@ version = "0.1.0" dependencies = [ "bytes", "futures", + "futures-core", "futures-util", "tokio", + "tokio-stream", ] [[package]] @@ -347,6 +349,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-xid" version = "0.2.2" diff --git a/__concurrent/async_study/Cargo.toml b/__concurrent/async_study/Cargo.toml index cae8074..e7e67d9 100644 --- a/__concurrent/async_study/Cargo.toml +++ b/__concurrent/async_study/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" [dependencies] tokio = { version = "1.0", features = ["full"] } +futures-core = "0.3" futures-util = { version = "0.3", default-features = false } bytes = "1.1" futures = "0.3" +tokio-stream = "0.1" \ No newline at end of file diff --git a/__concurrent/async_study/examples/tokio_stream.rs b/__concurrent/async_study/examples/tokio_stream.rs new file mode 100644 index 0000000..295029b --- /dev/null +++ b/__concurrent/async_study/examples/tokio_stream.rs @@ -0,0 +1,10 @@ +use tokio_stream::{self as stream, StreamExt}; + +#[tokio::main] +async fn main() { + let mut stream = stream::iter(vec![0, 1, 2]); + + while let Some(value) = stream.next().await { + println!("Got {}", value); + } +} \ No newline at end of file diff --git a/__concurrent/async_study/examples/tokio_stream2.rs b/__concurrent/async_study/examples/tokio_stream2.rs new file mode 100644 index 0000000..3cc9e6a --- /dev/null +++ b/__concurrent/async_study/examples/tokio_stream2.rs @@ -0,0 +1,29 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; +use tokio_stream::StreamExt; + +#[tokio::main] +async fn main() { + let mut stream = MyStream(0); + + while let Some(value) = stream.next().await { + println!("Got {}", value); + } +} + +struct MyStream(i32); + +impl Stream for MyStream { + type Item = i32; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.0 < 10 { + self.0 += 1; + Poll::Ready(Some(self.0)) + } else { + Poll::Ready(None) + } + } +} \ No newline at end of file