diff --git a/__concurrent/async_study/src/main.rs b/__concurrent/async_study/src/main.rs index 00ede24..4f34879 100644 --- a/__concurrent/async_study/src/main.rs +++ b/__concurrent/async_study/src/main.rs @@ -17,30 +17,27 @@ use tokio::time::sleep; async fn main() { let stream_id = AtomicU64::new(1); - let (left_sender, left_receiver) = channel::(512); - let (right_sender, right_receiver) = channel::(512); + let (left_to_right_sender, left_to_right_receiver) = channel::(512); + let (right_to_left_sender, right_to_left_receiver) = channel::(512); - let left_stream_map = Arc::new(Mutex::new(HashMap::>::new())); - let right_stream_map = Arc::new(Mutex::new(HashMap::>::new())); + let right_consumer_stream_map = Arc::new(Mutex::new(HashMap::>::new())); + let left_consumer_stream_map = Arc::new(Mutex::new(HashMap::>::new())); - let cloned_left_stream_map = left_stream_map.clone(); - let cloned_right_stream_map = right_stream_map.clone(); - loop_consume(left_receiver, cloned_left_stream_map); - loop_consume(right_receiver, cloned_right_stream_map); + loop_consume(left_to_right_receiver, right_consumer_stream_map.clone()); + loop_consume(right_to_left_receiver, left_consumer_stream_map.clone()); let mut joins = vec![]; for _ in 0..3 { let new_stream_id = stream_id.fetch_add(1, Ordering::SeqCst); - let left_stream = create_stream(new_stream_id, right_sender.clone(), left_stream_map.clone()); - let right_stream = create_stream(new_stream_id, left_sender.clone(), right_stream_map.clone()); + let left_stream = create_stream(new_stream_id, right_to_left_sender.clone(), right_consumer_stream_map.clone()); + let right_stream = create_stream(new_stream_id, left_to_right_sender.clone(), left_consumer_stream_map.clone()); let (left_stream_reader, mut left_stream_writer) = tokio::io::split(left_stream); let (right_stream_reader, mut right_stream_writer) = tokio::io::split(right_stream); let a = loop_read(left_stream_reader, format!("#{} left reader", new_stream_id)); let b = loop_read(right_stream_reader, format!("#{} right reader", new_stream_id)); - let c = tokio::spawn(async move { for i in 0..3 { sleep(Duration::from_millis(100)).await; @@ -56,10 +53,7 @@ async fn main() { let _ = right_stream_writer.shutdown().await; }); - joins.push(a); - joins.push(b); - joins.push(c); - joins.push(d); + joins.extend([a, b, c, d]); } for j in joins { let _ = tokio::join!(j);