Compare commits

5 Commits

Author SHA1 Message Date
b1a8533f1f feat: tcp data copy 2022-06-19 11:14:33 +08:00
45468be3bf feat: add shutdown timeout 2022-06-18 13:10:50 +08:00
fdb09cb94f chore: fix comment 2022-06-11 16:34:15 +08:00
3c79836afc feat: update fast socks 2022-06-10 22:43:50 +08:00
6f03f7f6f0 fix: tcp not dropped 2022-06-09 01:29:17 +08:00
2 changed files with 131 additions and 20 deletions

View File

@@ -15,10 +15,11 @@ socks4 = []
[dependencies] [dependencies]
log = "0.4" log = "0.4"
tokio = { version = "1.17.0", features = ["io-util", "net", "time"] } tokio = { version = "1.17.0", features = ["io-util", "net", "time", "macros"] }
anyhow = "1.0" anyhow = "1.0"
thiserror = "1.0" thiserror = "1.0"
tokio-stream = "0.1.8" tokio-stream = "0.1.8"
futures = "0.3"
# Dependencies for examples and tests # Dependencies for examples and tests
[dev-dependencies] [dev-dependencies]

View File

@@ -8,6 +8,7 @@ use crate::{consts, AuthenticationMethod, ReplyError, Result, SocksError};
use anyhow::Context; use anyhow::Context;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::io::{Error, ErrorKind};
use std::net::IpAddr; use std::net::IpAddr;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs};
@@ -19,8 +20,9 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs as AsyncToSocketAddrs}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs as AsyncToSocketAddrs};
use tokio::time::timeout; use tokio::time::timeout;
use tokio::try_join; use tokio::{select, time, try_join};
use tokio_stream::Stream; use tokio_stream::Stream;
use futures::future::try_join;
#[derive(Clone)] #[derive(Clone)]
pub struct Config { pub struct Config {
@@ -559,16 +561,16 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Socks5Socket<T> {
Err(e) => match e.kind() { Err(e) => match e.kind() {
// Match other TCP errors with ReplyError // Match other TCP errors with ReplyError
io::ErrorKind::ConnectionRefused => { io::ErrorKind::ConnectionRefused => {
return Err(ReplyError::ConnectionRefused.into()) return Err(ReplyError::ConnectionRefused.into());
} }
io::ErrorKind::ConnectionAborted => { io::ErrorKind::ConnectionAborted => {
return Err(ReplyError::ConnectionNotAllowed.into()) return Err(ReplyError::ConnectionNotAllowed.into());
} }
io::ErrorKind::ConnectionReset => { io::ErrorKind::ConnectionReset => {
return Err(ReplyError::ConnectionNotAllowed.into()) return Err(ReplyError::ConnectionNotAllowed.into());
} }
io::ErrorKind::NotConnected => { io::ErrorKind::NotConnected => {
return Err(ReplyError::NetworkUnreachable.into()) return Err(ReplyError::NetworkUnreachable.into());
} }
_ => return Err(e.into()), // #[error("General failure")] ? _ => return Err(e.into()), // #[error("General failure")] ?
}, },
@@ -640,18 +642,107 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Socks5Socket<T> {
/// Copy data between two peers /// Copy data between two peers
/// Using 2 different generators, because they could be different structs with same traits. /// Using 2 different generators, because they could be different structs with same traits.
async fn transfer<I, O>(mut inbound: I, mut outbound: O) -> Result<()> async fn transfer<I, O>(inbound: I, outbound: O) -> Result<()>
where where
I: AsyncRead + AsyncWrite + Unpin, I: AsyncRead + AsyncWrite + Unpin,
O: AsyncRead + AsyncWrite + Unpin, O: AsyncRead + AsyncWrite + Unpin,
{ {
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await { // match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(res) => info!("transfer closed ({}, {})", res.0, res.1), // Ok(res) => info!("transfer closed ({}, {})", res.0, res.1),
Err(err) => error!("transfer error: {:?}", err), // Err(err) => error!("transfer error: {:?}", err),
}; // };
// Ok(())
// if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr))
// = (inbound.peer_addr(), inbound.local_addr(), outbound.local_addr(), outbound.peer_addr()) {
// information!("[conn {}] New tcp connection: {} -> [{} * {}] -> {}",
// &conn_count, in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
// }
let (mut ri, mut wi) = tokio::io::split(inbound);
let (mut ro, mut wo) = tokio::io::split(outbound);
// IO copy timeout 6 HOURS
let tcp_io_copy_timeout = Duration::from_secs(6 * 3600);
let shutdown_tcp_timeout = Duration::from_secs(60);
let (client_to_server_tx, client_to_server_rx) = tokio::sync::oneshot::channel::<bool>();
let (server_to_client_tx, server_to_client_rx) = tokio::sync::oneshot::channel::<bool>();
let client_to_server = async move {
// let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ri, &mut wo));
let r = select! {
_timeout = time::sleep(tcp_io_copy_timeout) => {
error!("TCP copy client -> server timeout");
Err(Error::new(ErrorKind::TimedOut, "timeout"))
}
_tcp_break = client_to_server_rx => {
error!("TCP copy client -> server shutdown");
Err(Error::new(ErrorKind::BrokenPipe, "shutdown"))
}
data_copy_result = copy_data(&mut ri, &mut wo) => {
match data_copy_result {
Err(e) => {
error!("TCP copy client -> server error: {}", e);
Err(e)
},
Ok(r) => {
info!("TCP copy client -> server success: {} byte(s)", r);
Ok(r)
},
}
}
};
info!("Close client to server connection");
match time::timeout(shutdown_tcp_timeout, wo.shutdown()).await {
Err(e) => warn!("TCP close client -> server timeout: {}", e),
Ok(Err(e)) => warn!("TCP close client -> server error: {}", e),
_ => {},
}
time::sleep(Duration::from_secs(2)).await;
let _ = server_to_client_tx.send(true);
r
};
let server_to_client = async move {
// let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ro, &mut wi));
let r = select! {
_timeout = time::sleep(tcp_io_copy_timeout) => {
error!("TCP copy server -> client timeout");
Err(Error::new(ErrorKind::TimedOut, "timeout"))
}
_tcp_break = server_to_client_rx => {
error!("TCP copy server -> client shutdown");
Err(Error::new(ErrorKind::BrokenPipe, "shutdown"))
}
data_copy_result = copy_data(&mut ro, &mut wi) => {
match data_copy_result {
Err(e) => {
error!("TCP copy server -> client error: {}", e);
Err(e)
},
Ok(r) => {
info!("TCP copy server -> client success: {} byte(s)", r);
Ok(r)
},
}
}
};
info!("Close server to client connection");
match time::timeout(shutdown_tcp_timeout, wi.shutdown()).await {
Err(e) => warn!("TCP close server -> client timeout: {}", e),
Ok(Err(e)) => warn!("TCP close server -> client error: {}", e),
_ => {},
}
time::sleep(Duration::from_secs(2)).await;
let _ = client_to_server_tx.send(true);
r
};
match try_join(client_to_server, server_to_client).await {
Err(e) => Err(SocksError::Io(e)),
Ok((upstream_bytes, downstream_bytes)) => {
info!("Finished, proxy-in: {} bytes, proxy-out: {} bytes", upstream_bytes, downstream_bytes);
Ok(()) Ok(())
} }
}
}
async fn handle_udp_request(inbound: &UdpSocket, outbound: &UdpSocket) -> Result<()> { async fn handle_udp_request(inbound: &UdpSocket, outbound: &UdpSocket) -> Result<()> {
let mut buf = vec![0u8; 0x10000]; let mut buf = vec![0u8; 0x10000];
@@ -775,6 +866,25 @@ fn new_reply(error: &ReplyError, sock_addr: SocketAddr) -> Vec<u8> {
reply reply
} }
async fn copy_data<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> tokio::io::Result<u64>
where R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized {
let mut total_copied_bytes = 0_u64;
let mut buff = vec![0; 1024 * 4];
loop {
match reader.read(&mut buff).await {
Ok(0) => return Ok(total_copied_bytes),
Ok(n) => match writer.write_all(&buff[..n]).await {
Ok(_) => {
total_copied_bytes += n as u64;
}
Err(e) => return Err(e),
}
Err(e) => return Err(e),
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::server::Socks5Server; use crate::server::Socks5Server;