From f65898bef54e409c067f64c091ffe43f13925549 Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Sun, 21 Aug 2022 15:00:33 +0800 Subject: [PATCH] feat: works --- cert.pem | 11 ++++++ client.json | 6 ++++ key.pem | 6 ++++ server.json | 6 ++++ src/client.rs | 99 +++++++++++++++++++++++++++++++++++++++++---------- src/main.rs | 17 +++++---- src/server.rs | 3 +- 7 files changed, 123 insertions(+), 25 deletions(-) create mode 100644 cert.pem create mode 100644 client.json create mode 100644 key.pem create mode 100644 server.json diff --git a/cert.pem b/cert.pem new file mode 100644 index 0000000..43703f0 --- /dev/null +++ b/cert.pem @@ -0,0 +1,11 @@ +-----BEGIN CERTIFICATE----- +MIIBeDCCAR6gAwIBAgIBKjAKBggqhkjOPQQDAjAwMRgwFgYDVQQKDA9DcmFiIHdp +ZGdpdHMgU0UxFDASBgNVBAMMC01hc3RlciBDZXJ0MCIYDzE5NzUwMTAxMDAwMDAw +WhgPNDA5NjAxMDEwMDAwMDBaMDAxGDAWBgNVBAoMD0NyYWIgd2lkZ2l0cyBTRTEU +MBIGA1UEAwwLTWFzdGVyIENlcnQwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQb +bVPayLOdbKxXB4yB4Vx3Kf2Z89vsUvhmiICsjncRwBEKkP+GjTg1bSEloLvzuha9 +3u78xp2/1ZaeqtVwYgJMoyUwIzAhBgNVHREEGjAYggtxbGF3cy5xbGF3c4IJbG9j +YWxob3N0MAoGCCqGSM49BAMCA0gAMEUCIDrxPoQBu9G/g54f3TKYXj8bO2fdkPD1 +PMO712Y3e0eNAiEA9mt1NW6TDPVf+xmUA/swi8gnhlusV2Y1sB4qhDCPr9c= +-----END CERTIFICATE----- + diff --git a/client.json b/client.json new file mode 100644 index 0000000..31017b4 --- /dev/null +++ b/client.json @@ -0,0 +1,6 @@ +{ + "listen": "127.0.0.1:2001", + "cert_pem_file": "cert.pem", + "proxy_address": "127.0.0.1:2002", + "proxy_server_name": "localhost" +} \ No newline at end of file diff --git a/key.pem b/key.pem new file mode 100644 index 0000000..e547b01 --- /dev/null +++ b/key.pem @@ -0,0 +1,6 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgtZAp9paqkz1vzQSp +tw52t+ZiSKAuJRfB5JnvA6q7+CKhRANCAAQbbVPayLOdbKxXB4yB4Vx3Kf2Z89vs +UvhmiICsjncRwBEKkP+GjTg1bSEloLvzuha93u78xp2/1ZaeqtVwYgJM +-----END PRIVATE KEY----- + diff --git a/server.json b/server.json new file mode 100644 index 0000000..a76d800 --- /dev/null +++ b/server.json @@ -0,0 +1,6 @@ +{ + "listen": "127.0.0.1:2002", + "cert_pem_file": "cert.pem", + "key_pem_file": "key.pem", + "proxy_address": "127.0.0.1:1080" +} \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 3e123c1..835f71a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,33 +1,96 @@ use std::fs; use std::net::SocketAddr; -use rust_util::XResult; +use std::time::Duration; + +use rust_util::{util_time, XResult}; use s2n_quic::Client; use s2n_quic::client::Connect; +use tokio::net::TcpListener; +use tokio::sync::mpsc::channel; +use tokio::time::sleep; use crate::config::ListenConfig; +use crate::io_util; pub async fn run(listen_config: &ListenConfig) -> XResult<()> { + let (inbound_stream_channel_sender, mut inbound_stream_channel_receiver) = channel(16); + + let listener = match TcpListener::bind(&listen_config.listen).await { + Ok(listener) => listener, + Err(e) => return simple_error!("Listen: {} failed: {}", listen_config.listen, e), + }; + information!("Start listen: {}", &listen_config.listen); + tokio::spawn(async move { + while let Ok((inbound, _)) = listener.accept().await { + information!("Receive connection: {}", inbound.peer_addr().map(|addr| format!("{}", addr)).unwrap_or_else(|_| "n/a".to_string())); + // if is_in_peer_addr_matches(&inbound, &allow_ips, sender_tx.clone()) { + // } else { + // } + if let Err(e) = inbound_stream_channel_sender.send((util_time::get_current_millis(), inbound)).await { + failure!("Send tcp stream to channel failed: {}", e); + } + } + }); + + information!("Start connect to server"); let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); let client = Client::builder() .with_tls(cert_pem.as_str())? .with_io(listen_config.listen.as_str())? .start()?; - let addr: SocketAddr = "127.0.0.1:4433".parse()?; - let connect = Connect::new(addr).with_server_name("localhost"); - let mut connection = client.connect(connect).await?; - // ensure the connection doesn't time out with inactivity - connection.keep_alive(true)?; - // open a new stream and split the receiving and sending sides - let stream = connection.open_bidirectional_stream().await?; - let (mut receive_stream, mut send_stream) = stream.split(); - // spawn a task that copies responses from the server to stdout - tokio::spawn(async move { - let mut stdout = tokio::io::stdout(); - let _ = tokio::io::copy(&mut receive_stream, &mut stdout).await; - }); - // copy data from stdin and send it to the server - let mut stdin = tokio::io::stdin(); - tokio::io::copy(&mut stdin, &mut send_stream).await?; - Ok(()) + let addr: SocketAddr = listen_config.proxy_address.parse()?; + let server_name = opt_value_result!(&listen_config.proxy_server_name, + "proxy_server_name in config is require in client mode"); + + let mut connect_count = 0_usize; + 're_connect: loop { + let connect = Connect::new(addr).with_server_name(server_name.as_str()); + let mut connection = match client.connect(connect).await { + Ok(connection) => connection, + Err(e) => { + connect_count += 1; + failure!("Connect to server failed: {}, count: {}", e, connect_count); + sleep(Duration::from_secs(match connect_count { + 0..=3 => 1, + 4..=10 => 2, + 11..=30 => 3, + _ => 5, + })).await; + continue 're_connect; + } + }; + connect_count = 0; + opt_result!(connection.keep_alive(true), "Keep alive connection failed: {}"); + + 're_stream: loop { + let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { + Some(time_and_stream) => time_and_stream, + None => { + information!("End client to server proxy"); + return Ok(()); + } + }; + + let time = util_time::get_current_millis(); + if (time as i128 - client_stream_time as i128).abs() > 3_000 { + warning!("Connection is more than 3 second, abandon connection"); + continue 're_stream; + } + + let server_stream = match connection.open_bidirectional_stream().await { + Ok(stream) => stream, + Err(e) => { + failure!("Connect to server failed: {}", e); + continue 're_connect; + } + }; + tokio::spawn(async move { + let conn_count = format!("{}", util_time::get_current_millis()); + if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await { + failure!("Client - Server error: {}", e); + } + }); + } + } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 96681ed..c7ae3bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,18 @@ #[macro_use] extern crate rust_util; +use std::fs; + +use clap::{App, Arg}; +use rust_util::XResult; + +use crate::config::ListenConfig; + mod config; mod io_util; mod client; mod server; -use clap::{App, Arg}; -use rust_util::XResult; -use crate::config::ListenConfig; - #[tokio::main] async fn main() -> XResult<()> { let matches = App::new("Simple Rust HTTP/3 Proxy") @@ -19,7 +22,7 @@ async fn main() -> XResult<()> { .arg(Arg::with_name("verbose").short("v").long("verbose").help("Verbose")) .arg(Arg::with_name("server").short("s").long("server").help("Run server mode")) .arg(Arg::with_name("client").short("c").long("client").help("Run client mode")) - .arg(Arg::with_name("config").short("c").long("config").takes_value(true).help("HTTP/3 listen config file")) + .arg(Arg::with_name("config").long("config").takes_value(true).help("HTTP/3 listen config file")) .get_matches(); if matches.is_present("version") { information!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); @@ -37,7 +40,9 @@ async fn main() -> XResult<()> { failure_and_exit!("Must run in server on client mode") } - let listen_config: ListenConfig = deser_hjson::from_str("").unwrap(); + let config_file = opt_value_result!(matches.value_of("config"), "--config is required"); + let config_json = opt_result!(fs::read_to_string(config_file), "Read config: {} failed: {}", config_file); + let listen_config: ListenConfig = opt_result!(deser_hjson::from_str(&config_json), "Parse config: {} failed: {}", config_file); if client_mode { client::run(&listen_config).await diff --git a/src/server.rs b/src/server.rs index 8cf4e42..85d4379 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,6 +18,7 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { .with_tls((cert_pem.as_str(), key_pem.as_str()))? .with_io(listen_config.listen.as_str())? .start()?; + information!("Listen: {}", &listen_config.listen); while let Some(mut connection) = server.accept().await { // spawn a new task for the connection let proxy_address = listen_config.proxy_address.clone(); @@ -28,7 +29,7 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let proxy_address = proxy_address.clone(); tokio::spawn(async move { information!("Stream opened from {:?}", stream.connection().remote_addr()); - let conn_count = format!("{}", 1); + let conn_count = format!("{}", rust_util::util_time::get_current_millis()); if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await { failure!("Server - Client error: {}", e); }