feat: style
This commit is contained in:
50
src/main.rs
50
src/main.rs
@@ -1,20 +1,21 @@
|
|||||||
#[macro_use] extern crate rust_util;
|
#[macro_use] extern crate rust_util;
|
||||||
|
|
||||||
use std::{
|
use std::env;
|
||||||
collections::HashMap,
|
use std::io::Error as IoError;
|
||||||
env,
|
use std::collections::{HashMap, BTreeMap};
|
||||||
io::Error as IoError,
|
use std::net::SocketAddr;
|
||||||
net::SocketAddr,
|
use std::sync::{Arc, Mutex};
|
||||||
sync::{Arc, Mutex},
|
|
||||||
};
|
|
||||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||||
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
|
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tungstenite::protocol::Message;
|
use tungstenite::protocol::Message;
|
||||||
use rust_util::XResult;
|
use rust_util::XResult;
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
type Tx = UnboundedSender<Message>;
|
||||||
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
||||||
|
type RoomMap = Arc<Mutex<BTreeMap<String, BTreeMap<String, SocketAddr>>>>;
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
enum RoomMessageType {
|
enum RoomMessageType {
|
||||||
#[serde(rename = "Enter")]
|
#[serde(rename = "Enter")]
|
||||||
@@ -90,24 +91,22 @@ impl RoomMessageDown {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send(&self, tx: &UnboundedSender<Message>) {
|
fn send(&self, tx: &Tx) {
|
||||||
if let Ok(mm) = serde_json::to_string(self) {
|
if let Ok(mm) = serde_json::to_string(self) {
|
||||||
tx.unbounded_send(Message::Text(mm)).ok();
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Tx = UnboundedSender<Message>;
|
|
||||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
|
||||||
type RoomMap = Arc<Mutex<BTreeMap<String, BTreeMap<String, SocketAddr>>>>;
|
|
||||||
|
|
||||||
async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) {
|
async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) {
|
||||||
if let Err(e) = inner_handle_connection(peer_map, room_map, raw_stream, addr).await {
|
if let Err(e) = inner_handle_connection(peer_map, room_map, raw_stream, addr).await {
|
||||||
failure!("Handler connection error: {}", e);
|
failure!("Handler connection error: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) -> XResult<()> {
|
async fn inner_handle_connection(
|
||||||
|
peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr
|
||||||
|
) -> XResult<()> {
|
||||||
information!("Incoming TCP connection from: {}", addr);
|
information!("Incoming TCP connection from: {}", addr);
|
||||||
|
|
||||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||||
@@ -120,7 +119,7 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
let (outgoing, incoming) = ws_stream.split();
|
let (outgoing, incoming) = ws_stream.split();
|
||||||
|
|
||||||
let mut is_admin = false;
|
let mut is_admin = false;
|
||||||
let admin_pass = std::env::var("PASS").ok();
|
let admin_pass = env::var("PASS").ok();
|
||||||
let mut room_id: Option<String> = None;
|
let mut room_id: Option<String> = None;
|
||||||
let mut client_id: Option<String> = None;
|
let mut client_id: Option<String> = None;
|
||||||
let broadcast_incoming = incoming.try_for_each(|msg| {
|
let broadcast_incoming = incoming.try_for_each(|msg| {
|
||||||
@@ -130,18 +129,20 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
peer_map.lock().unwrap().remove(&addr);
|
peer_map.lock().unwrap().remove(&addr);
|
||||||
client_exit(&room_map, &room_id, &client_id);
|
client_exit(&room_map, &room_id, &client_id);
|
||||||
},
|
},
|
||||||
Message::Ping(msg) => {
|
Message::Ping(_) => {},
|
||||||
tx.unbounded_send(Message::Pong(msg)).ok();
|
|
||||||
},
|
|
||||||
Message::Pong(_) => {},
|
Message::Pong(_) => {},
|
||||||
Message::Binary(_) => {
|
Message::Binary(_) => {
|
||||||
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id);
|
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id);
|
||||||
RoomMessageDown::create_error_reply("unknown error").send(&tx);
|
RoomMessageDown::create_error_reply("binary not supported").send(&tx);
|
||||||
},
|
},
|
||||||
Message::Text(msg) => {
|
Message::Text(msg) => {
|
||||||
if msg.is_empty() {
|
if msg.is_empty() {
|
||||||
return future::ok(());
|
return future::ok(());
|
||||||
}
|
}
|
||||||
|
if msg == "/exit" {
|
||||||
|
tx.unbounded_send(Message::Close(None)).ok();
|
||||||
|
return future::ok(());
|
||||||
|
}
|
||||||
if msg == "/statics" {
|
if msg == "/statics" {
|
||||||
tx.unbounded_send(Message::Text(
|
tx.unbounded_send(Message::Text(
|
||||||
format!(
|
format!(
|
||||||
@@ -169,7 +170,11 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
let rooms = room_map.lock().unwrap().keys().map(
|
let rooms = room_map.lock().unwrap().keys().map(
|
||||||
|k| k.clone()
|
|k| k.clone()
|
||||||
).collect::<Vec<_>>().join("\n");
|
).collect::<Vec<_>>().join("\n");
|
||||||
|
if rooms.is_empty() {
|
||||||
|
tx.unbounded_send(Message::Text("rooms: <empty>".into())).ok();
|
||||||
|
} else {
|
||||||
tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok();
|
tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok();
|
||||||
|
}
|
||||||
return future::ok(());
|
return future::ok(());
|
||||||
}
|
}
|
||||||
if msg.starts_with("/room ") {
|
if msg.starts_with("/room ") {
|
||||||
@@ -211,7 +216,10 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
|
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
|
||||||
RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx);
|
RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx);
|
||||||
} else {
|
} else {
|
||||||
information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr);
|
information!(
|
||||||
|
"Replace client: {:?} - {:?}, from {:?} -> {:?}",
|
||||||
|
msg_room_id, msg_client_id, peer_addr, addr
|
||||||
|
);
|
||||||
let tx = peer_map.lock().unwrap().remove(peer_addr);
|
let tx = peer_map.lock().unwrap().remove(peer_addr);
|
||||||
if let Some(tx) = tx {
|
if let Some(tx) = tx {
|
||||||
tx.unbounded_send(Message::Close(None)).ok();
|
tx.unbounded_send(Message::Close(None)).ok();
|
||||||
@@ -386,7 +394,7 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn client_not_in_room(tx: &UnboundedSender<Message>) {
|
fn client_not_in_room(tx: &Tx) {
|
||||||
information!("Client is not in room");
|
information!("Client is not in room");
|
||||||
RoomMessageDown::create_error_reply("Client is not in room").send(tx);
|
RoomMessageDown::create_error_reply("Client is not in room").send(tx);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user