From 144ed41e35bddb4e8cca67127868fab63ab69044 Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sat, 3 Jul 2021 19:10:33 +0800 Subject: [PATCH] feat: add warp_output --- src/main.rs | 53 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 878e72e..bfdeb97 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,30 +11,33 @@ use url::Url; use rustyline::Editor; use rustyline::error::ReadlineError; use std::time::Duration; +use rust_util::util_msg::{clear_lastline, flush_stdout}; #[tokio::main] async fn main() { let matches = App::new("") .arg(Arg::with_name("connect").short("c").long("connect").takes_value(true).help("WebSocket URL")) + .arg(Arg::with_name("disable-ping-pong").long("disable-ping-pong").help("Disable ping/pong")) .get_matches(); let connect = matches.value_of("connect").unwrap_or_else(|| { failure_and_exit!("Arg --connect is required."); }); + let enable_ping_pong = !matches.is_present("disable-ping-pong"); let url = Url::parse(connect).unwrap_or_else(|e| { failure_and_exit!("Parse connect url: {}, failed: {}", connect, e); }); debugging!("Connecting to: {}", url); - let r = connect_to_and_loop(&url).await; + let r = connect_to_and_loop(&url, enable_ping_pong).await; match r { Ok(_) => success!("Success"), Err(e) => failure!("Error: {}", e), } } -async fn connect_to_and_loop(url: &Url) -> XResult<()> { +async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> { let (ws_stream, _) = connect_async(url).await?; let (mut ws_write, mut ws_read) = ws_stream.split(); @@ -60,7 +63,6 @@ async fn connect_to_and_loop(url: &Url) -> XResult<()> { Ok(line) => { if !line.is_empty() { rl.add_history_entry(line.as_str()); - println!("Input line: {}", line); let cloned_sender = cloned_sender.clone(); rt.spawn(async move { if let Err(e) = cloned_sender.send(Message::Text(line)).await { @@ -85,17 +87,22 @@ async fn connect_to_and_loop(url: &Url) -> XResult<()> { } }); - let time_cloned_sender = sender.clone(); - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - debugging!("Send ping message"); - if let Err(e) = time_cloned_sender.send(Message::Ping(vec![])).await { - debugging!("Send ping message failed: {}", e); - return; + if !enable_ping_pong { + wrap_output(|| debugging!("Ping/Pong is disabled")); + } else { + wrap_output(|| debugging!("Ping/Pong is enabled")); + let time_cloned_sender = sender.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + wrap_output(|| debugging!("Send ping message")); + if let Err(e) = time_cloned_sender.send(Message::Ping(vec![])).await { + wrap_output(|| debugging!("Send ping message failed: {}", e)); + return; + } } - } - }); + }); + } loop { let next = ws_read.next().await; @@ -110,27 +117,31 @@ async fn connect_to_and_loop(url: &Url) -> XResult<()> { Some(Ok(message)) => { match message { Message::Ping(msg) => { - debugging!("Received ping message: {:?}", msg); + wrap_output(|| debugging!("Received ping message: {:?}", msg)); let r = sender.send(Message::Pong(msg)).await; r.ok(); } Message::Pong(msg) => { - debugging!("Received pong message: {:?}", msg); + wrap_output(|| debugging!("Received pong message: {:?}", msg)); } Message::Close(close_frame) => { - information!("WebSocket closed: {:?}", close_frame); + wrap_output(|| information!("WebSocket closed: {:?}", close_frame)); } Message::Binary(msg) => { - information!("Received binary message: {:?}", msg); + wrap_output(|| information!("Received binary message: {:?}", msg)); } Message::Text(msg) => { - information!("Received text message: {}", msg); + wrap_output(|| information!("Received text message: {}", msg)); } } } } } - - // t.join().unwrap(); - // Ok(()) +} + +fn wrap_output(f: F) where F: Fn() -> () { + clear_lastline(); + f(); + print!("ws $ "); + flush_stdout(); }