From 4723298aeccb0e04b2310619ff09f6b8c03396a0 Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sun, 21 Aug 2022 11:06:11 +0800 Subject: [PATCH] feat: work in process --- Cargo.lock | 68 +++++++++++++++++++++ Cargo.toml | 4 ++ src/client.rs | 6 +- src/config.rs | 9 ++- src/io_util.rs | 163 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 11 +++- src/server.rs | 27 ++++---- 7 files changed, 269 insertions(+), 19 deletions(-) create mode 100644 src/io_util.rs diff --git a/Cargo.lock b/Cargo.lock index ab10c57..d558085 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,6 +318,15 @@ dependencies = [ "rand 0.7.3", ] +[[package]] +name = "deser-hjson" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f486ff51f3ecdf9364736375a4b358b6eb9f02555d5324fa4837c00b5aa23f5" +dependencies = [ + "serde", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -380,6 +389,7 @@ checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -402,6 +412,17 @@ version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" +[[package]] +name = "futures-executor" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.23" @@ -525,6 +546,12 @@ dependencies = [ "memoffset 0.5.6", ] +[[package]] +name = "itoa" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" + [[package]] name = "jobserver" version = "0.1.24" @@ -1002,6 +1029,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" +[[package]] +name = "ryu" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" + [[package]] name = "s2n-codec" version = "0.1.0" @@ -1186,6 +1219,37 @@ dependencies = [ "untrusted", ] +[[package]] +name = "serde" +version = "1.0.143" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53e8e5d5b70924f74ff5c6d64d9a5acd91422117c60f48c4e07855238a254553" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.143" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3d8e8de557aee63c26b85b947f5e59b690d0454c753f3adeb5cd7835ab88391" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38dd04e3c8279e75b31ef29dbdceebfe5ad89f4d0937213c53f7d49d01b3d5a7" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1200,8 +1264,12 @@ name = "simple-rust-http3-proxy" version = "0.1.0" dependencies = [ "clap", + "deser-hjson", + "futures", "rust_util", "s2n-quic", + "serde", + "serde_json", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 93bb27f..226dbba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,10 @@ tokio = { version = "1", features = ["full"] } s2n-quic = "1.9" clap = "2.33" rust_util = "0.6" +futures = "0.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +deser-hjson = "1.0" [profile.release] codegen-units = 1 diff --git a/src/client.rs b/src/client.rs index ad22afe..3e123c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,12 +6,12 @@ use s2n_quic::client::Connect; use crate::config::ListenConfig; -async fn run(listen_config: &ListenConfig) -> XResult<()> { +pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), - "Read cert pem file: {}, failed: {}", &listen_config.key_pem_file); + "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); let client = Client::builder() .with_tls(cert_pem.as_str())? - .with_io("0.0.0.0:0")? + .with_io(listen_config.listen.as_str())? .start()?; let addr: SocketAddr = "127.0.0.1:4433".parse()?; let connect = Connect::new(addr).with_server_name("localhost"); diff --git a/src/config.rs b/src/config.rs index a3dbcfe..1b350db 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,12 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ListenConfig { + // client: 0.0.0.0:0 + // server: 127.0.0.1:4433 pub listen: String, - pub key_pem_file: String, pub cert_pem_file: String, + pub key_pem_file: Option, + pub proxy_address: String, + pub proxy_server_name: Option, } \ No newline at end of file diff --git a/src/io_util.rs b/src/io_util.rs new file mode 100644 index 0000000..e706d78 --- /dev/null +++ b/src/io_util.rs @@ -0,0 +1,163 @@ +use std::io::{Error, ErrorKind}; +use std::time::Duration; + +use futures::future::try_join; +use s2n_quic::stream::BidirectionalStream; +use tokio::{select, time}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::net::TcpStream; + +enum StreamDirection { + Up, + Down, +} + +pub async fn transfer_for_server_to_remote(inbound: BidirectionalStream, proxy_addr: String, conn_count: String) -> Result<(), String> { + let mut outbound = match TcpStream::connect(&proxy_addr).await { + Ok(outbound) => outbound, + Err(e) => { + return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e)); + } + }; + if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr)) + = (inbound.connection().remote_addr(), inbound.connection().local_addr(), outbound.local_addr(), outbound.peer_addr()) { + let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); + information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer); + } + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await +} + +pub async fn transfer_for_client_to_server(mut inbound: TcpStream, outbound: BidirectionalStream, conn_count: String) -> Result<(), String> { + if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr)) + = (inbound.peer_addr(), inbound.local_addr(), outbound.connection().local_addr(), outbound.connection().remote_addr()) { + let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); + information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer); + } + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await +} + +async fn inner_transfer<'a, R1, W1, R2, W2>(mut ri: &'a mut R1, mut wi: &'a mut W1, mut ro: &'a mut R2, mut wo: &'a mut W2, conn_count: String) -> Result<(), String> + where R1: AsyncRead + Unpin + ?Sized, W1: AsyncWrite + Unpin + ?Sized, + R2: AsyncRead + Unpin + ?Sized, W2: AsyncWrite + Unpin + ?Sized { + // IO copy timeout 6 HOURS + let tcp_io_copy_timeout = Duration::from_secs(6 * 3600); + let shutdown_tcp_timeout = Duration::from_secs(60); + let (client_to_server_tx, client_to_server_rx) = tokio::sync::oneshot::channel::(); + let (server_to_client_tx, server_to_client_rx) = tokio::sync::oneshot::channel::(); + let client_to_server = async { + let r = select! { + _timeout = time::sleep(tcp_io_copy_timeout) => { + failure!("[conn {}] TCP copy client -> server timeout", &conn_count); + Err(Error::new(ErrorKind::TimedOut, "timeout")) + } + _tcp_break = client_to_server_rx => { + failure!("[conn {}] TCP copy client -> server shutdown", &conn_count); + Err(Error::new(ErrorKind::BrokenPipe, "shutdown")) + } + data_copy_result = copy_data(&mut ri, &mut wo, StreamDirection::Up, &conn_count) => { + match data_copy_result { + Err(e) => { + failure!("[conn {}] TCP copy client -> server error: {}", &conn_count, e); + Err(e) + }, + Ok(r) => { + information!("[conn {}] TCP copy client -> server success: {} byte(s)", &conn_count, r); + Ok(r) + }, + } + } + }; + information!("[conn {}] Close client to server connection", &conn_count); + match time::timeout(shutdown_tcp_timeout, wo.shutdown()).await { + Err(e) => warning!("[conn {}] TCP close client -> server timeout: {}", &conn_count, e), + Ok(Err(e)) => warning!("[conn {}] TCP close client -> server error: {}", &conn_count, e), + _ => {} + } + time::sleep(Duration::from_secs(2)).await; + let _ = server_to_client_tx.send(true); + r + }; + let server_to_client = async { + let r = select! { + _timeout = time::sleep(tcp_io_copy_timeout) => { + failure!("[conn {}] TCP copy server -> client timeout", &conn_count); + Err(Error::new(ErrorKind::TimedOut, "timeout")) + } + _tcp_break = server_to_client_rx => { + failure!("[conn {}] TCP copy server -> client shutdown", &conn_count); + Err(Error::new(ErrorKind::BrokenPipe, "shutdown")) + } + data_copy_result = copy_data(&mut ro, &mut wi, StreamDirection::Down, &conn_count) => { + match data_copy_result { + Err(e) => { + failure!("[conn {}] TCP copy server -> client error: {}", &conn_count, e); + Err(e) + }, + Ok(r) => { + information!("[conn {}] TCP copy server -> client success: {} byte(s)", &conn_count, r); + Ok(r) + }, + } + } + }; + information!("[conn {}] Close server to client connection", &conn_count); + match time::timeout(shutdown_tcp_timeout, wi.shutdown()).await { + Err(e) => warning!("[conn {}] TCP close server -> client timeout: {}", &conn_count, e), + Ok(Err(e)) => warning!("[conn {}] TCP close server -> client error: {}", &conn_count, e), + _ => {} + } + time::sleep(Duration::from_secs(2)).await; + let _ = client_to_server_tx.send(true); + r + }; + let r = match try_join(client_to_server, server_to_client).await { + Err(e) => Err(format!("[conn {}] Failed try_join: {}", &conn_count, e)), + Ok((upstream_bytes, downstream_bytes)) => { + information!("[conn {}] Finished, proxy-in: {} bytes, proxy-out: {} bytes", + &conn_count, upstream_bytes, downstream_bytes + ); + Ok(()) + } + }; + r +} + +// fn is_in_peer_addr_matches(inbound: &TcpStream, allow_ips: &[IpAddressMask]) -> bool { +// if let Ok(ref in_peer_addr) = inbound.peer_addr() { +// if let Some(ip_filter) = &*GLOBAL_TEMP_IP_FILTER.read().unwrap() { +// if let Some(ip) = ip_filter.is_ip_address_matches(in_peer_addr) { +// return true; +// } +// } +// if !allow_ips.is_empty() { +// // ONLY if allow ips is not config, returns true +// // If default deny IPs, should config like: "allow_ips": ["127.0.0.1"] +// return allow_ips.iter().any(|ip| ip.is_matches(in_peer_addr)); +// } +// } +// true +// } + +async fn copy_data<'a, R, W>( + reader: &'a mut R, writer: &'a mut W, _direction: StreamDirection, _conn_count: &str) -> tokio::io::Result + where R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized { + let mut total_copied_bytes = 0_u64; + let mut buff = vec![0; 1024 * 4]; + loop { + match reader.read(&mut buff).await { + Ok(0) => return Ok(total_copied_bytes), + Ok(n) => match writer.write_all(&buff[..n]).await { + Ok(_) => { + total_copied_bytes += n as u64; + } + Err(e) => return Err(e), + } + Err(e) => return Err(e), + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index fa4dbd7..96681ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,13 @@ extern crate rust_util; mod config; +mod io_util; mod client; mod server; use clap::{App, Arg}; use rust_util::XResult; +use crate::config::ListenConfig; #[tokio::main] async fn main() -> XResult<()> { @@ -35,8 +37,11 @@ async fn main() -> XResult<()> { failure_and_exit!("Must run in server on client mode") } + let listen_config: ListenConfig = deser_hjson::from_str("").unwrap(); - // TODO - - Ok(()) + if client_mode { + client::run(&listen_config).await + } else { + server::run(&listen_config).await + } } \ No newline at end of file diff --git a/src/server.rs b/src/server.rs index 744d1a1..8cf4e42 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,12 +4,15 @@ use rust_util::XResult; use s2n_quic::Server; use crate::config::ListenConfig; +use crate::io_util; -async fn run(listen_config: &ListenConfig) -> XResult<()> { +pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), - "Read cert pem file: {}, failed: {}", &listen_config.key_pem_file); - let key_pem = opt_result!(fs::read_to_string(&listen_config.key_pem_file), - "Read key pem file :{}, failed: {}", &listen_config.key_pem_file); + "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); + let key_pem_file = opt_value_result!(&listen_config.key_pem_file, + "key_pem_file in config is required in server mode"); + let key_pem = opt_result!(fs::read_to_string(&key_pem_file), + "Read key pem file :{}, failed: {}", &key_pem_file); let mut server = Server::builder() .with_tls((cert_pem.as_str(), key_pem.as_str()))? @@ -17,18 +20,18 @@ async fn run(listen_config: &ListenConfig) -> XResult<()> { .start()?; while let Some(mut connection) = server.accept().await { // spawn a new task for the connection + let proxy_address = listen_config.proxy_address.clone(); tokio::spawn(async move { information!("Connection accepted from {:?}", connection.remote_addr()); - while let Ok(Some(mut stream)) = connection.accept_bidirectional_stream().await { + while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { // spawn a new task for the stream + let proxy_address = proxy_address.clone(); tokio::spawn(async move { information!("Stream opened from {:?}", stream.connection().remote_addr()); - // echo any data back to the stream - // while let Ok(Some(data)) = stream.receive().await { - // stream.send(data).await.expect("stream should be open"); - // } - // println!("Stream closed from {:?}", stream.connection().remote_addr()); - // TODO ... + let conn_count = format!("{}", 1); + if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await { + failure!("Server - Client error: {}", e); + } }); } println!("Connection closed from {:?}", connection.remote_addr()); @@ -36,4 +39,4 @@ async fn run(listen_config: &ListenConfig) -> XResult<()> { } Ok(()) -} \ No newline at end of file +}