feat: update async study

This commit is contained in:
2022-03-19 17:43:50 +08:00
parent 03c3e1dab5
commit f1fcb8d6f4

View File

@@ -10,8 +10,7 @@ use tokio::time::sleep;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let (sender, receiver) = channel::<Vec<u8>>(512); let (sender, receiver) = channel::<StreamMessage>(512);
let stream = Stream { sender, receiver }; let stream = Stream { sender, receiver };
let (mut s, mut w) = tokio::io::split(stream); let (mut s, mut w) = tokio::io::split(stream);
@@ -19,11 +18,9 @@ async fn main() {
let mut buf = Vec::<u8>::with_capacity(1024); let mut buf = Vec::<u8>::with_capacity(1024);
loop { loop {
match s.read_buf(&mut buf).await { match s.read_buf(&mut buf).await {
Ok(len) => { Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]),
println!("Len: {}, Buf: {:?}", len, &buf[..len]);
}
Err(e) => { Err(e) => {
println!("Err: {}", e); println!("Err: {:?}", e);
if e.to_string().contains("broken pipe") { if e.to_string().contains("broken pipe") {
break; break;
} }
@@ -43,9 +40,16 @@ async fn main() {
println!("{:?}, {:?}", a, b) println!("{:?}, {:?}", a, b)
} }
#[derive(Debug)]
enum StreamMessage {
Data(Vec<u8>),
Flush,
Close,
}
struct Stream { struct Stream {
sender: Sender<Vec<u8>>, sender: Sender<StreamMessage>,
receiver: Receiver<Vec<u8>>, receiver: Receiver<StreamMessage>,
} }
impl AsyncRead for Stream { impl AsyncRead for Stream {
@@ -55,11 +59,14 @@ impl AsyncRead for Stream {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(m_opt) => match m_opt { Poll::Ready(m_opt) => match m_opt {
None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))), None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))),
Some(mm) => { Some(StreamMessage::Close) => {
if "close".as_bytes().to_vec() == mm { self.receiver.close();
self.receiver.close(); Poll::Ready(Ok(()))
} }
Poll::Ready(Ok(buf.put_slice(mm.as_slice()))) Some(StreamMessage::Flush) => Poll::Ready(Ok(())),
Some(StreamMessage::Data(mm)) => {
buf.put_slice(mm.as_slice());
Poll::Ready(Ok(()))
} }
} }
} }
@@ -68,35 +75,24 @@ impl AsyncRead for Stream {
impl AsyncWrite for Stream { impl AsyncWrite for Stream {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let mut fut = self.sender.send(Vec::from(buf)); pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len())
let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE??
match fut.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(buf.len())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
}
} }
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// NEED FLUSH? pool_send(self, cx, StreamMessage::Flush, ())
Poll::Ready(Ok(()))
} }
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// let mut fut = self.sender.send("close".as_bytes().to_vec()); pool_send(self, cx, StreamMessage::Close, ())
// let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE?? }
// match fut.poll(cx) { }
// Poll::Pending => Poll::Pending,
// Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), fn pool_send<R>(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll<io::Result<R>> {
// Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))), let fut = s.sender.send(stream_message);
// } tokio::pin!( fut);
let mut fut = self.sender.send("close".as_bytes().to_vec()); match fut.poll(cx) {
tokio::pin!( fut); Poll::Pending => Poll::Pending,
match fut.poll(cx) { Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)),
Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
}
// Poll::Ready(Ok(()))
} }
} }