feat: v0.1.3, add request_message_id

This commit is contained in:
2023-06-01 00:52:26 +08:00
parent 4fb7a935ed
commit 8f88f834ac
5 changed files with 48 additions and 29 deletions

2
Cargo.lock generated
View File

@@ -597,7 +597,7 @@ dependencies = [
[[package]] [[package]]
name = "room-rs" name = "room-rs"
version = "0.1.2" version = "0.1.3"
dependencies = [ dependencies = [
"chrono", "chrono",
"futures-channel", "futures-channel",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "room-rs" name = "room-rs"
version = "0.1.2" version = "0.1.3"
authors = ["Hatter Jiang@Pixelbook <jht5945@gmail.com>"] authors = ["Hatter Jiang@Pixelbook <jht5945@gmail.com>"]
edition = "2018" edition = "2018"

View File

@@ -62,7 +62,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
Ok(room_message) => room_message, Ok(room_message) => room_message,
Err(e) => { Err(e) => {
warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e); warning!("Parse message: from: {:?} - {:?}, failed: {}", handle_context.room_id, handle_context.client_id, e);
RoomMessageDown::create_error_reply(format!("Message parse failed: {}, message: {}", e, msg)).send(&tx); RoomMessageDown::create_error_reply(
&None, format!("Message parse failed: {}, message: {}", e, msg)).send(&tx);
return Ok(()); return Ok(());
} }
}; };
@@ -70,7 +71,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
RoomMessageType::CreateOrEnter => { RoomMessageType::CreateOrEnter => {
if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) { if let (Some(room_id), Some(client_id)) = (&handle_context.room_id, &handle_context.client_id) {
warning!("Client is already in room: {:?} - {:?}", room_id, client_id); warning!("Client is already in room: {:?} - {:?}", room_id, client_id);
RoomMessageDown::create_error_reply("Client is already in room").send(tx); RoomMessageDown::create_error_reply(&room_message.message_id, "Client is already in room").send(tx);
return Ok(()); return Ok(());
} }
if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) { if let (Some(msg_room_id), Some(msg_client_id)) = (room_message.room_id, room_message.client_id) {
@@ -81,7 +82,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
Some(peer_addr) => { Some(peer_addr) => {
if peer_addr == &addr { if peer_addr == &addr {
information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id); information!("Duplicate enter to room: {:?} - {:?}", msg_room_id, msg_client_id);
RoomMessageDown::create_success_reply("Duplicate enter to room").send(tx); RoomMessageDown::create_success_reply(
&room_message.message_id, "Duplicate enter to room").send(tx);
} else { } else {
information!( information!(
"Replace client: {:?} - {:?}, from {:?} -> {:?}", "Replace client: {:?} - {:?}, from {:?} -> {:?}",
@@ -92,14 +94,14 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
tx.send_close(); tx.send_close();
} }
client_map.insert(msg_client_id.clone(), addr); client_map.insert(msg_client_id.clone(), addr);
RoomMessageDown::create_success_reply(client_replaced_message).send(tx); RoomMessageDown::create_success_reply(&room_message.message_id, client_replaced_message).send(tx);
} }
} }
None => { None => {
information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr); information!("Enter room: {:?} - {:?}, addr: {}", msg_room_id, msg_client_id, addr);
client_map.insert(msg_client_id.clone(), addr); client_map.insert(msg_client_id.clone(), addr);
let m = RoomMessageDown::create_peer_enter(msg_client_id.clone()); let m = RoomMessageDown::create_peer_enter(&room_message.message_id, msg_client_id.clone());
let mm = serde_json::to_string(&m)?; let mm = serde_json::to_string(&m)?;
for (client_id, client_addr) in client_map { for (client_id, client_addr) in client_map {
if client_id != &msg_client_id { if client_id != &msg_client_id {
@@ -109,7 +111,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
} }
} }
RoomMessageDown::create_success_reply(format!("Client entered: {:?}", addr)).send(tx); RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Client entered: {:?}", addr)).send(tx);
} }
} }
} }
@@ -118,13 +121,14 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
let mut client_map = BTreeMap::new(); let mut client_map = BTreeMap::new();
client_map.insert(msg_client_id.clone(), addr); client_map.insert(msg_client_id.clone(), addr);
room_map.insert(msg_room_id.clone(), client_map); room_map.insert(msg_room_id.clone(), client_map);
RoomMessageDown::create_success_reply(format!("Client create room: {:?}", addr)).send(tx); RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Client create room: {:?}", addr)).send(tx);
} }
} }
handle_context.room_id = Some(msg_room_id); handle_context.room_id = Some(msg_room_id);
handle_context.client_id = Some(msg_client_id); handle_context.client_id = Some(msg_client_id);
} else { } else {
RoomMessageDown::create_error_reply("Room id and client id must both assigned").send(tx); RoomMessageDown::create_error_reply(&room_message.message_id, "Room id and client id must both assigned").send(tx);
} }
} }
RoomMessageType::Exit => { RoomMessageType::Exit => {
@@ -166,7 +170,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
.map(|m| m.keys().cloned().collect::<Vec<_>>()) .map(|m| m.keys().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new); .unwrap_or_else(Vec::new);
let client_ids_data = serde_json::to_string(&client_ids)?; let client_ids_data = serde_json::to_string(&client_ids)?;
RoomMessageDown::create_peerlist_message(Some(client_ids_data)).send(&tx); RoomMessageDown::create_peerlist_message(&room_message.message_id, Some(client_ids_data)).send(&tx);
} else { } else {
client_not_in_room(&tx, addr); client_not_in_room(&tx, addr);
} }
@@ -178,7 +182,7 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
let client_map = room_map.get(room_id); let client_map = room_map.get(room_id);
let m = RoomMessageDown::create_broadcast_message( let m = RoomMessageDown::create_broadcast_message(
client_id, room_message.data); &room_message.message_id, client_id, room_message.data);
let mm = serde_json::to_string(&m)?; let mm = serde_json::to_string(&m)?;
if let Some(client_map) = client_map { if let Some(client_map) = client_map {
let mut sent_messages = 0; let mut sent_messages = 0;
@@ -190,7 +194,8 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
} }
} }
} }
RoomMessageDown::create_success_reply(format!("Send message to {} peers", sent_messages)).send(&tx); RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Send message to {} peers", sent_messages)).send(&tx);
} }
} else { } else {
client_not_in_room(&tx, addr); client_not_in_room(&tx, addr);
@@ -203,16 +208,18 @@ fn handle_text_message(handle_context: &mut HandleContext, tx: &Tx, addr: Socket
let room_map = handle_context.room_map.lock().unwrap(); let room_map = handle_context.room_map.lock().unwrap();
let client_map = room_map.get(room_id); let client_map = room_map.get(room_id);
let m = RoomMessageDown::create_peer_message(peer_id, room_message.data); let m = RoomMessageDown::create_peer_message(&room_message.message_id, peer_id, room_message.data);
let mm = serde_json::to_string(&m)?; let mm = serde_json::to_string(&m)?;
if let Some(client_map) = client_map { if let Some(client_map) = client_map {
if let Some(peer_client_addr) = client_map.get(peer_id) { if let Some(peer_client_addr) = client_map.get(peer_id) {
if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) { if let Some(peer_tx) = handle_context.peer_map.lock().unwrap().get(peer_client_addr) {
peer_tx.send_text(mm); peer_tx.send_text(mm);
} }
RoomMessageDown::create_success_reply(format!("Message sent to: {}", peer_id)).send(tx); RoomMessageDown::create_success_reply(
&room_message.message_id, format!("Message sent to: {}", peer_id)).send(tx);
} else { } else {
RoomMessageDown::create_error_reply(format!("Peer not found: {}", peer_id)).send(tx); RoomMessageDown::create_fail_replay(
&room_message.message_id, 404, format!("Peer not found: {}", peer_id)).send(tx);
} }
} }
} }
@@ -255,12 +262,12 @@ async fn inner_handle_connection(
Message::Pong(_) => {} Message::Pong(_) => {}
Message::Binary(_) => { Message::Binary(_) => {
warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id); warning!("Ignore binary message from: {:?} - {:?}", handle_context.room_id, handle_context.client_id);
RoomMessageDown::create_error_reply("Binary message not supported").send(&tx); RoomMessageDown::create_error_reply(&None, "Binary message not supported").send(&tx);
} }
Message::Text(msg) => if !msg.is_empty() { Message::Text(msg) => if !msg.is_empty() {
if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) { if let Err(e) = handle_text_message(&mut handle_context, &tx, addr, msg) {
failure!("Error in process text message: {}", e); failure!("Error in process text message: {}", e);
RoomMessageDown::create_error_reply(format!("Error in process text message: {}", e)); RoomMessageDown::create_error_reply(&None, format!("Error in process text message: {}", e));
} }
}, },
} }
@@ -282,7 +289,7 @@ async fn inner_handle_connection(
fn client_not_in_room(tx: &Tx, addr: SocketAddr) { fn client_not_in_room(tx: &Tx, addr: SocketAddr) {
information!("Client is not in room: {}", addr); information!("Client is not in room: {}", addr);
RoomMessageDown::create_error_reply("Client is not in room").send(tx); RoomMessageDown::create_error_reply(&None, "Client is not in room").send(tx);
} }
fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<String>) { fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<String>) {

View File

@@ -32,6 +32,7 @@ impl Default for RoomMessageDownType {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RoomMessage { pub struct RoomMessage {
pub r#type: RoomMessageType, pub r#type: RoomMessageType,
pub message_id: Option<String>,
pub room_id: Option<String>, pub room_id: Option<String>,
pub client_id: Option<String>, pub client_id: Option<String>,
pub peer_id: Option<String>, pub peer_id: Option<String>,
@@ -42,6 +43,7 @@ pub struct RoomMessage {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RoomMessageDown { pub struct RoomMessageDown {
pub r#type: RoomMessageDownType, pub r#type: RoomMessageDownType,
pub request_message_id: Option<String>,
pub peer_id: Option<String>, pub peer_id: Option<String>,
pub reply_code: Option<i32>, pub reply_code: Option<i32>,
pub reply_message: Option<String>, pub reply_message: Option<String>,
@@ -49,28 +51,35 @@ pub struct RoomMessageDown {
} }
impl RoomMessageDown { impl RoomMessageDown {
pub fn create_error_reply<S>(error_message: S) -> Self where S: Into<String> { pub fn create_fail_replay<S>(message_id: &Option<String>, code: i32, error_message: S) -> Self where S: Into<String> {
Self { Self {
r#type: RoomMessageDownType::ReplyMessage, r#type: RoomMessageDownType::ReplyMessage,
reply_code: Some(500), request_message_id: message_id.clone(),
reply_code: Some(code),
reply_message: Some(error_message.into()), reply_message: Some(error_message.into()),
..Default::default() ..Default::default()
} }
} }
pub fn create_success_reply<S>(success_message: S) -> Self where S: Into<String> { pub fn create_error_reply<S>(message_id: &Option<String>, error_message: S) -> Self where S: Into<String> {
Self::create_fail_replay(message_id, 500, error_message)
}
pub fn create_success_reply<S>(message_id: &Option<String>, success_message: S) -> Self where S: Into<String> {
Self { Self {
r#type: RoomMessageDownType::ReplyMessage, r#type: RoomMessageDownType::ReplyMessage,
request_message_id: message_id.clone(),
reply_code: Some(200), reply_code: Some(200),
reply_message: Some(success_message.into()), reply_message: Some(success_message.into()),
..Default::default() ..Default::default()
} }
} }
pub fn create_peer_enter<S>(peer_id: S) -> Self where S: Into<String> { pub fn create_peer_enter<S>(message_id: &Option<String>, peer_id: S) -> Self where S: Into<String> {
let peer_id = peer_id.into(); let peer_id = peer_id.into();
Self { Self {
r#type: RoomMessageDownType::PeerEnter, r#type: RoomMessageDownType::PeerEnter,
request_message_id: message_id.clone(),
reply_code: Some(200), reply_code: Some(200),
reply_message: Some(format!("Peer {} entered", peer_id)), reply_message: Some(format!("Peer {} entered", peer_id)),
peer_id: Some(peer_id), peer_id: Some(peer_id),
@@ -78,9 +87,10 @@ impl RoomMessageDown {
} }
} }
pub fn create_peer_message<S>(peer_id: S, data: Option<String>) -> Self where S: Into<String> { pub fn create_peer_message<S>(message_id: &Option<String>, peer_id: S, data: Option<String>) -> Self where S: Into<String> {
RoomMessageDown { RoomMessageDown {
r#type: RoomMessageDownType::PeerMessage, r#type: RoomMessageDownType::PeerMessage,
request_message_id: message_id.clone(),
reply_code: Some(200), reply_code: Some(200),
reply_message: Some("ok".into()), reply_message: Some("ok".into()),
peer_id: Some(peer_id.into()), peer_id: Some(peer_id.into()),
@@ -88,9 +98,10 @@ impl RoomMessageDown {
} }
} }
pub fn create_broadcast_message<S>(peer_id: S, data: Option<String>) -> Self where S: Into<String> { pub fn create_broadcast_message<S>(message_id: &Option<String>, peer_id: S, data: Option<String>) -> Self where S: Into<String> {
RoomMessageDown { RoomMessageDown {
r#type: RoomMessageDownType::BroadcastMessage, r#type: RoomMessageDownType::BroadcastMessage,
request_message_id: message_id.clone(),
reply_code: Some(200), reply_code: Some(200),
reply_message: Some("ok".into()), reply_message: Some("ok".into()),
peer_id: Some(peer_id.into()), peer_id: Some(peer_id.into()),
@@ -98,9 +109,10 @@ impl RoomMessageDown {
} }
} }
pub fn create_peerlist_message(data: Option<String>) -> Self { pub fn create_peerlist_message(message_id: &Option<String>, data: Option<String>) -> Self {
RoomMessageDown { RoomMessageDown {
r#type: RoomMessageDownType::PeerList, r#type: RoomMessageDownType::PeerList,
request_message_id: message_id.clone(),
reply_code: Some(200), reply_code: Some(200),
reply_message: Some("ok".into()), reply_message: Some("ok".into()),
data, data,

View File

@@ -79,7 +79,7 @@ impl HandleTextMessage for HandleTextMessageRooms {
fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) {
if !handle_context.is_admin { if !handle_context.is_admin {
RoomMessageDown::create_error_reply("Not admin").send(tx); RoomMessageDown::create_error_reply(&None,"Not admin").send(tx);
return; return;
} }
let rooms = handle_context.room_map.lock().unwrap().keys().cloned().collect::<Vec<_>>().join("\n"); let rooms = handle_context.room_map.lock().unwrap().keys().cloned().collect::<Vec<_>>().join("\n");
@@ -99,7 +99,7 @@ impl HandleTextMessage for HandleTextMessageRoom {
fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, msg: &str) {
if !handle_context.is_admin { if !handle_context.is_admin {
RoomMessageDown::create_error_reply("Not admin").send(tx); RoomMessageDown::create_error_reply(&None, "Not admin").send(tx);
return; return;
} }
let room_id = &msg[6..]; let room_id = &msg[6..];
@@ -124,7 +124,7 @@ impl HandleTextMessage for HandleTextMessageVersion {
fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) { fn handle(&self, handle_context: &mut HandleContext, tx: &Tx, _addr: SocketAddr, _msg: &str) {
if !handle_context.is_admin { if !handle_context.is_admin {
RoomMessageDown::create_error_reply("Not admin").send(tx); RoomMessageDown::create_error_reply(&None, "Not admin").send(tx);
return; return;
} }
tx.send_text(format!("{} - v{}", NAME, VERSION)); tx.send_text(format!("{} - v{}", NAME, VERSION));