#[macro_use] extern crate rust_util; use clap::{App, Arg}; use tokio_tungstenite::connect_async; use rust_util::XResult; use futures_util::{StreamExt, SinkExt}; use tokio_tungstenite::tungstenite::Message; use tokio::sync::mpsc::channel; use url::Url; use rustyline::Editor; use rustyline::error::ReadlineError; use std::time::Duration; use rust_util::util_msg::{clear_lastline, flush_stdout}; #[tokio::main] async fn main() { let matches = App::new("") .arg(Arg::with_name("connect").short("c").long("connect").takes_value(true).help("WebSocket URL")) .arg(Arg::with_name("disable-ping-pong").long("disable-ping-pong").help("Disable ping/pong")) .get_matches(); let connect = matches.value_of("connect").unwrap_or_else(|| { failure_and_exit!("Arg --connect is required."); }); let enable_ping_pong = !matches.is_present("disable-ping-pong"); let url = Url::parse(connect).unwrap_or_else(|e| { failure_and_exit!("Parse connect url: {}, failed: {}", connect, e); }); debugging!("Connecting to: {}", url); let r = connect_to_and_loop(&url, enable_ping_pong).await; match r { Ok(_) => success!("Success"), Err(e) => failure!("Error: {}", e), } } async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> { let (ws_stream, _) = connect_async(url).await?; let (mut ws_write, mut ws_read) = ws_stream.split(); let (sender, mut receiver) = channel(2); tokio::spawn(async move { while let Some(msg) = receiver.recv().await { if let Err(e) = ws_write.send(msg).await { failure!("Send message to server failed: {}", e); ws_write.close().await.ok(); break; } } }); let rt = tokio::runtime::Runtime::new().unwrap(); let cloned_sender = sender.clone(); let _t = std::thread::spawn(move || { let mut rl = Editor::<()>::new(); loop { let readline = rl.readline("ws $ "); match readline { Ok(line) => { if !line.is_empty() { rl.add_history_entry(line.as_str()); let cloned_sender = cloned_sender.clone(); rt.spawn(async move { if let Err(e) = cloned_sender.send(Message::Text(line)).await { failure!("Send message failed: {}", e); } }); } } Err(ReadlineError::Interrupted) => { information!("Press Ctrl+C"); std::process::exit(1); } Err(ReadlineError::Eof) => { println!("CTRL-D"); break; } Err(err) => { println!("Error: {:?}", err); break; } } } }); if !enable_ping_pong { wrap_output(|| debugging!("Ping/Pong is disabled")); } else { wrap_output(|| debugging!("Ping/Pong is enabled")); let time_cloned_sender = sender.clone(); tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_secs(10)).await; wrap_output(|| debugging!("Send ping message")); if let Err(e) = time_cloned_sender.send(Message::Ping(vec![])).await { wrap_output(|| debugging!("Send ping message failed: {}", e)); return; } } }); } loop { let next = ws_read.next().await; match next { None => { information!("WebSocket ended."); return Ok(()); } Some(Err(e)) => { return simple_error!("WebSocket got error: {}", e); } Some(Ok(message)) => { match message { Message::Ping(msg) => { wrap_output(|| debugging!("Received ping message: {:?}", msg)); let r = sender.send(Message::Pong(msg)).await; r.ok(); } Message::Pong(msg) => { wrap_output(|| debugging!("Received pong message: {:?}", msg)); } Message::Close(close_frame) => { wrap_output(|| information!("WebSocket closed: {:?}", close_frame)); } Message::Binary(msg) => { wrap_output(|| information!("Received binary message: {:?}", msg)); } Message::Text(msg) => { wrap_output(|| information!("Received text message: {}", msg)); } } } } } } fn wrap_output(f: F) where F: Fn() -> () { clear_lastline(); f(); print!("ws $ "); flush_stdout(); }