use std::future::Future; use std::io::{self, ErrorKind}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::sleep; #[tokio::main] async fn main() { let (sender, receiver) = channel::(512); let stream = Stream { sender, receiver }; let (mut s, mut w) = tokio::io::split(stream); let t = tokio::spawn(async move { let mut buf = Vec::::with_capacity(1024); loop { match s.read_buf(&mut buf).await { Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]), Err(e) => { println!("Err: {:?}", e); if e.to_string().contains("broken pipe") { break; } } } } }); sleep(Duration::from_secs(1)).await; let _ = w.write_all(b"hello world").await; sleep(Duration::from_secs(1)).await; let _ = w.write_all(b"hello world").await; sleep(Duration::from_secs(1)).await; let _ = w.write_all(b"hello world").await; let a = tokio::join!(w.shutdown()); let b = tokio::join!(t); println!("{:?}, {:?}", a, b) } #[derive(Debug)] enum StreamMessage { Data(Vec), Flush, Close, } struct Stream { sender: Sender, receiver: Receiver, } impl AsyncRead for Stream { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { let m = self.receiver.poll_recv(cx); match m { Poll::Pending => Poll::Pending, Poll::Ready(m_opt) => match m_opt { None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))), Some(StreamMessage::Close) => { self.receiver.close(); Poll::Ready(Ok(())) } Some(StreamMessage::Flush) => Poll::Ready(Ok(())), Some(StreamMessage::Data(mm)) => { buf.put_slice(mm.as_slice()); Poll::Ready(Ok(())) } } } } } impl AsyncWrite for Stream { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pool_send(self, cx, StreamMessage::Flush, ()) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pool_send(self, cx, StreamMessage::Close, ()) } } fn pool_send(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll> { let fut = s.sender.send(stream_message); tokio::pin!( fut); match fut.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)), Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), } }