use std::fs; use std::net::SocketAddr; use rust_util::{util_time, XResult}; use s2n_quic::{Client, Connection}; use s2n_quic::client::Connect; use tokio::net::TcpListener; use tokio::sync::mpsc::channel; use crate::config::ListenConfig; use crate::io_util; pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let (inbound_stream_channel_sender, mut inbound_stream_channel_receiver) = channel(16); let listener = match TcpListener::bind(&listen_config.listen).await { Ok(listener) => listener, Err(e) => return simple_error!("Listen: {} failed: {}", listen_config.listen, e), }; information!("Start listen: {}", &listen_config.listen); tokio::spawn(async move { while let Ok((inbound, _)) = listener.accept().await { information!("Receive connection: {}", inbound.peer_addr().map(|addr| format!("{}", addr)).unwrap_or_else(|_| "n/a".to_string())); // if is_in_peer_addr_matches(&inbound, &allow_ips, sender_tx.clone()) { // } else { // } if let Err(e) = inbound_stream_channel_sender.send((util_time::get_current_millis(), inbound)).await { failure!("Send tcp stream to channel failed: {}", e); } } }); information!("Start connect to server, proxy address: {}, with server name: {:?}", listen_config.proxy_address, listen_config.proxy_server_name); let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); let client = Client::builder() .with_tls(cert_pem.as_str())? .with_io("0.0.0.0:0")? .start()?; let addr: SocketAddr = listen_config.proxy_address.parse()?; let server_name = opt_value_result!(&listen_config.proxy_server_name, "proxy_server_name in config is require in client mode"); let mut connection_opt: Option = None; loop { let (client_stream_time, client_stream) = match inbound_stream_channel_receiver.recv().await { Some(time_and_stream) => time_and_stream, None => { information!("End client to server proxy"); return Ok(()); } }; information!("Get connection: {} - {}", client_stream_time, client_stream.peer_addr().unwrap()); let time = util_time::get_current_millis(); if (time as i128 - client_stream_time as i128).abs() > 3_000 { warning!("Connection is more than 3 seconds, abandon connection"); continue; } if let None = connection_opt { let connect = Connect::new(addr).with_server_name(server_name.as_str()); let mut connection = match client.connect(connect).await { Ok(connection) => connection, Err(e) => { failure!("Connect to server failed: {}", e); continue; } }; connection.keep_alive(true).ok(); connection_opt = Some(connection); } let connection = connection_opt.as_mut().unwrap(); let server_stream = match connection.open_bidirectional_stream().await { Ok(stream) => stream, Err(e) => { failure!("Open stream in connection to server failed: {}", e); connection_opt = None; continue; } }; let connection_id = connection.id(); tokio::spawn(async move { let conn_count = format!("{}-{}-{}", util_time::get_current_millis(), connection_id, server_stream.id()); if let Err(e) = io_util::transfer_for_client_to_server(client_stream, server_stream, conn_count).await { failure!("Client - Server error: {}", e); } }); } }