feat: multiple stream by channel

This commit is contained in:
2022-03-19 22:55:26 +08:00
parent db42a77035
commit e0b7a148ba

View File

@@ -23,57 +23,60 @@ async fn main() {
let left_stream_map = Arc::new(Mutex::new(HashMap::<u64, Sender<StreamPackage>>::new())); let left_stream_map = Arc::new(Mutex::new(HashMap::<u64, Sender<StreamPackage>>::new()));
let right_stream_map = Arc::new(Mutex::new(HashMap::<u64, Sender<StreamPackage>>::new())); let right_stream_map = Arc::new(Mutex::new(HashMap::<u64, Sender<StreamPackage>>::new()));
let cloned_left_stream_map = left_stream_map.clone(); let cloned_left_stream_map = left_stream_map.clone();
let cloned_right_stream_map = right_stream_map.clone(); let cloned_right_stream_map = right_stream_map.clone();
loop_consume(left_receiver, cloned_left_stream_map); loop_consume(left_receiver, cloned_left_stream_map);
loop_consume(right_receiver, cloned_right_stream_map); loop_consume(right_receiver, cloned_right_stream_map);
let new_stream_id = stream_id.fetch_add(1, Ordering::SeqCst); let mut joins = vec![];
for _ in 0..3 {
let new_stream_id = stream_id.fetch_add(1, Ordering::SeqCst);
let (left_new_sender, left_new_receiver) = channel::<StreamPackage>(16); let left_stream = create_stream(new_stream_id, right_sender.clone(), left_stream_map.clone());
let right_stream = create_stream(new_stream_id, left_sender.clone(), right_stream_map.clone());
let (left_stream_reader, mut left_stream_writer) = tokio::io::split(left_stream);
let (right_stream_reader, mut right_stream_writer) = tokio::io::split(right_stream);
let a = loop_read(left_stream_reader, format!("#{} left reader", new_stream_id));
let b = loop_read(right_stream_reader, format!("#{} right reader", new_stream_id));
let c = tokio::spawn(async move {
for i in 0..3 {
sleep(Duration::from_millis(100)).await;
let _ = left_stream_writer.write_all(format!("Left message: {} - {}", new_stream_id, i).as_bytes()).await;
}
let _ = left_stream_writer.shutdown().await;
});
let d = tokio::spawn(async move {
for i in 0..3 {
sleep(Duration::from_millis(100)).await;
let _ = right_stream_writer.write_all(format!("Right message: {} - {}", new_stream_id, i).as_bytes()).await;
}
let _ = right_stream_writer.shutdown().await;
});
joins.push(a);
joins.push(b);
joins.push(c);
joins.push(d);
}
for j in joins {
let _ = tokio::join!(j);
}
}
fn create_stream(channel_id: u64, sender: Sender<StreamPackage>, left_stream_map: Arc<Mutex<HashMap<u64, Sender<StreamPackage>>>>) -> Stream {
let (new_sender, receiver) = channel::<StreamPackage>(16);
{ {
let stream_map_left_locked = &mut left_stream_map.lock().unwrap(); let stream_map_left_locked = &mut left_stream_map.lock().unwrap();
stream_map_left_locked.insert(new_stream_id, left_new_sender); stream_map_left_locked.insert(channel_id, new_sender);
} }
let left_stream = Stream { Stream {
channel_id: new_stream_id, channel_id,
sender: right_sender.clone(), sender,
receiver: left_new_receiver, receiver,
};
let (right_new_sender, right_new_receiver) = channel::<StreamPackage>(16);
{
let stream_map_right_locked = &mut right_stream_map.lock().unwrap();
stream_map_right_locked.insert(new_stream_id, right_new_sender);
} }
let right_stream = Stream {
channel_id: new_stream_id,
sender: left_sender.clone(),
receiver: right_new_receiver,
};
let (left_stream_reader, mut left_stream_writer) = tokio::io::split(left_stream);
let (right_stream_reader, mut right_stream_writer) = tokio::io::split(right_stream);
let a = loop_read(left_stream_reader, format!("#{} left reader ", new_stream_id));
let b = loop_read(right_stream_reader, format!("#{} right reader", new_stream_id));
let c = tokio::spawn(async move {
for i in 0..10 {
sleep(Duration::from_millis(100)).await;
let _ = left_stream_writer.write_all(format!("Left message: {}", i).as_bytes()).await;
}
let _ = left_stream_writer.shutdown().await;
});
let d = tokio::spawn(async move {
for i in 0..10 {
sleep(Duration::from_millis(100)).await;
let _ = right_stream_writer.write_all(format!("Right message: {}", i).as_bytes()).await;
}
let _ = right_stream_writer.shutdown().await;
});
let _ = tokio::join!(a, b, c, d);
} }
fn loop_read(mut reader: ReadHalf<Stream>, tag: String) -> JoinHandle<()> { fn loop_read(mut reader: ReadHalf<Stream>, tag: String) -> JoinHandle<()> {