feat: pending implement quic qinn

This commit is contained in:
2022-08-25 01:40:58 +08:00
parent 87ff030ff0
commit bec4467b54
5 changed files with 319 additions and 848 deletions

943
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,13 +7,16 @@ edition = "2021"
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
s2n-quic = { version = "1.9", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] } futures-util = "0.3.14"
clap = "2.33" clap = "2.33"
rust_util = "0.6" rust_util = "0.6"
futures = "0.3" futures = "0.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
deser-hjson = "1.0" deser-hjson = "1.0"
rcgen = "0.8.11"
quinn = "0.8.5"
rustls = { version = "0.20.3", default-features = false, features = ["quic"] }
[profile.release] [profile.release]
codegen-units = 1 codegen-units = 1

View File

@@ -2,8 +2,6 @@ use std::fs;
use std::net::SocketAddr; use std::net::SocketAddr;
use rust_util::{util_time, XResult}; use rust_util::{util_time, XResult};
use s2n_quic::{Client, Connection};
use s2n_quic::client::Connect;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
@@ -32,60 +30,61 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
information!("Start connect to server, proxy address: {}, with server name: {:?}", information!("Start connect to server, proxy address: {}, with server name: {:?}",
listen_config.proxy_address, listen_config.proxy_server_name); listen_config.proxy_address, listen_config.proxy_server_name);
let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), // let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file),
"Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); // "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file);
let client = Client::builder() // let client = Client::builder()
.with_tls(cert_pem.as_str())? // .with_tls(cert_pem.as_str())?
.with_io("0.0.0.0:0")? // .with_io("0.0.0.0:0")?
.start()?; // .start()?;
let addr: SocketAddr = listen_config.proxy_address.parse()?; // let addr: SocketAddr = listen_config.proxy_address.parse()?;
let server_name = opt_value_result!(&listen_config.proxy_server_name, // let server_name = opt_value_result!(&listen_config.proxy_server_name,
"proxy_server_name in config is require in client mode"); // "proxy_server_name in config is require in client mode");
//
let mut connection_opt: Option<Connection> = None; // let mut connection_opt: Option<Connection> = None;
loop { // loop {
let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { // let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await {
Some(time_and_stream) => time_and_stream, // Some(time_and_stream) => time_and_stream,
None => { // None => {
information!("End client to server proxy"); // information!("End client to server proxy");
return Ok(()); // return Ok(());
} // }
}; // };
information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap()); // information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap());
let time = util_time::get_current_millis(); // let time = util_time::get_current_millis();
if (time as i128 - client_stream_time as i128).abs() > 3_000 { // if (time as i128 - client_stream_time as i128).abs() > 3_000 {
warning!("Connection is more than 3 seconds, abandon connection"); // warning!("Connection is more than 3 seconds, abandon connection");
continue; // continue;
} // }
//
if let None = connection_opt { // if let None = connection_opt {
let connect = Connect::new(addr).with_server_name(server_name.as_str()); // let connect = Connect::new(addr).with_server_name(server_name.as_str());
let mut connection = match client.connect(connect).await { // let mut connection = match client.connect(connect).await {
Ok(connection) => connection, // Ok(connection) => connection,
Err(e) => { // Err(e) => {
failure!("Connect to server failed: {}", e); // failure!("Connect to server failed: {}", e);
continue; // continue;
} // }
}; // };
connection.keep_alive(true).ok(); // connection.keep_alive(true).ok();
connection_opt = Some(connection); // connection_opt = Some(connection);
} // }
//
let connection = connection_opt.as_mut().unwrap(); // let connection = connection_opt.as_mut().unwrap();
let server_stream = match connection.open_bidirectional_stream().await { // let server_stream = match connection.open_bidirectional_stream().await {
Ok(stream) => stream, // Ok(stream) => stream,
Err(e) => { // Err(e) => {
failure!("Open stream in connection to server failed: {}", e); // failure!("Open stream in connection to server failed: {}", e);
connection_opt = None; // connection_opt = None;
continue; // continue;
} // }
}; // };
let connection_id = connection.id(); // let connection_id = connection.id();
tokio::spawn(async move { // tokio::spawn(async move {
let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id()); // let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id());
if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await { // if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await {
failure!("Client - Server error: {}", e); // failure!("Client - Server error: {}", e);
} // }
}); // });
} // }
Ok(())
} }

View File

