feat: v0.1.1

This commit is contained in:
2023-11-05 18:26:25 +08:00
parent 1bfe461a6b
commit a7980bd51a
3 changed files with 88 additions and 71 deletions

2
Cargo.lock generated
View File

@@ -1226,7 +1226,7 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "wscat-rs"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"clap",
"futures-util",

View File

@@ -1,6 +1,6 @@
[package]
name = "wscat-rs"
version = "0.1.0"
version = "0.1.1"
authors = ["Hatter Jiang <jht5945@gmail.com>"]
edition = "2018"

View File

@@ -1,17 +1,21 @@
#[macro_use]
extern crate rust_util;
use std::time::Duration;
use clap::{App, Arg};
use tokio_tungstenite::connect_async;
use futures_util::{SinkExt, StreamExt};
use futures_util::stream::SplitSink;
use rust_util::util_msg::{clear_lastline, flush_stdout};
use rust_util::XResult;
use futures_util::{StreamExt, SinkExt};
use tokio_tungstenite::tungstenite::Message;
use tokio::sync::mpsc::channel;
use url::Url;
use rustyline::Editor;
use rustyline::error::ReadlineError;
use std::time::Duration;
use rust_util::util_msg::{clear_lastline, flush_stdout};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::{Error, Message};
use url::Url;
#[tokio::main]
async fn main() {
@@ -41,22 +45,74 @@ async fn main() {
async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> {
let (ws_stream, _) = connect_async(url).await?;
let (mut ws_write, mut ws_read) = ws_stream.split();
let (ws_write, mut ws_read) = ws_stream.split();
let (sender, mut receiver) = channel(2);
let (mut sender, receiver) = channel(2);
// receiver from `receiver`, then send to `ws_write`
spawn_loop_write_to_ws(ws_write, receiver);
// read from `console`, then send to `sender`
spawn_loop_console(sender.clone());
if !enable_ping_pong {
wrap_output(|| debugging!("Ping/Pong is disabled"));
} else {
wrap_output(|| debugging!("Ping/Pong is enabled"));
spawn_loop_ping_pong(sender.clone());
}
loop {
let next_message: Option<Result<Message, Error>> = ws_read.next().await;
match next_message {
None => return process_ws_end(),
Some(Err(e)) => return simple_error!("WebSocket got error: {}", e),
Some(Ok(message)) => process_message(&mut sender, message).await,
}
}
}
fn process_ws_end() -> XResult<()> {
information!("WebSocket ended.");
return Ok(());
}
async fn process_message(sender: &mut Sender<Message>, message: Message) {
match message {
Message::Ping(msg) => {
wrap_output(|| debugging!("Received ping message: {:?}", msg));
let r = sender.send(Message::Pong(msg)).await;
r.ok();
}
Message::Pong(msg) => {
wrap_output(|| debugging!("Received pong message: {:?}", msg));
}
Message::Close(close_frame) => {
wrap_output(|| information!("WebSocket closed: {:?}", close_frame));
}
Message::Binary(msg) => {
wrap_output(|| information!("Received binary message: {:?}", msg));
}
Message::Text(msg) => {
wrap_output(|| information!("Received text message: {}", msg));
}
}
}
fn spawn_loop_ping_pong(sender: Sender<Message>) {
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
if let Err(e) = ws_write.send(msg).await {
failure!("Send message to server failed: {}", e);
ws_write.close().await.ok();
break;
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
wrap_output(|| debugging!("Send ping message"));
if let Err(e) = sender.send(Message::Ping(vec![])).await {
wrap_output(|| debugging!("Send ping message failed: {}", e));
return;
}
}
});
}
let rt = tokio::runtime::Runtime::new().unwrap();
let cloned_sender = sender.clone();
fn spawn_loop_console(sender: Sender<Message>) {
let rt = Runtime::new().unwrap();
let _t = std::thread::spawn(move || {
let mut rl = Editor::<()>::new();
loop {
@@ -65,7 +121,7 @@ async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> {
Ok(line) => {
if !line.is_empty() {
rl.add_history_entry(line.as_str());
let cloned_sender = cloned_sender.clone();
let cloned_sender = sender.clone();
rt.spawn(async move {
if let Err(e) = cloned_sender.send(Message::Text(line)).await {
failure!("Send message failed: {}", e);
@@ -88,60 +144,21 @@ async fn connect_to_and_loop(url: &Url, enable_ping_pong: bool) -> XResult<()> {
}
}
});
}
if !enable_ping_pong {
wrap_output(|| debugging!("Ping/Pong is disabled"));
} else {
wrap_output(|| debugging!("Ping/Pong is enabled"));
let time_cloned_sender = sender.clone();
fn spawn_loop_write_to_ws(mut ws_write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, mut receiver: Receiver<Message>) {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
wrap_output(|| debugging!("Send ping message"));
if let Err(e) = time_cloned_sender.send(Message::Ping(vec![])).await {
wrap_output(|| debugging!("Send ping message failed: {}", e));
return;
while let Some(msg) = receiver.recv().await {
if let Err(e) = ws_write.send(msg).await {
failure!("Send message to server failed: {}", e);
ws_write.close().await.ok();
break;
}
}
});
}
loop {
let next = ws_read.next().await;
match next {
None => {
information!("WebSocket ended.");
return Ok(());
}
Some(Err(e)) => {
return simple_error!("WebSocket got error: {}", e);
}
Some(Ok(message)) => {
match message {
Message::Ping(msg) => {
wrap_output(|| debugging!("Received ping message: {:?}", msg));
let r = sender.send(Message::Pong(msg)).await;
r.ok();
}
Message::Pong(msg) => {
wrap_output(|| debugging!("Received pong message: {:?}", msg));
}
Message::Close(close_frame) => {
wrap_output(|| information!("WebSocket closed: {:?}", close_frame));
}
Message::Binary(msg) => {
wrap_output(|| information!("Received binary message: {:?}", msg));
}
Message::Text(msg) => {
wrap_output(|| information!("Received text message: {}", msg));
}
}
}
}
}
}
fn wrap_output<F>(f: F) where F: Fn() -> () {
fn wrap_output<F>(f: F) where F: Fn() {
clear_lastline();
f();
print!("ws $ ");