diff --git a/src/main.rs b/src/main.rs index 3a9ef66..d0c6826 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,244 +1,44 @@ #[macro_use] extern crate lazy_static; #[macro_use] extern crate rust_util; +mod types; +mod msg; +mod slash_handles; + use std::env; use std::io::Error as IoError; use std::collections::{HashMap, BTreeMap}; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; -use futures_channel::mpsc::{unbounded, UnboundedSender}; +use std::sync::Mutex; +use futures_channel::mpsc::unbounded; use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; use tungstenite::protocol::Message; use rust_util::XResult; -use serde::{Deserialize, Serialize}; - -type Tx = UnboundedSender; -type PeerMap = Arc>>; -type RoomMap = Arc>>>; +use crate::types::{PeerMap, RoomMap, Tx}; +use crate::msg::{RoomMessageDown, RoomMessage, RoomMessageType, RoomMessageDownType}; +use crate::slash_handles::{HandleContext, HandleTextMessage}; +use crate::slash_handles::{HandleTextMessageExit, HandleTextMessageStatics}; +use crate::slash_handles::{HandleTextMessagePass, HandleTextMessageRoom}; +use crate::slash_handles::{HandleTextMessageRooms, HandleTextMessageVersion}; const NAME: &str = env!("CARGO_PKG_NAME"); const VERSION: &str = env!("CARGO_PKG_VERSION"); -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -enum RoomMessageType { - #[serde(rename = "Enter")] - CreateOrEnter, - Exit, - Destroy, - ListPeers, - Broadcast, - Peer, -} - -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -enum RoomMessageDownType { - PeerEnter, - PeerExit, - PeerList, - PeerMessage, - BroadcastMessage, - ReplyMessage, -} - -impl Default for RoomMessageDownType { - fn default() -> Self { - Self::ReplyMessage - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct RoomMessage { - r#type: RoomMessageType, - room_id: Option, - client_id: Option, - peer_id: Option, - data: Option, -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct RoomMessageDown { - r#type: RoomMessageDownType, - peer_id: Option, - reply_code: Option, - reply_message: Option, - data: Option, -} - -impl RoomMessageDown { - fn create_error_reply(error_message: S) -> Self where S: Into { - Self { - r#type: RoomMessageDownType::ReplyMessage, - reply_code: Some(500), - reply_message: Some(error_message.into()), - ..Default::default() - } - } - fn create_success_reply(success_message: S) -> Self where S: Into { - Self { - r#type: RoomMessageDownType::ReplyMessage, - reply_code: Some(200), - reply_message: Some(success_message.into()), - ..Default::default() - } - } - fn create_peer_enter(peer_id: S) -> Self where S: Into { - let peer_id = peer_id.into(); - Self { - r#type: RoomMessageDownType::PeerEnter, - reply_code: Some(200), - reply_message: Some(format!("Peer {} entered", peer_id)), - peer_id: Some(peer_id), - ..Default::default() - } - } - - fn send(&self, tx: &Tx) { - if let Ok(mm) = serde_json::to_string(self) { - tx.unbounded_send(Message::Text(mm)).ok(); - } - } -} - -#[derive(Debug, Clone)] -struct HandleContext { - peer_map: PeerMap, - room_map: RoomMap, - admin_pass: Option, - is_admin: bool, - room_id: Option, - client_id: Option, -} - lazy_static! { static ref TEXT_MESSAGE_HANDLES: Vec>= { - let mut handles:Vec> = vec![]; - handles.push(Box::new(HandleTextMessageExit)); - handles.push(Box::new(HandleTextMessageStatics)); - handles.push(Box::new(HandleTextMessagePass)); - handles.push(Box::new(HandleTextMessageRooms)); - handles.push(Box::new(HandleTextMessageRoom)); - handles.push(Box::new(HandleTextMessageVersion)); + let handles:Vec> = vec![ + Box::new(HandleTextMessageExit), + Box::new(HandleTextMessageStatics), + Box::new(HandleTextMessagePass), + Box::new(HandleTextMessageRooms), + Box::new(HandleTextMessageRoom), + Box::new(HandleTextMessageVersion), + ]; handles }; } -trait HandleTextMessage: Sync { - fn is_matches(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str) -> bool; - fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str); -} - -struct HandleTextMessageExit; -impl HandleTextMessage for HandleTextMessageExit { - fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg == "/exit" - } - - fn handle(&self, _handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { - tx.unbounded_send(Message::Close(None)).ok(); - } -} - -struct HandleTextMessageStatics; -impl HandleTextMessage for HandleTextMessageStatics { - fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg == "/statics" - } - - fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { - tx.unbounded_send(Message::Text( - format!( - "room count: {}\npeer count: {}", - handle_context.room_map.lock().unwrap().len(), - handle_context.peer_map.lock().unwrap().len(), - ) - )).ok(); - } -} - -struct HandleTextMessagePass; -impl HandleTextMessage for HandleTextMessagePass { - fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg.starts_with("/pass ") - } - - fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { - let pass = &msg[6..]; - if let Some(admin_pass) = &handle_context.admin_pass { - if admin_pass == pass { - handle_context.is_admin = true; - tx.unbounded_send(Message::Text("Admin password success".into())).ok(); - } else { - tx.unbounded_send(Message::Text("Admin password error".into())).ok(); - } - } - } -} - -struct HandleTextMessageRooms; -impl HandleTextMessage for HandleTextMessageRooms { - fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg == "/rooms" - } - - fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { - if !handle_context.is_admin { - RoomMessageDown::create_error_reply("Not admin").send(tx); - return; - } - let rooms = handle_context.room_map.lock().unwrap().keys().map( - |k| k.clone() - ).collect::>().join("\n"); - if rooms.is_empty() { - tx.unbounded_send(Message::Text("rooms: ".into())).ok(); - } else { - tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok(); - } - } -} - -struct HandleTextMessageRoom; -impl HandleTextMessage for HandleTextMessageRoom { - fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg.starts_with("/room ") - } - - fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { - if !handle_context.is_admin { - RoomMessageDown::create_error_reply("Not admin").send(tx); - return; - } - let room_id = &msg[6..]; - let room_map = handle_context.room_map.lock().unwrap(); - let mut client_ids = vec![]; - if let Some(client_map) = room_map.get(room_id) { - for peer_client_id in client_map.keys() { - client_ids.push(peer_client_id.clone()); - } - tx.unbounded_send(Message::Text(format!("clients in room {}:\n{}", room_id, client_ids.join("\n")))).ok(); - } else { - tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok(); - } - } -} - -struct HandleTextMessageVersion; -impl HandleTextMessage for HandleTextMessageVersion { - fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg == "/version" - } - - fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { - if !handle_context.is_admin { - RoomMessageDown::create_error_reply("Not admin").send(tx); - return; - } - tx.unbounded_send(Message::Text(format!("{} - v{}", NAME, VERSION))).ok(); - } -} - fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) { // process all registered handles if msg.starts_with('/') { @@ -261,61 +61,61 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket RoomMessageType::CreateOrEnter => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { warning!("Client is already in room: {:?} - {:?}", room_id, client_id); - RoomMessageDown::create_error_reply("Client is already in room").send(&tx); - } else { - if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { - let mut room_map = handle_context.room_map.lock().unwrap(); - match room_map.get_mut(&msg_room_id) { - Some(client_map) => { - match client_map.get(&msg_client_id) { - Some(peer_addr) => { - if peer_addr == &addr { - information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); - RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx); - } else { - information!( - "Replace client: {:?} - {:?}, from {:?} -> {:?}", - msg_room_id, msg_client_id, peer_addr, addr - ); - let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr); - if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) { - tx.unbounded_send(Message::Close(None)).ok(); - } - client_map.insert(msg_client_id.clone(), addr); - RoomMessageDown::create_success_reply(client_replaced_message).send(&tx); + RoomMessageDown::create_error_reply("Client is already in room").send(tx); + return; + } + if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { + let mut room_map = handle_context.room_map.lock().unwrap(); + match room_map.get_mut(&msg_room_id) { + Some(client_map) => { + match client_map.get(&msg_client_id) { + Some(peer_addr) => { + if peer_addr == &addr { + information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); + RoomMessageDown::create_success_reply("Duplicate enter to room").send(tx); + } else { + information!( + "Replace client: {:?} - {:?}, from {:?} -> {:?}", + msg_room_id, msg_client_id, peer_addr, addr + ); + let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr); + if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) { + tx.unbounded_send(Message::Close(None)).ok(); } - }, - None => { - information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); client_map.insert(msg_client_id.clone(), addr); + RoomMessageDown::create_success_reply(client_replaced_message).send(tx); + } + }, + None => { + information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + client_map.insert(msg_client_id.clone(), addr); - let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); - if let Ok(mm) = serde_json::to_string(&m) { - for (client_id, client_addr) in client_map { - if client_id != &msg_client_id { - if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { - client_tx.unbounded_send(Message::Text(mm.clone())).ok(); - } + let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); + if let Ok(mm) = serde_json::to_string(&m) { + for (client_id, client_addr) in client_map { + if client_id != &msg_client_id { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { + client_tx.unbounded_send(Message::Text(mm.clone())).ok(); } } } - RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(&tx); - }, - } - }, - None => { - information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); - let mut client_map = BTreeMap::new(); - client_map.insert(msg_client_id.clone(), addr); - room_map.insert(msg_room_id.clone(), client_map); - RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(&tx); - }, - } - handle_context.room_id = Some(msg_room_id); - handle_context.client_id = Some(msg_client_id); - } else { - RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx); + } + RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(tx); + }, + } + }, + None => { + information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); + let mut client_map = BTreeMap::new(); + client_map.insert(msg_client_id.clone(), addr); + room_map.insert(msg_room_id.clone(), client_map); + RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(tx); + }, } + handle_context.room_id = Some(msg_room_id); + handle_context.client_id = Some(msg_client_id); + } else { + RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(tx); } }, RoomMessageType::Exit => { @@ -355,7 +155,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket let client_map = room_map.get(room_id); let mut client_ids = vec![]; if let Some(client_map) = client_map { - for (client_id, _client_addr) in client_map { + for client_id in client_map.keys() { client_ids.push(client_id.clone()); } } @@ -425,11 +225,11 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket if let Some(client_map) = client_map { if let Some(peer_client_addr) = client_map.get(peer_id) { if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { - peer_tx.unbounded_send(Message::Text(mm.clone())).ok(); + peer_tx.unbounded_send(Message::Text(mm)).ok(); } - RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(&tx); + RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(tx); } else { - RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(&tx); + RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(tx); } } } @@ -472,7 +272,7 @@ async fn inner_handle_connection( Message::Pong(_) => {}, Message::Binary(_) => { warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); - RoomMessageDown::create_error_reply("binary not supported").send(&tx); + RoomMessageDown::create_error_reply("Binary message not supported").send(&tx); }, Message::Text(msg) => { if !msg.is_empty() { @@ -534,7 +334,7 @@ async fn main() -> Result<(), IoError> { // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&listen_addr).await; - let listener = try_socket.expect(&format!("Failed to bind ok: {}", listen_addr)); + let listener = try_socket.unwrap_or_else(|_| panic!("Failed to bind ok: {}", listen_addr)); success!("Listening on: {}", listen_addr); // Let's spawn the handling of each connection in a separate task. diff --git a/src/msg.rs b/src/msg.rs new file mode 100644 index 0000000..f8948d6 --- /dev/null +++ b/src/msg.rs @@ -0,0 +1,85 @@ +use tungstenite::Message; +use serde::{Deserialize, Serialize}; +use crate::types::Tx; + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum RoomMessageType { + #[serde(rename = "Enter")] + CreateOrEnter, + Exit, + Destroy, + ListPeers, + Broadcast, + Peer, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum RoomMessageDownType { + PeerEnter, + PeerExit, + PeerList, + PeerMessage, + BroadcastMessage, + ReplyMessage, +} + +impl Default for RoomMessageDownType { + fn default() -> Self { + Self::ReplyMessage + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RoomMessage { + pub r#type: RoomMessageType, + pub room_id: Option, + pub client_id: Option, + pub peer_id: Option, + pub data: Option, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RoomMessageDown { + pub r#type: RoomMessageDownType, + pub peer_id: Option, + pub reply_code: Option, + pub reply_message: Option, + pub data: Option, +} + +impl RoomMessageDown { + pub fn create_error_reply(error_message: S) -> Self where S: Into { + Self { + r#type: RoomMessageDownType::ReplyMessage, + reply_code: Some(500), + reply_message: Some(error_message.into()), + ..Default::default() + } + } + pub fn create_success_reply(success_message: S) -> Self where S: Into { + Self { + r#type: RoomMessageDownType::ReplyMessage, + reply_code: Some(200), + reply_message: Some(success_message.into()), + ..Default::default() + } + } + pub fn create_peer_enter(peer_id: S) -> Self where S: Into { + let peer_id = peer_id.into(); + Self { + r#type: RoomMessageDownType::PeerEnter, + reply_code: Some(200), + reply_message: Some(format!("Peer {} entered", peer_id)), + peer_id: Some(peer_id), + ..Default::default() + } + } + + pub fn send(&self, tx: &Tx) { + if let Ok(mm) = serde_json::to_string(self) { + tx.unbounded_send(Message::Text(mm)).ok(); + } + } +} diff --git a/src/slash_handles.rs b/src/slash_handles.rs new file mode 100644 index 0000000..996ae59 --- /dev/null +++ b/src/slash_handles.rs @@ -0,0 +1,127 @@ +use crate::types::{PeerMap, RoomMap, Tx}; +use std::net::SocketAddr; +use tungstenite::Message; +use crate::msg::RoomMessageDown; +use crate::{NAME, VERSION}; + +#[derive(Debug, Clone)] +pub struct HandleContext { + pub peer_map: PeerMap, + pub room_map: RoomMap, + pub admin_pass: Option, + pub is_admin: bool, + pub room_id: Option, + pub client_id: Option, +} + +pub trait HandleTextMessage: Sync { + fn is_matches(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str) -> bool; + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str); +} + +pub struct HandleTextMessageExit; +impl HandleTextMessage for HandleTextMessageExit { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg == "/exit" + } + + fn handle(&self, _handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { + tx.unbounded_send(Message::Close(None)).ok(); + } +} + +pub struct HandleTextMessageStatics; +impl HandleTextMessage for HandleTextMessageStatics { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg == "/statics" + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { + tx.unbounded_send(Message::Text( + format!( + "room count: {}\npeer count: {}", + handle_context.room_map.lock().unwrap().len(), + handle_context.peer_map.lock().unwrap().len(), + ) + )).ok(); + } +} + +pub struct HandleTextMessagePass; +impl HandleTextMessage for HandleTextMessagePass { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg.starts_with("/pass ") + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { + let pass = &msg[6..]; + if let Some(admin_pass) = &handle_context.admin_pass { + if admin_pass == pass { + handle_context.is_admin = true; + tx.unbounded_send(Message::Text("Admin password success".into())).ok(); + } else { + tx.unbounded_send(Message::Text("Admin password error".into())).ok(); + } + } + } +} + +pub struct HandleTextMessageRooms; +impl HandleTextMessage for HandleTextMessageRooms { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg == "/rooms" + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { + if !handle_context.is_admin { + RoomMessageDown::create_error_reply("Not admin").send(tx); + return; + } + let rooms = handle_context.room_map.lock().unwrap().keys().cloned().collect::>().join("\n"); + if rooms.is_empty() { + tx.unbounded_send(Message::Text("rooms: ".into())).ok(); + } else { + tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok(); + } + } +} + +pub struct HandleTextMessageRoom; +impl HandleTextMessage for HandleTextMessageRoom { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg.starts_with("/room ") + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { + if !handle_context.is_admin { + RoomMessageDown::create_error_reply("Not admin").send(tx); + return; + } + let room_id = &msg[6..]; + let room_map = handle_context.room_map.lock().unwrap(); + let mut client_ids = vec![]; + if let Some(client_map) = room_map.get(room_id) { + for peer_client_id in client_map.keys() { + client_ids.push(peer_client_id.clone()); + } + tx.unbounded_send(Message::Text(format!("clients in room {}:\n{}", room_id, client_ids.join("\n")))).ok(); + } else { + tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok(); + } + } +} + +pub struct HandleTextMessageVersion; +impl HandleTextMessage for HandleTextMessageVersion { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { + msg == "/version" + } + + fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { + if !handle_context.is_admin { + RoomMessageDown::create_error_reply("Not admin").send(tx); + return; + } + tx.unbounded_send(Message::Text(format!("{} - v{}", NAME, VERSION))).ok(); + } +} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..878df4d --- /dev/null +++ b/src/types.rs @@ -0,0 +1,9 @@ +use futures_channel::mpsc::UnboundedSender; +use tungstenite::Message; +use std::net::SocketAddr; +use std::collections::{HashMap, BTreeMap}; +use std::sync::{Mutex, Arc}; + +pub type Tx = UnboundedSender; +pub type PeerMap = Arc>>; +pub type RoomMap = Arc>>>;