#[macro_use] extern crate lazy_static; #[macro_use] extern crate rust_util; mod types; mod msg; mod slash_handles; use std::env; use std::io::Error as IoError; use std::collections::{HashMap, BTreeMap}; use std::net::SocketAddr; use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use futures_channel::mpsc::unbounded; use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; use tungstenite::protocol::Message; use rust_util::XResult; use crate::types::{PeerMap, RoomMap, Tx, TxSendMessage}; use crate::msg::{RoomMessageDown, RoomMessage, RoomMessageType, RoomMessageDownType}; use crate::slash_handles::{HandleContext, HandleTextMessage, HandleTextMessageTime}; use crate::slash_handles::{HandleTextMessageExit, HandleTextMessageStatics}; use crate::slash_handles::{HandleTextMessagePass, HandleTextMessageRoom}; use crate::slash_handles::{HandleTextMessageRooms, HandleTextMessageVersion}; const NAME: &str = env!("CARGO_PKG_NAME"); const VERSION: &str = env!("CARGO_PKG_VERSION"); static TOTAL_CREATED_CONN: AtomicU64 = AtomicU64::new(0); lazy_static! { static ref TEXT_MESSAGE_HANDLES: Vec>= { let handles:Vec> = vec![ Box::new(HandleTextMessageExit), Box::new(HandleTextMessageStatics), Box::new(HandleTextMessagePass), Box::new(HandleTextMessageRooms), Box::new(HandleTextMessageRoom), Box::new(HandleTextMessageVersion), Box::new(HandleTextMessageTime), ]; handles }; } fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) -> XResult<()> { // process all registered handles if msg.starts_with('/') { for handle in &*TEXT_MESSAGE_HANDLES { if handle.is_matches(handle_context, tx, addr, &msg) { handle.handle(handle_context, tx, addr, &msg); return Ok(()) } } } let room_message = match serde_json::from_str::(&msg) { Ok(room_message) => room_message, Err(e) => { warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); return Ok(()); } }; match room_message.r#type { RoomMessageType::CreateOrEnter => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { warning!("Client is already in room: {:?} - {:?}", room_id, client_id); RoomMessageDown::create_error_reply("Client is already in room").send(tx); return Ok(()); } if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { let mut room_map = handle_context.room_map.lock().unwrap(); match room_map.get_mut(&msg_room_id) { Some(client_map) => { match client_map.get(&msg_client_id) { Some(peer_addr) => { if peer_addr == &addr { information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); RoomMessageDown::create_success_reply("Duplicate enter to room").send(tx); } else { information!( "Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr ); let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr); if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) { tx.send_close(); } client_map.insert(msg_client_id.clone(), addr); RoomMessageDown::create_success_reply(client_replaced_message).send(tx); } }, None => { information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); client_map.insert(msg_client_id.clone(), addr); let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); let mm = serde_json::to_string(&m)?; for (client_id, client_addr) in client_map { if client_id != &msg_client_id { if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { client_tx.send_text(mm.clone()); } } } RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(tx); }, } }, None => { information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); let mut client_map = BTreeMap::new(); client_map.insert(msg_client_id.clone(), addr); room_map.insert(msg_room_id.clone(), client_map); RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(tx); }, } handle_context.room_id = Some(msg_room_id); handle_context.client_id = Some(msg_client_id); } else { RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(tx); } }, RoomMessageType::Exit => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { let mut room_map = handle_context.room_map.lock().unwrap(); if let Some(client_map) = room_map.get_mut(room_id) { client_map.remove(client_id); handle_context.peer_map.lock().unwrap().remove(&addr); tx.send_close(); } else { warning!("Not in room: {:?} - {:?}", room_id, client_id); } } else { client_not_in_room(&tx, addr); } }, RoomMessageType::Destroy => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Destroy room: {:?} - {:?}", room_id, client_id); let mut room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.remove(room_id); if let Some(client_map) = client_map { for (_client_id, client_addr) in client_map { if let Some(client_tx) = handle_context.peer_map.lock().unwrap().remove(&client_addr) { client_tx.send_close(); } } } } else { client_not_in_room(&tx, addr); } }, RoomMessageType::ListPeers => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("List room peers: {:?} - {:?}", room_id, client_id); let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); let mut client_ids = vec![]; if let Some(client_map) = client_map { for client_id in client_map.keys() { client_ids.push(client_id.clone()); } } let client_ids_data = serde_json::to_string(&client_ids)?; let m = RoomMessageDown { r#type: RoomMessageDownType::PeerList, reply_code: Some(200), reply_message: Some("ok".into()), data: Some(client_ids_data), ..Default::default() }; m.send(&tx); } else { client_not_in_room(&tx, addr); } }, RoomMessageType::Broadcast => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Broadcast room message: {:?} - {:?}", room_id, client_id); let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); let data = room_message.data; let m = RoomMessageDown { r#type: RoomMessageDownType::BroadcastMessage, reply_code: Some(200), reply_message: Some("ok".into()), peer_id: Some(client_id.clone()), data, }; let mm = serde_json::to_string(&m)?; if let Some(client_map) = client_map { let mut sent_messages = 0; for (peer_client_id, peer_client_addr) in client_map { if peer_client_id != client_id { if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { sent_messages += 1; peer_tx.send_text(mm.clone()); } } } RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); } } else { client_not_in_room(&tx, addr); } }, RoomMessageType::Peer => { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id); if let Some(peer_id) = room_message.peer_id.as_ref() { let room_map = handle_context.room_map.lock().unwrap(); let client_map = room_map.get(room_id); let data = room_message.data; let m = RoomMessageDown { r#type: RoomMessageDownType::PeerMessage, reply_code: Some(200), reply_message: Some("ok".into()), peer_id: Some(client_id.clone()), data, }; let mm = serde_json::to_string(&m)?; if let Some(client_map) = client_map { if let Some(peer_client_addr) = client_map.get(peer_id) { if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { peer_tx.send_text(mm); } RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(tx); } else { RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(tx); } } } } else { client_not_in_room(&tx, addr); } }, } Ok(()) } async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) { if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await { failure!("Handler connection error: {}", e); } } async fn inner_handle_connection( mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr ) -> XResult<()> { information!("Incoming TCP connection from: {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; information!("WebSocket connection established: {}", addr); // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); { handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); } let (outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming.try_for_each(|msg| { match msg { Message::Close(_) => { information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr); handle_context.peer_map.lock().unwrap().remove(&addr); client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id); }, Message::Ping(_) => {}, Message::Pong(_) => {}, Message::Binary(_) => { warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); RoomMessageDown::create_error_reply("Binary message not supported").send(&tx); }, Message::Text(msg) => { if !msg.is_empty() { if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { failure!("Error in process text message: {}", e); RoomMessageDown::create_error_reply(format!("Error in process text message: {}", e)); } } }, } future::ok(()) }); let receive_from_others = rx.map(Ok).forward(outgoing); pin_mut!(broadcast_incoming, receive_from_others); future::select(broadcast_incoming, receive_from_others).await; information!("Client disconnected: {}", &addr); client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id); handle_context.peer_map.lock().unwrap().remove(&addr); Ok(()) } fn client_not_in_room(tx: &Tx, addr: SocketAddr) { information!("Client is not in room: {}", addr); RoomMessageDown::create_error_reply("Client is not in room").send(tx); } fn client_exit(room_map: &RoomMap, room_id: &Option, client_id: &Option) { if let (Some(room_id), Some(client_id)) = (room_id, client_id) { let mut room_map = room_map.lock().unwrap(); if let Some(client_map) = room_map.get_mut(room_id) { information!("Client: {} exit from room: {}", client_id, room_id); client_map.remove(client_id); if client_map.is_empty() { information!("Room is empty, close room: {}", room_id); room_map.remove(room_id); } } } } #[tokio::main] async fn main() -> Result<(), IoError> { let admin_pass = env::var("PASS").ok(); let listen_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); let handle_context = HandleContext { peer_map: PeerMap::new(Mutex::new(HashMap::new())), room_map: RoomMap::new(Mutex::new(BTreeMap::new())), admin_pass, is_admin: false, room_id: None, client_id: None, }; let try_socket = TcpListener::bind(&listen_addr).await; let listener = try_socket.unwrap_or_else(|_| panic!("Failed to bind ok: {}", listen_addr)); success!("Listening on: {}", listen_addr); while let Ok((stream, addr)) = listener.accept().await { TOTAL_CREATED_CONN.fetch_add(1, Ordering::Relaxed); tokio::spawn(handle_connection(handle_context.clone(), stream, addr)); } Ok(()) }