#[macro_use] extern crate rust_util; use clap::{App, Arg}; use tokio_tungstenite::connect_async; use rust_util::XResult; use futures_util::{StreamExt, SinkExt}; use tokio_tungstenite::tungstenite::Message; use tokio::sync::mpsc::channel; use url::Url; #[tokio::main] async fn main() { let matches = App::new("") .arg(Arg::with_name("connect").short("c").long("connect").takes_value(true).help("WebSocket URL")) .get_matches(); let connect = matches.value_of("connect").unwrap_or_else(|| { failure_and_exit!("Arg --connect is required."); }); let url = Url::parse(connect).unwrap_or_else(|e| { failure_and_exit!("Parse connect url: {}, failed: {}", connect, e); }); let r = connect_to_and_loop(&url).await; match r { Ok(_) => success!("Success"), Err(e) => failure!("Error: {}", e), } } async fn connect_to_and_loop(url: &Url) -> XResult<()> { let (ws_stream, _) = connect_async(url).await?; let (mut ws_write, mut ws_read) = ws_stream.split(); let (sender, mut receiver) = channel(2); tokio::spawn(async move { while let Some(msg) = receiver.recv().await { if let Err(e) = ws_write.send(msg).await { failure!("Send message to server failed: {}", e); ws_write.close().await.ok(); break; } } }); loop { let next = ws_read.next().await; match next { None => { information!("WebSocket ended."); return Ok(()); } Some(Err(e)) => { return simple_error!("WebSocket got error: {}", e); } Some(Ok(message)) => { match message { Message::Ping(msg) => { information!("Received ping message: {:?}", msg); let r = sender.send(Message::Pong(msg)).await; r.ok(); } Message::Pong(msg) => { information!("Received pong message: {:?}", msg); } Message::Close(close_frame) => { information!("WebSocket closed: {:?}", close_frame); } Message::Binary(msg) => { information!("Received binary message: {:?}", msg); } Message::Text(msg) => { information!("Received text message: {}", msg); } } } } } }