diff --git a/client.json b/client.json index b7b60b2..aa7e172 100644 --- a/client.json +++ b/client.json @@ -1,6 +1,6 @@ { "listen": "127.0.0.1:2001", "cert_pem_file": "cert.pem", - "proxy_address": "47.52.7.223:1443", + "proxy_address": "127.0.0.1:2002", "proxy_server_name": "localhost" } diff --git a/src/client.rs b/src/client.rs index 177ae88..64228c8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use std::fs; use std::net::SocketAddr; use rust_util::{util_time, XResult}; -use s2n_quic::Client; +use s2n_quic::{Client, Connection}; use s2n_quic::client::Connect; use tokio::net::TcpListener; use tokio::sync::mpsc::channel; @@ -42,6 +42,7 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let server_name = opt_value_result!(&listen_config.proxy_server_name, "proxy_server_name in config is require in client mode"); + let mut connection_opt: Option = None; loop { let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { Some(time_and_stream) => time_and_stream, @@ -51,34 +52,37 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { } }; information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap()); - let addr = addr.clone(); - let client = client.clone(); - let server_name = server_name.clone(); - tokio::spawn(async move { + let time = util_time::get_current_millis(); + if (time as i128 - client_stream_time as i128).abs() > 3_000 { + warning!("Connection is more than 3 second, abandon connection"); + continue; + } + + if let None = connection_opt { let connect = Connect::new(addr).with_server_name(server_name.as_str()); let mut connection = match client.connect(connect).await { Ok(connection) => connection, Err(e) => { failure!("Connect to server failed: {}", e); - return; + continue; } }; connection.keep_alive(true).ok(); + connection_opt = Some(connection); + } - let time = util_time::get_current_millis(); - if (time as i128 - client_stream_time as i128).abs() > 3_000 { - warning!("Connection is more than 3 second, abandon connection"); - return; + let connection = connection_opt.as_mut().unwrap(); + let server_stream = match connection.open_bidirectional_stream().await { + Ok(stream) => stream, + Err(e) => { + failure!("Open stream in connection to server failed: {}", e); + connection_opt = None; + continue; } - - let server_stream = match connection.open_bidirectional_stream().await { - Ok(stream) => stream, - Err(e) => { - failure!("Open stream in connection to server failed: {}", e); - return; - } - }; - let conn_count = format!("{}", util_time::get_current_millis()); + }; + let connection_id = connection.id(); + tokio::spawn(async move { + let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id()); if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await { failure!("Client - Server error: {}", e); } diff --git a/src/io_util.rs b/src/io_util.rs index e706d78..ed9db4d 100644 --- a/src/io_util.rs +++ b/src/io_util.rs @@ -2,11 +2,14 @@ use std::io::{Error, ErrorKind}; use std::time::Duration; use futures::future::try_join; +use rust_util::util_msg; +use rust_util::util_msg::MessageType; use s2n_quic::stream::BidirectionalStream; use tokio::{select, time}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; +#[derive(Debug)] enum StreamDirection { Up, Down, @@ -143,7 +146,7 @@ async fn inner_transfer<'a, R1, W1, R2, W2>(mut ri: &'a mut R1, mut wi: &'a mut // } async fn copy_data<'a, R, W>( - reader: &'a mut R, writer: &'a mut W, _direction: StreamDirection, _conn_count: &str) -> tokio::io::Result + reader: &'a mut R, writer: &'a mut W, direction: StreamDirection, conn_count: &str) -> tokio::io::Result where R: AsyncRead + Unpin + ?Sized, W: AsyncWrite + Unpin + ?Sized { let mut total_copied_bytes = 0_u64; @@ -151,11 +154,17 @@ async fn copy_data<'a, R, W>( loop { match reader.read(&mut buff).await { Ok(0) => return Ok(total_copied_bytes), - Ok(n) => match writer.write_all(&buff[..n]).await { - Ok(_) => { - total_copied_bytes += n as u64; + Ok(n) => { + util_msg::when(MessageType::DEBUG, || { + debugging!("[conn {}] Direction: {:?}, {} bytes: {:02x?}", conn_count, direction, n, &buff[..n]); + debugging!("[conn {}] Direction: {:?}, {} string: {}", conn_count, direction, n, String::from_utf8_lossy(&buff[..n]).to_string()); + }); + match writer.write_all(&buff[..n]).await { + Ok(_) => { + total_copied_bytes += n as u64; + } + Err(e) => return Err(e), } - Err(e) => return Err(e), } Err(e) => return Err(e), } diff --git a/src/server.rs b/src/server.rs index 85d4379..2a774e7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -26,10 +26,11 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { information!("Connection accepted from {:?}", connection.remote_addr()); while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { // spawn a new task for the stream + let connection_id = connection.id(); let proxy_address = proxy_address.clone(); tokio::spawn(async move { information!("Stream opened from {:?}", stream.connection().remote_addr()); - let conn_count = format!("{}", rust_util::util_time::get_current_millis()); + let conn_count = format!("{}-{}-{}", rust_util::util_time::get_current_millis(), connection_id, stream.id()); if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await { failure!("Server - Client error: {}", e); }