feat: works

This commit is contained in:
2022-08-21 15:00:33 +08:00
parent 4723298aec
commit f65898bef5
7 changed files with 123 additions and 25 deletions

View File

@@ -1,33 +1,96 @@
use std::fs;
use std::net::SocketAddr;
use rust_util::XResult;
use std::time::Duration;
use rust_util::{util_time, XResult};
use s2n_quic::Client;
use s2n_quic::client::Connect;
use tokio::net::TcpListener;
use tokio::sync::mpsc::channel;
use tokio::time::sleep;
use crate::config::ListenConfig;
use crate::io_util;
pub async fn run(listen_config: &ListenConfig) -> XResult<()> {
let (inbound_stream_channel_sender, mut inbound_stream_channel_receiver) = channel(16);
let listener = match TcpListener::bind(&listen_config.listen).await {
Ok(listener) => listener,
Err(e) => return simple_error!("Listen: {} failed: {}", listen_config.listen, e),
};
information!("Start listen: {}", &listen_config.listen);
tokio::spawn(async move {
while let Ok((inbound, _)) = listener.accept().await {
information!("Receive connection: {}", inbound.peer_addr().map(|addr| format!("{}", addr)).unwrap_or_else(|_| "n/a".to_string()));
// if is_in_peer_addr_matches(&inbound, &allow_ips, sender_tx.clone()) {
// } else {
// }
if let Err(e) = inbound_stream_channel_sender.send((util_time::get_current_millis(), inbound)).await {
failure!("Send tcp stream to channel failed: {}", e);
}
}
});
information!("Start connect to server");
let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file),
"Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file);
let client = Client::builder()
.with_tls(cert_pem.as_str())?
.with_io(listen_config.listen.as_str())?
.start()?;
let addr: SocketAddr = "127.0.0.1:4433".parse()?;
let connect = Connect::new(addr).with_server_name("localhost");
let mut connection = client.connect(connect).await?;
// ensure the connection doesn't time out with inactivity
connection.keep_alive(true)?;
// open a new stream and split the receiving and sending sides
let stream = connection.open_bidirectional_stream().await?;
let (mut receive_stream, mut send_stream) = stream.split();
// spawn a task that copies responses from the server to stdout
tokio::spawn(async move {
let mut stdout = tokio::io::stdout();
let _ = tokio::io::copy(&mut receive_stream, &mut stdout).await;
});
// copy data from stdin and send it to the server
let mut stdin = tokio::io::stdin();
tokio::io::copy(&mut stdin, &mut send_stream).await?;
Ok(())
let addr: SocketAddr = listen_config.proxy_address.parse()?;
let server_name = opt_value_result!(&listen_config.proxy_server_name,
"proxy_server_name in config is require in client mode");
let mut connect_count = 0_usize;
're_connect: loop {
let connect = Connect::new(addr).with_server_name(server_name.as_str());
let mut connection = match client.connect(connect).await {
Ok(connection) => connection,
Err(e) => {
connect_count += 1;
failure!("Connect to server failed: {}, count: {}", e, connect_count);
sleep(Duration::from_secs(match connect_count {
0..=3 => 1,
4..=10 => 2,
11..=30 => 3,
_ => 5,
})).await;
continue 're_connect;
}
};
connect_count = 0;
opt_result!(connection.keep_alive(true), "Keep alive connection failed: {}");
're_stream: loop {
let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await {
Some(time_and_stream) => time_and_stream,
None => {
information!("End client to server proxy");
return Ok(());
}
};
let time = util_time::get_current_millis();
if (time as i128 - client_stream_time as i128).abs() > 3_000 {
warning!("Connection is more than 3 second, abandon connection");
continue 're_stream;
}
let server_stream = match connection.open_bidirectional_stream().await {
Ok(stream) => stream,
Err(e) => {
failure!("Connect to server failed: {}", e);
continue 're_connect;
}
};
tokio::spawn(async move {
let conn_count = format!("{}", util_time::get_current_millis());
if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await {
failure!("Client - Server error: {}", e);
}
});
}
}
}