diff --git a/__concurrent/async_study/examples/stream.rs b/__concurrent/async_study/examples/stream.rs new file mode 100644 index 0000000..2becc30 --- /dev/null +++ b/__concurrent/async_study/examples/stream.rs @@ -0,0 +1,98 @@ +use std::future::Future; +use std::io::{self, ErrorKind}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::time::sleep; + +#[tokio::main] +async fn main() { + let (sender, receiver) = channel::(512); + let stream = Stream { sender, receiver }; + let (mut s, mut w) = tokio::io::split(stream); + + let t = tokio::spawn(async move { + let mut buf = Vec::::with_capacity(1024); + loop { + match s.read_buf(&mut buf).await { + Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]), + Err(e) => { + println!("Err: {:?}", e); + if e.to_string().contains("broken pipe") { + break; + } + } + } + } + }); + + sleep(Duration::from_secs(1)).await; + let _ = w.write_all(b"hello world").await; + sleep(Duration::from_secs(1)).await; + let _ = w.write_all(b"hello world").await; + sleep(Duration::from_secs(1)).await; + let _ = w.write_all(b"hello world").await; + let a = tokio::join!(w.shutdown()); + let b = tokio::join!(t); + println!("{:?}, {:?}", a, b) +} + +#[derive(Debug)] +enum StreamMessage { + Data(Vec), + Flush, + Close, +} + +struct Stream { + sender: Sender, + receiver: Receiver, +} + +impl AsyncRead for Stream { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let m = self.receiver.poll_recv(cx); + match m { + Poll::Pending => Poll::Pending, + Poll::Ready(m_opt) => match m_opt { + None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))), + Some(StreamMessage::Close) => { + self.receiver.close(); + Poll::Ready(Ok(())) + } + Some(StreamMessage::Flush) => Poll::Ready(Ok(())), + Some(StreamMessage::Data(mm)) => { + buf.put_slice(mm.as_slice()); + Poll::Ready(Ok(())) + } + } + } + } +} + +impl AsyncWrite for Stream { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pool_send(self, cx, StreamMessage::Flush, ()) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pool_send(self, cx, StreamMessage::Close, ()) + } +} + +fn pool_send(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll> { + let fut = s.sender.send(stream_message); + tokio::pin!( fut); + match fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), + } +} \ No newline at end of file diff --git a/__concurrent/async_study/src/main.rs b/__concurrent/async_study/src/main.rs index 2becc30..e68090e 100644 --- a/__concurrent/async_study/src/main.rs +++ b/__concurrent/async_study/src/main.rs @@ -9,90 +9,11 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::sleep; #[tokio::main] -async fn main() { - let (sender, receiver) = channel::(512); - let stream = Stream { sender, receiver }; - let (mut s, mut w) = tokio::io::split(stream); - - let t = tokio::spawn(async move { - let mut buf = Vec::::with_capacity(1024); - loop { - match s.read_buf(&mut buf).await { - Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]), - Err(e) => { - println!("Err: {:?}", e); - if e.to_string().contains("broken pipe") { - break; - } - } - } - } - }); - - sleep(Duration::from_secs(1)).await; - let _ = w.write_all(b"hello world").await; - sleep(Duration::from_secs(1)).await; - let _ = w.write_all(b"hello world").await; - sleep(Duration::from_secs(1)).await; - let _ = w.write_all(b"hello world").await; - let a = tokio::join!(w.shutdown()); - let b = tokio::join!(t); - println!("{:?}, {:?}", a, b) -} +async fn main() {} #[derive(Debug)] -enum StreamMessage { - Data(Vec), - Flush, - Close, +enum StreamPackage { + Data(u64, Vec), + Flush(u64), + Close(u64), } - -struct Stream { - sender: Sender, - receiver: Receiver, -} - -impl AsyncRead for Stream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let m = self.receiver.poll_recv(cx); - match m { - Poll::Pending => Poll::Pending, - Poll::Ready(m_opt) => match m_opt { - None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))), - Some(StreamMessage::Close) => { - self.receiver.close(); - Poll::Ready(Ok(())) - } - Some(StreamMessage::Flush) => Poll::Ready(Ok(())), - Some(StreamMessage::Data(mm)) => { - buf.put_slice(mm.as_slice()); - Poll::Ready(Ok(())) - } - } - } - } -} - -impl AsyncWrite for Stream { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pool_send(self, cx, StreamMessage::Flush, ()) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - pool_send(self, cx, StreamMessage::Close, ()) - } -} - -fn pool_send(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll> { - let fut = s.sender.send(stream_message); - tokio::pin!( fut); - match fut.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)), - Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), - } -} \ No newline at end of file