diff --git a/Cargo.lock b/Cargo.lock index e1c201b..13ab58d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,7 +597,7 @@ dependencies = [ [[package]] name = "room-rs" -version = "0.1.2" +version = "0.1.3" dependencies = [ "chrono", "futures-channel", diff --git a/Cargo.toml b/Cargo.toml index 0707512..75d560b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "room-rs" -version = "0.1.2" +version = "0.1.3" authors = ["Hatter Jiang@Pixelbook "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index 29f18f0..92a5a1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,7 +62,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket Ok(room_message) => room_message, Err(e) => { warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); - RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); + RoomMessageDown::create_error_reply( + &None, format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); return Ok(()); } }; @@ -70,7 +71,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket RoomMessageType::CreateOrEnter => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { warning!("Client is already in room: {:?} - {:?}", room_id, client_id); - RoomMessageDown::create_error_reply("Client is already in room").send(tx); + RoomMessageDown::create_error_reply(&room_message.message_id, "Client is already in room").send(tx); return Ok(()); } if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { @@ -81,7 +82,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket Some(peer_addr) => { if peer_addr == &addr { information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); - RoomMessageDown::create_success_reply("Duplicate enter to room").send(tx); + RoomMessageDown::create_success_reply( + &room_message.message_id, "Duplicate enter to room").send(tx); } else { information!( "Replace client: {:?} - {:?}, from {:?} -> {:?}", @@ -92,14 +94,14 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket tx.send_close(); } client_map.insert(msg_client_id.clone(), addr); - RoomMessageDown::create_success_reply(client_replaced_message).send(tx); + RoomMessageDown::create_success_reply(&room_message.message_id, client_replaced_message).send(tx); } } None => { information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); client_map.insert(msg_client_id.clone(), addr); - let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); + let m = RoomMessageDown::create_peer_enter(&room_message.message_id, msg_client_id.clone()); let mm = serde_json::to_string(&m)?; for (client_id, client_addr) in client_map { if client_id != &msg_client_id { @@ -109,7 +111,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } } - RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(tx); + RoomMessageDown::create_success_reply( + &room_message.message_id, format!("Client entered: {:?}", addr)).send(tx); } } } @@ -118,13 +121,14 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket let mut client_map = BTreeMap::new(); client_map.insert(msg_client_id.clone(), addr); room_map.insert(msg_room_id.clone(), client_map); - RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(tx); + RoomMessageDown::create_success_reply( + &room_message.message_id, format!("Client create room: {:?}", addr)).send(tx); } } handle_context.room_id = Some(msg_room_id); handle_context.client_id = Some(msg_client_id); } else { - RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(tx); + RoomMessageDown::create_error_reply(&room_message.message_id, "Room id and client id must both assigned").send(tx); } } RoomMessageType::Exit => { @@ -166,7 +170,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket .map(|m| m.keys().cloned().collect::>()) .unwrap_or_else(Vec::new); let client_ids_data = serde_json::to_string(&client_ids)?; - RoomMessageDown::create_peerlist_message(Some(client_ids_data)).send(&tx); + RoomMessageDown::create_peerlist_message(&room_message.message_id, Some(client_ids_data)).send(&tx); } else { client_not_in_room(&tx, addr); } @@ -178,7 +182,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket let client_map = room_map.get(room_id); let m = RoomMessageDown::create_broadcast_message( - client_id, room_message.data); + &room_message.message_id, client_id, room_message.data); let mm = serde_json::to_string(&m)?; if let Some(client_map) = client_map { let mut sent_messages = 0; @@ -190,7 +194,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } } } - RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); + RoomMessageDown::create_success_reply( + &room_message.message_id, format!("Send message to {} peers", sent_messages)).send(&tx); } } else { client_not_in_room(&tx, addr); @@ -203,16 +208,18 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); - let m = RoomMessageDown::create_peer_message(peer_id, room_message.data); + let m = RoomMessageDown::create_peer_message(&room_message.message_id, peer_id, room_message.data); let mm = serde_json::to_string(&m)?; if let Some(client_map) = client_map { if let Some(peer_client_addr) = client_map.get(peer_id) { if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { peer_tx.send_text(mm); } - RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(tx); + RoomMessageDown::create_success_reply( + &room_message.message_id, format!("Message sent to: {}", peer_id)).send(tx); } else { - RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(tx); + RoomMessageDown::create_fail_replay( + &room_message.message_id, 404, format!("Peer not found: {}", peer_id)).send(tx); } } } @@ -255,12 +262,12 @@ async fn inner_handle_connection( Message::Pong(_) => {} Message::Binary(_) => { warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); - RoomMessageDown::create_error_reply("Binary message not supported").send(&tx); + RoomMessageDown::create_error_reply(&None, "Binary message not supported").send(&tx); } Message::Text(msg) => if !msg.is_empty() { if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { failure!("Error in process text message: {}", e); - RoomMessageDown::create_error_reply(format!("Error in process text message: {}", e)); + RoomMessageDown::create_error_reply(&None, format!("Error in process text message: {}", e)); } }, } @@ -282,7 +289,7 @@ async fn inner_handle_connection( fn client_not_in_room(tx: &Tx, addr: SocketAddr) { information!("Client is not in room: {}", addr); - RoomMessageDown::create_error_reply("Client is not in room").send(tx); + RoomMessageDown::create_error_reply(&None, "Client is not in room").send(tx); } fn client_exit(room_map: &RoomMap, room_id: &Option, client_id: &Option) { diff --git a/src/msg.rs b/src/msg.rs index 9064008..160a710 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -32,6 +32,7 @@ impl Default for RoomMessageDownType { #[serde(rename_all = "camelCase")] pub struct RoomMessage { pub r#type: RoomMessageType, + pub message_id: Option, pub room_id: Option, pub client_id: Option, pub peer_id: Option, @@ -42,6 +43,7 @@ pub struct RoomMessage { #[serde(rename_all = "camelCase")] pub struct RoomMessageDown { pub r#type: RoomMessageDownType, + pub request_message_id: Option, pub peer_id: Option, pub reply_code: Option, pub reply_message: Option, @@ -49,28 +51,35 @@ pub struct RoomMessageDown { } impl RoomMessageDown { - pub fn create_error_reply(error_message: S) -> Self where S: Into { + pub fn create_fail_replay(message_id: &Option, code: i32, error_message: S) -> Self where S: Into { Self { r#type: RoomMessageDownType::ReplyMessage, - reply_code: Some(500), + request_message_id: message_id.clone(), + reply_code: Some(code), reply_message: Some(error_message.into()), ..Default::default() } } - pub fn create_success_reply(success_message: S) -> Self where S: Into { + pub fn create_error_reply(message_id: &Option, error_message: S) -> Self where S: Into { + Self::create_fail_replay(message_id, 500, error_message) + } + + pub fn create_success_reply(message_id: &Option, success_message: S) -> Self where S: Into { Self { r#type: RoomMessageDownType::ReplyMessage, + request_message_id: message_id.clone(), reply_code: Some(200), reply_message: Some(success_message.into()), ..Default::default() } } - pub fn create_peer_enter(peer_id: S) -> Self where S: Into { + pub fn create_peer_enter(message_id: &Option, peer_id: S) -> Self where S: Into { let peer_id = peer_id.into(); Self { r#type: RoomMessageDownType::PeerEnter, + request_message_id: message_id.clone(), reply_code: Some(200), reply_message: Some(format!("Peer {} entered", peer_id)), peer_id: Some(peer_id), @@ -78,9 +87,10 @@ impl RoomMessageDown { } } - pub fn create_peer_message(peer_id: S, data: Option) -> Self where S: Into { + pub fn create_peer_message(message_id: &Option, peer_id: S, data: Option) -> Self where S: Into { RoomMessageDown { r#type: RoomMessageDownType::PeerMessage, + request_message_id: message_id.clone(), reply_code: Some(200), reply_message: Some("ok".into()), peer_id: Some(peer_id.into()), @@ -88,9 +98,10 @@ impl RoomMessageDown { } } - pub fn create_broadcast_message(peer_id: S, data: Option) -> Self where S: Into { + pub fn create_broadcast_message(message_id: &Option, peer_id: S, data: Option) -> Self where S: Into { RoomMessageDown { r#type: RoomMessageDownType::BroadcastMessage, + request_message_id: message_id.clone(), reply_code: Some(200), reply_message: Some("ok".into()), peer_id: Some(peer_id.into()), @@ -98,9 +109,10 @@ impl RoomMessageDown { } } - pub fn create_peerlist_message(data: Option) -> Self { + pub fn create_peerlist_message(message_id: &Option, data: Option) -> Self { RoomMessageDown { r#type: RoomMessageDownType::PeerList, + request_message_id: message_id.clone(), reply_code: Some(200), reply_message: Some("ok".into()), data, diff --git a/src/slash_handles.rs b/src/slash_handles.rs index f7743df..4da8788 100644 --- a/src/slash_handles.rs +++ b/src/slash_handles.rs @@ -79,7 +79,7 @@ impl HandleTextMessage for HandleTextMessageRooms { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { if !handle_context.is_admin { - RoomMessageDown::create_error_reply("Not admin").send(tx); + RoomMessageDown::create_error_reply(&None,"Not admin").send(tx); return; } let rooms = handle_context.room_map.lock().unwrap().keys().cloned().collect::>().join("\n"); @@ -99,7 +99,7 @@ impl HandleTextMessage for HandleTextMessageRoom { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { if !handle_context.is_admin { - RoomMessageDown::create_error_reply("Not admin").send(tx); + RoomMessageDown::create_error_reply(&None, "Not admin").send(tx); return; } let room_id = &msg[6..]; @@ -124,7 +124,7 @@ impl HandleTextMessage for HandleTextMessageVersion { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { if !handle_context.is_admin { - RoomMessageDown::create_error_reply("Not admin").send(tx); + RoomMessageDown::create_error_reply(&None, "Not admin").send(tx); return; } tx.send_text(format!("{} - v{}", NAME, VERSION));