@@ -4,7 +4,6 @@ use std::time::Duration;
use futures::future::try_join; use futures::future::try_join;
use rust_util::util_msg; use rust_util::util_msg;
use rust_util::util_msg::MessageType; use rust_util::util_msg::MessageType;
use s2n_quic::stream::BidirectionalStream;
use tokio::{select, time}; use tokio::{select, time};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
@@ -15,33 +14,33 @@ enum StreamDirection {
Down, Down,
} }
pub async fn transfer_for_server_to_remote(inbound: BidirectionalStream, proxy_addr: String, conn_count: String) -> Result<(), String> { // pub async fn transfer_for_server_to_remote(inbound: BidirectionalStream, proxy_addr: String, conn_count: String) -> Result<(), String> {
let mut outbound = match TcpStream::connect(&proxy_addr).await { // let mut outbound = match TcpStream::connect(&proxy_addr).await {
Ok(outbound) => outbound, // Ok(outbound) => outbound,
Err(e) => { // Err(e) => {
return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e)); // return Err(format!("[conn {}] Failed to connect to: {}, err: {}", &conn_count, &proxy_addr, e));
} // }
}; // };
if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr)) // if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr))
= (inbound.connection().remote_addr(), inbound.connection().local_addr(), outbound.local_addr(), outbound.peer_addr()) { // = (inbound.connection().remote_addr(), inbound.connection().local_addr(), outbound.local_addr(), outbound.peer_addr()) {
let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); // let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer); // information!("[conn {}] New server-remote tcp connection: {}", &conn_count, peer);
} // }
let (mut ri, mut wi) = inbound.split(); // let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split(); // let (mut ro, mut wo) = outbound.split();
inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await // inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await
} // }
//
pub async fn transfer_for_client_to_server(mut inbound: TcpStream, outbound: BidirectionalStream, conn_count: String) -> Result<(), String> { // pub async fn transfer_for_client_to_server(mut inbound: TcpStream, outbound: BidirectionalStream, conn_count: String) -> Result<(), String> {
if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr)) // if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr))
= (inbound.peer_addr(), inbound.local_addr(), outbound.connection().local_addr(), outbound.connection().remote_addr()) { // = (inbound.peer_addr(), inbound.local_addr(), outbound.connection().local_addr(), outbound.connection().remote_addr()) {
let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr); // let peer = format!("{} -> [{} * {}] -> {}", in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer); // information!("[conn {}] New client-server tcp connection: {}", &conn_count, peer);
} // }
let (mut ri, mut wi) = inbound.split(); // let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split(); // let (mut ro, mut wo) = outbound.split();
inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await // inner_transfer(&mut ri, &mut wi, &mut ro, &mut wo, conn_count).await
} // }
async fn inner_transfer<'a, R1, W1, R2, W2>(mut ri: &'a mut R1, mut wi: &'a mut W1, mut ro: &'a mut R2, mut wo: &'a mut W2, conn_count: String) -> Result<(), String> async fn inner_transfer<'a, R1, W1, R2, W2>(mut ri: &'a mut R1, mut wi: &'a mut W1, mut ro: &'a mut R2, mut wo: &'a mut W2, conn_count: String) -> Result<(), String>
where R1: AsyncRead + Unpin + ?Sized, W1: AsyncWrite + Unpin + ?Sized, where R1: AsyncRead + Unpin + ?Sized, W1: AsyncWrite + Unpin + ?Sized,

View File

@@ -1,7 +1,6 @@
use std::fs; use std::fs;
use rust_util::XResult; use rust_util::XResult;
use s2n_quic::Server;
use crate::config::ListenConfig; use crate::config::ListenConfig;
use crate::io_util; use crate::io_util;
@@ -14,31 +13,31 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
let key_pem = opt_result!(fs::read_to_string(&key_pem_file), let key_pem = opt_result!(fs::read_to_string(&key_pem_file),
"Read key pem file :{}, failed: {}", &key_pem_file); "Read key pem file :{}, failed: {}", &key_pem_file);
let mut server = Server::builder() // let mut server = Server::builder()
.with_tls((cert_pem.as_str(), key_pem.as_str()))? // .with_tls((cert_pem.as_str(), key_pem.as_str()))?
.with_io(listen_config.listen.as_str())? // .with_io(listen_config.listen.as_str())?
.start()?; // .start()?;
information!("Listen: {}", &listen_config.listen); information!("Listen: {}", &listen_config.listen);
while let Some(mut connection) = server.accept().await { // while let Some(mut connection) = server.accept().await {
// spawn a new task for the connection // // spawn a new task for the connection
let proxy_address = listen_config.proxy_address.clone(); // let proxy_address = listen_config.proxy_address.clone();
tokio::spawn(async move { // tokio::spawn(async move {
information!("Connection accepted from {:?}", connection.remote_addr()); // information!("Connection accepted from {:?}", connection.remote_addr());
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { // while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
// spawn a new task for the stream // // spawn a new task for the stream
let connection_id = connection.id(); // let connection_id = connection.id();
let proxy_address = proxy_address.clone(); // let proxy_address = proxy_address.clone();
tokio::spawn(async move { // tokio::spawn(async move {
information!("Stream opened from {:?}", stream.connection().remote_addr()); // information!("Stream opened from {:?}", stream.connection().remote_addr());
let conn_count = format!("{}-{}-{}", rust_util::util_time::get_current_millis(), connection_id, stream.id()); // let conn_count = format!("{}-{}-{}", rust_util::util_time::get_current_millis(), connection_id, stream.id());
if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await { // if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await {
failure!("Server - Client error: {}", e); // failure!("Server - Client error: {}", e);
} // }
}); // });
} // }
println!("Connection closed from {:?}", connection.remote_addr()); // println!("Connection closed from {:?}", connection.remote_addr());
}); // });
} // }
Ok(()) Ok(())
} }