diff --git a/__concurrent/async_study/Cargo.lock b/__concurrent/async_study/Cargo.lock index f9a0d2c..0453b09 100644 --- a/__concurrent/async_study/Cargo.lock +++ b/__concurrent/async_study/Cargo.lock @@ -6,6 +6,7 @@ version = 3 name = "async_study" version = "0.1.0" dependencies = [ + "futures-util", "tokio", ] @@ -27,6 +28,30 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-task" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" + +[[package]] +name = "futures-util" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -143,6 +168,12 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.36" diff --git a/__concurrent/async_study/Cargo.toml b/__concurrent/async_study/Cargo.toml index ad47309..de8e4e3 100644 --- a/__concurrent/async_study/Cargo.toml +++ b/__concurrent/async_study/Cargo.toml @@ -7,3 +7,4 @@ edition = "2021" [dependencies] tokio = { version = "1.0", features = ["full"] } +futures-util = { version = "0.3", default-features = false } diff --git a/__concurrent/async_study/src/main.rs b/__concurrent/async_study/src/main.rs index e7a11a9..d7acaa2 100644 --- a/__concurrent/async_study/src/main.rs +++ b/__concurrent/async_study/src/main.rs @@ -1,3 +1,102 @@ -fn main() { - println!("Hello, world!"); +use std::future::Future; +use std::io::{self, ErrorKind}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::time::sleep; + +#[tokio::main] +async fn main() { + let (sender, receiver) = channel::>(512); + + let stream = Stream { sender, receiver }; + let (mut s, mut w) = tokio::io::split(stream); + + let t = tokio::spawn(async move { + let mut buf = Vec::::with_capacity(1024); + loop { + match s.read_buf(&mut buf).await { + Ok(len) => { + println!("Len: {}, Buf: {:?}", len, &buf[..len]); + } + Err(e) => { + println!("Err: {}", e); + if e.to_string().contains("broken pipe") { + break; + } + } + } + } + }); + + sleep(Duration::from_secs(1)).await; + let _ = w.write_all(b"hello world").await; + sleep(Duration::from_secs(1)).await; + let _ = w.write_all(b"hello world").await; + sleep(Duration::from_secs(1)).await; + let _ = w.write_all(b"hello world").await; + let a = tokio::join!(w.shutdown()); + let b = tokio::join!(t); + println!("{:?}, {:?}", a, b) } + +struct Stream { + sender: Sender>, + receiver: Receiver>, +} + +impl AsyncRead for Stream { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let m = self.receiver.poll_recv(cx); + match m { + Poll::Pending => Poll::Pending, + Poll::Ready(m_opt) => match m_opt { + None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))), + Some(mm) => { + if "close".as_bytes().to_vec() == mm { + self.receiver.close(); + } + Poll::Ready(Ok(buf.put_slice(mm.as_slice()))) + } + } + } + } +} + +impl AsyncWrite for Stream { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let mut fut = self.sender.send(Vec::from(buf)); + let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE?? + match fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(Ok(buf.len())), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // NEED FLUSH? + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // let mut fut = self.sender.send("close".as_bytes().to_vec()); + // let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE?? + // match fut.poll(cx) { + // Poll::Pending => Poll::Pending, + // Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + // Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), + // } + let mut fut = self.sender.send("close".as_bytes().to_vec()); + tokio::pin!( fut); + match fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), + } + // Poll::Ready(Ok(())) + } +} \ No newline at end of file