From 9f5ceedafeb65c0989e1474f473052f892955d53 Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Mon, 17 May 2021 01:36:49 +0800 Subject: [PATCH] feat: add handle context --- src/main.rs | 99 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/src/main.rs b/src/main.rs index 61011ab..1b1a076 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,14 +101,23 @@ impl RoomMessageDown { } } -async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) { - if let Err(e) = inner_handle_connection(peer_map, room_map, raw_stream, addr).await { +#[derive(Debug, Clone)] +struct HandleContext { + peer_map: PeerMap, + room_map: RoomMap, + is_admin: bool, + room_id: Option, + client_id: Option, +} + +async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) { + if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await { failure!("Handler connection error: {}", e); } } async fn inner_handle_connection( - peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr + mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr ) -> XResult<()> { information!("Incoming TCP connection from: {}", addr); @@ -117,25 +126,22 @@ async fn inner_handle_connection( // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); - { peer_map.lock().unwrap().insert(addr, tx.clone()); } + { handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); } let (outgoing, incoming) = ws_stream.split(); - let mut is_admin = false; let admin_pass = env::var("PASS").ok(); - let mut room_id: Option = None; - let mut client_id: Option = None; let broadcast_incoming = incoming.try_for_each(|msg| { match msg { Message::Close(_) => { - information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr); - peer_map.lock().unwrap().remove(&addr); - client_exit(&room_map, &room_id, &client_id); + information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr); + handle_context.peer_map.lock().unwrap().remove(&addr); + client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id); }, Message::Ping(_) => {}, Message::Pong(_) => {}, Message::Binary(_) => { - warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id); + warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); RoomMessageDown::create_error_reply("binary not supported").send(&tx); }, Message::Text(msg) => { @@ -150,8 +156,8 @@ async fn inner_handle_connection( tx.unbounded_send(Message::Text( format!( "room count: {}\npeer count: {}", - room_map.lock().unwrap().len(), - peer_map.lock().unwrap().len(), + handle_context.room_map.lock().unwrap().len(), + handle_context.peer_map.lock().unwrap().len(), ) )).ok(); return future::ok(()); @@ -160,7 +166,7 @@ async fn inner_handle_connection( let pass = &msg[6..]; if let Some(admin_pass) = &admin_pass { if admin_pass == pass { - is_admin = true; + handle_context.is_admin = true; tx.unbounded_send(Message::Text("Admin password success".into())).ok(); } else { tx.unbounded_send(Message::Text("Admin password error".into())).ok(); @@ -168,9 +174,9 @@ async fn inner_handle_connection( } return future::ok(()); } - if is_admin { + if handle_context.is_admin { if msg == "/rooms" { - let rooms = room_map.lock().unwrap().keys().map( + let rooms = handle_context.room_map.lock().unwrap().keys().map( |k| k.clone() ).collect::>().join("\n"); if rooms.is_empty() { @@ -182,7 +188,7 @@ async fn inner_handle_connection( } if msg.starts_with("/room ") { let room_id = &msg[6..]; - let room_map = room_map.lock().unwrap(); + let room_map = handle_context.room_map.lock().unwrap(); let mut client_ids = vec![]; if let Some(client_map) = room_map.get(room_id) { for peer_client_id in client_map.keys() { @@ -204,19 +210,19 @@ async fn inner_handle_connection( let room_message = match serde_json::from_str::(&msg) { Ok(room_message) => room_message, Err(e) => { - warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e); + warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); return future::ok(()); } }; match room_message.r#type { RoomMessageType::CreateOrEnter => { - if let (Some(_), Some(_)) = (&room_id, &client_id) { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { warning!("Client is already in room: {:?} - {:?}", room_id, client_id); RoomMessageDown::create_error_reply("Client is already in room").send(&tx); } else { if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { - let mut room_map = room_map.lock().unwrap(); + let mut room_map = handle_context.room_map.lock().unwrap(); match room_map.get_mut(&msg_room_id) { Some(client_map) => { match client_map.get(&msg_client_id) { @@ -229,11 +235,12 @@ async fn inner_handle_connection( "Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr ); - if let Some(tx) = peer_map.lock().unwrap().remove(peer_addr) { + let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr); + if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) { tx.unbounded_send(Message::Close(None)).ok(); } client_map.insert(msg_client_id.clone(), addr); - RoomMessageDown::create_success_reply(format!("Client replaced {:?} -> {:?}", peer_addr, addr)).send(&tx); + RoomMessageDown::create_success_reply(client_replaced_message).send(&tx); } }, None => { @@ -244,7 +251,7 @@ async fn inner_handle_connection( if let Ok(mm) = serde_json::to_string(&m) { for (client_id, client_addr) in client_map { if client_id != &msg_client_id { - if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { client_tx.unbounded_send(Message::Text(mm.clone())).ok(); } } @@ -262,19 +269,19 @@ async fn inner_handle_connection( RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(&tx); }, } - room_id = Some(msg_room_id); - client_id = Some(msg_client_id); + handle_context.room_id = Some(msg_room_id); + handle_context.client_id = Some(msg_client_id); } else { RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx); } } }, RoomMessageType::Exit => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { - let mut room_map = room_map.lock().unwrap(); + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + let mut room_map = handle_context.room_map.lock().unwrap(); if let Some(client_map) = room_map.get_mut(room_id) { client_map.remove(client_id); - peer_map.lock().unwrap().remove(&addr); + handle_context.peer_map.lock().unwrap().remove(&addr); tx.unbounded_send(Message::Close(None)).ok(); } else { warning!("Not in room: {:?} - {:?}", room_id, client_id); @@ -284,13 +291,13 @@ async fn inner_handle_connection( } }, RoomMessageType::Destroy => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Destroy room: {:?} - {:?}", room_id, client_id); - let mut room_map = room_map.lock().unwrap(); + let mut room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.remove(room_id); if let Some(client_map) = client_map { for (_client_id, client_addr) in client_map { - if let Some(client_tx) = peer_map.lock().unwrap().remove(&client_addr) { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().remove(&client_addr) { client_tx.unbounded_send(Message::Close(None)).ok(); } } @@ -300,9 +307,9 @@ async fn inner_handle_connection( } }, RoomMessageType::ListPeers => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("List room peers: {:?} - {:?}", room_id, client_id); - let room_map = room_map.lock().unwrap(); + let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); let mut client_ids = vec![]; if let Some(client_map) = client_map { @@ -326,9 +333,9 @@ async fn inner_handle_connection( } }, RoomMessageType::Broadcast => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Broadcast room message: {:?} - {:?}", room_id, client_id); - let room_map = room_map.lock().unwrap(); + let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); let data = room_message.data; @@ -344,7 +351,7 @@ async fn inner_handle_connection( let mut sent_messages = 0; for (peer_client_id, peer_client_addr) in client_map { if peer_client_id != client_id { - if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { + if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { sent_messages += 1; peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); } @@ -358,10 +365,10 @@ async fn inner_handle_connection( } }, RoomMessageType::Peer => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); if let Some(peer_id) = room_message.peer_id.as_ref() { - let room_map = room_map.lock().unwrap(); + let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); let data = room_message.data; @@ -375,7 +382,7 @@ async fn inner_handle_connection( if let Ok(mm) = serde_json::to_string(&m) { if let Some(client_map) = client_map { if let Some(peer_client_addr) = client_map.get(peer_id) { - if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { + if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); } RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(&tx); @@ -402,8 +409,8 @@ async fn inner_handle_connection( future::select(broadcast_incoming, receive_from_others).await; information!("{} disconnected", &addr); - client_exit(&room_map, &room_id, &client_id); - peer_map.lock().unwrap().remove(&addr); + client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id); + handle_context.peer_map.lock().unwrap().remove(&addr); Ok(()) } @@ -434,6 +441,14 @@ async fn main() -> Result<(), IoError> { let state = PeerMap::new(Mutex::new(HashMap::new())); let room = RoomMap::new(Mutex::new(BTreeMap::new())); + let handle_context = HandleContext { + peer_map: state, + room_map: room, + is_admin: false, + room_id: None, + client_id: None, + }; + // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; let listener = try_socket.expect(&format!("Failed to bind ok: {}", addr)); @@ -441,7 +456,7 @@ async fn main() -> Result<(), IoError> { // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { - tokio::spawn(handle_connection(state.clone(), room.clone(), stream, addr)); + tokio::spawn(handle_connection(handle_context.clone(), stream, addr)); } Ok(())