diff --git a/__concurrent/async_study/src/main.rs b/__concurrent/async_study/src/main.rs index d7acaa2..2becc30 100644 --- a/__concurrent/async_study/src/main.rs +++ b/__concurrent/async_study/src/main.rs @@ -10,8 +10,7 @@ use tokio::time::sleep; #[tokio::main] async fn main() { - let (sender, receiver) = channel::>(512); - + let (sender, receiver) = channel::(512); let stream = Stream { sender, receiver }; let (mut s, mut w) = tokio::io::split(stream); @@ -19,11 +18,9 @@ async fn main() { let mut buf = Vec::::with_capacity(1024); loop { match s.read_buf(&mut buf).await { - Ok(len) => { - println!("Len: {}, Buf: {:?}", len, &buf[..len]); - } + Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]), Err(e) => { - println!("Err: {}", e); + println!("Err: {:?}", e); if e.to_string().contains("broken pipe") { break; } @@ -43,9 +40,16 @@ async fn main() { println!("{:?}, {:?}", a, b) } +#[derive(Debug)] +enum StreamMessage { + Data(Vec), + Flush, + Close, +} + struct Stream { - sender: Sender>, - receiver: Receiver>, + sender: Sender, + receiver: Receiver, } impl AsyncRead for Stream { @@ -55,11 +59,14 @@ impl AsyncRead for Stream { Poll::Pending => Poll::Pending, Poll::Ready(m_opt) => match m_opt { None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))), - Some(mm) => { - if "close".as_bytes().to_vec() == mm { - self.receiver.close(); - } - Poll::Ready(Ok(buf.put_slice(mm.as_slice()))) + Some(StreamMessage::Close) => { + self.receiver.close(); + Poll::Ready(Ok(())) + } + Some(StreamMessage::Flush) => Poll::Ready(Ok(())), + Some(StreamMessage::Data(mm)) => { + buf.put_slice(mm.as_slice()); + Poll::Ready(Ok(())) } } } @@ -68,35 +75,24 @@ impl AsyncRead for Stream { impl AsyncWrite for Stream { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let mut fut = self.sender.send(Vec::from(buf)); - let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE?? - match fut.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(buf.len())), - Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), - } + pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len()) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // NEED FLUSH? - Poll::Ready(Ok(())) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pool_send(self, cx, StreamMessage::Flush, ()) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // let mut fut = self.sender.send("close".as_bytes().to_vec()); - // let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE?? - // match fut.poll(cx) { - // Poll::Pending => Poll::Pending, - // Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - // Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), - // } - let mut fut = self.sender.send("close".as_bytes().to_vec()); - tokio::pin!( fut); - match fut.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), - } - // Poll::Ready(Ok(())) + pool_send(self, cx, StreamMessage::Close, ()) + } +} + +fn pool_send(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll> { + let fut = s.sender.send(stream_message); + tokio::pin!( fut); + match fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), } } \ No newline at end of file