From e0b7a148bad5e57a3109e953024ec3bffd1c40da Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sat, 19 Mar 2022 22:55:26 +0800 Subject: [PATCH] feat: multiple stream by channel --- __concurrent/async_study/src/main.rs | 85 ++++++++++++++-------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/__concurrent/async_study/src/main.rs b/__concurrent/async_study/src/main.rs index 06d7767..00ede24 100644 --- a/__concurrent/async_study/src/main.rs +++ b/__concurrent/async_study/src/main.rs @@ -23,57 +23,60 @@ async fn main() { let left_stream_map = Arc::new(Mutex::new(HashMap::>::new())); let right_stream_map = Arc::new(Mutex::new(HashMap::>::new())); - let cloned_left_stream_map = left_stream_map.clone(); let cloned_right_stream_map = right_stream_map.clone(); loop_consume(left_receiver, cloned_left_stream_map); loop_consume(right_receiver, cloned_right_stream_map); - let new_stream_id = stream_id.fetch_add(1, Ordering::SeqCst); + let mut joins = vec![]; + for _ in 0..3 { + let new_stream_id = stream_id.fetch_add(1, Ordering::SeqCst); - let (left_new_sender, left_new_receiver) = channel::(16); + let left_stream = create_stream(new_stream_id, right_sender.clone(), left_stream_map.clone()); + let right_stream = create_stream(new_stream_id, left_sender.clone(), right_stream_map.clone()); + + let (left_stream_reader, mut left_stream_writer) = tokio::io::split(left_stream); + let (right_stream_reader, mut right_stream_writer) = tokio::io::split(right_stream); + + let a = loop_read(left_stream_reader, format!("#{} left reader", new_stream_id)); + let b = loop_read(right_stream_reader, format!("#{} right reader", new_stream_id)); + + let c = tokio::spawn(async move { + for i in 0..3 { + sleep(Duration::from_millis(100)).await; + let _ = left_stream_writer.write_all(format!("Left message: {} - {}", new_stream_id, i).as_bytes()).await; + } + let _ = left_stream_writer.shutdown().await; + }); + let d = tokio::spawn(async move { + for i in 0..3 { + sleep(Duration::from_millis(100)).await; + let _ = right_stream_writer.write_all(format!("Right message: {} - {}", new_stream_id, i).as_bytes()).await; + } + let _ = right_stream_writer.shutdown().await; + }); + + joins.push(a); + joins.push(b); + joins.push(c); + joins.push(d); + } + for j in joins { + let _ = tokio::join!(j); + } +} + +fn create_stream(channel_id: u64, sender: Sender, left_stream_map: Arc>>>) -> Stream { + let (new_sender, receiver) = channel::(16); { let stream_map_left_locked = &mut left_stream_map.lock().unwrap(); - stream_map_left_locked.insert(new_stream_id, left_new_sender); + stream_map_left_locked.insert(channel_id, new_sender); } - let left_stream = Stream { - channel_id: new_stream_id, - sender: right_sender.clone(), - receiver: left_new_receiver, - }; - let (right_new_sender, right_new_receiver) = channel::(16); - { - let stream_map_right_locked = &mut right_stream_map.lock().unwrap(); - stream_map_right_locked.insert(new_stream_id, right_new_sender); + Stream { + channel_id, + sender, + receiver, } - let right_stream = Stream { - channel_id: new_stream_id, - sender: left_sender.clone(), - receiver: right_new_receiver, - }; - - let (left_stream_reader, mut left_stream_writer) = tokio::io::split(left_stream); - let (right_stream_reader, mut right_stream_writer) = tokio::io::split(right_stream); - - let a = loop_read(left_stream_reader, format!("#{} left reader ", new_stream_id)); - let b = loop_read(right_stream_reader, format!("#{} right reader", new_stream_id)); - - let c = tokio::spawn(async move { - for i in 0..10 { - sleep(Duration::from_millis(100)).await; - let _ = left_stream_writer.write_all(format!("Left message: {}", i).as_bytes()).await; - } - let _ = left_stream_writer.shutdown().await; - }); - let d = tokio::spawn(async move { - for i in 0..10 { - sleep(Duration::from_millis(100)).await; - let _ = right_stream_writer.write_all(format!("Right message: {}", i).as_bytes()).await; - } - let _ = right_stream_writer.shutdown().await; - }); - - let _ = tokio::join!(a, b, c, d); } fn loop_read(mut reader: ReadHalf, tag: String) -> JoinHandle<()> {