From a7980bd51a9d7410ec0fdd6b267c84f3f429b713 Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sun, 5 Nov 2023 18:26:25 +0800 Subject: [PATCH] feat: v0.1.1 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 155 +++++++++++++++++++++++++++++----------------------- 3 files changed, 88 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d8c57a..308801a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1226,7 +1226,7 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "wscat-rs" -version = "0.1.0" +version = "0.1.1" dependencies = [ "clap", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 3abd906..37d7bf5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wscat-rs" -version = "0.1.0" +version = "0.1.1" authors = ["Hatter Jiang "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index dda77af..0b446ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,21 @@ #[macro_use] extern crate rust_util; +use std::time::Duration; + use clap::{App, Arg}; -use tokio_tungstenite::connect_async; +use futures_util::{SinkExt, StreamExt}; +use futures_util::stream::SplitSink; +use rust_util::util_msg::{clear_lastline, flush_stdout}; use rust_util::XResult; -use futures_util::{StreamExt, SinkExt}; -use tokio_tungstenite::tungstenite::Message; -use tokio::sync::mpsc::channel; -use url::Url; use rustyline::Editor; use rustyline::error::ReadlineError; -use std::time::Duration; -use rust_util::util_msg::{clear_lastline, flush_stdout}; +use tokio::net::TcpStream; +use tokio::runtime::Runtime; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::tungstenite::{Error, Message}; +use url::Url; #[tokio::main] async fn main() { @@ -41,22 +45,74 @@ async fn main() { async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> { let (ws_stream, _) = connect_async(url).await?; - let (mut ws_write, mut ws_read) = ws_stream.split(); + let (ws_write, mut ws_read) = ws_stream.split(); - let (sender, mut receiver) = channel(2); + let (mut sender, receiver) = channel(2); + // receiver from `receiver`, then send to `ws_write` + spawn_loop_write_to_ws(ws_write, receiver); + // read from `console`, then send to `sender` + spawn_loop_console(sender.clone()); + + if !enable_ping_pong { + wrap_output(|| debugging!("Ping/Pong is disabled")); + } else { + wrap_output(|| debugging!("Ping/Pong is enabled")); + spawn_loop_ping_pong(sender.clone()); + } + + loop { + let next_message: Option> = ws_read.next().await; + match next_message { + None => return process_ws_end(), + Some(Err(e)) => return simple_error!("WebSocket got error: {}", e), + Some(Ok(message)) => process_message(&mut sender, message).await, + } + } +} + +fn process_ws_end() -> XResult<()> { + information!("WebSocket ended."); + return Ok(()); +} + +async fn process_message(sender: &mut Sender, message: Message) { + match message { + Message::Ping(msg) => { + wrap_output(|| debugging!("Received ping message: {:?}", msg)); + let r = sender.send(Message::Pong(msg)).await; + r.ok(); + } + Message::Pong(msg) => { + wrap_output(|| debugging!("Received pong message: {:?}", msg)); + } + Message::Close(close_frame) => { + wrap_output(|| information!("WebSocket closed: {:?}", close_frame)); + } + Message::Binary(msg) => { + wrap_output(|| information!("Received binary message: {:?}", msg)); + } + Message::Text(msg) => { + wrap_output(|| information!("Received text message: {}", msg)); + } + } +} + +fn spawn_loop_ping_pong(sender: Sender) { tokio::spawn(async move { - while let Some(msg) = receiver.recv().await { - if let Err(e) = ws_write.send(msg).await { - failure!("Send message to server failed: {}", e); - ws_write.close().await.ok(); - break; + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + wrap_output(|| debugging!("Send ping message")); + if let Err(e) = sender.send(Message::Ping(vec![])).await { + wrap_output(|| debugging!("Send ping message failed: {}", e)); + return; } } }); +} - let rt = tokio::runtime::Runtime::new().unwrap(); - let cloned_sender = sender.clone(); +fn spawn_loop_console(sender: Sender) { + let rt = Runtime::new().unwrap(); let _t = std::thread::spawn(move || { let mut rl = Editor::<()>::new(); loop { @@ -65,7 +121,7 @@ async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> { Ok(line) => { if !line.is_empty() { rl.add_history_entry(line.as_str()); - let cloned_sender = cloned_sender.clone(); + let cloned_sender = sender.clone(); rt.spawn(async move { if let Err(e) = cloned_sender.send(Message::Text(line)).await { failure!("Send message failed: {}", e); @@ -88,60 +144,21 @@ async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> { } } }); - - if !enable_ping_pong { - wrap_output(|| debugging!("Ping/Pong is disabled")); - } else { - wrap_output(|| debugging!("Ping/Pong is enabled")); - let time_cloned_sender = sender.clone(); - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - wrap_output(|| debugging!("Send ping message")); - if let Err(e) = time_cloned_sender.send(Message::Ping(vec![])).await { - wrap_output(|| debugging!("Send ping message failed: {}", e)); - return; - } - } - }); - } - - loop { - let next = ws_read.next().await; - match next { - None => { - information!("WebSocket ended."); - return Ok(()); - } - Some(Err(e)) => { - return simple_error!("WebSocket got error: {}", e); - } - Some(Ok(message)) => { - match message { - Message::Ping(msg) => { - wrap_output(|| debugging!("Received ping message: {:?}", msg)); - let r = sender.send(Message::Pong(msg)).await; - r.ok(); - } - Message::Pong(msg) => { - wrap_output(|| debugging!("Received pong message: {:?}", msg)); - } - Message::Close(close_frame) => { - wrap_output(|| information!("WebSocket closed: {:?}", close_frame)); - } - Message::Binary(msg) => { - wrap_output(|| information!("Received binary message: {:?}", msg)); - } - Message::Text(msg) => { - wrap_output(|| information!("Received text message: {}", msg)); - } - } - } - } - } } -fn wrap_output(f: F) where F: Fn() -> () { +fn spawn_loop_write_to_ws(mut ws_write: SplitSink>, Message>, mut receiver: Receiver) { + tokio::spawn(async move { + while let Some(msg) = receiver.recv().await { + if let Err(e) = ws_write.send(msg).await { + failure!("Send message to server failed: {}", e); + ws_write.close().await.ok(); + break; + } + } + }); +} + +fn wrap_output(f: F) where F: Fn() { clear_lastline(); f(); print!("ws $ ");