use std::fs; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use futures_util::StreamExt; use quinn::{Endpoint, ServerConfig}; use rust_util::XResult; use tokio::time::sleep; use crate::config::ListenConfig; use crate::io_util; pub async fn run(listen_config: &ListenConfig) -> XResult<()> { let cert_pem = opt_result!(fs::read_to_string(&listen_config.cert_pem_file), "Read cert pem file: {}, failed: {}", &listen_config.cert_pem_file); let key_pem_file = opt_value_result!(&listen_config.key_pem_file, "key_pem_file in config is required in server mode"); let key_pem = opt_result!(fs::read_to_string(&key_pem_file), "Read key pem file :{}, failed: {}", &key_pem_file); let cert_bytes = opt_result!(pem::parse(&cert_pem), "Parse cert pem failed: {}").contents; let key_bytes = opt_result!(pem::parse(&key_pem), "Parse key pem failed: {}").contents; let priv_key = rustls::PrivateKey(key_bytes); let cert_chain = vec![rustls::Certificate(cert_bytes)]; let mut server_config = opt_result!(ServerConfig::with_single_cert(cert_chain, priv_key), "Create server config failed: {}"); Arc::get_mut(&mut server_config.transport) .unwrap() .max_concurrent_uni_streams(0_u8.into()); information!("Listen: {}", &listen_config.listen); let listen_addr: SocketAddr = opt_result!(listen_config.listen.parse(), "Parse listen address: {} failed: {}", &listen_config.listen); let (_endpoint, mut incoming) = opt_result!(Endpoint::server(server_config, listen_addr), "Listen server failed: {}"); let proxy_address = listen_config.proxy_address.clone(); loop { let connection = match incoming.next().await { Some(connection) => connection, None => { warning!("Create connection is None"); sleep(Duration::from_secs(3)).await; continue; } }; let connection = match connection.await { Ok(connection) => connection, Err(e) => { warning!("Create connection failed: {}", e); sleep(Duration::from_secs(3)).await; continue; } }; let quinn::NewConnection { connection, mut bi_streams, .. } = connection; let proxy_address = proxy_address.clone(); tokio::spawn(async move { information!("Connection accepted from {}", connection.remote_address()); let remote_addr = format!("{}", connection.remote_address()); let local_addr = connection.local_ip().map(|ip| format!("{}", ip)).unwrap_or_else(|| "".to_string()); loop { match bi_streams.next().await { None => { information!("Connection ended"); break; } Some(Err(e)) => { information!("Connection ended: {}", e); break; } Some(Ok((send, recv))) => { let remote_addr = remote_addr.clone(); let local_addr = local_addr.clone(); let proxy_address = proxy_address.clone(); tokio::spawn(async move { information!("Stream opened from {:?}", &remote_addr); let conn_count = format!("{}", rust_util::util_time::get_current_millis()); if let Err(e) = io_util::transfer_for_server_to_remote(recv, send, &remote_addr, &local_addr, proxy_address, conn_count).await { failure!("Server - Client error: {}", e); } }); } } } }); } }