diff --git a/src/main.rs b/src/main.rs index 0bab7bc..418fe9a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,12 +43,13 @@ lazy_static! { }; } -fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) { +fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) -> XResult<()> { // process all registered handles if msg.starts_with('/') { for handle in &*TEXT_MESSAGE_HANDLES { if handle.is_matches(handle_context, tx, addr, &msg) { - return handle.handle(handle_context, tx, addr, &msg); + handle.handle(handle_context, tx, addr, &msg); + return Ok(()) } } } @@ -58,7 +59,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket Err(e) => { warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); - return; + return Ok(()); } }; match room_message.r#type { @@ -66,7 +67,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { warning!("Client is already in room: {:?} - {:?}", room_id, client_id); RoomMessageDown::create_error_reply("Client is already in room").send(tx); - return; + return Ok(()); } if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { let mut room_map = handle_context.room_map.lock().unwrap(); @@ -95,15 +96,15 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket client_map.insert(msg_client_id.clone(), addr); let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); - if let Ok(mm) = serde_json::to_string(&m) { - for (client_id, client_addr) in client_map { - if client_id != &msg_client_id { - if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { - client_tx.send_text(mm.clone()); - } + let mm = serde_json::to_string(&m)?; + for (client_id, client_addr) in client_map { + if client_id != &msg_client_id { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { + client_tx.send_text(mm.clone()); } } } + RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(tx); }, } @@ -163,17 +164,15 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket client_ids.push(client_id.clone()); } } - let client_ids_data = serde_json::to_string(&client_ids); - if let Ok(client_ids_data) = client_ids_data { - let m = RoomMessageDown { - r#type: RoomMessageDownType::PeerList, - reply_code: Some(200), - reply_message: Some("ok".into()), - data: Some(client_ids_data), - ..Default::default() - }; - m.send(&tx); - } + let client_ids_data = serde_json::to_string(&client_ids)?; + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerList, + reply_code: Some(200), + reply_message: Some("ok".into()), + data: Some(client_ids_data), + ..Default::default() + }; + m.send(&tx); } else { client_not_in_room(&tx, addr); } @@ -192,19 +191,18 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket peer_id: Some(client_id.clone()), data, }; - if let Ok(mm) = serde_json::to_string(&m) { - if let Some(client_map) = client_map { - let mut sent_messages = 0; - for (peer_client_id, peer_client_addr) in client_map { - if peer_client_id != client_id { - if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { - sent_messages += 1; - peer_tx.send_text(mm.clone()); - } + let mm = serde_json::to_string(&m)?; + if let Some(client_map) = client_map { + let mut sent_messages = 0; + for (peer_client_id, peer_client_addr) in client_map { + if peer_client_id != client_id { + if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { + sent_messages += 1; + peer_tx.send_text(mm.clone()); } } - RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); } + RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); } } else { client_not_in_room(&tx, addr); @@ -225,16 +223,15 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket peer_id: Some(client_id.clone()), data, }; - if let Ok(mm) = serde_json::to_string(&m) { - if let Some(client_map) = client_map { - if let Some(peer_client_addr) = client_map.get(peer_id) { - if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { - peer_tx.send_text(mm); - } - RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(tx); - } else { - RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(tx); + let mm = serde_json::to_string(&m)?; + if let Some(client_map) = client_map { + if let Some(peer_client_addr) = client_map.get(peer_id) { + if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { + peer_tx.send_text(mm); } + RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(tx); + } else { + RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(tx); } } } @@ -243,6 +240,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } }, } + Ok(()) } async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) { @@ -280,7 +278,10 @@ async fn inner_handle_connection( }, Message::Text(msg) => { if !msg.is_empty() { - handle_text_message(&mut handle_context, &tx, addr, msg); + if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { + failure!("Error in process text message: {}", e); + RoomMessageDown::create_error_reply(format!("Error in process text message: {}", e)); + } } }, }