From e749f6628a45317037a1d081c853f424efff7145 Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Fri, 14 May 2021 23:40:30 +0800 Subject: [PATCH] feat: works --- src/main.rs | 413 ++++++++++++++++++++++++++++------------------------ 1 file changed, 223 insertions(+), 190 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8a0941f..b71abde 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,23 +63,24 @@ struct RoomMessageDown { } impl RoomMessageDown { - fn create_error_reply(error_message: String) -> Self { + fn create_error_reply(error_message: S) -> Self where S: Into { Self { r#type: RoomMessageDownType::ReplyMessage, reply_code: Some(500), - reply_message: Some(error_message), + reply_message: Some(error_message.into()), ..Default::default() } } - fn create_success_reply(success_message: String) -> Self { + fn create_success_reply(success_message: S) -> Self where S: Into { Self { r#type: RoomMessageDownType::ReplyMessage, reply_code: Some(200), - reply_message: Some(success_message), + reply_message: Some(success_message.into()), ..Default::default() } } - fn create_peer_enter(peer_id: String) -> Self { + fn create_peer_enter(peer_id: S) -> Self where S: Into { + let peer_id = peer_id.into(); Self { r#type: RoomMessageDownType::PeerEnter, reply_code: Some(200), @@ -88,6 +89,12 @@ impl RoomMessageDown { ..Default::default() } } + + fn send(&self, tx: &UnboundedSender) { + if let Ok(mm) = serde_json::to_string(self) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } } type Tx = UnboundedSender; @@ -112,6 +119,8 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea let (outgoing, incoming) = ws_stream.split(); + let mut is_admin = false; + let admin_pass = std::env::var("PASS").ok(); let mut room_id: Option = None; let mut client_id: Option = None; let broadcast_incoming = incoming.try_for_each(|msg| { @@ -127,208 +136,235 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea Message::Pong(_) => {}, Message::Binary(_) => { warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id); - let m = RoomMessageDown::create_error_reply("unknown error".into()); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } + RoomMessageDown::create_error_reply("unknown error").send(&tx); }, Message::Text(msg) => { - let room_message = serde_json::from_str::(&msg); - match room_message { + if msg.is_empty() { + return future::ok(()); + } + if msg == "/statics" { + tx.unbounded_send(Message::Text( + format!( + "room count: {}\npeer count: {}", + room_map.lock().unwrap().len(), + peer_map.lock().unwrap().len(), + ) + )).ok(); + return future::ok(()); + } + if msg.starts_with("/pass ") { + let pass = &msg[6..]; + if let Some(admin_pass) = &admin_pass { + if admin_pass == pass { + is_admin = true; + tx.unbounded_send(Message::Text("Admin password success".into())).ok(); + } else { + tx.unbounded_send(Message::Text("Admin password error".into())).ok(); + } + } + return future::ok(()); + } + if is_admin { + if msg == "/rooms" { + let rooms = room_map.lock().unwrap().keys().map( + |k| k.clone() + ).collect::>().join("\n"); + tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok(); + return future::ok(()); + } + if msg.starts_with("/room ") { + let room_id = &msg[6..]; + let room_map = room_map.lock().unwrap(); + let mut client_ids = vec![]; + if let Some(client_map) = room_map.get(room_id) { + for peer_client_id in client_map.keys() { + client_ids.push(peer_client_id.clone()); + } + tx.unbounded_send(Message::Text(format!("clients in room {}:\n{}", room_id, client_ids.join("\n")))).ok(); + } else { + tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok(); + } + return future::ok(()); + } + } + let room_message = match serde_json::from_str::(&msg) { + Ok(room_message) => room_message, Err(e) => { warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e); - let m = RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } - }, - Ok(room_message) => { - match room_message.r#type { - RoomMessageType::CreateOrEnter => { - if let (Some(_), Some(_)) = (&room_id, &client_id) { - warning!("Client is already in room: {:?} - {:?}", room_id, client_id); - let m = RoomMessageDown::create_error_reply("Client is already in room".into()); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } - } else { - if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { - let mut room_map = room_map.lock().unwrap(); - match room_map.get_mut(&msg_room_id) { - Some(client_map) => { - match client_map.get(&msg_client_id) { - Some(peer_addr) => { - if peer_addr == &addr { - information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); - let m = RoomMessageDown::create_success_reply("Duplicate enter to room".into()); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } - } else { - information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr); - let tx = peer_map.lock().unwrap().remove(peer_addr); - if let Some(tx) = tx { - tx.unbounded_send(Message::Close(None)).ok(); - } - client_map.insert(msg_client_id.clone(), addr); - } - }, - None => { - information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); - client_map.insert(msg_client_id.clone(), addr); - - let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); - if let Ok(mm) = serde_json::to_string(&m) { - for (client_id, client_addr) in client_map { - if client_id != &msg_client_id { - if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) { - client_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } - } - } - } - }, + RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); + return future::ok(()); + } + }; + match room_message.r#type { + RoomMessageType::CreateOrEnter => { + if let (Some(_), Some(_)) = (&room_id, &client_id) { + warning!("Client is already in room: {:?} - {:?}", room_id, client_id); + RoomMessageDown::create_error_reply("Client is already in room").send(&tx); + } else { + if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { + let mut room_map = room_map.lock().unwrap(); + match room_map.get_mut(&msg_room_id) { + Some(client_map) => { + match client_map.get(&msg_client_id) { + Some(peer_addr) => { + if peer_addr == &addr { + information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); + RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx); + } else { + information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr); + let tx = peer_map.lock().unwrap().remove(peer_addr); + if let Some(tx) = tx { + tx.unbounded_send(Message::Close(None)).ok(); + } + client_map.insert(msg_client_id.clone(), addr); } }, None => { - information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); - let mut client_map = BTreeMap::new(); + information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); client_map.insert(msg_client_id.clone(), addr); - room_map.insert(msg_room_id.clone(), client_map); + + let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); + if let Ok(mm) = serde_json::to_string(&m) { + for (client_id, client_addr) in client_map { + if client_id != &msg_client_id { + if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) { + client_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + } + } + } }, } - room_id = Some(msg_room_id); - client_id = Some(msg_client_id); - } else { - let m = RoomMessageDown::create_error_reply("Room id and client id must both assigned".into()); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } + }, + None => { + information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + let mut client_map = BTreeMap::new(); + client_map.insert(msg_client_id.clone(), addr); + room_map.insert(msg_room_id.clone(), client_map); + }, + } + room_id = Some(msg_room_id); + client_id = Some(msg_client_id); + } else { + RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx); + } + } + }, + RoomMessageType::Exit => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + let mut room_map = room_map.lock().unwrap(); + if let Some(client_map) = room_map.get_mut(room_id) { + client_map.remove(client_id); + peer_map.lock().unwrap().remove(&addr); + tx.unbounded_send(Message::Close(None)).ok(); + } else { + warning!("Not in room: {:?} - {:?}", room_id, client_id); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Destroy => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("Destroy room: {:?} - {:?}", room_id, client_id); + let mut room_map = room_map.lock().unwrap(); + let client_map = room_map.remove(room_id); + if let Some(client_map) = client_map { + for (_client_id, client_addr) in client_map { + let client_tx = peer_map.lock().unwrap().remove(&client_addr); + if let Some(client_tx) = client_tx { + client_tx.unbounded_send(Message::Close(None)).ok(); } } - }, - RoomMessageType::Exit => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { - let mut room_map = room_map.lock().unwrap(); - if let Some(client_map) = room_map.get_mut(room_id) { - client_map.remove(client_id); - peer_map.lock().unwrap().remove(&addr); - tx.unbounded_send(Message::Close(None)).ok(); - } else { - warning!("Not in room: {:?} - {:?}", room_id, client_id); - } - } else { - client_not_in_room(&tx); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::ListPeers => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("List room peers: {:?} - {:?}", room_id, client_id); + let room_map = room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + let mut client_ids = vec![]; + if let Some(client_map) = client_map { + for (client_id, _client_addr) in client_map { + client_ids.push(client_id.clone()); } - }, - RoomMessageType::Destroy => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { - information!("Destroy room: {:?} - {:?}", room_id, client_id); - let mut room_map = room_map.lock().unwrap(); - let client_map = room_map.remove(room_id); - if let Some(client_map) = client_map { - for (_client_id, client_addr) in client_map { - let client_tx = peer_map.lock().unwrap().remove(&client_addr); - if let Some(client_tx) = client_tx { - client_tx.unbounded_send(Message::Close(None)).ok(); - } - } - } - } else { - client_not_in_room(&tx); - } - }, - RoomMessageType::ListPeers => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { - information!("List room peers: {:?} - {:?}", room_id, client_id); - let room_map = room_map.lock().unwrap(); - let client_map = room_map.get(room_id); - let mut client_ids = vec![]; - if let Some(client_map) = client_map { - for (client_id, _client_addr) in client_map { - client_ids.push(client_id.clone()); - } - } - let client_ids_data = serde_json::to_string(&client_ids); - if let Ok(client_ids_data) = client_ids_data { - let m = RoomMessageDown { - r#type: RoomMessageDownType::PeerList, - reply_code: Some(200), - reply_message: Some("ok".into()), - data: Some(client_ids_data), - ..Default::default() - }; - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } - } - } else { - client_not_in_room(&tx); - } - }, - RoomMessageType::Broadcast => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { - information!("Broadcast room message: {:?} - {:?}", room_id, client_id); - let room_map = room_map.lock().unwrap(); - let client_map = room_map.get(room_id); + } + let client_ids_data = serde_json::to_string(&client_ids); + if let Ok(client_ids_data) = client_ids_data { + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerList, + reply_code: Some(200), + reply_message: Some("ok".into()), + data: Some(client_ids_data), + ..Default::default() + }; + m.send(&tx); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Broadcast => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("Broadcast room message: {:?} - {:?}", room_id, client_id); + let room_map = room_map.lock().unwrap(); + let client_map = room_map.get(room_id); - let data = room_message.data; - let m = RoomMessageDown { - r#type: RoomMessageDownType::BroadcastMessage, - reply_code: Some(200), - reply_message: Some("ok".into()), - peer_id: Some(client_id.clone()), - data, - }; - if let Ok(mm) = serde_json::to_string(&m) { - if let Some(client_map) = client_map { - for (peer_client_id, peer_client_addr) in client_map { - if peer_client_id != client_id { - if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { - peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } - } + let data = room_message.data; + let m = RoomMessageDown { + r#type: RoomMessageDownType::BroadcastMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(client_id.clone()), + data, + }; + if let Ok(mm) = serde_json::to_string(&m) { + if let Some(client_map) = client_map { + for (peer_client_id, peer_client_addr) in client_map { + if peer_client_id != client_id { + if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { + peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); } } } - } else { - client_not_in_room(&tx); } - }, - RoomMessageType::Peer => { - if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { - information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); - if let Some(peer_id) = room_message.peer_id.as_ref() { - let room_map = room_map.lock().unwrap(); - let client_map = room_map.get(room_id); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Peer => { + if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) { + information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); + if let Some(peer_id) = room_message.peer_id.as_ref() { + let room_map = room_map.lock().unwrap(); + let client_map = room_map.get(room_id); - let data = room_message.data; - let m = RoomMessageDown { - r#type: RoomMessageDownType::PeerMessage, - reply_code: Some(200), - reply_message: Some("ok".into()), - peer_id: Some(client_id.clone()), - data, - }; - if let Ok(mm) = serde_json::to_string(&m) { - if let Some(client_map) = client_map { - if let Some(peer_client_addr) = client_map.get(peer_id) { - if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { - peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } - } else { - let m = RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } - } + let data = room_message.data; + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(client_id.clone()), + data, + }; + if let Ok(mm) = serde_json::to_string(&m) { + if let Some(client_map) = client_map { + if let Some(peer_client_addr) = client_map.get(peer_id) { + if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) { + peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); } + } else { + RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(&tx); } } - } else { - client_not_in_room(&tx); } - }, + } + } else { + client_not_in_room(&tx); } }, } @@ -352,10 +388,7 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea fn client_not_in_room(tx: &UnboundedSender) { information!("Client is not in room"); - let m = RoomMessageDown::create_error_reply("Client is not in room".into()); - if let Ok(mm) = serde_json::to_string(&m) { - tx.unbounded_send(Message::Text(mm)).ok(); - } + RoomMessageDown::create_error_reply("Client is not in room").send(tx); } fn client_exit(room_map: &RoomMap, room_id: &Option, client_id: &Option) { @@ -381,8 +414,8 @@ async fn main() -> Result<(), IoError> { // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; - let listener = try_socket.expect("Failed to bind"); - println!("Listening on: {}", addr); + let listener = try_socket.expect(&format!("Failed to bind ok: {}", addr)); + success!("Listening on: {}", addr); // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await {