feat: pending test

This commit is contained in:
2022-08-23 02:30:10 +08:00
parent 9e9301d4d7
commit 67e1d2f495
4 changed files with 40 additions and 26 deletions

View File

@@ -1,6 +1,6 @@
{ {
"listen": "127.0.0.1:2001", "listen": "127.0.0.1:2001",
"cert_pem_file": "cert.pem", "cert_pem_file": "cert.pem",
"proxy_address": "47.52.7.223:1443", "proxy_address": "127.0.0.1:2002",
"proxy_server_name": "localhost" "proxy_server_name": "localhost"
} }

View File

@@ -2,7 +2,7 @@ use std::fs;
use std::net::SocketAddr; use std::net::SocketAddr;
use rust_util::{util_time, XResult}; use rust_util::{util_time, XResult};
use s2n_quic::Client; use s2n_quic::{Client, Connection};
use s2n_quic::client::Connect; use s2n_quic::client::Connect;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
@@ -42,6 +42,7 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
let server_name = opt_value_result!(&listen_config.proxy_server_name, let server_name = opt_value_result!(&listen_config.proxy_server_name,
"proxy_server_name in config is require in client mode"); "proxy_server_name in config is require in client mode");
let mut connection_opt: Option<Connection> = None;
loop { loop {
let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await {
Some(time_and_stream) => time_and_stream, Some(time_and_stream) => time_and_stream,
@@ -51,34 +52,37 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
} }
}; };
information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap()); information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap());
let addr = addr.clone(); let time = util_time::get_current_millis();
let client = client.clone(); if (time as i128 - client_stream_time as i128).abs() > 3_000 {
let server_name = server_name.clone(); warning!("Connection is more than 3 second, abandon connection");
tokio::spawn(async move { continue;
}
if let None = connection_opt {
let connect = Connect::new(addr).with_server_name(server_name.as_str()); let connect = Connect::new(addr).with_server_name(server_name.as_str());
let mut connection = match client.connect(connect).await { let mut connection = match client.connect(connect).await {
Ok(connection) => connection, Ok(connection) => connection,
Err(e) => { Err(e) => {
failure!("Connect to server failed: {}", e); failure!("Connect to server failed: {}", e);
return; continue;
} }
}; };
connection.keep_alive(true).ok(); connection.keep_alive(true).ok();
connection_opt = Some(connection);
}
let time = util_time::get_current_millis(); let connection = connection_opt.as_mut().unwrap();
if (time as i128 - client_stream_time as i128).abs() > 3_000 { let server_stream = match connection.open_bidirectional_stream().await {
warning!("Connection is more than 3 second, abandon connection"); Ok(stream) => stream,
return; Err(e) => {
failure!("Open stream in connection to server failed: {}", e);
connection_opt = None;
continue;
} }
};
let server_stream = match connection.open_bidirectional_stream().await { let connection_id = connection.id();
Ok(stream) => stream, tokio::spawn(async move {
Err(e) => { let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id());
failure!("Open stream in connection to server failed: {}", e);
return;
}
};
let conn_count = format!("{}", util_time::get_current_millis());
if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await { if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await {
failure!("Client - Server error: {}", e); failure!("Client - Server error: {}", e);
} }

View File

@@ -2,11 +2,14 @@ use std::io::{Error, ErrorKind};
use std::time::Duration; use std::time::Duration;
use futures::future::try_join; use futures::future::try_join;
use rust_util::util_msg;
use rust_util::util_msg::MessageType;
use s2n_quic::stream::BidirectionalStream; use s2n_quic::stream::BidirectionalStream;
use tokio::{select, time}; use tokio::{select, time};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
#[derive(Debug)]
enum StreamDirection { enum StreamDirection {
Up, Up,
Down, Down,
@@ -143,7 +146,7 @@ async fn inner_transfer<'a, R1, W1, R2, W2>(mut ri: &'a mut R1, mut wi: &'a mut
// } // }
async fn copy_data<'a, R, W>( async fn copy_data<'a, R, W>(
reader: &'a mut R, writer: &'a mut W, _direction: StreamDirection, _conn_count: &str) -> tokio::io::Result<u64> reader: &'a mut R, writer: &'a mut W, direction: StreamDirection, conn_count: &str) -> tokio::io::Result<u64>
where R: AsyncRead + Unpin + ?Sized, where R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized { W: AsyncWrite + Unpin + ?Sized {
let mut total_copied_bytes = 0_u64; let mut total_copied_bytes = 0_u64;
@@ -151,11 +154,17 @@ async fn copy_data<'a, R, W>(
loop { loop {
match reader.read(&mut buff).await { match reader.read(&mut buff).await {
Ok(0) => return Ok(total_copied_bytes), Ok(0) => return Ok(total_copied_bytes),
Ok(n) => match writer.write_all(&buff[..n]).await { Ok(n) => {
Ok(_) => { util_msg::when(MessageType::DEBUG, || {
total_copied_bytes += n as u64; debugging!("[conn {}] Direction: {:?}, {} bytes: {:02x?}", conn_count, direction, n, &buff[..n]);
debugging!("[conn {}] Direction: {:?}, {} string: {}", conn_count, direction, n, String::from_utf8_lossy(&buff[..n]).to_string());
});
match writer.write_all(&buff[..n]).await {
Ok(_) => {
total_copied_bytes += n as u64;
}
Err(e) => return Err(e),
} }
Err(e) => return Err(e),
} }
Err(e) => return Err(e), Err(e) => return Err(e),
} }

View File

@@ -26,10 +26,11 @@ pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
information!("Connection accepted from {:?}", connection.remote_addr()); information!("Connection accepted from {:?}", connection.remote_addr());
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
// spawn a new task for the stream // spawn a new task for the stream
let connection_id = connection.id();
let proxy_address = proxy_address.clone(); let proxy_address = proxy_address.clone();
tokio::spawn(async move { tokio::spawn(async move {
information!("Stream opened from {:?}", stream.connection().remote_addr()); information!("Stream opened from {:?}", stream.connection().remote_addr());
let conn_count = format!("{}", rust_util::util_time::get_current_millis()); let conn_count = format!("{}-{}-{}", rust_util::util_time::get_current_millis(), connection_id, stream.id());
if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await { if let Err(e) = io_util::transfer_for_server_to_remote(stream, proxy_address, conn_count).await {
failure!("Server - Client error: {}", e); failure!("Server - Client error: {}", e);
} }