diff --git a/src/main.rs b/src/main.rs index 6202e0a..5689620 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ -#[macro_use] extern crate lazy_static; -#[macro_use] extern crate rust_util; +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate rust_util; mod types; mod msg; @@ -51,13 +53,14 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket for handle in &*TEXT_MESSAGE_HANDLES { if handle.is_matches(handle_context, tx, addr, &msg) { handle.handle(handle_context, tx, addr, &msg); - return Ok(()) + return Ok(()); } } } let room_message = match serde_json::from_str::(&msg) { - Ok(room_message) => room_message, Err(e) => { + Ok(room_message) => room_message, + Err(e) => { warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); return Ok(()); @@ -91,7 +94,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket client_map.insert(msg_client_id.clone(), addr); RoomMessageDown::create_success_reply(client_replaced_message).send(tx); } - }, + } None => { information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); client_map.insert(msg_client_id.clone(), addr); @@ -107,23 +110,23 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(tx); - }, + } } - }, + } None => { information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); let mut client_map = BTreeMap::new(); client_map.insert(msg_client_id.clone(), addr); room_map.insert(msg_room_id.clone(), client_map); RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(tx); - }, + } } handle_context.room_id = Some(msg_room_id); handle_context.client_id = Some(msg_client_id); } else { RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(tx); } - }, + } RoomMessageType::Exit => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { let mut room_map = handle_context.room_map.lock().unwrap(); @@ -137,7 +140,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } else { client_not_in_room(&tx, addr); } - }, + } RoomMessageType::Destroy => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Destroy room: {:?} - {:?}", room_id, client_id); @@ -153,7 +156,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } else { client_not_in_room(&tx, addr); } - }, + } RoomMessageType::ListPeers => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("List room peers: {:?} - {:?}", room_id, client_id); @@ -163,31 +166,19 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket .map(|m| m.keys().cloned().collect::>()) .unwrap_or_else(Vec::new); let client_ids_data = serde_json::to_string(&client_ids)?; - let m = RoomMessageDown { - r#type: RoomMessageDownType::PeerList, - reply_code: Some(200), - reply_message: Some("ok".into()), - data: Some(client_ids_data), - ..Default::default() - }; - m.send(&tx); + RoomMessageDown::create_peerlist_message(Some(client_ids_data)).send(&tx); } else { client_not_in_room(&tx, addr); } - }, + } RoomMessageType::Broadcast => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Broadcast room message: {:?} - {:?}", room_id, client_id); let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); - let m = RoomMessageDown { - r#type: RoomMessageDownType::BroadcastMessage, - reply_code: Some(200), - reply_message: Some("ok".into()), - peer_id: Some(client_id.clone()), - data: room_message.data, - }; + let m = RoomMessageDown::create_broadcast_message( + client_id, room_message.data); let mm = serde_json::to_string(&m)?; if let Some(client_map) = client_map { let mut sent_messages = 0; @@ -204,7 +195,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } else { client_not_in_room(&tx, addr); } - }, + } RoomMessageType::Peer => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); @@ -228,7 +219,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } else { client_not_in_room(&tx, addr); } - }, + } } Ok(()) } @@ -240,7 +231,7 @@ async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, } async fn inner_handle_connection( - mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr + mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr, ) -> XResult<()> { information!("Incoming TCP connection from: {}", addr); @@ -259,13 +250,13 @@ async fn inner_handle_connection( information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr); handle_context.peer_map.lock().unwrap().remove(&addr); client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id); - }, - Message::Ping(_) => {}, - Message::Pong(_) => {}, + } + Message::Ping(_) => {} + Message::Pong(_) => {} Message::Binary(_) => { warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); RoomMessageDown::create_error_reply("Binary message not supported").send(&tx); - }, + } Message::Text(msg) => if !msg.is_empty() { if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { failure!("Error in process text message: {}", e); diff --git a/src/msg.rs b/src/msg.rs index 0205e2f..b38f9ff 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -88,6 +88,26 @@ impl RoomMessageDown { } } + pub fn create_broadcast_message(peer_id: S, data: Option) -> Self where S: Into { + RoomMessageDown { + r#type: RoomMessageDownType::BroadcastMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(peer_id.into()), + data, + } + } + + pub fn create_peerlist_message(data: Option) -> Self { + RoomMessageDown { + r#type: RoomMessageDownType::PeerList, + reply_code: Some(200), + reply_message: Some("ok".into()), + data: Some(client_ids_data), + ..Default::default() + } + } + pub fn send(&self, tx: &Tx) { if let Ok(mm) = serde_json::to_string(self) { tx.send_text(mm);