diff --git a/Cargo.lock b/Cargo.lock index 0f1e584..693c948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -915,4 +915,5 @@ dependencies = [ "rustyline", "tokio", "tokio-tungstenite", + "url", ] diff --git a/Cargo.toml b/Cargo.toml index 2111456..9283fec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +url = "2.0.0" clap = "2.33.3" rust_util = "0.6.39" rustyline = "8.2.0" diff --git a/src/main.rs b/src/main.rs index 348661c..6d543f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,11 +7,7 @@ use rust_util::XResult; use futures_util::{StreamExt, SinkExt}; use tokio_tungstenite::tungstenite::Message; use tokio::sync::mpsc::channel; - -enum SimpleMessage { - End, - Msg(Message), -} +use url::Url; #[tokio::main] async fn main() { @@ -23,39 +19,29 @@ async fn main() { failure_and_exit!("Arg --connect is required."); }); - let r = connect_to_and_loop(connect).await; + let url = Url::parse(connect).unwrap_or_else(|e| { + failure_and_exit!("Parse connect url: {}, failed: {}", connect, e); + }); + + let r = connect_to_and_loop(&url).await; match r { Ok(_) => success!("Success"), Err(e) => failure!("Error: {}", e), } } -async fn connect_to_and_loop(url: &str) -> XResult<()> { +async fn connect_to_and_loop(url: &Url) -> XResult<()> { let (ws_stream, _) = connect_async(url).await?; let (mut ws_write, mut ws_read) = ws_stream.split(); let (sender, mut receiver) = channel(2); - let cloned_sender = sender.clone(); - let end_simple_msg_loop = || async { - cloned_sender.send(SimpleMessage::End).await.ok(); - }; - tokio::spawn(async move { - loop { - let a = receiver.recv().await; - match a { - None => {} - Some(simple_msg) => { - match simple_msg { - SimpleMessage::End => { - break; - } - SimpleMessage::Msg(msg) => { - ws_write.send(msg).await.unwrap(); - } - } - } + while let Some(msg) = receiver.recv().await { + if let Err(e) = ws_write.send(msg).await { + failure!("Send message to server failed: {}", e); + ws_write.close().await.ok(); + break; } } }); @@ -65,30 +51,32 @@ async fn connect_to_and_loop(url: &str) -> XResult<()> { match next { None => { information!("WebSocket ended."); - end_simple_msg_loop().await; return Ok(()); } Some(Err(e)) => { - end_simple_msg_loop().await; return simple_error!("WebSocket got error: {}", e); } Some(Ok(message)) => { match message { Message::Ping(msg) => { information!("Received ping message: {:?}", msg); - let r = sender.send(SimpleMessage::Msg(Message::Pong(msg))).await; + let r = sender.send(Message::Pong(msg)).await; r.ok(); } Message::Pong(msg) => { information!("Received pong message: {:?}", msg); } - Message::Close(close_frame) => {} - Message::Binary(msg) => {} - Message::Text(msg) => {} + Message::Close(close_frame) => { + information!("WebSocket closed: {:?}", close_frame); + } + Message::Binary(msg) => { + information!("Received binary message: {:?}", msg); + } + Message::Text(msg) => { + information!("Received text message: {}", msg); + } } } } } - - Ok(()) }