feat: works now
This commit is contained in:
276
src/main.rs
276
src/main.rs
@@ -30,14 +30,15 @@ enum RoomMessageType {
|
|||||||
enum RoomMessageDownType {
|
enum RoomMessageDownType {
|
||||||
PeerEnter,
|
PeerEnter,
|
||||||
PeerExit,
|
PeerExit,
|
||||||
|
PeerList,
|
||||||
PeerMessage,
|
PeerMessage,
|
||||||
BroadcastMessage,
|
BroadcastMessage,
|
||||||
ReplayMessage,
|
ReplyMessage,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RoomMessageDownType {
|
impl Default for RoomMessageDownType {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self::ReplayMessage
|
Self::ReplyMessage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,11 +57,39 @@ struct RoomMessage {
|
|||||||
struct RoomMessageDown {
|
struct RoomMessageDown {
|
||||||
r#type: RoomMessageDownType,
|
r#type: RoomMessageDownType,
|
||||||
peer_id: Option<String>,
|
peer_id: Option<String>,
|
||||||
replay_code: Option<i32>,
|
reply_code: Option<i32>,
|
||||||
replay_message: Option<String>,
|
reply_message: Option<String>,
|
||||||
data: Option<String>,
|
data: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RoomMessageDown {
|
||||||
|
fn create_error_reply(error_message: String) -> Self {
|
||||||
|
Self {
|
||||||
|
r#type: RoomMessageDownType::ReplyMessage,
|
||||||
|
reply_code: Some(500),
|
||||||
|
reply_message: Some(error_message),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn create_success_reply(success_message: String) -> Self {
|
||||||
|
Self {
|
||||||
|
r#type: RoomMessageDownType::ReplyMessage,
|
||||||
|
reply_code: Some(200),
|
||||||
|
reply_message: Some(success_message),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn create_peer_enter(peer_id: String) -> Self {
|
||||||
|
Self {
|
||||||
|
r#type: RoomMessageDownType::PeerEnter,
|
||||||
|
reply_code: Some(200),
|
||||||
|
reply_message: Some(format!("Peer {} entered", peer_id)),
|
||||||
|
peer_id: Some(peer_id),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Tx = UnboundedSender<Message>;
|
type Tx = UnboundedSender<Message>;
|
||||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
||||||
type RoomMap = Arc<Mutex<BTreeMap<String, BTreeMap<String, SocketAddr>>>>;
|
type RoomMap = Arc<Mutex<BTreeMap<String, BTreeMap<String, SocketAddr>>>>;
|
||||||
@@ -72,10 +101,10 @@ async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: Tcp
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) -> XResult<()> {
|
async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) -> XResult<()> {
|
||||||
println!("Incoming TCP connection from: {}", addr);
|
information!("Incoming TCP connection from: {}", addr);
|
||||||
|
|
||||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||||
println!("WebSocket connection established: {}", addr);
|
information!("WebSocket connection established: {}", addr);
|
||||||
|
|
||||||
// Insert the write part of this peer to the peer map.
|
// Insert the write part of this peer to the peer map.
|
||||||
let (tx, rx) = unbounded();
|
let (tx, rx) = unbounded();
|
||||||
@@ -89,51 +118,222 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
match msg {
|
match msg {
|
||||||
Message::Close(_) => {
|
Message::Close(_) => {
|
||||||
information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr);
|
information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr);
|
||||||
|
peer_map.lock().unwrap().remove(&addr);
|
||||||
|
client_exit(&room_map, &room_id, &client_id);
|
||||||
},
|
},
|
||||||
Message::Ping(msg) => {
|
Message::Ping(msg) => {
|
||||||
tx.unbounded_send(Message::Pong(msg)).unwrap();
|
tx.unbounded_send(Message::Pong(msg)).ok();
|
||||||
},
|
},
|
||||||
Message::Pong(_) => {},
|
Message::Pong(_) => {},
|
||||||
Message::Binary(_) => {
|
Message::Binary(_) => {
|
||||||
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id)
|
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id);
|
||||||
|
let m = RoomMessageDown::create_error_reply("unknown error".into());
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Message::Text(msg) => {
|
Message::Text(msg) => {
|
||||||
let msg = serde_json::from_str::<RoomMessage>(&msg);
|
let room_message = serde_json::from_str::<RoomMessage>(&msg);
|
||||||
match msg {
|
match room_message {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e);
|
warning!("Parse message: from: {:?} - {:?}, failed: {}", room_id, client_id, e);
|
||||||
let m = RoomMessageDown{
|
let m = RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg));
|
||||||
r#type: RoomMessageDownType::PeerMessage,
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Ok(room_message) => {
|
||||||
|
match room_message.r#type {
|
||||||
|
RoomMessageType::CreateOrEnter => {
|
||||||
|
if let (Some(_), Some(_)) = (&room_id, &client_id) {
|
||||||
|
warning!("Client is already in room: {:?} - {:?}", room_id, client_id);
|
||||||
|
let m = RoomMessageDown::create_error_reply("Client is already in room".into());
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) {
|
||||||
|
let mut room_map = room_map.lock().unwrap();
|
||||||
|
match room_map.get_mut(&msg_room_id) {
|
||||||
|
Some(client_map) => {
|
||||||
|
match client_map.get(&msg_client_id) {
|
||||||
|
Some(peer_addr) => {
|
||||||
|
if peer_addr == &addr {
|
||||||
|
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
|
||||||
|
let m = RoomMessageDown::create_success_reply("Duplicate enter to room".into());
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
information!("Replace client: {:?} - {:?}, from {:?} -> {:?}", msg_room_id, msg_client_id, peer_addr, addr);
|
||||||
|
let tx = peer_map.lock().unwrap().remove(peer_addr);
|
||||||
|
if let Some(tx) = tx {
|
||||||
|
tx.unbounded_send(Message::Close(None)).ok();
|
||||||
|
}
|
||||||
|
client_map.insert(msg_client_id.clone(), addr);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
|
||||||
|
client_map.insert(msg_client_id.clone(), addr);
|
||||||
|
|
||||||
|
let m = RoomMessageDown::create_peer_enter(msg_client_id.clone());
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
for (client_id, client_addr) in client_map {
|
||||||
|
if client_id != &msg_client_id {
|
||||||
|
if let Some(client_tx) = peer_map.lock().unwrap().get(client_addr) {
|
||||||
|
client_tx.unbounded_send(Message::Text(mm.clone())).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
information!("Create and enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
|
||||||
|
let mut client_map = BTreeMap::new();
|
||||||
|
client_map.insert(msg_client_id.clone(), addr);
|
||||||
|
room_map.insert(msg_room_id.clone(), client_map);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
room_id = Some(msg_room_id);
|
||||||
|
client_id = Some(msg_client_id);
|
||||||
|
} else {
|
||||||
|
let m = RoomMessageDown::create_error_reply("Room id and client id must both assigned".into());
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
RoomMessageType::Exit => {
|
||||||
|
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
|
||||||
|
let mut room_map = room_map.lock().unwrap();
|
||||||
|
if let Some(client_map) = room_map.get_mut(room_id) {
|
||||||
|
client_map.remove(client_id);
|
||||||
|
peer_map.lock().unwrap().remove(&addr);
|
||||||
|
tx.unbounded_send(Message::Close(None)).ok();
|
||||||
|
} else {
|
||||||
|
warning!("Not in room: {:?} - {:?}", room_id, client_id);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_not_in_room(&tx);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
RoomMessageType::Destroy => {
|
||||||
|
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
|
||||||
|
information!("Destroy room: {:?} - {:?}", room_id, client_id);
|
||||||
|
let mut room_map = room_map.lock().unwrap();
|
||||||
|
let client_map = room_map.remove(room_id);
|
||||||
|
if let Some(client_map) = client_map {
|
||||||
|
for (_client_id, client_addr) in client_map {
|
||||||
|
let client_tx = peer_map.lock().unwrap().remove(&client_addr);
|
||||||
|
if let Some(client_tx) = client_tx {
|
||||||
|
client_tx.unbounded_send(Message::Close(None)).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_not_in_room(&tx);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
RoomMessageType::ListPeers => {
|
||||||
|
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
|
||||||
|
information!("List room peers: {:?} - {:?}", room_id, client_id);
|
||||||
|
let room_map = room_map.lock().unwrap();
|
||||||
|
let client_map = room_map.get(room_id);
|
||||||
|
let mut client_ids = vec![];
|
||||||
|
if let Some(client_map) = client_map {
|
||||||
|
for (client_id, _client_addr) in client_map {
|
||||||
|
client_ids.push(client_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let client_ids_data = serde_json::to_string(&client_ids);
|
||||||
|
if let Ok(client_ids_data) = client_ids_data {
|
||||||
|
let m = RoomMessageDown {
|
||||||
|
r#type: RoomMessageDownType::PeerList,
|
||||||
|
reply_code: Some(200),
|
||||||
|
reply_message: Some("ok".into()),
|
||||||
|
data: Some(client_ids_data),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_not_in_room(&tx);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Ok(msg) => {
|
RoomMessageType::Broadcast => {
|
||||||
|
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
|
||||||
|
information!("Broadcast room message: {:?} - {:?}", room_id, client_id);
|
||||||
|
let room_map = room_map.lock().unwrap();
|
||||||
|
let client_map = room_map.get(room_id);
|
||||||
|
|
||||||
|
let data = room_message.data;
|
||||||
|
let m = RoomMessageDown {
|
||||||
|
r#type: RoomMessageDownType::BroadcastMessage,
|
||||||
|
reply_code: Some(200),
|
||||||
|
reply_message: Some("ok".into()),
|
||||||
|
peer_id: Some(client_id.clone()),
|
||||||
|
data,
|
||||||
|
};
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
if let Some(client_map) = client_map {
|
||||||
|
for (peer_client_id, peer_client_addr) in client_map {
|
||||||
|
if peer_client_id != client_id {
|
||||||
|
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
|
||||||
|
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_not_in_room(&tx);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
RoomMessageType::Peer => {
|
||||||
|
if let (Some(room_id), Some(client_id)) = (&room_id, &client_id) {
|
||||||
|
information!("Send peer message: {:?} - {:?} -> {:?}", room_id, client_id, room_message.peer_id);
|
||||||
|
if let Some(peer_id) = room_message.peer_id.as_ref() {
|
||||||
|
let room_map = room_map.lock().unwrap();
|
||||||
|
let client_map = room_map.get(room_id);
|
||||||
|
|
||||||
|
let data = room_message.data;
|
||||||
|
let m = RoomMessageDown {
|
||||||
|
r#type: RoomMessageDownType::PeerMessage,
|
||||||
|
reply_code: Some(200),
|
||||||
|
reply_message: Some("ok".into()),
|
||||||
|
peer_id: Some(client_id.clone()),
|
||||||
|
data,
|
||||||
|
};
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
if let Some(client_map) = client_map {
|
||||||
|
if let Some(peer_client_addr) = client_map.get(peer_id) {
|
||||||
|
if let Some(peer_tx) = peer_map.lock().unwrap().get(peer_client_addr) {
|
||||||
|
peer_tx.unbounded_send(Message::Text(mm.clone())).ok();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let m = RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id));
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client_not_in_room(&tx);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// TODO ...
|
|
||||||
// create/enter room
|
|
||||||
// exit room
|
|
||||||
// destroy room
|
|
||||||
// list peers
|
|
||||||
// enter peer
|
|
||||||
// exit peer
|
|
||||||
// send message
|
|
||||||
// - broadcast
|
|
||||||
// - peer
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// println!("Received a message from {}: {}", addr, msg.to_text().unwrap());
|
|
||||||
// let peers = peer_map.lock().unwrap();
|
|
||||||
//
|
|
||||||
// // We want to broadcast the message to everyone except ourselves.
|
|
||||||
// let broadcast_recipients =
|
|
||||||
// peers.iter().filter(|(peer_addr, _)| peer_addr != &&addr).map(|(_, ws_sink)| ws_sink);
|
|
||||||
//
|
|
||||||
// for recp in broadcast_recipients {
|
|
||||||
// recp.unbounded_send(msg.clone()).unwrap();
|
|
||||||
// }
|
|
||||||
|
|
||||||
future::ok(())
|
future::ok(())
|
||||||
});
|
});
|
||||||
@@ -150,6 +350,14 @@ async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_strea
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn client_not_in_room(tx: &UnboundedSender<Message>) {
|
||||||
|
information!("Client is not in room");
|
||||||
|
let m = RoomMessageDown::create_error_reply("Client is not in room".into());
|
||||||
|
if let Ok(mm) = serde_json::to_string(&m) {
|
||||||
|
tx.unbounded_send(Message::Text(mm)).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<String>) {
|
fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<String>) {
|
||||||
if let (Some(room_id), Some(client_id)) = (room_id, client_id) {
|
if let (Some(room_id), Some(client_id)) = (room_id, client_id) {
|
||||||
let mut room_map = room_map.lock().unwrap();
|
let mut room_map = room_map.lock().unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user