feat: tcp data copy

This commit is contained in:
2022-06-19 11:14:33 +08:00
parent 45468be3bf
commit b1a8533f1f
2 changed files with 64 additions and 24 deletions

View File

@@ -15,7 +15,7 @@ socks4 = []
[dependencies] [dependencies]
log = "0.4" log = "0.4"
tokio = { version = "1.17.0", features = ["io-util", "net", "time"] } tokio = { version = "1.17.0", features = ["io-util", "net", "time", "macros"] }
anyhow = "1.0" anyhow = "1.0"
thiserror = "1.0" thiserror = "1.0"
tokio-stream = "0.1.8" tokio-stream = "0.1.8"

View File

@@ -8,6 +8,7 @@ use crate::{consts, AuthenticationMethod, ReplyError, Result, SocksError};
use anyhow::Context; use anyhow::Context;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::io::{Error, ErrorKind};
use std::net::IpAddr; use std::net::IpAddr;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs};
@@ -19,7 +20,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs as AsyncToSocketAddrs}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs as AsyncToSocketAddrs};
use tokio::time::timeout; use tokio::time::timeout;
use tokio::{time, try_join}; use tokio::{select, time, try_join};
use tokio_stream::Stream; use tokio_stream::Stream;
use futures::future::try_join; use futures::future::try_join;
@@ -663,37 +664,76 @@ async fn transfer<I, O>(inbound: I, outbound: O) -> Result<()>
// IO copy timeout 6 HOURS // IO copy timeout 6 HOURS
let tcp_io_copy_timeout = Duration::from_secs(6 * 3600); let tcp_io_copy_timeout = Duration::from_secs(6 * 3600);
let shutdown_tcp_timeout = Duration::from_secs(60); let shutdown_tcp_timeout = Duration::from_secs(60);
let (client_to_server_tx, client_to_server_rx) = tokio::sync::oneshot::channel::<bool>();
let (server_to_client_tx, server_to_client_rx) = tokio::sync::oneshot::channel::<bool>();
let client_to_server = async move { let client_to_server = async move {
// let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ri, &mut wo)); // let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ri, &mut wo));
let copy_result = time::timeout( let r = select! {
tcp_io_copy_timeout, _timeout = time::sleep(tcp_io_copy_timeout) => {
copy_data(&mut ri, &mut wo), error!("TCP copy client -> server timeout");
); Err(Error::new(ErrorKind::TimedOut, "timeout"))
let r = copy_result.await;
time::timeout(shutdown_tcp_timeout, wo.shutdown()).await.ok();
match r {
Err(e) => {
error!("TCP copy timeout: {}", e);
Err(e.into())
} }
Ok(r) => r, _tcp_break = client_to_server_rx => {
error!("TCP copy client -> server shutdown");
Err(Error::new(ErrorKind::BrokenPipe, "shutdown"))
}
data_copy_result = copy_data(&mut ri, &mut wo) => {
match data_copy_result {
Err(e) => {
error!("TCP copy client -> server error: {}", e);
Err(e)
},
Ok(r) => {
info!("TCP copy client -> server success: {} byte(s)", r);
Ok(r)
},
}
}
};
info!("Close client to server connection");
match time::timeout(shutdown_tcp_timeout, wo.shutdown()).await {
Err(e) => warn!("TCP close client -> server timeout: {}", e),
Ok(Err(e)) => warn!("TCP close client -> server error: {}", e),
_ => {},
} }
time::sleep(Duration::from_secs(2)).await;
let _ = server_to_client_tx.send(true);
r
}; };
let server_to_client = async move { let server_to_client = async move {
// let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ro, &mut wi)); // let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ro, &mut wi));
let copy_result = time::timeout( let r = select! {
tcp_io_copy_timeout, _timeout = time::sleep(tcp_io_copy_timeout) => {
copy_data(&mut ro, &mut wi), error!("TCP copy server -> client timeout");
); Err(Error::new(ErrorKind::TimedOut, "timeout"))
let r = copy_result.await;
time::timeout(shutdown_tcp_timeout, wi.shutdown()).await.ok();
match r {
Err(e) => {
error!("TCP copy timeout: {}", e);
Err(e.into())
} }
Ok(r) => r, _tcp_break = server_to_client_rx => {
error!("TCP copy server -> client shutdown");
Err(Error::new(ErrorKind::BrokenPipe, "shutdown"))
}
data_copy_result = copy_data(&mut ro, &mut wi) => {
match data_copy_result {
Err(e) => {
error!("TCP copy server -> client error: {}", e);
Err(e)
},
Ok(r) => {
info!("TCP copy server -> client success: {} byte(s)", r);
Ok(r)
},
}
}
};
info!("Close server to client connection");
match time::timeout(shutdown_tcp_timeout, wi.shutdown()).await {
Err(e) => warn!("TCP close server -> client timeout: {}", e),
Ok(Err(e)) => warn!("TCP close server -> client error: {}", e),
_ => {},
} }
time::sleep(Duration::from_secs(2)).await;
let _ = client_to_server_tx.send(true);
r
}; };
match try_join(client_to_server, server_to_client).await { match try_join(client_to_server, server_to_client).await {
Err(e) => Err(SocksError::Io(e)), Err(e) => Err(SocksError::Io(e)),