Compare commits
5 Commits
master
...
fix_tcp_co
| Author | SHA1 | Date | |
|---|---|---|---|
|
b1a8533f1f
|
|||
|
45468be3bf
|
|||
|
fdb09cb94f
|
|||
|
3c79836afc
|
|||
|
6f03f7f6f0
|
@@ -15,10 +15,11 @@ socks4 = []
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
tokio = { version = "1.17.0", features = ["io-util", "net", "time"] }
|
tokio = { version = "1.17.0", features = ["io-util", "net", "time", "macros"] }
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio-stream = "0.1.8"
|
tokio-stream = "0.1.8"
|
||||||
|
futures = "0.3"
|
||||||
|
|
||||||
# Dependencies for examples and tests
|
# Dependencies for examples and tests
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|||||||
148
src/server.rs
148
src/server.rs
@@ -8,6 +8,7 @@ use crate::{consts, AuthenticationMethod, ReplyError, Result, SocksError};
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::io::{Error, ErrorKind};
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs};
|
use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs};
|
||||||
@@ -19,8 +20,9 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
|||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs as AsyncToSocketAddrs};
|
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs as AsyncToSocketAddrs};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tokio::try_join;
|
use tokio::{select, time, try_join};
|
||||||
use tokio_stream::Stream;
|
use tokio_stream::Stream;
|
||||||
|
use futures::future::try_join;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -140,7 +142,7 @@ impl Socks5Server {
|
|||||||
/// `Incoming` implements [`futures::stream::Stream`].
|
/// `Incoming` implements [`futures::stream::Stream`].
|
||||||
pub struct Incoming<'a>(
|
pub struct Incoming<'a>(
|
||||||
&'a Socks5Server,
|
&'a Socks5Server,
|
||||||
Option<Pin<Box<dyn Future<Output = io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>>,
|
Option<Pin<Box<dyn Future<Output=io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>>,
|
||||||
);
|
);
|
||||||
|
|
||||||
/// Iterator for each incoming stream connection
|
/// Iterator for each incoming stream connection
|
||||||
@@ -559,16 +561,16 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Socks5Socket<T> {
|
|||||||
Err(e) => match e.kind() {
|
Err(e) => match e.kind() {
|
||||||
// Match other TCP errors with ReplyError
|
// Match other TCP errors with ReplyError
|
||||||
io::ErrorKind::ConnectionRefused => {
|
io::ErrorKind::ConnectionRefused => {
|
||||||
return Err(ReplyError::ConnectionRefused.into())
|
return Err(ReplyError::ConnectionRefused.into());
|
||||||
}
|
}
|
||||||
io::ErrorKind::ConnectionAborted => {
|
io::ErrorKind::ConnectionAborted => {
|
||||||
return Err(ReplyError::ConnectionNotAllowed.into())
|
return Err(ReplyError::ConnectionNotAllowed.into());
|
||||||
}
|
}
|
||||||
io::ErrorKind::ConnectionReset => {
|
io::ErrorKind::ConnectionReset => {
|
||||||
return Err(ReplyError::ConnectionNotAllowed.into())
|
return Err(ReplyError::ConnectionNotAllowed.into());
|
||||||
}
|
}
|
||||||
io::ErrorKind::NotConnected => {
|
io::ErrorKind::NotConnected => {
|
||||||
return Err(ReplyError::NetworkUnreachable.into())
|
return Err(ReplyError::NetworkUnreachable.into());
|
||||||
}
|
}
|
||||||
_ => return Err(e.into()), // #[error("General failure")] ?
|
_ => return Err(e.into()), // #[error("General failure")] ?
|
||||||
},
|
},
|
||||||
@@ -640,17 +642,106 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Socks5Socket<T> {
|
|||||||
|
|
||||||
/// Copy data between two peers
|
/// Copy data between two peers
|
||||||
/// Using 2 different generators, because they could be different structs with same traits.
|
/// Using 2 different generators, because they could be different structs with same traits.
|
||||||
async fn transfer<I, O>(mut inbound: I, mut outbound: O) -> Result<()>
|
async fn transfer<I, O>(inbound: I, outbound: O) -> Result<()>
|
||||||
where
|
where
|
||||||
I: AsyncRead + AsyncWrite + Unpin,
|
I: AsyncRead + AsyncWrite + Unpin,
|
||||||
O: AsyncRead + AsyncWrite + Unpin,
|
O: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
|
// match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
|
||||||
Ok(res) => info!("transfer closed ({}, {})", res.0, res.1),
|
// Ok(res) => info!("transfer closed ({}, {})", res.0, res.1),
|
||||||
Err(err) => error!("transfer error: {:?}", err),
|
// Err(err) => error!("transfer error: {:?}", err),
|
||||||
};
|
// };
|
||||||
|
// Ok(())
|
||||||
|
|
||||||
Ok(())
|
// if let (Ok(ref in_peer_addr), Ok(ref in_local_addr), Ok(ref out_local_addr), Ok(ref out_peer_addr))
|
||||||
|
// = (inbound.peer_addr(), inbound.local_addr(), outbound.local_addr(), outbound.peer_addr()) {
|
||||||
|
// information!("[conn {}] New tcp connection: {} -> [{} * {}] -> {}",
|
||||||
|
// &conn_count, in_peer_addr, in_local_addr, out_local_addr, out_peer_addr);
|
||||||
|
// }
|
||||||
|
|
||||||
|
let (mut ri, mut wi) = tokio::io::split(inbound);
|
||||||
|
let (mut ro, mut wo) = tokio::io::split(outbound);
|
||||||
|
// IO copy timeout 6 HOURS
|
||||||
|
let tcp_io_copy_timeout = Duration::from_secs(6 * 3600);
|
||||||
|
let shutdown_tcp_timeout = Duration::from_secs(60);
|
||||||
|
let (client_to_server_tx, client_to_server_rx) = tokio::sync::oneshot::channel::<bool>();
|
||||||
|
let (server_to_client_tx, server_to_client_rx) = tokio::sync::oneshot::channel::<bool>();
|
||||||
|
|
||||||
|
let client_to_server = async move {
|
||||||
|
// let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ri, &mut wo));
|
||||||
|
let r = select! {
|
||||||
|
_timeout = time::sleep(tcp_io_copy_timeout) => {
|
||||||
|
error!("TCP copy client -> server timeout");
|
||||||
|
Err(Error::new(ErrorKind::TimedOut, "timeout"))
|
||||||
|
}
|
||||||
|
_tcp_break = client_to_server_rx => {
|
||||||
|
error!("TCP copy client -> server shutdown");
|
||||||
|
Err(Error::new(ErrorKind::BrokenPipe, "shutdown"))
|
||||||
|
}
|
||||||
|
data_copy_result = copy_data(&mut ri, &mut wo) => {
|
||||||
|
match data_copy_result {
|
||||||
|
Err(e) => {
|
||||||
|
error!("TCP copy client -> server error: {}", e);
|
||||||
|
Err(e)
|
||||||
|
},
|
||||||
|
Ok(r) => {
|
||||||
|
info!("TCP copy client -> server success: {} byte(s)", r);
|
||||||
|
Ok(r)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
info!("Close client to server connection");
|
||||||
|
match time::timeout(shutdown_tcp_timeout, wo.shutdown()).await {
|
||||||
|
Err(e) => warn!("TCP close client -> server timeout: {}", e),
|
||||||
|
Ok(Err(e)) => warn!("TCP close client -> server error: {}", e),
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
time::sleep(Duration::from_secs(2)).await;
|
||||||
|
let _ = server_to_client_tx.send(true);
|
||||||
|
r
|
||||||
|
};
|
||||||
|
let server_to_client = async move {
|
||||||
|
// let copy_result = time::timeout(tcp_io_copy_timeout, tokio::io::copy(&mut ro, &mut wi));
|
||||||
|
let r = select! {
|
||||||
|
_timeout = time::sleep(tcp_io_copy_timeout) => {
|
||||||
|
error!("TCP copy server -> client timeout");
|
||||||
|
Err(Error::new(ErrorKind::TimedOut, "timeout"))
|
||||||
|
}
|
||||||
|
_tcp_break = server_to_client_rx => {
|
||||||
|
error!("TCP copy server -> client shutdown");
|
||||||
|
Err(Error::new(ErrorKind::BrokenPipe, "shutdown"))
|
||||||
|
}
|
||||||
|
data_copy_result = copy_data(&mut ro, &mut wi) => {
|
||||||
|
match data_copy_result {
|
||||||
|
Err(e) => {
|
||||||
|
error!("TCP copy server -> client error: {}", e);
|
||||||
|
Err(e)
|
||||||
|
},
|
||||||
|
Ok(r) => {
|
||||||
|
info!("TCP copy server -> client success: {} byte(s)", r);
|
||||||
|
Ok(r)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
info!("Close server to client connection");
|
||||||
|
match time::timeout(shutdown_tcp_timeout, wi.shutdown()).await {
|
||||||
|
Err(e) => warn!("TCP close server -> client timeout: {}", e),
|
||||||
|
Ok(Err(e)) => warn!("TCP close server -> client error: {}", e),
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
time::sleep(Duration::from_secs(2)).await;
|
||||||
|
let _ = client_to_server_tx.send(true);
|
||||||
|
r
|
||||||
|
};
|
||||||
|
match try_join(client_to_server, server_to_client).await {
|
||||||
|
Err(e) => Err(SocksError::Io(e)),
|
||||||
|
Ok((upstream_bytes, downstream_bytes)) => {
|
||||||
|
info!("Finished, proxy-in: {} bytes, proxy-out: {} bytes", upstream_bytes, downstream_bytes);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_udp_request(inbound: &UdpSocket, outbound: &UdpSocket) -> Result<()> {
|
async fn handle_udp_request(inbound: &UdpSocket, outbound: &UdpSocket) -> Result<()> {
|
||||||
@@ -708,8 +799,8 @@ async fn transfer_udp(inbound: UdpSocket) -> Result<()> {
|
|||||||
|
|
||||||
/// Allow us to read directly from the struct
|
/// Allow us to read directly from the struct
|
||||||
impl<T> AsyncRead for Socks5Socket<T>
|
impl<T> AsyncRead for Socks5Socket<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
@@ -722,8 +813,8 @@ where
|
|||||||
|
|
||||||
/// Allow us to write directly into the struct
|
/// Allow us to write directly into the struct
|
||||||
impl<T> AsyncWrite for Socks5Socket<T>
|
impl<T> AsyncWrite for Socks5Socket<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
fn poll_write(
|
fn poll_write(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
@@ -775,6 +866,25 @@ fn new_reply(error: &ReplyError, sock_addr: SocketAddr) -> Vec<u8> {
|
|||||||
reply
|
reply
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn copy_data<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> tokio::io::Result<u64>
|
||||||
|
where R: AsyncRead + Unpin + ?Sized,
|
||||||
|
W: AsyncWrite + Unpin + ?Sized {
|
||||||
|
let mut total_copied_bytes = 0_u64;
|
||||||
|
let mut buff = vec![0; 1024 * 4];
|
||||||
|
loop {
|
||||||
|
match reader.read(&mut buff).await {
|
||||||
|
Ok(0) => return Ok(total_copied_bytes),
|
||||||
|
Ok(n) => match writer.write_all(&buff[..n]).await {
|
||||||
|
Ok(_) => {
|
||||||
|
total_copied_bytes += n as u64;
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use crate::server::Socks5Server;
|
use crate::server::Socks5Server;
|
||||||
|
|||||||
Reference in New Issue
Block a user