Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
8706733610
|
|||
|
47d2937c45
|
|||
|
9fbc498615
|
|||
| 3037464c03 | |||
|
04ad8ab668
|
|||
|
bec4467b54
|
946
Cargo.lock
generated
946
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,19 +1,23 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "simple-rust-http3-proxy"
|
name = "simple-rust-http3-proxy"
|
||||||
version = "0.2.1"
|
version = "0.3.1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
s2n-quic = { version = "1.9", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] }
|
futures-util = "0.3.14"
|
||||||
clap = "2.33"
|
clap = "2.33"
|
||||||
rust_util = "0.6"
|
rust_util = "0.6"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
deser-hjson = "1.0"
|
deser-hjson = "1.0"
|
||||||
|
pem = "1.0"
|
||||||
|
rcgen = "0.8.11"
|
||||||
|
quinn = "0.8.5"
|
||||||
|
rustls = { version = "0.20.3", default-features = false, features = ["quic"] }
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
codegen-units = 1
|
codegen-units = 1
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
|
use std::error::Error;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use quinn::{ClientConfig, Endpoint};
|
||||||
use rust_util::{util_time, XResult};
|
use rust_util::{util_time, XResult};
|
||||||
use s2n_quic::{Client, Connection};
|
|
||||||
use s2n_quic::client::Connect;
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::sync::mpsc::channel;
|
use tokio::sync::mpsc::channel;
|
||||||
|
|
||||||
@@ -21,9 +21,7 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
|
|||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Ok((inbound, _)) = listener.accept().await {
|
while let Ok((inbound, _)) = listener.accept().await {
|
||||||
information!("Receive connection: {}", inbound.peer_addr().map(|addr| format!("{}", addr)).unwrap_or_else(|_| "n/a".to_string()));
|
information!("Receive connection: {}", inbound.peer_addr().map(|addr| format!("{}", addr)).unwrap_or_else(|_| "n/a".to_string()));
|
||||||
// if is_in_peer_addr_matches(&inbound, &allow_ips, sender_tx.clone()) {
|
// TODO if is_in_peer_addr_matches(&inbound, &allow_ips) {
|
||||||
// } else {
|
|
||||||
// }
|
|
||||||
if let Err(e) = inbound_stream_channel_sender.send((util_time::get_current_millis(), inbound)).await {
|
if let Err(e) = inbound_stream_channel_sender.send((util_time::get_current_millis(), inbound)).await {
|
||||||
failure!("Send tcp stream to channel failed: {}", e);
|
failure!("Send tcp stream to channel failed: {}", e);
|
||||||
}
|
}
|
||||||
@@ -34,15 +32,13 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
|
|||||||
listen_config.proxy_address, listen_config.proxy_server_name);
|
listen_config.proxy_address, listen_config.proxy_server_name);
|
||||||
let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file),
|
let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file),
|
||||||
"Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file);
|
"Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file);
|
||||||
let client = Client::builder()
|
let cert_bytes = opt_result!(pem::parse(&cert_pem), "Parse cert pem failed: {}").contents;
|
||||||
.with_tls(cert_pem.as_str())?
|
|
||||||
.with_io("0.0.0.0:0")?
|
|
||||||
.start()?;
|
|
||||||
let addr: SocketAddr = listen_config.proxy_address.parse()?;
|
let addr: SocketAddr = listen_config.proxy_address.parse()?;
|
||||||
let server_name = opt_value_result!(&listen_config.proxy_server_name,
|
let server_name = opt_value_result!(&listen_config.proxy_server_name,
|
||||||
"proxy_server_name in config is require in client mode");
|
"proxy_server_name in config is require in client mode");
|
||||||
|
|
||||||
let mut connection_opt: Option<Connection> = None;
|
let mut endpoint_opt: Option<Endpoint> = None;
|
||||||
loop {
|
loop {
|
||||||
let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await {
|
let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await {
|
||||||
Some(time_and_stream) => time_and_stream,
|
Some(time_and_stream) => time_and_stream,
|
||||||
@@ -58,34 +54,67 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let None = connection_opt {
|
if let None = endpoint_opt {
|
||||||
let connect = Connect::new(addr).with_server_name(server_name.as_str());
|
let endpoint = match make_client_endpoint("0.0.0.0:0".parse().unwrap(), &[&cert_bytes]) {
|
||||||
let mut connection = match client.connect(connect).await {
|
Ok(client) => client,
|
||||||
Ok(connection) => connection,
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
failure!("Connect to server failed: {}", e);
|
failure!("Make client endpoint failed: {}", e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
connection.keep_alive(true).ok();
|
endpoint_opt = Some(endpoint);
|
||||||
connection_opt = Some(connection);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let connection = connection_opt.as_mut().unwrap();
|
let endpoint = endpoint_opt.as_mut().unwrap();
|
||||||
let server_stream = match connection.open_bidirectional_stream().await {
|
|
||||||
Ok(stream) => stream,
|
let connect = match endpoint.connect(addr, &server_name) {
|
||||||
|
Ok(connect) => connect,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
failure!("Open stream in connection to server failed: {}", e);
|
failure!("Connect failed: {:?}", e);
|
||||||
connection_opt = None;
|
endpoint_opt = None;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let connection_id = connection.id();
|
let quinn::NewConnection { connection, .. } = match connect.await {
|
||||||
|
Ok(connection) => connection,
|
||||||
|
Err(e) => {
|
||||||
|
failure!("Connect failed: {:?}", e);
|
||||||
|
endpoint_opt = None;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let (send, recv) = match connection.open_bi().await {
|
||||||
|
Ok(stream) => stream,
|
||||||
|
Err(e) => {
|
||||||
|
failure!("Connect failed: {:?}", e);
|
||||||
|
endpoint_opt = None;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let remote_addr = format!("{}", connection.remote_address());
|
||||||
|
let local_addr = connection.local_ip().map(|ip| format!("{}", ip)).unwrap_or_else(|| "".to_string());
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id());
|
let remote_addr = remote_addr.clone();
|
||||||
if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await {
|
let local_addr = local_addr.clone();
|
||||||
|
let conn_count = format!("{}", util_time::get_current_millis());
|
||||||
|
if let Err(e) = io_util::transfer_for_client_to_server(client_stream, recv, send, &remote_addr, &local_addr, conn_count).await {
|
||||||
failure!("Client - Server error: {}", e);
|
failure!("Client - Server error: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_client_endpoint(bind_addr: SocketAddr, server_certs: &[&[u8]]) -> Result<Endpoint, Box<dyn Error>> {
|
||||||
|
let client_cfg = configure_client(server_certs)?;
|
||||||
|
let mut endpoint = Endpoint::client(bind_addr)?;
|
||||||
|
endpoint.set_default_client_config(client_cfg);
|
||||||
|
Ok(endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn configure_client(server_certs: &[&[u8]]) -> Result<ClientConfig, Box<dyn Error>> {
|
||||||
|
let mut certs = rustls::RootCertStore::empty();
|
||||||
|
for cert in server_certs {
|
||||||
|
certs.add(&rustls::Certificate(cert.to_vec()))?;
|
||||||
|
}
|
||||||
|
Ok(ClientConfig::with_root_certificates(certs))
|
||||||
}
|
}
|
||||||
@@ -2,9 +2,9 @@ use std::io::{Error, ErrorKind};
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::future::try_join;
|
use futures::future::try_join;
|
||||||
|
use quinn::{RecvStream, SendStream};
|
||||||
use rust_util::util_msg;
|
use rust_util::util_msg;
|
||||||
use rust_util::util_msg::MessageType;
|
use rust_util::util_msg::MessageType;
|
||||||
use s2n_quic::stream::BidirectionalStream;
|
|
||||||
use tokio::{select, time};
|
use tokio::{select, time};
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
@@ -15,31 +15,31 @@ enum StreamDirection {
|
|||||||
Down,
|
Down,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn transfer_for_server_to_remote(inbound: BidirectionalStream, proxy_addr: String, conn_count: String) -> Result<(), String> {
|
pub async fn transfer_for_server_to_remote(recv: RecvStream, send: SendStream, remote_addr: &str, local_addr: &str, proxy_addr: String, conn_count: String) -> Result<(), String> {
|
||||||
let mut outbound = match TcpStream::connect(&proxy_addr).await {
|
let mut outbound = match TcpStream::connect(&proxy_addr).await {
|
||||||
Ok(outbound) => outbound,
|
Ok(outbound) => outbound,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e));
|
return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr))
|
if let (in_peer_addr, in_local_addr, Ok(ref out_local_addr), Ok(ref out_peer_addr))
|
||||||
= (inbound.connection().remote_addr(), inbound.connection().local_addr(), outbound.local_addr(), outbound.peer_addr()) {
|
= (remote_addr, local_addr, outbound.local_addr(), outbound.peer_addr()) {
|
||||||
let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
|
let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
|
||||||
information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer);
|
information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer);
|
||||||
}
|
}
|
||||||
let (mut ri, mut wi) = inbound.split();
|
let (mut ri, mut wi) = (recv, send);
|
||||||
let (mut ro, mut wo) = outbound.split();
|
let (mut ro, mut wo) = outbound.split();
|
||||||
inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await
|
inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn transfer_for_client_to_server(mut inbound: TcpStream, outbound: BidirectionalStream, conn_count: String) -> Result<(), String> {
|
pub async fn transfer_for_client_to_server(mut inbound: TcpStream, recv: RecvStream, send: SendStream, remote_addr: &str, local_addr: &str, conn_count: String) -> Result<(), String> {
|
||||||
if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr))
|
if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), out_local_addr, out_peer_addr)
|
||||||
= (inbound.peer_addr(), inbound.local_addr(), outbound.connection().local_addr(), outbound.connection().remote_addr()) {
|
= (inbound.peer_addr(), inbound.local_addr(), local_addr, remote_addr) {
|
||||||
let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
|
let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
|
||||||
information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer);
|
information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer);
|
||||||
}
|
}
|
||||||
let (mut ri, mut wi) = inbound.split();
|
let (mut ri, mut wi) = inbound.split();
|
||||||
let (mut ro, mut wo) = outbound.split();
|
let (mut ro, mut wo) = (recv, send);
|
||||||
inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await
|
inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ async fn main() -> XResult<()> {
|
|||||||
failure_and_exit!("Cannot run in both server and client mode");
|
failure_and_exit!("Cannot run in both server and client mode");
|
||||||
}
|
}
|
||||||
if !server_mode && !client_mode {
|
if !server_mode && !client_mode {
|
||||||
failure_and_exit!("Must run in server on client mode")
|
failure_and_exit!("Must run in server or client mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
let config_file = opt_value_result!(matches.value_of("config"), "--config is required");
|
let config_file = opt_value_result!(matches.value_of("config"), "--config is required");
|
||||||
|
|||||||
@@ -1,7 +1,12 @@
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use quinn::{Endpoint, ServerConfig};
|
||||||
use rust_util::XResult;
|
use rust_util::XResult;
|
||||||
use s2n_quic::Server;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
use crate::config::ListenConfig;
|
use crate::config::ListenConfig;
|
||||||
use crate::io_util;
|
use crate::io_util;
|
||||||
@@ -14,31 +19,70 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
|
|||||||
let key_pem = opt_result!(fs::read_to_string(&key_pem_file),
|
let key_pem = opt_result!(fs::read_to_string(&key_pem_file),
|
||||||
"Read key pem file :{}, failed: {}", &key_pem_file);
|
"Read key pem file :{}, failed: {}", &key_pem_file);
|
||||||
|
|
||||||
let mut server = Server::builder()
|
let cert_bytes = opt_result!(pem::parse(&cert_pem), "Parse cert pem failed: {}").contents;
|
||||||
.with_tls((cert_pem.as_str(), key_pem.as_str()))?
|
let key_bytes = opt_result!(pem::parse(&key_pem), "Parse key pem failed: {}").contents;
|
||||||
.with_io(listen_config.listen.as_str())?
|
|
||||||
.start()?;
|
let priv_key = rustls::PrivateKey(key_bytes);
|
||||||
|
let cert_chain = vec![rustls::Certificate(cert_bytes)];
|
||||||
|
|
||||||
|
let mut server_config = opt_result!(ServerConfig::with_single_cert(cert_chain, priv_key), "Create server config failed: {}");
|
||||||
|
Arc::get_mut(&mut server_config.transport).unwrap()
|
||||||
|
.max_concurrent_uni_streams(0_u8.into());
|
||||||
information!("Listen: {}", &listen_config.listen);
|
information!("Listen: {}", &listen_config.listen);
|
||||||
while let Some(mut connection) = server.accept().await {
|
let listen_addr: SocketAddr = opt_result!(listen_config.listen.parse(), "Parse listen address: {} failed: {}", &listen_config.listen);
|
||||||
// spawn a new task for the connection
|
let (_endpoint, mut incoming) = opt_result!(Endpoint::server(server_config, listen_addr), "Listen server failed: {}");
|
||||||
let proxy_address = listen_config.proxy_address.clone();
|
let proxy_address = listen_config.proxy_address.clone();
|
||||||
tokio::spawn(async move {
|
loop {
|
||||||
information!("Connection accepted from {:?}", connection.remote_addr());
|
let connection = match incoming.next().await {
|
||||||
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
|
Some(connection) => connection,
|
||||||
// spawn a new task for the stream
|
None => {
|
||||||
let connection_id = connection.id();
|
warning!("Create connection is None");
|
||||||
let proxy_address = proxy_address.clone();
|
sleep(Duration::from_secs(3)).await;
|
||||||
tokio::spawn(async move {
|
continue;
|
||||||
information!("Stream opened from {:?}", stream.connection().remote_addr());
|
}
|
||||||
let conn_count = format!("{}-{}-{}", rust_util::util_time::get_current_millis(), connection_id, stream.id());
|
};
|
||||||
if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await {
|
let connection = match connection.await {
|
||||||
failure!("Server - Client error: {}", e);
|
Ok(connection) => connection,
|
||||||
}
|
Err(e) => {
|
||||||
});
|
warning!("Create connection failed: {:?}", e);
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let quinn::NewConnection {
|
||||||
|
connection,
|
||||||
|
mut bi_streams,
|
||||||
|
..
|
||||||
|
} = connection;
|
||||||
|
let proxy_address = proxy_address.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
information!("Connection accepted from {}", connection.remote_address());
|
||||||
|
let remote_addr = format!("{}", connection.remote_address());
|
||||||
|
let local_addr = connection.local_ip().map(|ip| format!("{}", ip)).unwrap_or_else(|| "".to_string());
|
||||||
|
loop {
|
||||||
|
match bi_streams.next().await {
|
||||||
|
None => {
|
||||||
|
information!("Connection ended");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
information!("Connection ended: {:?}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Some(Ok((send, recv))) => {
|
||||||
|
let remote_addr = remote_addr.clone();
|
||||||
|
let local_addr = local_addr.clone();
|
||||||
|
let proxy_address = proxy_address.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
information!("Stream opened from {:?}", &remote_addr);
|
||||||
|
let conn_count = format!("{}", rust_util::util_time::get_current_millis());
|
||||||
|
if let Err(e) = io_util::transfer_for_server_to_remote(recv, send, &remote_addr, &local_addr, proxy_address, conn_count).await {
|
||||||
|
failure!("Server - Client error: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
println!("Connection closed from {:?}", connection.remote_addr());
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user