feat: add stream fork/join
This commit is contained in:
98
__concurrent/async_study/examples/stream.rs
Normal file
98
__concurrent/async_study/examples/stream.rs
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
use std::io::{self, ErrorKind};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||||
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let (sender, receiver) = channel::<StreamMessage>(512);
|
||||||
|
let stream = Stream { sender, receiver };
|
||||||
|
let (mut s, mut w) = tokio::io::split(stream);
|
||||||
|
|
||||||
|
let t = tokio::spawn(async move {
|
||||||
|
let mut buf = Vec::<u8>::with_capacity(1024);
|
||||||
|
loop {
|
||||||
|
match s.read_buf(&mut buf).await {
|
||||||
|
Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]),
|
||||||
|
Err(e) => {
|
||||||
|
println!("Err: {:?}", e);
|
||||||
|
if e.to_string().contains("broken pipe") {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
let _ = w.write_all(b"hello world").await;
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
let _ = w.write_all(b"hello world").await;
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
let _ = w.write_all(b"hello world").await;
|
||||||
|
let a = tokio::join!(w.shutdown());
|
||||||
|
let b = tokio::join!(t);
|
||||||
|
println!("{:?}, {:?}", a, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum StreamMessage {
|
||||||
|
Data(Vec<u8>),
|
||||||
|
Flush,
|
||||||
|
Close,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Stream {
|
||||||
|
sender: Sender<StreamMessage>,
|
||||||
|
receiver: Receiver<StreamMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for Stream {
|
||||||
|
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
|
||||||
|
let m = self.receiver.poll_recv(cx);
|
||||||
|
match m {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(m_opt) => match m_opt {
|
||||||
|
None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))),
|
||||||
|
Some(StreamMessage::Close) => {
|
||||||
|
self.receiver.close();
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
Some(StreamMessage::Flush) => Poll::Ready(Ok(())),
|
||||||
|
Some(StreamMessage::Data(mm)) => {
|
||||||
|
buf.put_slice(mm.as_slice());
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWrite for Stream {
|
||||||
|
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
|
||||||
|
pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
pool_send(self, cx, StreamMessage::Flush, ())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
pool_send(self, cx, StreamMessage::Close, ())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pool_send<R>(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll<io::Result<R>> {
|
||||||
|
let fut = s.sender.send(stream_message);
|
||||||
|
tokio::pin!( fut);
|
||||||
|
match fut.poll(cx) {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)),
|
||||||
|
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,90 +9,11 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
|
|||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {}
|
||||||
let (sender, receiver) = channel::<StreamMessage>(512);
|
|
||||||
let stream = Stream { sender, receiver };
|
|
||||||
let (mut s, mut w) = tokio::io::split(stream);
|
|
||||||
|
|
||||||
let t = tokio::spawn(async move {
|
|
||||||
let mut buf = Vec::<u8>::with_capacity(1024);
|
|
||||||
loop {
|
|
||||||
match s.read_buf(&mut buf).await {
|
|
||||||
Ok(len) => println!("Len: {}, Buf: {:?}", len, &buf[..len]),
|
|
||||||
Err(e) => {
|
|
||||||
println!("Err: {:?}", e);
|
|
||||||
if e.to_string().contains("broken pipe") {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
let _ = w.write_all(b"hello world").await;
|
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
let _ = w.write_all(b"hello world").await;
|
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
let _ = w.write_all(b"hello world").await;
|
|
||||||
let a = tokio::join!(w.shutdown());
|
|
||||||
let b = tokio::join!(t);
|
|
||||||
println!("{:?}, {:?}", a, b)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum StreamMessage {
|
enum StreamPackage {
|
||||||
Data(Vec<u8>),
|
Data(u64, Vec<u8>),
|
||||||
Flush,
|
Flush(u64),
|
||||||
Close,
|
Close(u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Stream {
|
|
||||||
sender: Sender<StreamMessage>,
|
|
||||||
receiver: Receiver<StreamMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for Stream {
|
|
||||||
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
|
|
||||||
let m = self.receiver.poll_recv(cx);
|
|
||||||
match m {
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(m_opt) => match m_opt {
|
|
||||||
None => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, "broken pipe"))),
|
|
||||||
Some(StreamMessage::Close) => {
|
|
||||||
self.receiver.close();
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
Some(StreamMessage::Flush) => Poll::Ready(Ok(())),
|
|
||||||
Some(StreamMessage::Data(mm)) => {
|
|
||||||
buf.put_slice(mm.as_slice());
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncWrite for Stream {
|
|
||||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
|
|
||||||
pool_send(self, cx, StreamMessage::Data(Vec::from(buf)), buf.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
||||||
pool_send(self, cx, StreamMessage::Flush, ())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
||||||
pool_send(self, cx, StreamMessage::Close, ())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pool_send<R>(s: Pin<&mut Stream>, cx: &mut Context<'_>, stream_message: StreamMessage, r: R) -> Poll<io::Result<R>> {
|
|
||||||
let fut = s.sender.send(stream_message);
|
|
||||||
tokio::pin!( fut);
|
|
||||||
match fut.poll(cx) {
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Ok(_)) => Poll::Ready(Ok(r)),
|
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(ErrorKind::BrokenPipe, e.to_string()))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user