Files
room-rs/src/main.rs
2023-11-05 16:02:16 +08:00

342 lines
16 KiB
Rust

#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate rust_util;
use std::collections::BTreeMap;
use std::env;
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use futures_channel::mpsc::unbounded;
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
use rust_util::{util_time, XResult};
use tokio::net::{TcpListener, TcpStream};
use tungstenite::protocol::Message;
use crate::msg::{RoomMessage, RoomMessageDown, RoomMessageType};
use crate::slash_handles::{
HandleContext, HandleRestMessage,
HandleTextMessage, HandleTextMessageExit,
HandleTextMessagePass, HandleTextMessageRoom,
HandleTextMessageRooms, HandleTextMessageStatistic,
HandleTextMessageTime, HandleTextMessageVersion,
};
use crate::types::{Tx, TxSendMessage};
mod types;
mod msg;
mod slash_handles;
const NAME: &str = env!("CARGO_PKG_NAME");
const VERSION: &str = env!("CARGO_PKG_VERSION");
static TOTAL_CREATED_CONN: AtomicU64 = AtomicU64::new(0);
lazy_static! {
static ref STARTUP_MILLIS: u128 = util_time::get_current_millis();
static ref TEXT_MESSAGE_HANDLES: Vec<Box<dyn HandleTextMessage>>= {
let handles:Vec<Box<dyn HandleTextMessage>> = vec![
Box::new(HandleTextMessageExit),
Box::new(HandleTextMessageStatistic),
Box::new(HandleTextMessagePass),
Box::new(HandleTextMessageRooms),
Box::new(HandleTextMessageRoom),
Box::new(HandleTextMessageVersion),
Box::new(HandleTextMessageTime),
Box::new(HandleRestMessage),
];
handles
};
}
#[tokio::main]
async fn main() -> Result<(), IoError> {
let admin_pass = env::var("PASS").ok();
let listen_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
let handle_context_template = HandleContext::new_context(admin_pass);
let try_socket = TcpListener::bind(&listen_addr).await;
let listener = try_socket.unwrap_or_else(|_| panic!("Failed to bind ok: {}", listen_addr));
success!("Listening on: {}, startup at millis: {}", listen_addr, *STARTUP_MILLIS);
while let Ok((stream, addr)) = listener.accept().await {
TOTAL_CREATED_CONN.fetch_add(1, Ordering::Relaxed);
tokio::spawn(handle_connection(handle_context_template.clone(), stream, addr));
}
Ok(())
}
async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) {
if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await {
failure!("Handler connection error: {}", e);
}
}
async fn inner_handle_connection(
mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr,
) -> XResult<()> {
information!("Incoming TCP connection from: {}", addr);
let ws_stream = opt_result!(tokio_tungstenite::accept_async(raw_stream).await, "Accept websocket failed: {}");
information!("WebSocket connection established: {}", addr);
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded();
{ handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); }
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
match msg {
Message::Close(_) => {
information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr);
handle_context.peer_map.lock().unwrap().remove(&addr);
client_exit(&handle_context, &handle_context.room_id, &handle_context.client_id);
}
Message::Ping(_) => {}
Message::Pong(_) => {}
Message::Binary(_) => {
warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id);
RoomMessageDown::create_error_reply(&None, "Binary message not supported").send(&tx);
}
Message::Text(msg) => if !msg.is_empty() {
if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) {
failure!("Error in process text message: {}", e);
RoomMessageDown::create_error_reply(&None, format!("Error in process text message: {}", e));
}
},
Message::Frame(_) => {}
}
future::ok(())
});
let receive_from_others = rx.map(Ok).forward(outgoing);
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
information!("Client disconnected: {}", &addr);
client_exit(&handle_context, &handle_context.room_id, &handle_context.client_id);
handle_context.peer_map.lock().unwrap().remove(&addr);
Ok(())
}
fn client_not_in_room(tx: &Tx, addr: SocketAddr) {
information!("Client is not in room: {}", addr);
RoomMessageDown::create_error_reply(&None, "Client is not in room").send(tx);
}
fn client_exit(handle_context: &HandleContext, room_id: &Option<String>, client_id: &Option<String>) {
let room_map = &handle_context.room_map;
if let (Some(room_id), Some(client_id)) = (room_id, client_id) {
let mut room_map = room_map.lock().unwrap();
if let Some(client_map) = room_map.get_mut(room_id) {
information!("Client: {} exit from room: {}", client_id, room_id);
client_map.remove(client_id);
if client_map.is_empty() {
information!("Room is empty, close room: {}", room_id);
room_map.remove(room_id);
} else {
let m = RoomMessageDown::create_peer_exit(&None, client_id.clone());
if let Ok(mm) = serde_json::to_string(&m) {
for client_addr in client_map.values_mut() {
if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) {
client_tx.send_text(mm.clone());
}
}
}
}
}
}
}
fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) -> XResult<()> {
// process all registered handles
if msg.starts_with('/') {
for handle in &*TEXT_MESSAGE_HANDLES {
if handle.is_matches(handle_context, tx, addr, &msg) {
handle.handle(handle_context, tx, addr, &msg);
return Ok(());
}
}
}
let room_message = match serde_json::from_str::<RoomMessage>(&msg) {
Ok(room_message) => room_message,
Err(e) => {
warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e);
RoomMessageDown::create_error_reply(
&None, format!("Message parse failed: {}, message: {}", e, msg)).send(tx);
return Ok(());
}
};
match room_message.r#type {
RoomMessageType::CreateOrEnter => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
warning!("Client is already in room: {:?} - {:?}", room_id, client_id);
RoomMessageDown::create_error_reply(&room_message.message_id, "Client is already in room").send(tx);
return Ok(());
}
if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) {
let mut room_map = handle_context.room_map.lock().unwrap();
match room_map.get_mut(&msg_room_id) {
Some(client_map) => {
match client_map.get(&msg_client_id) {
Some(peer_addr) => {
if peer_addr == &addr {
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
RoomMessageDown::create_success_reply(
&room_message.message_id, "Duplicate enter to room").send(tx);
} else {
information!(
"Replace client: {:?} - {:?}, from {:?} -> {:?}",
msg_room_id, msg_client_id, peer_addr, addr
);
let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr);
if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) {
tx.send_close();
}
client_map.insert(msg_client_id.clone(), addr);
RoomMessageDown::create_success_reply(&room_message.message_id, client_replaced_message).send(tx);
}
}
None => {
information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
client_map.insert(msg_client_id.clone(), addr);
let m = RoomMessageDown::create_peer_enter(&room_message.message_id, msg_client_id.clone());
let mm = serde_json::to_string(&m)?;
for (client_id, client_addr) in client_map {
if client_id != &msg_client_id {
if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) {
client_tx.send_text(mm.clone());
}
}
}
RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Client entered: {:?}", addr)).send(tx);
}
}
}
None => {
information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
let mut client_map = BTreeMap::new();
client_map.insert(msg_client_id.clone(), addr);
room_map.insert(msg_room_id.clone(), client_map);
RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Client create room: {:?}", addr)).send(tx);
}
}
handle_context.room_id = Some(msg_room_id);
handle_context.client_id = Some(msg_client_id);
} else {
RoomMessageDown::create_error_reply(&room_message.message_id, "Room id and client id must both assigned").send(tx);
}
}
RoomMessageType::Exit => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
let mut room_map = handle_context.room_map.lock().unwrap();
if let Some(client_map) = room_map.get_mut(room_id) {
client_map.remove(client_id);
handle_context.peer_map.lock().unwrap().remove(&addr);
tx.send_close();
} else {
warning!("Not in room: {:?} - {:?}", room_id, client_id);
}
} else {
client_not_in_room(tx, addr);
}
}
RoomMessageType::Destroy => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("Destroy room: {:?} - {:?}", room_id, client_id);
let mut room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.remove(room_id);
if let Some(client_map) = client_map {
for (_client_id, client_addr) in client_map {
if let Some(client_tx) = handle_context.peer_map.lock().unwrap().remove(&client_addr) {
client_tx.send_close();
}
}
}
} else {
client_not_in_room(tx, addr);
}
}
RoomMessageType::ListPeers => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("List room peers: {:?} - {:?}", room_id, client_id);
let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let client_ids = client_map
.map(|m| m.keys().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new);
let client_ids_data = serde_json::to_string(&client_ids)?;
RoomMessageDown::create_peerlist_message(&room_message.message_id, Some(client_ids_data)).send(tx);
} else {
client_not_in_room(tx, addr);
}
}
RoomMessageType::Broadcast => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("Broadcast room message: {:?} - {:?}", room_id, client_id);
let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let m = RoomMessageDown::create_broadcast_message(
&room_message.message_id, client_id, room_message.data);
let mm = serde_json::to_string(&m)?;
if let Some(client_map) = client_map {
let mut sent_messages = 0;
for (peer_client_id, peer_client_addr) in client_map {
if peer_client_id != client_id {
if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) {
sent_messages += 1;
peer_tx.send_text(mm.clone());
}
}
}
RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Send message to {} peers", sent_messages)).send(tx);
}
} else {
client_not_in_room(tx, addr);
}
}
RoomMessageType::Peer => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id);
if let Some(peer_id) = room_message.peer_id.as_ref() {
let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let m = RoomMessageDown::create_peer_message(&room_message.message_id, peer_id, room_message.data);
let mm = serde_json::to_string(&m)?;
if let Some(client_map) = client_map {
if let Some(peer_client_addr) = client_map.get(peer_id) {
if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.send_text(mm);
}
RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Message sent to: {}", peer_id)).send(tx);
} else {
RoomMessageDown::create_fail_replay(
&room_message.message_id, 404, format!("Peer not found: {}", peer_id)).send(tx);
}
}
}
} else {
client_not_in_room(tx, addr);
}
}
}
Ok(())
}