From e8fcd412c54e51bd271e570c9966a6729a108b9c Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sun, 5 Nov 2023 16:02:16 +0800 Subject: [PATCH] v0.1.7 --- Cargo.lock | 2 +- Cargo.toml | 6 +- src/main.rs | 262 +++++++++++++++++++++---------------------- src/slash_handles.rs | 53 +++++++-- 4 files changed, 179 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90d6678..0d253d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -571,7 +571,7 @@ dependencies = [ [[package]] name = "room-rs" -version = "0.1.6" +version = "0.1.7" dependencies = [ "chrono", "futures-channel", diff --git a/Cargo.toml b/Cargo.toml index 5dfb0b1..ce6f949 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "room-rs" -version = "0.1.6" +version = "0.1.7" authors = ["Hatter Jiang@Pixelbook "] edition = "2018" @@ -10,9 +10,9 @@ edition = "2018" rust_util = "0.6" simpledateformat = "0.1" chrono = "0.4" -tungstenite = {version = "0.20", default-features = false} +tungstenite = { version = "0.20", default-features = false } tokio-tungstenite = "0.20" -tokio = { version = "1.28", features = ["full"]} +tokio = { version = "1.28", features = ["full"] } futures-util = "0.3" lazy_static = "1.4" futures-channel = "0.3" diff --git a/src/main.rs b/src/main.rs index c025ad9..3fd0c53 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,29 +3,32 @@ extern crate lazy_static; #[macro_use] extern crate rust_util; +use std::collections::BTreeMap; +use std::env; +use std::io::Error as IoError; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicU64, Ordering}; + +use futures_channel::mpsc::unbounded; +use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; +use rust_util::{util_time, XResult}; +use tokio::net::{TcpListener, TcpStream}; +use tungstenite::protocol::Message; + +use crate::msg::{RoomMessage, RoomMessageDown, RoomMessageType}; +use crate::slash_handles::{ + HandleContext, HandleRestMessage, + HandleTextMessage, HandleTextMessageExit, + HandleTextMessagePass, HandleTextMessageRoom, + HandleTextMessageRooms, HandleTextMessageStatistic, + HandleTextMessageTime, HandleTextMessageVersion, +}; +use crate::types::{Tx, TxSendMessage}; + mod types; mod msg; mod slash_handles; -use std::env; -use std::io::Error as IoError; -use std::collections::{HashMap, BTreeMap}; -use std::net::SocketAddr; -use std::sync::Mutex; -use std::sync::atomic::{AtomicU64, Ordering}; -use futures_channel::mpsc::unbounded; -use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; -use tokio::net::{TcpListener, TcpStream}; -use tungstenite::protocol::Message; -use rust_util::util_time; -use rust_util::XResult; -use crate::types::{PeerMap, RoomMap, Tx, TxSendMessage}; -use crate::msg::{RoomMessageDown, RoomMessage, RoomMessageType}; -use crate::slash_handles::{HandleContext, HandleTextMessage, HandleTextMessageTime}; -use crate::slash_handles::{HandleTextMessageExit, HandleTextMessageStatics}; -use crate::slash_handles::{HandleTextMessagePass, HandleTextMessageRoom}; -use crate::slash_handles::{HandleTextMessageRooms, HandleTextMessageVersion}; - const NAME: &str = env!("CARGO_PKG_NAME"); const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -36,17 +39,124 @@ lazy_static! { static ref TEXT_MESSAGE_HANDLES: Vec>= { let handles:Vec> = vec![ Box::new(HandleTextMessageExit), - Box::new(HandleTextMessageStatics), + Box::new(HandleTextMessageStatistic), Box::new(HandleTextMessagePass), Box::new(HandleTextMessageRooms), Box::new(HandleTextMessageRoom), Box::new(HandleTextMessageVersion), Box::new(HandleTextMessageTime), + Box::new(HandleRestMessage), ]; handles }; } + +#[tokio::main] +async fn main() -> Result<(), IoError> { + let admin_pass = env::var("PASS").ok(); + let listen_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); + + let handle_context_template = HandleContext::new_context(admin_pass); + + let try_socket = TcpListener::bind(&listen_addr).await; + let listener = try_socket.unwrap_or_else(|_| panic!("Failed to bind ok: {}", listen_addr)); + success!("Listening on: {}, startup at millis: {}", listen_addr, *STARTUP_MILLIS); + + while let Ok((stream, addr)) = listener.accept().await { + TOTAL_CREATED_CONN.fetch_add(1, Ordering::Relaxed); + tokio::spawn(handle_connection(handle_context_template.clone(), stream, addr)); + } + + Ok(()) +} + +async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) { + if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await { + failure!("Handler connection error: {}", e); + } +} + +async fn inner_handle_connection( + mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr, +) -> XResult<()> { + information!("Incoming TCP connection from: {}", addr); + + let ws_stream = opt_result!(tokio_tungstenite::accept_async(raw_stream).await, "Accept websocket failed: {}"); + information!("WebSocket connection established: {}", addr); + + // Insert the write part of this peer to the peer map. + let (tx, rx) = unbounded(); + { handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); } + + let (outgoing, incoming) = ws_stream.split(); + + let broadcast_incoming = incoming.try_for_each(|msg| { + match msg { + Message::Close(_) => { + information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr); + handle_context.peer_map.lock().unwrap().remove(&addr); + client_exit(&handle_context, &handle_context.room_id, &handle_context.client_id); + } + Message::Ping(_) => {} + Message::Pong(_) => {} + Message::Binary(_) => { + warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); + RoomMessageDown::create_error_reply(&None, "Binary message not supported").send(&tx); + } + Message::Text(msg) => if !msg.is_empty() { + if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { + failure!("Error in process text message: {}", e); + RoomMessageDown::create_error_reply(&None, format!("Error in process text message: {}", e)); + } + }, + Message::Frame(_) => {} + } + + future::ok(()) + }); + + let receive_from_others = rx.map(Ok).forward(outgoing); + + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; + + information!("Client disconnected: {}", &addr); + client_exit(&handle_context, &handle_context.room_id, &handle_context.client_id); + handle_context.peer_map.lock().unwrap().remove(&addr); + + Ok(()) +} + +fn client_not_in_room(tx: &Tx, addr: SocketAddr) { + information!("Client is not in room: {}", addr); + RoomMessageDown::create_error_reply(&None, "Client is not in room").send(tx); +} + +fn client_exit(handle_context: &HandleContext, room_id: &Option, client_id: &Option) { + let room_map = &handle_context.room_map; + if let (Some(room_id), Some(client_id)) = (room_id, client_id) { + let mut room_map = room_map.lock().unwrap(); + if let Some(client_map) = room_map.get_mut(room_id) { + information!("Client: {} exit from room: {}", client_id, room_id); + client_map.remove(client_id); + if client_map.is_empty() { + information!("Room is empty, close room: {}", room_id); + room_map.remove(room_id); + } else { + let m = RoomMessageDown::create_peer_exit(&None, client_id.clone()); + if let Ok(mm) = serde_json::to_string(&m) { + for client_addr in client_map.values_mut() { + if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { + client_tx.send_text(mm.clone()); + } + } + } + } + } + } +} + fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) -> XResult<()> { // process all registered handles if msg.starts_with('/') { @@ -228,117 +338,5 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket } } } - Ok(()) -} - -async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) { - if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await { - failure!("Handler connection error: {}", e); - } -} - -async fn inner_handle_connection( - mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr, -) -> XResult<()> { - information!("Incoming TCP connection from: {}", addr); - - let ws_stream = opt_result!(tokio_tungstenite::accept_async(raw_stream).await, "Accept websocket failed: {}"); - information!("WebSocket connection established: {}", addr); - - // Insert the write part of this peer to the peer map. - let (tx, rx) = unbounded(); - { handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); } - - let (outgoing, incoming) = ws_stream.split(); - - let broadcast_incoming = incoming.try_for_each(|msg| { - match msg { - Message::Close(_) => { - information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr); - handle_context.peer_map.lock().unwrap().remove(&addr); - client_exit(&handle_context, &handle_context.room_id, &handle_context.client_id); - } - Message::Ping(_) => {} - Message::Pong(_) => {} - Message::Binary(_) => { - warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); - RoomMessageDown::create_error_reply(&None, "Binary message not supported").send(&tx); - } - Message::Text(msg) => if !msg.is_empty() { - if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { - failure!("Error in process text message: {}", e); - RoomMessageDown::create_error_reply(&None, format!("Error in process text message: {}", e)); - } - }, - Message::Frame(_) => {} - } - - future::ok(()) - }); - - let receive_from_others = rx.map(Ok).forward(outgoing); - - pin_mut!(broadcast_incoming, receive_from_others); - future::select(broadcast_incoming, receive_from_others).await; - - information!("Client disconnected: {}", &addr); - client_exit(&handle_context, &handle_context.room_id, &handle_context.client_id); - handle_context.peer_map.lock().unwrap().remove(&addr); - - Ok(()) -} - -fn client_not_in_room(tx: &Tx, addr: SocketAddr) { - information!("Client is not in room: {}", addr); - RoomMessageDown::create_error_reply(&None, "Client is not in room").send(tx); -} - -fn client_exit(handle_context: &HandleContext, room_id: &Option, client_id: &Option) { - let room_map = &handle_context.room_map; - if let (Some(room_id), Some(client_id)) = (room_id, client_id) { - let mut room_map = room_map.lock().unwrap(); - if let Some(client_map) = room_map.get_mut(room_id) { - information!("Client: {} exit from room: {}", client_id, room_id); - client_map.remove(client_id); - if client_map.is_empty() { - information!("Room is empty, close room: {}", room_id); - room_map.remove(room_id); - } else { - let m = RoomMessageDown::create_peer_exit(&None, client_id.clone()); - if let Ok(mm) = serde_json::to_string(&m) { - for client_addr in client_map.values_mut() { - if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) { - client_tx.send_text(mm.clone()); - } - } - } - } - } - } -} - -#[tokio::main] -async fn main() -> Result<(), IoError> { - let admin_pass = env::var("PASS").ok(); - let listen_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); - - let handle_context = HandleContext { - peer_map: PeerMap::new(Mutex::new(HashMap::new())), - room_map: RoomMap::new(Mutex::new(BTreeMap::new())), - admin_pass, - is_admin: false, - room_id: None, - client_id: None, - }; - - let try_socket = TcpListener::bind(&listen_addr).await; - let listener = try_socket.unwrap_or_else(|_| panic!("Failed to bind ok: {}", listen_addr)); - success!("Listening on: {}, startup at millis: {}", listen_addr, *STARTUP_MILLIS); - - while let Ok((stream, addr)) = listener.accept().await { - TOTAL_CREATED_CONN.fetch_add(1, Ordering::Relaxed); - tokio::spawn(handle_connection(handle_context.clone(), stream, addr)); - } - Ok(()) } \ No newline at end of file diff --git a/src/slash_handles.rs b/src/slash_handles.rs index 4da8788..31f2a1d 100644 --- a/src/slash_handles.rs +++ b/src/slash_handles.rs @@ -1,12 +1,16 @@ +use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::sync::atomic::Ordering; +use std::sync::Mutex; +use std::time::Duration; + use chrono::Local; use rust_util::util_time; +use simpledateformat::format_human; + +use crate::{NAME, TOTAL_CREATED_CONN, VERSION}; use crate::msg::RoomMessageDown; use crate::types::{PeerMap, RoomMap, Tx, TxSendMessage}; -use crate::{NAME, VERSION, TOTAL_CREATED_CONN}; -use simpledateformat::format_human; -use std::time::Duration; #[derive(Debug, Clone)] pub struct HandleContext { @@ -18,11 +22,26 @@ pub struct HandleContext { pub client_id: Option, } +impl HandleContext { + pub fn new_context(admin_pass: Option) -> Self { + HandleContext { + peer_map: PeerMap::new(Mutex::new(HashMap::new())), + room_map: RoomMap::new(Mutex::new(BTreeMap::new())), + admin_pass, + is_admin: false, + room_id: None, + client_id: None, + } + } +} + pub trait HandleTextMessage: Sync { fn is_matches(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str) -> bool; fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str); } + pub struct HandleTextMessageExit; + impl HandleTextMessage for HandleTextMessageExit { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { msg == "/exit" @@ -33,10 +52,11 @@ impl HandleTextMessage for HandleTextMessageExit { } } -pub struct HandleTextMessageStatics; -impl HandleTextMessage for HandleTextMessageStatics { +pub struct HandleTextMessageStatistic; + +impl HandleTextMessage for HandleTextMessageStatistic { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { - msg == "/statics" + msg == "/statistic" // formal: "/statics" } fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { @@ -53,6 +73,7 @@ impl HandleTextMessage for HandleTextMessageStatics { } pub struct HandleTextMessagePass; + impl HandleTextMessage for HandleTextMessagePass { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { msg.starts_with("/pass ") @@ -72,6 +93,7 @@ impl HandleTextMessage for HandleTextMessagePass { } pub struct HandleTextMessageRooms; + impl HandleTextMessage for HandleTextMessageRooms { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { msg == "/rooms" @@ -79,7 +101,7 @@ impl HandleTextMessage for HandleTextMessageRooms { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { if !handle_context.is_admin { - RoomMessageDown::create_error_reply(&None,"Not admin").send(tx); + RoomMessageDown::create_error_reply(&None, "Not admin").send(tx); return; } let rooms = handle_context.room_map.lock().unwrap().keys().cloned().collect::>().join("\n"); @@ -92,6 +114,7 @@ impl HandleTextMessage for HandleTextMessageRooms { } pub struct HandleTextMessageRoom; + impl HandleTextMessage for HandleTextMessageRoom { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { msg.starts_with("/room ") @@ -117,6 +140,7 @@ impl HandleTextMessage for HandleTextMessageRoom { } pub struct HandleTextMessageVersion; + impl HandleTextMessage for HandleTextMessageVersion { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { msg == "/version" @@ -132,6 +156,7 @@ impl HandleTextMessage for HandleTextMessageVersion { } pub struct HandleTextMessageTime; + impl HandleTextMessage for HandleTextMessageTime { fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool { msg == "/time" @@ -139,7 +164,19 @@ impl HandleTextMessage for HandleTextMessageTime { fn handle(&self, _handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { simpledateformat::fmt("yyyy-MM-dd HH:mm:ss z").map( - |f| tx.send_text(format!("Time: {}", f.format(&Local::now()))) + |f| tx.send_text(format!("Time: {}", f.format(&Local::now()))) ).ok(); } } + +pub struct HandleRestMessage; + +impl HandleTextMessage for HandleRestMessage { + fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, _msg: &str) -> bool { + true + } + + fn handle(&self, _handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { + tx.send_text(format!("Unknown command: {}", msg)); + } +}