From 04ad8ab668130a63a0d90e14c722d9037745cbba Mon Sep 17 00:00:00 2001 From: Hatter Jiang Date: Thu, 25 Aug 2022 23:05:00 +0800 Subject: [PATCH] feat: v0.3.0 use quinn, now works --- Cargo.lock | 3 +- Cargo.toml | 3 +- src/client.rs | 129 +++++++++++++++++++++++++++---------------------- src/io_util.rs | 55 ++++++++++----------- src/server.rs | 98 +++++++++++++++++++++++++++---------- 5 files changed, 176 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0704434..0f5176f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -718,12 +718,13 @@ dependencies = [ [[package]] name = "simple-rust-http3-proxy" -version = "0.2.1" +version = "0.3.0" dependencies = [ "clap", "deser-hjson", "futures", "futures-util", + "pem", "quinn", "rcgen", "rust_util", diff --git a/Cargo.toml b/Cargo.toml index e5fceae..a784010 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-rust-http3-proxy" -version = "0.2.1" +version = "0.3.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -14,6 +14,7 @@ futures = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" deser-hjson = "1.0" +pem = "1.0" rcgen = "0.8.11" quinn = "0.8.5" rustls = { version = "0.20.3", default-features = false, features = ["quic"] } diff --git a/src/client.rs b/src/client.rs index 6c48970..d3d17a1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,8 @@ +use std::error::Error; use std::fs; use std::net::SocketAddr; +use quinn::{ClientConfig, Endpoint}; use rust_util::{util_time, XResult}; use tokio::net::TcpListener; use tokio::sync::mpsc::channel; @@ -30,61 +32,74 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { information!("Start connect to server, proxy address: {}, with server name: {:?}", listen_config.proxy_address, listen_config.proxy_server_name); - // let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), - // "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); - // let client = Client::builder() - // .with_tls(cert_pem.as_str())? - // .with_io("0.0.0.0:0")? - // .start()?; - // let addr: SocketAddr = listen_config.proxy_address.parse()?; - // let server_name = opt_value_result!(&listen_config.proxy_server_name, - // "proxy_server_name in config is require in client mode"); - // - // let mut connection_opt: Option = None; - // loop { - // let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { - // Some(time_and_stream) => time_and_stream, - // None => { - // information!("End client to server proxy"); - // return Ok(()); - // } - // }; - // information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap()); - // let time = util_time::get_current_millis(); - // if (time as i128 - client_stream_time as i128).abs() > 3_000 { - // warning!("Connection is more than 3 seconds, abandon connection"); - // continue; - // } - // - // if let None = connection_opt { - // let connect = Connect::new(addr).with_server_name(server_name.as_str()); - // let mut connection = match client.connect(connect).await { - // Ok(connection) => connection, - // Err(e) => { - // failure!("Connect to server failed: {}", e); - // continue; - // } - // }; - // connection.keep_alive(true).ok(); - // connection_opt = Some(connection); - // } - // - // let connection = connection_opt.as_mut().unwrap(); - // let server_stream = match connection.open_bidirectional_stream().await { - // Ok(stream) => stream, - // Err(e) => { - // failure!("Open stream in connection to server failed: {}", e); - // connection_opt = None; - // continue; - // } - // }; - // let connection_id = connection.id(); - // tokio::spawn(async move { - // let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id()); - // if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await { - // failure!("Client - Server error: {}", e); - // } - // }); - // } - Ok(()) + let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), + "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); + let cert_bytes = opt_result!(pem::parse(&cert_pem), "Parse cert pem failed: {}").contents; + + let addr: SocketAddr = listen_config.proxy_address.parse()?; + let server_name = opt_value_result!(&listen_config.proxy_server_name, + "proxy_server_name in config is require in client mode"); + + let mut endpoint_opt: Option = None; + loop { + let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { + Some(time_and_stream) => time_and_stream, + None => { + information!("End client to server proxy"); + return Ok(()); + } + }; + information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap()); + let time = util_time::get_current_millis(); + if (time as i128 - client_stream_time as i128).abs() > 3_000 { + warning!("Connection is more than 3 seconds, abandon connection"); + continue; + } + + if let None = endpoint_opt { + let endpoint = match make_client_endpoint("0.0.0.0:0".parse().unwrap(), &[&cert_bytes]) { + Ok(client) => client, + Err(e) => { + failure!("Make client endpoint failed: {}", e); + continue; + } + }; + endpoint_opt = Some(endpoint); + } + + let endpoint = endpoint_opt.as_mut().unwrap(); + + let connect = endpoint.connect(addr, &server_name).unwrap(); + let quinn::NewConnection { connection, .. } = connect.await.unwrap(); + let (send, recv) = connection.open_bi().await.unwrap(); + let remote_addr = format!("{}", connection.remote_address()); + let local_addr = connection.local_ip().map(|ip| format!("{}", ip)).unwrap_or_else(|| "".to_string()); + tokio::spawn(async move { + let remote_addr = remote_addr.clone(); + let local_addr = local_addr.clone(); + let conn_count = format!("{}", util_time::get_current_millis()); + if let Err(e) = io_util::transfer_for_client_to_server(client_stream, recv, send, &remote_addr, &local_addr, conn_count).await { + failure!("Client - Server error: {}", e); + } + }); + } +} + +fn make_client_endpoint( + bind_addr: SocketAddr, + server_certs: &[&[u8]], +) -> Result> { + let client_cfg = configure_client(server_certs)?; + let mut endpoint = Endpoint::client(bind_addr)?; + endpoint.set_default_client_config(client_cfg); + Ok(endpoint) +} + +fn configure_client(server_certs: &[&[u8]]) -> Result> { + let mut certs = rustls::RootCertStore::empty(); + for cert in server_certs { + certs.add(&rustls::Certificate(cert.to_vec()))?; + } + + Ok(ClientConfig::with_root_certificates(certs)) } \ No newline at end of file diff --git a/src/io_util.rs b/src/io_util.rs index c794fc6..3c7448e 100644 --- a/src/io_util.rs +++ b/src/io_util.rs @@ -2,6 +2,7 @@ use std::io::{Error, ErrorKind}; use std::time::Duration; use futures::future::try_join; +use quinn::{RecvStream, SendStream}; use rust_util::util_msg; use rust_util::util_msg::MessageType; use tokio::{select, time}; @@ -14,33 +15,33 @@ enum StreamDirection { Down, } -// pub async fn transfer_for_server_to_remote(inbound: BidirectionalStream, proxy_addr: String, conn_count: String) -> Result<(), String> { -// let mut outbound = match TcpStream::connect(&proxy_addr).await { -// Ok(outbound) => outbound, -// Err(e) => { -// return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e)); -// } -// }; -// if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr)) -// = (inbound.connection().remote_addr(), inbound.connection().local_addr(), outbound.local_addr(), outbound.peer_addr()) { -// let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); -// information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer); -// } -// let (mut ri, mut wi) = inbound.split(); -// let (mut ro, mut wo) = outbound.split(); -// inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await -// } -// -// pub async fn transfer_for_client_to_server(mut inbound: TcpStream, outbound: BidirectionalStream, conn_count: String) -> Result<(), String> { -// if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr)) -// = (inbound.peer_addr(), inbound.local_addr(), outbound.connection().local_addr(), outbound.connection().remote_addr()) { -// let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); -// information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer); -// } -// let (mut ri, mut wi) = inbound.split(); -// let (mut ro, mut wo) = outbound.split(); -// inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await -// } +pub async fn transfer_for_server_to_remote(recv: RecvStream, send: SendStream, remote_addr: &str, local_addr: &str, proxy_addr: String, conn_count: String) -> Result<(), String> { + let mut outbound = match TcpStream::connect(&proxy_addr).await { + Ok(outbound) => outbound, + Err(e) => { + return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e)); + } + }; + if let (in_peer_addr, in_local_addr, Ok(ref out_local_addr), Ok(ref out_peer_addr)) + = (remote_addr, local_addr, outbound.local_addr(), outbound.peer_addr()) { + let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); + information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer); + } + let (mut ri, mut wi) = (recv, send); + let (mut ro, mut wo) = outbound.split(); + inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await +} + +pub async fn transfer_for_client_to_server(mut inbound: TcpStream, recv: RecvStream, send: SendStream, remote_addr: &str, local_addr: &str, conn_count: String) -> Result<(), String> { + if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), out_local_addr, out_peer_addr) + = (inbound.peer_addr(), inbound.local_addr(), local_addr, remote_addr) { + let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); + information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer); + } + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = (recv, send); + inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await +} async fn inner_transfer<'a, R1, W1, R2, W2>(mut ri: &'a mut R1, mut wi: &'a mut W1, mut ro: &'a mut R2, mut wo: &'a mut W2, conn_count: String) -> Result<(), String> where R1: AsyncRead + Unpin + ?Sized, W1: AsyncWrite + Unpin + ?Sized, diff --git a/src/server.rs b/src/server.rs index 18ee994..f61c647 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,12 @@ use std::fs; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use futures_util::StreamExt; +use quinn::{Endpoint, ServerConfig}; use rust_util::XResult; +use tokio::time::sleep; use crate::config::ListenConfig; use crate::io_util; @@ -13,31 +19,71 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let key_pem = opt_result!(fs::read_to_string(&key_pem_file), "Read key pem file :{}, failed: {}", &key_pem_file); - // let mut server = Server::builder() - // .with_tls((cert_pem.as_str(), key_pem.as_str()))? - // .with_io(listen_config.listen.as_str())? - // .start()?; - information!("Listen: {}", &listen_config.listen); - // while let Some(mut connection) = server.accept().await { - // // spawn a new task for the connection - // let proxy_address = listen_config.proxy_address.clone(); - // tokio::spawn(async move { - // information!("Connection accepted from {:?}", connection.remote_addr()); - // while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { - // // spawn a new task for the stream - // let connection_id = connection.id(); - // let proxy_address = proxy_address.clone(); - // tokio::spawn(async move { - // information!("Stream opened from {:?}", stream.connection().remote_addr()); - // let conn_count = format!("{}-{}-{}", rust_util::util_time::get_current_millis(), connection_id, stream.id()); - // if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await { - // failure!("Server - Client error: {}", e); - // } - // }); - // } - // println!("Connection closed from {:?}", connection.remote_addr()); - // }); - // } + let cert_bytes = opt_result!(pem::parse(&cert_pem), "Parse cert pem failed: {}").contents; + let key_bytes = opt_result!(pem::parse(&key_pem), "Parse key pem failed: {}").contents; - Ok(()) + let priv_key = rustls::PrivateKey(key_bytes); + let cert_chain = vec![rustls::Certificate(cert_bytes)]; + + let mut server_config = opt_result!(ServerConfig::with_single_cert(cert_chain, priv_key), "Create server config failed: {}"); + Arc::get_mut(&mut server_config.transport) + .unwrap() + .max_concurrent_uni_streams(0_u8.into()); + information!("Listen: {}", &listen_config.listen); + let listen_addr: SocketAddr = opt_result!(listen_config.listen.parse(), "Parse listen address: {} failed: {}", &listen_config.listen); + let (_endpoint, mut incoming) = opt_result!(Endpoint::server(server_config, listen_addr), "Listen server failed: {}"); + let proxy_address = listen_config.proxy_address.clone(); + loop { + let connection = match incoming.next().await { + Some(connection) => connection, + None => { + warning!("Create connection is None"); + sleep(Duration::from_secs(3)).await; + continue; + } + }; + let connection = match connection.await { + Ok(connection) => connection, + Err(e) => { + warning!("Create connection failed: {}", e); + sleep(Duration::from_secs(3)).await; + continue; + } + }; + let quinn::NewConnection { + connection, + mut bi_streams, + .. + } = connection; + let proxy_address = proxy_address.clone(); + tokio::spawn(async move { + information!("Connection accepted from {}", connection.remote_address()); + let remote_addr = format!("{}", connection.remote_address()); + let local_addr = connection.local_ip().map(|ip| format!("{}", ip)).unwrap_or_else(|| "".to_string()); + loop { + match bi_streams.next().await { + None => { + information!("Connection ended"); + break; + } + Some(Err(e)) => { + information!("Connection ended: {}", e); + break; + } + Some(Ok((send, recv))) => { + let remote_addr = remote_addr.clone(); + let local_addr = local_addr.clone(); + let proxy_address = proxy_address.clone(); + tokio::spawn(async move { + information!("Stream opened from {:?}", &remote_addr); + let conn_count = format!("{}", rust_util::util_time::get_current_millis()); + if let Err(e) = io_util::transfer_for_server_to_remote(recv, send, &remote_addr, &local_addr, proxy_address, conn_count).await { + failure!("Server - Client error: {}", e); + } + }); + } + } + } + }); + } }