From 7007a773348f0c34728865c088d970ff291de4b8 Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sat, 19 Mar 2022 23:36:45 +0800 Subject: [PATCH] refactor: methods --- __concurrent/async_study/src/main.rs | 49 ++++++++++++++-------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/__concurrent/async_study/src/main.rs b/__concurrent/async_study/src/main.rs index 33a24ff..79c8d24 100644 --- a/__concurrent/async_study/src/main.rs +++ b/__concurrent/async_study/src/main.rs @@ -8,14 +8,14 @@ use std::task::{Context, Poll}; use std::time::Duration; use bytes::BytesMut; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf, ReadHalf}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf, ReadHalf, WriteHalf}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::task::JoinHandle; use tokio::time::sleep; #[tokio::main] async fn main() { - let stream_id = AtomicU64::new(1); + let channel_id = AtomicU64::new(1); let (left_to_right_sender, left_to_right_receiver) = channel::(512); let (right_to_left_sender, right_to_left_receiver) = channel::(512); @@ -28,30 +28,19 @@ async fn main() { let mut joins = vec![]; for _ in 0..3 { - let new_stream_id = stream_id.fetch_add(1, Ordering::SeqCst); + let new_channel_id = channel_id.fetch_add(1, Ordering::SeqCst); - let right_stream = create_stream(new_stream_id, right_to_left_sender.clone(), right_consumer_stream_map.clone()); - let left_stream = create_stream(new_stream_id, left_to_right_sender.clone(), left_consumer_stream_map.clone()); + let right_stream = create_stream(new_channel_id, right_to_left_sender.clone(), right_consumer_stream_map.clone()); + let left_stream = create_stream(new_channel_id, left_to_right_sender.clone(), left_consumer_stream_map.clone()); - let (right_stream_reader, mut right_stream_writer) = tokio::io::split(right_stream); - let (left_stream_reader, mut left_stream_writer) = tokio::io::split(left_stream); + let (right_stream_reader, right_stream_writer) = tokio::io::split(right_stream); + let (left_stream_reader, left_stream_writer) = tokio::io::split(left_stream); - let a = loop_read(right_stream_reader, format!("#{} right reader", new_stream_id)); - let b = loop_read(left_stream_reader, format!("#{} left reader", new_stream_id)); - let c = tokio::spawn(async move { - for i in 0..3 { - sleep(Duration::from_millis(100)).await; - let _ = right_stream_writer.write_all(format!("Right send message: {} - {}", new_stream_id, i).as_bytes()).await; - } - let _ = right_stream_writer.shutdown().await; - }); - let d = tokio::spawn(async move { - for i in 0..3 { - sleep(Duration::from_millis(100)).await; - let _ = left_stream_writer.write_all(format!("Left send message: {} - {}", new_stream_id, i).as_bytes()).await; - } - let _ = left_stream_writer.shutdown().await; - }); + let a = loop_read(right_stream_reader, format!("#{} right reader", new_channel_id)); + let b = loop_read(left_stream_reader, format!("#{} left reader", new_channel_id)); + + let c = loop_send(new_channel_id, right_stream_writer, format!("right sender")); + let d = loop_send(new_channel_id, left_stream_writer, format!("left sender")); joins.extend([a, b, c, d]); } @@ -60,10 +49,10 @@ async fn main() { } } -fn create_stream(channel_id: u64, sender: Sender, left_stream_map: Arc>>>) -> Stream { +fn create_stream(channel_id: u64, sender: Sender, consumer_stream_map: Arc>>>) -> Stream { let (new_sender, receiver) = channel::(16); { - let stream_map_left_locked = &mut left_stream_map.lock().unwrap(); + let stream_map_left_locked = &mut consumer_stream_map.lock().unwrap(); stream_map_left_locked.insert(channel_id, new_sender); } Stream { @@ -73,6 +62,16 @@ fn create_stream(channel_id: u64, sender: Sender, left_stream_map } } +fn loop_send(channel_id: u64, mut writer: WriteHalf, tag: String) -> JoinHandle<()> { + tokio::spawn(async move { + for i in 0..3 { + sleep(Duration::from_millis(100)).await; + let _ = writer.write_all(format!("Send message: [{}] {} - {}", &tag, channel_id, i).as_bytes()).await; + } + let _ = writer.shutdown().await; + }) +} + fn loop_read(mut reader: ReadHalf, tag: String) -> JoinHandle<()> { tokio::spawn(async move { let mut buf = BytesMut::with_capacity(4096);