feat: add handle context

This commit is contained in:
2021-05-17 01:36:49 +08:00
parent 5b06d18f7b
commit 9f5ceedafe

View File

@@ -101,14 +101,23 @@ impl RoomMessageDown {
}
}
async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) {
if let Err(e) = inner_handle_connection(peer_map, room_map, raw_stream, addr).await {
#[derive(Debug, Clone)]
struct HandleContext {
peer_map: PeerMap,
room_map: RoomMap,
is_admin: bool,
room_id: Option<String>,
client_id: Option<String>,
}
async fn handle_connection(handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr) {
if let Err(e) = inner_handle_connection(handle_context, raw_stream, addr).await {
failure!("Handler connection error: {}", e);
}
}
async fn inner_handle_connection(
peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr
mut handle_context: HandleContext, raw_stream: TcpStream, addr: SocketAddr
) -> XResult<()> {
information!("Incoming TCP connection from: {}", addr);
@@ -117,25 +126,22 @@ async fn inner_handle_connection(
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded();
{ peer_map.lock().unwrap().insert(addr, tx.clone()); }
{ handle_context.peer_map.lock().unwrap().insert(addr, tx.clone()); }
let (outgoing, incoming) = ws_stream.split();
let mut is_admin = false;
let admin_pass = env::var("PASS").ok();
let mut room_id: Option<String> = None;
let mut client_id: Option<String> = None;
let broadcast_incoming = incoming.try_for_each(|msg| {
match msg {
Message::Close(_) => {
information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr);
peer_map.lock().unwrap().remove(&addr);
client_exit(&room_map, &room_id, &client_id);
information!("Close connection: {:?} - {:?} from: {}", handle_context.room_id, handle_context.client_id, addr);
handle_context.peer_map.lock().unwrap().remove(&addr);
client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id);
},
Message::Ping(_) => {},
Message::Pong(_) => {},
Message::Binary(_) => {
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id);
warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id);
RoomMessageDown::create_error_reply("binary not supported").send(&tx);
},
Message::Text(msg) => {
@@ -150,8 +156,8 @@ async fn inner_handle_connection(
tx.unbounded_send(Message::Text(
format!(
"room count: {}\npeer count: {}",
room_map.lock().unwrap().len(),
peer_map.lock().unwrap().len(),
handle_context.room_map.lock().unwrap().len(),
handle_context.peer_map.lock().unwrap().len(),
)
)).ok();
return future::ok(());
@@ -160,7 +166,7 @@ async fn inner_handle_connection(
let pass = &msg[6..];
if let Some(admin_pass) = &admin_pass {
if admin_pass == pass {
is_admin = true;
handle_context.is_admin = true;
tx.unbounded_send(Message::Text("Admin password success".into())).ok();
} else {
tx.unbounded_send(Message::Text("Admin password error".into())).ok();
@@ -168,9 +174,9 @@ async fn inner_handle_connection(
}
return future::ok(());
}
if is_admin {
if handle_context.is_admin {
if msg == "/rooms" {
let rooms = room_map.lock().unwrap().keys().map(
let rooms = handle_context.room_map.lock().unwrap().keys().map(
|k| k.clone()
).collect::<Vec<_>>().join("\n");
if rooms.is_empty() {
@@ -182,7 +188,7 @@ async fn inner_handle_connection(
}
if msg.starts_with("/room ") {
let room_id = &msg[6..];
let room_map = room_map.lock().unwrap();
let room_map = handle_context.room_map.lock().unwrap();
let mut client_ids = vec![];
if let Some(client_map) = room_map.get(room_id) {
for peer_client_id in client_map.keys() {
@@ -204,19 +210,19 @@ async fn inner_handle_connection(
let room_message = match serde_json::from_str::<RoomMessage>(&msg) {
Ok(room_message) => room_message,
Err(e) => {
warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e);
warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e);
RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx);
return future::ok(());
}
};
match room_message.r#type {
RoomMessageType::CreateOrEnter => {
if let (Some(_), Some(_)) = (&room_id, &client_id) {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
warning!("Client is already in room: {:?} - {:?}", room_id, client_id);
RoomMessageDown::create_error_reply("Client is already in room").send(&tx);
} else {
if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) {
let mut room_map = room_map.lock().unwrap();
let mut room_map = handle_context.room_map.lock().unwrap();
match room_map.get_mut(&msg_room_id) {
Some(client_map) => {
match client_map.get(&msg_client_id) {
@@ -229,11 +235,12 @@ async fn inner_handle_connection(
"Replace client: {:?} - {:?}, from {:?} -> {:?}",
msg_room_id, msg_client_id, peer_addr, addr
);
if let Some(tx) = peer_map.lock().unwrap().remove(peer_addr) {
let client_replaced_message = format!("Client replaced {:?} -> {:?}", peer_addr, addr);
if let Some(tx) = handle_context.peer_map.lock().unwrap().remove(peer_addr) {
tx.unbounded_send(Message::Close(None)).ok();
}
client_map.insert(msg_client_id.clone(), addr);
RoomMessageDown::create_success_reply(format!("Client replaced {:?} -> {:?}", peer_addr, addr)).send(&tx);
RoomMessageDown::create_success_reply(client_replaced_message).send(&tx);
}
},
None => {
@@ -244,7 +251,7 @@ async fn inner_handle_connection(
if let Ok(mm) = serde_json::to_string(&m) {
for (client_id, client_addr) in client_map {
if client_id != &msg_client_id {
if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) {
if let Some(client_tx) = handle_context.peer_map.lock().unwrap().get(client_addr) {
client_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
}
@@ -262,19 +269,19 @@ async fn inner_handle_connection(
RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(&tx);
},
}
room_id = Some(msg_room_id);
client_id = Some(msg_client_id);
handle_context.room_id = Some(msg_room_id);
handle_context.client_id = Some(msg_client_id);
} else {
RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx);
}
}
},
RoomMessageType::Exit => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
let mut room_map = room_map.lock().unwrap();
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
let mut room_map = handle_context.room_map.lock().unwrap();
if let Some(client_map) = room_map.get_mut(room_id) {
client_map.remove(client_id);
peer_map.lock().unwrap().remove(&addr);
handle_context.peer_map.lock().unwrap().remove(&addr);
tx.unbounded_send(Message::Close(None)).ok();
} else {
warning!("Not in room: {:?} - {:?}", room_id, client_id);
@@ -284,13 +291,13 @@ async fn inner_handle_connection(
}
},
RoomMessageType::Destroy => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("Destroy room: {:?} - {:?}", room_id, client_id);
let mut room_map = room_map.lock().unwrap();
let mut room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.remove(room_id);
if let Some(client_map) = client_map {
for (_client_id, client_addr) in client_map {
if let Some(client_tx) = peer_map.lock().unwrap().remove(&client_addr) {
if let Some(client_tx) = handle_context.peer_map.lock().unwrap().remove(&client_addr) {
client_tx.unbounded_send(Message::Close(None)).ok();
}
}
@@ -300,9 +307,9 @@ async fn inner_handle_connection(
}
},
RoomMessageType::ListPeers => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("List room peers: {:?} - {:?}", room_id, client_id);
let room_map = room_map.lock().unwrap();
let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let mut client_ids = vec![];
if let Some(client_map) = client_map {
@@ -326,9 +333,9 @@ async fn inner_handle_connection(
}
},
RoomMessageType::Broadcast => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("Broadcast room message: {:?} - {:?}", room_id, client_id);
let room_map = room_map.lock().unwrap();
let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let data = room_message.data;
@@ -344,7 +351,7 @@ async fn inner_handle_connection(
let mut sent_messages = 0;
for (peer_client_id, peer_client_addr) in client_map {
if peer_client_id != client_id {
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) {
sent_messages += 1;
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
@@ -358,10 +365,10 @@ async fn inner_handle_connection(
}
},
RoomMessageType::Peer => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id);
if let Some(peer_id) = room_message.peer_id.as_ref() {
let room_map = room_map.lock().unwrap();
let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let data = room_message.data;
@@ -375,7 +382,7 @@ async fn inner_handle_connection(
if let Ok(mm) = serde_json::to_string(&m) {
if let Some(client_map) = client_map {
if let Some(peer_client_addr) = client_map.get(peer_id) {
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(&tx);
@@ -402,8 +409,8 @@ async fn inner_handle_connection(
future::select(broadcast_incoming, receive_from_others).await;
information!("{} disconnected", &addr);
client_exit(&room_map, &room_id, &client_id);
peer_map.lock().unwrap().remove(&addr);
client_exit(&handle_context.room_map, &handle_context.room_id, &handle_context.client_id);
handle_context.peer_map.lock().unwrap().remove(&addr);
Ok(())
}
@@ -434,6 +441,14 @@ async fn main() -> Result<(), IoError> {
let state = PeerMap::new(Mutex::new(HashMap::new()));
let room = RoomMap::new(Mutex::new(BTreeMap::new()));
let handle_context = HandleContext {
peer_map: state,
room_map: room,
is_admin: false,
room_id: None,
client_id: None,
};
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect(&format!("Failed to bind ok: {}", addr));
@@ -441,7 +456,7 @@ async fn main() -> Result<(), IoError> {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(state.clone(), room.clone(), stream, addr));
tokio::spawn(handle_connection(handle_context.clone(), stream, addr));
}
Ok(())