reflect: TEXT_MESSAGE_HANDLES
This commit is contained in:
151
src/main.rs
151
src/main.rs
@@ -1,3 +1,4 @@
|
|||||||
|
#[macro_use] extern crate lazy_static;
|
||||||
#[macro_use] extern crate rust_util;
|
#[macro_use] extern crate rust_util;
|
||||||
|
|
||||||
use std::env;
|
use std::env;
|
||||||
@@ -105,54 +106,45 @@ impl RoomMessageDown {
|
|||||||
struct HandleContext {
|
struct HandleContext {
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
room_map: RoomMap,
|
room_map: RoomMap,
|
||||||
|
admin_pass: Option<String>,
|
||||||
is_admin: bool,
|
is_admin: bool,
|
||||||
room_id: Option<String>,
|
room_id: Option<String>,
|
||||||
client_id: Option<String>,
|
client_id: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) {
|
lazy_static! {
|
||||||
if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await {
|
static ref TEXT_MESSAGE_HANDLES: Vec<Box<dyn HandleTextMessage>>= {
|
||||||
failure!("Handler connection error: {}", e);
|
let mut handles:Vec<Box<dyn HandleTextMessage>> = vec![];
|
||||||
|
handles.push(Box::new(HandleTextMessageExit));
|
||||||
|
handles.push(Box::new(HandleTextMessageStatics));
|
||||||
|
handles.push(Box::new(HandleTextMessagePass));
|
||||||
|
handles
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
trait HandleTextMessage: Sync {
|
||||||
|
fn is_matches(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str) -> bool;
|
||||||
|
fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: &str);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct HandleTextMessageExit;
|
||||||
|
impl HandleTextMessage for HandleTextMessageExit {
|
||||||
|
fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool {
|
||||||
|
msg == "/exit"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle(&self, _handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) {
|
||||||
|
tx.unbounded_send(Message::Close(None)).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn inner_handle_connection(
|
struct HandleTextMessageStatics;
|
||||||
mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr
|
impl HandleTextMessage for HandleTextMessageStatics {
|
||||||
) -> XResult<()> {
|
fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool {
|
||||||
information!("Incoming TCP connection from: {}", addr);
|
msg == "/statics"
|
||||||
|
|
||||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
|
||||||
information!("WebSocket connection established: {}", addr);
|
|
||||||
|
|
||||||
// Insert the write part of this peer to the peer map.
|
|
||||||
let (tx, rx) = unbounded();
|
|
||||||
{ handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); }
|
|
||||||
|
|
||||||
let (outgoing, incoming) = ws_stream.split();
|
|
||||||
|
|
||||||
let admin_pass = env::var("PASS").ok();
|
|
||||||
let broadcast_incoming = incoming.try_for_each(|msg| {
|
|
||||||
match msg {
|
|
||||||
Message::Close(_) => {
|
|
||||||
information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr);
|
|
||||||
handle_context.peer_map.lock().unwrap().remove(&addr);
|
|
||||||
client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id);
|
|
||||||
},
|
|
||||||
Message::Ping(_) => {},
|
|
||||||
Message::Pong(_) => {},
|
|
||||||
Message::Binary(_) => {
|
|
||||||
warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id);
|
|
||||||
RoomMessageDown::create_error_reply("binary not supported").send(&tx);
|
|
||||||
},
|
|
||||||
Message::Text(msg) => {
|
|
||||||
if msg.is_empty() {
|
|
||||||
return future::ok(());
|
|
||||||
}
|
}
|
||||||
if msg == "/exit" {
|
|
||||||
tx.unbounded_send(Message::Close(None)).ok();
|
fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) {
|
||||||
return future::ok(());
|
|
||||||
}
|
|
||||||
if msg == "/statics" {
|
|
||||||
tx.unbounded_send(Message::Text(
|
tx.unbounded_send(Message::Text(
|
||||||
format!(
|
format!(
|
||||||
"room count: {}\npeer count: {}",
|
"room count: {}\npeer count: {}",
|
||||||
@@ -160,11 +152,18 @@ async fn inner_handle_connection(
|
|||||||
handle_context.peer_map.lock().unwrap().len(),
|
handle_context.peer_map.lock().unwrap().len(),
|
||||||
)
|
)
|
||||||
)).ok();
|
)).ok();
|
||||||
return future::ok(());
|
|
||||||
}
|
}
|
||||||
if msg.starts_with("/pass ") {
|
}
|
||||||
|
|
||||||
|
struct HandleTextMessagePass;
|
||||||
|
impl HandleTextMessage for HandleTextMessagePass {
|
||||||
|
fn is_matches(&self, _handle_context: &mut HandleContext, _tx: &Tx, _addr: SocketAddr, msg: &str) -> bool {
|
||||||
|
msg.starts_with("/pass ")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) {
|
||||||
let pass = &msg[6..];
|
let pass = &msg[6..];
|
||||||
if let Some(admin_pass) = &admin_pass {
|
if let Some(admin_pass) = &handle_context.admin_pass {
|
||||||
if admin_pass == pass {
|
if admin_pass == pass {
|
||||||
handle_context.is_admin = true;
|
handle_context.is_admin = true;
|
||||||
tx.unbounded_send(Message::Text("Admin password success".into())).ok();
|
tx.unbounded_send(Message::Text("Admin password success".into())).ok();
|
||||||
@@ -172,8 +171,17 @@ async fn inner_handle_connection(
|
|||||||
tx.unbounded_send(Message::Text("Admin password error".into())).ok();
|
tx.unbounded_send(Message::Text("Admin password error".into())).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return future::ok(());
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: SocketAddr, msg: String) {
|
||||||
|
// process all registered handles
|
||||||
|
for handle in &*TEXT_MESSAGE_HANDLES {
|
||||||
|
if handle.is_matches(handle_context, tx, addr, &msg) {
|
||||||
|
return handle.handle(handle_context, tx, addr, &msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if handle_context.is_admin {
|
if handle_context.is_admin {
|
||||||
if msg == "/rooms" {
|
if msg == "/rooms" {
|
||||||
let rooms = handle_context.room_map.lock().unwrap().keys().map(
|
let rooms = handle_context.room_map.lock().unwrap().keys().map(
|
||||||
@@ -184,7 +192,7 @@ async fn inner_handle_connection(
|
|||||||
} else {
|
} else {
|
||||||
tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok();
|
tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok();
|
||||||
}
|
}
|
||||||
return future::ok(());
|
return;
|
||||||
}
|
}
|
||||||
if msg.starts_with("/room ") {
|
if msg.starts_with("/room ") {
|
||||||
let room_id = &msg[6..];
|
let room_id = &msg[6..];
|
||||||
@@ -198,13 +206,13 @@ async fn inner_handle_connection(
|
|||||||
} else {
|
} else {
|
||||||
tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok();
|
tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok();
|
||||||
}
|
}
|
||||||
return future::ok(());
|
return;
|
||||||
}
|
}
|
||||||
if msg == "/version" {
|
if msg == "/version" {
|
||||||
tx.unbounded_send(Message::Text(
|
tx.unbounded_send(Message::Text(
|
||||||
format!("{} - v{}", NAME, VERSION)
|
format!("{} - v{}", NAME, VERSION)
|
||||||
)).ok();
|
)).ok();
|
||||||
return future::ok(());
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let room_message = match serde_json::from_str::<RoomMessage>(&msg) {
|
let room_message = match serde_json::from_str::<RoomMessage>(&msg) {
|
||||||
@@ -212,7 +220,7 @@ async fn inner_handle_connection(
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e);
|
warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e);
|
||||||
RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx);
|
RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx);
|
||||||
return future::ok(());
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match room_message.r#type {
|
match room_message.r#type {
|
||||||
@@ -397,6 +405,45 @@ async fn inner_handle_connection(
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) {
|
||||||
|
if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await {
|
||||||
|
failure!("Handler connection error: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn inner_handle_connection(
|
||||||
|
mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr
|
||||||
|
) -> XResult<()> {
|
||||||
|
information!("Incoming TCP connection from: {}", addr);
|
||||||
|
|
||||||
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||||
|
information!("WebSocket connection established: {}", addr);
|
||||||
|
|
||||||
|
// Insert the write part of this peer to the peer map.
|
||||||
|
let (tx, rx) = unbounded();
|
||||||
|
{ handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); }
|
||||||
|
|
||||||
|
let (outgoing, incoming) = ws_stream.split();
|
||||||
|
|
||||||
|
let broadcast_incoming = incoming.try_for_each(|msg| {
|
||||||
|
match msg {
|
||||||
|
Message::Close(_) => {
|
||||||
|
information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr);
|
||||||
|
handle_context.peer_map.lock().unwrap().remove(&addr);
|
||||||
|
client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id);
|
||||||
|
},
|
||||||
|
Message::Ping(_) => {},
|
||||||
|
Message::Pong(_) => {},
|
||||||
|
Message::Binary(_) => {
|
||||||
|
warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id);
|
||||||
|
RoomMessageDown::create_error_reply("binary not supported").send(&tx);
|
||||||
|
},
|
||||||
|
Message::Text(msg) => {
|
||||||
|
if !msg.is_empty() {
|
||||||
|
handle_text_message(&mut handle_context, &tx, addr, msg);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -436,7 +483,8 @@ fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), IoError> {
|
async fn main() -> Result<(), IoError> {
|
||||||
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
let admin_pass = env::var("PASS").ok();
|
||||||
|
let listen_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
||||||
|
|
||||||
let state = PeerMap::new(Mutex::new(HashMap::new()));
|
let state = PeerMap::new(Mutex::new(HashMap::new()));
|
||||||
let room = RoomMap::new(Mutex::new(BTreeMap::new()));
|
let room = RoomMap::new(Mutex::new(BTreeMap::new()));
|
||||||
@@ -444,15 +492,16 @@ async fn main() -> Result<(), IoError> {
|
|||||||
let handle_context = HandleContext {
|
let handle_context = HandleContext {
|
||||||
peer_map: state,
|
peer_map: state,
|
||||||
room_map: room,
|
room_map: room,
|
||||||
|
admin_pass,
|
||||||
is_admin: false,
|
is_admin: false,
|
||||||
room_id: None,
|
room_id: None,
|
||||||
client_id: None,
|
client_id: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create the event loop and TCP listener we'll accept connections on.
|
// Create the event loop and TCP listener we'll accept connections on.
|
||||||
let try_socket = TcpListener::bind(&addr).await;
|
let try_socket = TcpListener::bind(&listen_addr).await;
|
||||||
let listener = try_socket.expect(&format!("Failed to bind ok: {}", addr));
|
let listener = try_socket.expect(&format!("Failed to bind ok: {}", listen_addr));
|
||||||
success!("Listening on: {}", addr);
|
success!("Listening on: {}", listen_addr);
|
||||||
|
|
||||||
// Let's spawn the handling of each connection in a separate task.
|
// Let's spawn the handling of each connection in a separate task.
|
||||||
while let Ok((stream, addr)) = listener.accept().await {
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
|
|||||||
Reference in New Issue
Block a user