diff --git a/src/main.rs b/src/main.rs index 1b1a076..9d307c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#[macro_use] extern crate lazy_static; #[macro_use] extern crate rust_util; use std::env; @@ -105,11 +106,307 @@ impl RoomMessageDown { struct HandleContext { peer_map: PeerMap, room_map: RoomMap, + admin_pass: Option, is_admin: bool, room_id: Option, client_id: Option, } +lazy_static! { + static ref TEXT_MESSAGE_HANDLES: Vec>= { + let mut handles:Vec> = vec![]; + handles.push(Box::new(HandleTextMessageExit)); + handles.push(Box::new(HandleTextMessageStatics)); + handles.push(Box::new(HandleTextMessagePass)); + handles + }; +} + +trait HandleTextMessage: Sync { + fn is_matches(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str) -> bool; + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str); +} + +struct HandleTextMessageExit; +impl HandleTextMessage for HandleTextMessageExit { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg == "/exit" + } + + fn handle(&self, _handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { + tx.unbounded_send(Message::Close(None)).ok(); + } +} + +struct HandleTextMessageStatics; +impl HandleTextMessage for HandleTextMessageStatics { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg == "/statics" + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { + tx.unbounded_send(Message::Text( + format!( + "room count: {}\npeer count: {}", + handle_context.room_map.lock().unwrap().len(), + handle_context.peer_map.lock().unwrap().len(), + ) + )).ok(); + } +} + +struct HandleTextMessagePass; +impl HandleTextMessage for HandleTextMessagePass { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg.starts_with("/pass ") + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { + let pass = &msg[6..]; + if let Some(admin_pass) = &handle_context.admin_pass { + if admin_pass == pass { + handle_context.is_admin = true; + tx.unbounded_send(Message::Text("Admin password success".into())).ok(); + } else { + tx.unbounded_send(Message::Text("Admin password error".into())).ok(); + } + } + } +} + +fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) { + // process all registered handles + for handle in &*TEXT_MESSAGE_HANDLES { + if handle.is_matches(handle_context, tx, addr, &msg) { + return handle.handle(handle_context, tx, addr, &msg); + } + } + + if handle_context.is_admin { + if msg == "/rooms" { + let rooms = handle_context.room_map.lock().unwrap().keys().map( + |k| k.clone() + ).collect::>().join("\n"); + if rooms.is_empty() { + tx.unbounded_send(Message::Text("rooms: ".into())).ok(); + } else { + tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok(); + } + return; + } + if msg.starts_with("/room ") { + let room_id = &msg[6..]; + let room_map = handle_context.room_map.lock().unwrap(); + let mut client_ids = vec![]; + if let Some(client_map) = room_map.get(room_id) { + for peer_client_id in client_map.keys() { + client_ids.push(peer_client_id.clone()); + } + tx.unbounded_send(Message::Text(format!("clients in room {}:\n{}", room_id, client_ids.join("\n")))).ok(); + } else { + tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok(); + } + return; + } + if msg == "/version" { + tx.unbounded_send(Message::Text( + format!("{} - v{}", NAME, VERSION) + )).ok(); + return; + } + } + let room_message = match serde_json::from_str::(&msg) { + Ok(room_message) => room_message, + Err(e) => { + warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); + RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); + return; + } + }; + match room_message.r#type { + RoomMessageType::CreateOrEnter => { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + warning!("Client is already in room: {:?} - {:?}", room_id, client_id); + RoomMessageDown::create_error_reply("Client is already in room").send(&tx); + } else { + if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { + let mut room_map = handle_context.room_map.lock().unwrap(); + match room_map.get_mut(&msg_room_id) { + Some(client_map) => { + match client_map.get(&msg_client_id) { + Some(peer_addr) => { + if peer_addr == &addr { + information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); + RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx); + } else { + information!( + "Replace client: {:?} - {:?}, from {:?} -> {:?}", + msg_room_id, msg_client_id, peer_addr, addr + ); + let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr); + if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) { + tx.unbounded_send(Message::Close(None)).ok(); + } + client_map.insert(msg_client_id.clone(), addr); + RoomMessageDown::create_success_reply(client_replaced_message).send(&tx); + } + }, + None => { + information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + client_map.insert(msg_client_id.clone(), addr); + + let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); + if let Ok(mm) = serde_json::to_string(&m) { + for (client_id, client_addr) in client_map { + if client_id != &msg_client_id { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { + client_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + } + } + } + RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(&tx); + }, + } + }, + None => { + information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + let mut client_map = BTreeMap::new(); + client_map.insert(msg_client_id.clone(), addr); + room_map.insert(msg_room_id.clone(), client_map); + RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(&tx); + }, + } + handle_context.room_id = Some(msg_room_id); + handle_context.client_id = Some(msg_client_id); + } else { + RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx); + } + } + }, + RoomMessageType::Exit => { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + let mut room_map = handle_context.room_map.lock().unwrap(); + if let Some(client_map) = room_map.get_mut(room_id) { + client_map.remove(client_id); + handle_context.peer_map.lock().unwrap().remove(&addr); + tx.unbounded_send(Message::Close(None)).ok(); + } else { + warning!("Not in room: {:?} - {:?}", room_id, client_id); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Destroy => { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + information!("Destroy room: {:?} - {:?}", room_id, client_id); + let mut room_map = handle_context.room_map.lock().unwrap(); + let client_map = room_map.remove(room_id); + if let Some(client_map) = client_map { + for (_client_id, client_addr) in client_map { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().remove(&client_addr) { + client_tx.unbounded_send(Message::Close(None)).ok(); + } + } + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::ListPeers => { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + information!("List room peers: {:?} - {:?}", room_id, client_id); + let room_map = handle_context.room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + let mut client_ids = vec![]; + if let Some(client_map) = client_map { + for (client_id, _client_addr) in client_map { + client_ids.push(client_id.clone()); + } + } + let client_ids_data = serde_json::to_string(&client_ids); + if let Ok(client_ids_data) = client_ids_data { + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerList, + reply_code: Some(200), + reply_message: Some("ok".into()), + data: Some(client_ids_data), + ..Default::default() + }; + m.send(&tx); + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Broadcast => { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + information!("Broadcast room message: {:?} - {:?}", room_id, client_id); + let room_map = handle_context.room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + + let data = room_message.data; + let m = RoomMessageDown { + r#type: RoomMessageDownType::BroadcastMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(client_id.clone()), + data, + }; + if let Ok(mm) = serde_json::to_string(&m) { + if let Some(client_map) = client_map { + let mut sent_messages = 0; + for (peer_client_id, peer_client_addr) in client_map { + if peer_client_id != client_id { + if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { + sent_messages += 1; + peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + } + } + RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); + } + } + } else { + client_not_in_room(&tx); + } + }, + RoomMessageType::Peer => { + if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { + information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); + if let Some(peer_id) = room_message.peer_id.as_ref() { + let room_map = handle_context.room_map.lock().unwrap(); + let client_map = room_map.get(room_id); + + let data = room_message.data; + let m = RoomMessageDown { + r#type: RoomMessageDownType::PeerMessage, + reply_code: Some(200), + reply_message: Some("ok".into()), + peer_id: Some(client_id.clone()), + data, + }; + if let Ok(mm) = serde_json::to_string(&m) { + if let Some(client_map) = client_map { + if let Some(peer_client_addr) = client_map.get(peer_id) { + if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { + peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); + } + RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(&tx); + } else { + RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(&tx); + } + } + } + } + } else { + client_not_in_room(&tx); + } + }, + } +} + async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) { if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await { failure!("Handler connection error: {}", e); @@ -130,7 +427,6 @@ async fn inner_handle_connection( let (outgoing, incoming) = ws_stream.split(); - let admin_pass = env::var("PASS").ok(); let broadcast_incoming = incoming.try_for_each(|msg| { match msg { Message::Close(_) => { @@ -145,257 +441,8 @@ async fn inner_handle_connection( RoomMessageDown::create_error_reply("binary not supported").send(&tx); }, Message::Text(msg) => { - if msg.is_empty() { - return future::ok(()); - } - if msg == "/exit" { - tx.unbounded_send(Message::Close(None)).ok(); - return future::ok(()); - } - if msg == "/statics" { - tx.unbounded_send(Message::Text( - format!( - "room count: {}\npeer count: {}", - handle_context.room_map.lock().unwrap().len(), - handle_context.peer_map.lock().unwrap().len(), - ) - )).ok(); - return future::ok(()); - } - if msg.starts_with("/pass ") { - let pass = &msg[6..]; - if let Some(admin_pass) = &admin_pass { - if admin_pass == pass { - handle_context.is_admin = true; - tx.unbounded_send(Message::Text("Admin password success".into())).ok(); - } else { - tx.unbounded_send(Message::Text("Admin password error".into())).ok(); - } - } - return future::ok(()); - } - if handle_context.is_admin { - if msg == "/rooms" { - let rooms = handle_context.room_map.lock().unwrap().keys().map( - |k| k.clone() - ).collect::>().join("\n"); - if rooms.is_empty() { - tx.unbounded_send(Message::Text("rooms: ".into())).ok(); - } else { - tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok(); - } - return future::ok(()); - } - if msg.starts_with("/room ") { - let room_id = &msg[6..]; - let room_map = handle_context.room_map.lock().unwrap(); - let mut client_ids = vec![]; - if let Some(client_map) = room_map.get(room_id) { - for peer_client_id in client_map.keys() { - client_ids.push(peer_client_id.clone()); - } - tx.unbounded_send(Message::Text(format!("clients in room {}:\n{}", room_id, client_ids.join("\n")))).ok(); - } else { - tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok(); - } - return future::ok(()); - } - if msg == "/version" { - tx.unbounded_send(Message::Text( - format!("{} - v{}", NAME, VERSION) - )).ok(); - return future::ok(()); - } - } - let room_message = match serde_json::from_str::(&msg) { - Ok(room_message) => room_message, - Err(e) => { - warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); - RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); - return future::ok(()); - } - }; - match room_message.r#type { - RoomMessageType::CreateOrEnter => { - if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { - warning!("Client is already in room: {:?} - {:?}", room_id, client_id); - RoomMessageDown::create_error_reply("Client is already in room").send(&tx); - } else { - if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { - let mut room_map = handle_context.room_map.lock().unwrap(); - match room_map.get_mut(&msg_room_id) { - Some(client_map) => { - match client_map.get(&msg_client_id) { - Some(peer_addr) => { - if peer_addr == &addr { - information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); - RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx); - } else { - information!( - "Replace client: {:?} - {:?}, from {:?} -> {:?}", - msg_room_id, msg_client_id, peer_addr, addr - ); - let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr); - if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) { - tx.unbounded_send(Message::Close(None)).ok(); - } - client_map.insert(msg_client_id.clone(), addr); - RoomMessageDown::create_success_reply(client_replaced_message).send(&tx); - } - }, - None => { - information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); - client_map.insert(msg_client_id.clone(), addr); - - let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); - if let Ok(mm) = serde_json::to_string(&m) { - for (client_id, client_addr) in client_map { - if client_id != &msg_client_id { - if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { - client_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } - } - } - } - RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(&tx); - }, - } - }, - None => { - information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); - let mut client_map = BTreeMap::new(); - client_map.insert(msg_client_id.clone(), addr); - room_map.insert(msg_room_id.clone(), client_map); - RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(&tx); - }, - } - handle_context.room_id = Some(msg_room_id); - handle_context.client_id = Some(msg_client_id); - } else { - RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx); - } - } - }, - RoomMessageType::Exit => { - if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { - let mut room_map = handle_context.room_map.lock().unwrap(); - if let Some(client_map) = room_map.get_mut(room_id) { - client_map.remove(client_id); - handle_context.peer_map.lock().unwrap().remove(&addr); - tx.unbounded_send(Message::Close(None)).ok(); - } else { - warning!("Not in room: {:?} - {:?}", room_id, client_id); - } - } else { - client_not_in_room(&tx); - } - }, - RoomMessageType::Destroy => { - if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { - information!("Destroy room: {:?} - {:?}", room_id, client_id); - let mut room_map = handle_context.room_map.lock().unwrap(); - let client_map = room_map.remove(room_id); - if let Some(client_map) = client_map { - for (_client_id, client_addr) in client_map { - if let Some(client_tx) = handle_context.peer_map.lock().unwrap().remove(&client_addr) { - client_tx.unbounded_send(Message::Close(None)).ok(); - } - } - } - } else { - client_not_in_room(&tx); - } - }, - RoomMessageType::ListPeers => { - if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { - information!("List room peers: {:?} - {:?}", room_id, client_id); - let room_map = handle_context.room_map.lock().unwrap(); - let client_map = room_map.get(room_id); - let mut client_ids = vec![]; - if let Some(client_map) = client_map { - for (client_id, _client_addr) in client_map { - client_ids.push(client_id.clone()); - } - } - let client_ids_data = serde_json::to_string(&client_ids); - if let Ok(client_ids_data) = client_ids_data { - let m = RoomMessageDown { - r#type: RoomMessageDownType::PeerList, - reply_code: Some(200), - reply_message: Some("ok".into()), - data: Some(client_ids_data), - ..Default::default() - }; - m.send(&tx); - } - } else { - client_not_in_room(&tx); - } - }, - RoomMessageType::Broadcast => { - if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { - information!("Broadcast room message: {:?} - {:?}", room_id, client_id); - let room_map = handle_context.room_map.lock().unwrap(); - let client_map = room_map.get(room_id); - - let data = room_message.data; - let m = RoomMessageDown { - r#type: RoomMessageDownType::BroadcastMessage, - reply_code: Some(200), - reply_message: Some("ok".into()), - peer_id: Some(client_id.clone()), - data, - }; - if let Ok(mm) = serde_json::to_string(&m) { - if let Some(client_map) = client_map { - let mut sent_messages = 0; - for (peer_client_id, peer_client_addr) in client_map { - if peer_client_id != client_id { - if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { - sent_messages += 1; - peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } - } - } - RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); - } - } - } else { - client_not_in_room(&tx); - } - }, - RoomMessageType::Peer => { - if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { - information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); - if let Some(peer_id) = room_message.peer_id.as_ref() { - let room_map = handle_context.room_map.lock().unwrap(); - let client_map = room_map.get(room_id); - - let data = room_message.data; - let m = RoomMessageDown { - r#type: RoomMessageDownType::PeerMessage, - reply_code: Some(200), - reply_message: Some("ok".into()), - peer_id: Some(client_id.clone()), - data, - }; - if let Ok(mm) = serde_json::to_string(&m) { - if let Some(client_map) = client_map { - if let Some(peer_client_addr) = client_map.get(peer_id) { - if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { - peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } - RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(&tx); - } else { - RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(&tx); - } - } - } - } - } else { - client_not_in_room(&tx); - } - }, + if !msg.is_empty() { + handle_text_message(&mut handle_context, &tx, addr, msg); } }, } @@ -436,7 +483,8 @@ fn client_exit(room_map: &RoomMap, room_id: &Option, client_id: &Option< #[tokio::main] async fn main() -> Result<(), IoError> { - let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); + let admin_pass = env::var("PASS").ok(); + let listen_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); let state = PeerMap::new(Mutex::new(HashMap::new())); let room = RoomMap::new(Mutex::new(BTreeMap::new())); @@ -444,15 +492,16 @@ async fn main() -> Result<(), IoError> { let handle_context = HandleContext { peer_map: state, room_map: room, + admin_pass, is_admin: false, room_id: None, client_id: None, }; // Create the event loop and TCP listener we'll accept connections on. - let try_socket = TcpListener::bind(&addr).await; - let listener = try_socket.expect(&format!("Failed to bind ok: {}", addr)); - success!("Listening on: {}", addr); + let try_socket = TcpListener::bind(&listen_addr).await; + let listener = try_socket.expect(&format!("Failed to bind ok: {}", listen_addr)); + success!("Listening on: {}", listen_addr); // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await {