From d62ca7173585ae2922bf1a4d6b331f9df55ef1a1 Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Fri, 14 May 2021 23:01:55 +0800 Subject: [PATCH] feat: works now --- src/main.rs | 280 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 244 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index ba0f757..8a0941f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,14 +30,15 @@ enum RoomMessageType { enum RoomMessageDownType { PeerEnter, PeerExit, + PeerList, PeerMessage, BroadcastMessage, - ReplayMessage, + ReplyMessage, } impl Default for RoomMessageDownType { fn default() -> Self { - Self::ReplayMessage + Self::ReplyMessage } } @@ -56,11 +57,39 @@ struct RoomMessage { struct RoomMessageDown { r#type: RoomMessageDownType, peer_id: Option, - replay_code: Option, - replay_message: Option, + reply_code: Option, + reply_message: Option, data: Option, } +impl RoomMessageDown { + fn create_error_reply(error_message: String) -> Self { + Self { + r#type: RoomMessageDownType::ReplyMessage, + reply_code: Some(500), + reply_message: Some(error_message), + ..Default::default() + } + } + fn create_success_reply(success_message: String) -> Self { + Self { + r#type: RoomMessageDownType::ReplyMessage, + reply_code: Some(200), + reply_message: Some(success_message), + ..Default::default() + } + } + fn create_peer_enter(peer_id: String) -> Self { + Self { + r#type: RoomMessageDownType::PeerEnter, + reply_code: Some(200), + reply_message: Some(format!("Peer {} entered", peer_id)), + peer_id: Some(peer_id), + ..Default::default() + } + } +} + type Tx = UnboundedSender; type PeerMap = Arc>>; type RoomMap = Arc>>>; @@ -72,10 +101,10 @@ async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: Tcp } async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) -> XResult<()> { - println!("Incoming TCP connection from: {}", addr); + information!("Incoming TCP connection from: {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; - println!("WebSocket connection established: {}", addr); + information!("WebSocket connection established: {}", addr); // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); @@ -89,52 +118,223 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea match msg { Message::Close(_) => { information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr); + peer_map.lock().unwrap().remove(&addr); + client_exit(&room_map, &room_id, &client_id); }, Message::Ping(msg) => { - tx.unbounded_send(Message::Pong(msg)).unwrap(); + tx.unbounded_send(Message::Pong(msg)).ok(); }, Message::Pong(_) => {}, Message::Binary(_) => { - warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id) + warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id); + let m = RoomMessageDown::create_error_reply("unknown error".into()); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } }, Message::Text(msg) => { - let msg = serde_json::from_str::(&msg); - match msg { + let room_message = serde_json::from_str::(&msg); + match room_message { Err(e) => { warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e); - let m = RoomMessageDown{ - r#type: RoomMessageDownType::PeerMessage, - ..Default::default() - }; + let m = RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } }, - Ok(msg) => { + Ok(room_message) => { + match room_message.r#type { + RoomMessageType::CreateOrEnter => { + if let (Some(_), Some(_)) = (&room_id, &client_id) { + warning!("Client is already in room: {:?} - {:?}", room_id, client_id); + let m = RoomMessageDown::create_error_reply("Client is already in room".into()); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } else { + if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { + let mut room_map = room_map.lock().unwrap(); + match room_map.get_mut(&msg_room_id) { + Some(client_map) => { + match client_map.get(&msg_client_id) { + Some(peer_addr) => { + if peer_addr == &addr { + information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); + let m = RoomMessageDown::create_success_reply("Duplicate enter to room".into()); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } else { + information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr); + let tx = peer_map.lock().unwrap().remove(peer_addr); + if let Some(tx) = tx { + tx.unbounded_send(Message::Close(None)).ok(); + } + client_map.insert(msg_client_id.clone(), addr); + } + }, + None => { + information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + client_map.insert(msg_client_id.clone(), addr); + let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); + if let Ok(mm) = serde_json::to_string(&m) { + for (client_id, client_addr) in client_map { + if client_id != &msg_client_id { + if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) { + client_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + } + } + } + }, + } + }, + None => { + information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + let mut client_map = BTreeMap::new(); + client_map.insert(msg_client_id.clone(), addr); + room_map.insert(msg_room_id.clone(), client_map); + }, + } + room_id = Some(msg_room_id); + client_id = Some(msg_client_id); + } else { + let m = RoomMessageDown::create_error_reply("Room id and client id must both assigned".into()); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } + } + }, + RoomMessageType::Exit => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + let mut room_map = room_map.lock().unwrap(); + if let Some(client_map) = room_map.get_mut(room_id) { + client_map.remove(client_id); + peer_map.lock().unwrap().remove(&addr); + tx.unbounded_send(Message::Close(None)).ok(); + } else { + warning!("Not in room: {:?} - {:?}", room_id, client_id); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Destroy => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("Destroy room: {:?} - {:?}", room_id, client_id); + let mut room_map = room_map.lock().unwrap(); + let client_map = room_map.remove(room_id); + if let Some(client_map) = client_map { + for (_client_id, client_addr) in client_map { + let client_tx = peer_map.lock().unwrap().remove(&client_addr); + if let Some(client_tx) = client_tx { + client_tx.unbounded_send(Message::Close(None)).ok(); + } + } + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::ListPeers => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("List room peers: {:?} - {:?}", room_id, client_id); + let room_map = room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + let mut client_ids = vec![]; + if let Some(client_map) = client_map { + for (client_id, _client_addr) in client_map { + client_ids.push(client_id.clone()); + } + } + let client_ids_data = serde_json::to_string(&client_ids); + if let Ok(client_ids_data) = client_ids_data { + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerList, + reply_code: Some(200), + reply_message: Some("ok".into()), + data: Some(client_ids_data), + ..Default::default() + }; + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Broadcast => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("Broadcast room message: {:?} - {:?}", room_id, client_id); + let room_map = room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + + let data = room_message.data; + let m = RoomMessageDown { + r#type: RoomMessageDownType::BroadcastMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(client_id.clone()), + data, + }; + if let Ok(mm) = serde_json::to_string(&m) { + if let Some(client_map) = client_map { + for (peer_client_id, peer_client_addr) in client_map { + if peer_client_id != client_id { + if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { + peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + } + } + } + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Peer => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); + if let Some(peer_id) = room_message.peer_id.as_ref() { + let room_map = room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + + let data = room_message.data; + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(client_id.clone()), + data, + }; + if let Ok(mm) = serde_json::to_string(&m) { + if let Some(client_map) = client_map { + if let Some(peer_client_addr) = client_map.get(peer_id) { + if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { + peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + } else { + let m = RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } + } + } + } + } else { + client_not_in_room(&tx); + } + }, + } }, } - // TODO ... - // create/enter room - // exit room - // destroy room - // list peers - // enter peer - // exit peer - // send message - // - broadcast - // - peer }, } - // println!("Received a message from {}: {}", addr, msg.to_text().unwrap()); - // let peers = peer_map.lock().unwrap(); - // - // // We want to broadcast the message to everyone except ourselves. - // let broadcast_recipients = - // peers.iter().filter(|(peer_addr, _)| peer_addr != &&addr).map(|(_, ws_sink)| ws_sink); - // - // for recp in broadcast_recipients { - // recp.unbounded_send(msg.clone()).unwrap(); - // } - future::ok(()) }); @@ -150,6 +350,14 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea Ok(()) } +fn client_not_in_room(tx: &UnboundedSender) { + information!("Client is not in room"); + let m = RoomMessageDown::create_error_reply("Client is not in room".into()); + if let Ok(mm) = serde_json::to_string(&m) { + tx.unbounded_send(Message::Text(mm)).ok(); + } +} + fn client_exit(room_map: &RoomMap, room_id: &Option, client_id: &Option) { if let (Some(room_id), Some(client_id)) = (room_id, client_id) { let mut room_map = room_map.lock().unwrap();