feat: works

This commit is contained in:
2021-05-14 23:40:30 +08:00
parent d62ca71735
commit e749f6628a

View File

@@ -63,23 +63,24 @@ struct RoomMessageDown {
}
impl RoomMessageDown {
fn create_error_reply(error_message: String) -> Self {
fn create_error_reply<S>(error_message: S) -> Self where S: Into<String> {
Self {
r#type: RoomMessageDownType::ReplyMessage,
reply_code: Some(500),
reply_message: Some(error_message),
reply_message: Some(error_message.into()),
..Default::default()
}
}
fn create_success_reply(success_message: String) -> Self {
fn create_success_reply<S>(success_message: S) -> Self where S: Into<String> {
Self {
r#type: RoomMessageDownType::ReplyMessage,
reply_code: Some(200),
reply_message: Some(success_message),
reply_message: Some(success_message.into()),
..Default::default()
}
}
fn create_peer_enter(peer_id: String) -> Self {
fn create_peer_enter<S>(peer_id: S) -> Self where S: Into<String> {
let peer_id = peer_id.into();
Self {
r#type: RoomMessageDownType::PeerEnter,
reply_code: Some(200),
@@ -88,6 +89,12 @@ impl RoomMessageDown {
..Default::default()
}
}
fn send(&self, tx: &UnboundedSender<Message>) {
if let Ok(mm) = serde_json::to_string(self) {
tx.unbounded_send(Message::Text(mm)).ok();
}
}
}
type Tx = UnboundedSender<Message>;
@@ -112,6 +119,8 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
let (outgoing, incoming) = ws_stream.split();
let mut is_admin = false;
let admin_pass = std::env::var("PASS").ok();
let mut room_id: Option<String> = None;
let mut client_id: Option<String> = None;
let broadcast_incoming = incoming.try_for_each(|msg| {
@@ -127,208 +136,235 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
Message::Pong(_) => {},
Message::Binary(_) => {
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id);
let m = RoomMessageDown::create_error_reply("unknown error".into());
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
RoomMessageDown::create_error_reply("unknown error").send(&tx);
},
Message::Text(msg) => {
let room_message = serde_json::from_str::<RoomMessage>(&msg);
match room_message {
if msg.is_empty() {
return future::ok(());
}
if msg == "/statics" {
tx.unbounded_send(Message::Text(
format!(
"room count: {}\npeer count: {}",
room_map.lock().unwrap().len(),
peer_map.lock().unwrap().len(),
)
)).ok();
return future::ok(());
}
if msg.starts_with("/pass ") {
let pass = &msg[6..];
if let Some(admin_pass) = &admin_pass {
if admin_pass == pass {
is_admin = true;
tx.unbounded_send(Message::Text("Admin password success".into())).ok();
} else {
tx.unbounded_send(Message::Text("Admin password error".into())).ok();
}
}
return future::ok(());
}
if is_admin {
if msg == "/rooms" {
let rooms = room_map.lock().unwrap().keys().map(
|k| k.clone()
).collect::<Vec<_>>().join("\n");
tx.unbounded_send(Message::Text(format!("rooms:\n{}", rooms))).ok();
return future::ok(());
}
if msg.starts_with("/room ") {
let room_id = &msg[6..];
let room_map = room_map.lock().unwrap();
let mut client_ids = vec![];
if let Some(client_map) = room_map.get(room_id) {
for peer_client_id in client_map.keys() {
client_ids.push(peer_client_id.clone());
}
tx.unbounded_send(Message::Text(format!("clients in room {}:\n{}", room_id, client_ids.join("\n")))).ok();
} else {
tx.unbounded_send(Message::Text(format!("room not found: {}", room_id))).ok();
}
return future::ok(());
}
}
let room_message = match serde_json::from_str::<RoomMessage>(&msg) {
Ok(room_message) => room_message,
Err(e) => {
warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e);
let m = RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg));
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
},
Ok(room_message) => {
match room_message.r#type {
RoomMessageType::CreateOrEnter => {
if let (Some(_), Some(_)) = (&room_id, &client_id) {
warning!("Client is already in room: {:?} - {:?}", room_id, client_id);
let m = RoomMessageDown::create_error_reply("Client is already in room".into());
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
} else {
if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) {
let mut room_map = room_map.lock().unwrap();
match room_map.get_mut(&msg_room_id) {
Some(client_map) => {
match client_map.get(&msg_client_id) {
Some(peer_addr) => {
if peer_addr == &addr {
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
let m = RoomMessageDown::create_success_reply("Duplicate enter to room".into());
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
} else {
information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr);
let tx = peer_map.lock().unwrap().remove(peer_addr);
if let Some(tx) = tx {
tx.unbounded_send(Message::Close(None)).ok();
}
client_map.insert(msg_client_id.clone(), addr);
}
},
None => {
information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
client_map.insert(msg_client_id.clone(), addr);
let m = RoomMessageDown::create_peer_enter(msg_client_id.clone());
if let Ok(mm) = serde_json::to_string(&m) {
for (client_id, client_addr) in client_map {
if client_id != &msg_client_id {
if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) {
client_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
}
}
}
},
RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx);
return future::ok(());
}
};
match room_message.r#type {
RoomMessageType::CreateOrEnter => {
if let (Some(_), Some(_)) = (&room_id, &client_id) {
warning!("Client is already in room: {:?} - {:?}", room_id, client_id);
RoomMessageDown::create_error_reply("Client is already in room").send(&tx);
} else {
if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) {
let mut room_map = room_map.lock().unwrap();
match room_map.get_mut(&msg_room_id) {
Some(client_map) => {
match client_map.get(&msg_client_id) {
Some(peer_addr) => {
if peer_addr == &addr {
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
RoomMessageDown::create_success_reply("Duplicate enter to room").send(&tx);
} else {
information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr);
let tx = peer_map.lock().unwrap().remove(peer_addr);
if let Some(tx) = tx {
tx.unbounded_send(Message::Close(None)).ok();
}
client_map.insert(msg_client_id.clone(), addr);
}
},
None => {
information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
let mut client_map = BTreeMap::new();
information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
client_map.insert(msg_client_id.clone(), addr);
room_map.insert(msg_room_id.clone(), client_map);
let m = RoomMessageDown::create_peer_enter(msg_client_id.clone());
if let Ok(mm) = serde_json::to_string(&m) {
for (client_id, client_addr) in client_map {
if client_id != &msg_client_id {
if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) {
client_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
}
}
}
},
}
room_id = Some(msg_room_id);
client_id = Some(msg_client_id);
} else {
let m = RoomMessageDown::create_error_reply("Room id and client id must both assigned".into());
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
},
None => {
information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
let mut client_map = BTreeMap::new();
client_map.insert(msg_client_id.clone(), addr);
room_map.insert(msg_room_id.clone(), client_map);
},
}
room_id = Some(msg_room_id);
client_id = Some(msg_client_id);
} else {
RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(&tx);
}
}
},
RoomMessageType::Exit => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
let mut room_map = room_map.lock().unwrap();
if let Some(client_map) = room_map.get_mut(room_id) {
client_map.remove(client_id);
peer_map.lock().unwrap().remove(&addr);
tx.unbounded_send(Message::Close(None)).ok();
} else {
warning!("Not in room: {:?} - {:?}", room_id, client_id);
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::Destroy => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("Destroy room: {:?} - {:?}", room_id, client_id);
let mut room_map = room_map.lock().unwrap();
let client_map = room_map.remove(room_id);
if let Some(client_map) = client_map {
for (_client_id, client_addr) in client_map {
let client_tx = peer_map.lock().unwrap().remove(&client_addr);
if let Some(client_tx) = client_tx {
client_tx.unbounded_send(Message::Close(None)).ok();
}
}
},
RoomMessageType::Exit => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
let mut room_map = room_map.lock().unwrap();
if let Some(client_map) = room_map.get_mut(room_id) {
client_map.remove(client_id);
peer_map.lock().unwrap().remove(&addr);
tx.unbounded_send(Message::Close(None)).ok();
} else {
warning!("Not in room: {:?} - {:?}", room_id, client_id);
}
} else {
client_not_in_room(&tx);
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::ListPeers => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("List room peers: {:?} - {:?}", room_id, client_id);
let room_map = room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let mut client_ids = vec![];
if let Some(client_map) = client_map {
for (client_id, _client_addr) in client_map {
client_ids.push(client_id.clone());
}
},
RoomMessageType::Destroy => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("Destroy room: {:?} - {:?}", room_id, client_id);
let mut room_map = room_map.lock().unwrap();
let client_map = room_map.remove(room_id);
if let Some(client_map) = client_map {
for (_client_id, client_addr) in client_map {
let client_tx = peer_map.lock().unwrap().remove(&client_addr);
if let Some(client_tx) = client_tx {
client_tx.unbounded_send(Message::Close(None)).ok();
}
}
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::ListPeers => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("List room peers: {:?} - {:?}", room_id, client_id);
let room_map = room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let mut client_ids = vec![];
if let Some(client_map) = client_map {
for (client_id, _client_addr) in client_map {
client_ids.push(client_id.clone());
}
}
let client_ids_data = serde_json::to_string(&client_ids);
if let Ok(client_ids_data) = client_ids_data {
let m = RoomMessageDown {
r#type: RoomMessageDownType::PeerList,
reply_code: Some(200),
reply_message: Some("ok".into()),
data: Some(client_ids_data),
..Default::default()
};
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::Broadcast => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("Broadcast room message: {:?} - {:?}", room_id, client_id);
let room_map = room_map.lock().unwrap();
let client_map = room_map.get(room_id);
}
let client_ids_data = serde_json::to_string(&client_ids);
if let Ok(client_ids_data) = client_ids_data {
let m = RoomMessageDown {
r#type: RoomMessageDownType::PeerList,
reply_code: Some(200),
reply_message: Some("ok".into()),
data: Some(client_ids_data),
..Default::default()
};
m.send(&tx);
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::Broadcast => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("Broadcast room message: {:?} - {:?}", room_id, client_id);
let room_map = room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let data = room_message.data;
let m = RoomMessageDown {
r#type: RoomMessageDownType::BroadcastMessage,
reply_code: Some(200),
reply_message: Some("ok".into()),
peer_id: Some(client_id.clone()),
data,
};
if let Ok(mm) = serde_json::to_string(&m) {
if let Some(client_map) = client_map {
for (peer_client_id, peer_client_addr) in client_map {
if peer_client_id != client_id {
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
}
let data = room_message.data;
let m = RoomMessageDown {
r#type: RoomMessageDownType::BroadcastMessage,
reply_code: Some(200),
reply_message: Some("ok".into()),
peer_id: Some(client_id.clone()),
data,
};
if let Ok(mm) = serde_json::to_string(&m) {
if let Some(client_map) = client_map {
for (peer_client_id, peer_client_addr) in client_map {
if peer_client_id != client_id {
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
}
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::Peer => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id);
if let Some(peer_id) = room_message.peer_id.as_ref() {
let room_map = room_map.lock().unwrap();
let client_map = room_map.get(room_id);
}
} else {
client_not_in_room(&tx);
}
},
RoomMessageType::Peer => {
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id);
if let Some(peer_id) = room_message.peer_id.as_ref() {
let room_map = room_map.lock().unwrap();
let client_map = room_map.get(room_id);
let data = room_message.data;
let m = RoomMessageDown {
r#type: RoomMessageDownType::PeerMessage,
reply_code: Some(200),
reply_message: Some("ok".into()),
peer_id: Some(client_id.clone()),
data,
};
if let Ok(mm) = serde_json::to_string(&m) {
if let Some(client_map) = client_map {
if let Some(peer_client_addr) = client_map.get(peer_id) {
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
} else {
let m = RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id));
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
}
let data = room_message.data;
let m = RoomMessageDown {
r#type: RoomMessageDownType::PeerMessage,
reply_code: Some(200),
reply_message: Some("ok".into()),
peer_id: Some(client_id.clone()),
data,
};
if let Ok(mm) = serde_json::to_string(&m) {
if let Some(client_map) = client_map {
if let Some(peer_client_addr) = client_map.get(peer_id) {
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
}
} else {
RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(&tx);
}
}
} else {
client_not_in_room(&tx);
}
},
}
} else {
client_not_in_room(&tx);
}
},
}
@@ -352,10 +388,7 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
fn client_not_in_room(tx: &UnboundedSender<Message>) {
information!("Client is not in room");
let m = RoomMessageDown::create_error_reply("Client is not in room".into());
if let Ok(mm) = serde_json::to_string(&m) {
tx.unbounded_send(Message::Text(mm)).ok();
}
RoomMessageDown::create_error_reply("Client is not in room").send(tx);
}
fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<String>) {
@@ -381,8 +414,8 @@ async fn main() -> Result<(), IoError> {
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", addr);
let listener = try_socket.expect(&format!("Failed to bind ok: {}", addr));
success!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {