feat: works

This commit is contained in:
2022-03-19 12:13:59 +08:00
parent 0534c43968
commit 03c3e1dab5
3 changed files with 133 additions and 2 deletions

View File

@@ -1,3 +1,102 @@
fn main() {
println!("Hello, world!");
use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let (sender, receiver) = channel::<Vec<u8>>(512);
let stream = Stream { sender, receiver };
let (mut s, mut w) = tokio::io::split(stream);
let t = tokio::spawn(async move {
let mut buf = Vec::<u8>::with_capacity(1024);
loop {
match s.read_buf(&mut buf).await {
Ok(len) => {
println!("Len: {}, Buf: {:?}", len, &buf[..len]);
}
Err(e) => {
println!("Err: {}", e);
if e.to_string().contains("broken pipe") {
break;
}
}
}
}
});
sleep(Duration::from_secs(1)).await;
let _ = w.write_all(b"hello world").await;
sleep(Duration::from_secs(1)).await;
let _ = w.write_all(b"hello world").await;
sleep(Duration::from_secs(1)).await;
let _ = w.write_all(b"hello world").await;
let a = tokio::join!(w.shutdown());
let b = tokio::join!(t);
println!("{:?}, {:?}", a, b)
}
struct Stream {
sender: Sender<Vec<u8>>,
receiver: Receiver<Vec<u8>>,
}
impl AsyncRead for Stream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let m = self.receiver.poll_recv(cx);
match m {
Poll::Pending => Poll::Pending,
Poll::Ready(m_opt) => match m_opt {
None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))),
Some(mm) => {
if "close".as_bytes().to_vec() == mm {
self.receiver.close();
}
Poll::Ready(Ok(buf.put_slice(mm.as_slice())))
}
}
}
}
}
impl AsyncWrite for Stream {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let mut fut = self.sender.send(Vec::from(buf));
let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE??
match fut.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(buf.len())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// NEED FLUSH?
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// let mut fut = self.sender.send("close".as_bytes().to_vec());
// let fut = unsafe { Pin::new_unchecked(&mut fut) }; // SAFE??
// match fut.poll(cx) {
// Poll::Pending => Poll::Pending,
// Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
// Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
// }
let mut fut = self.sender.send("close".as_bytes().to_vec());
tokio::pin!( fut);
match fut.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
}
// Poll::Ready(Ok(()))
}
}