From 552f44ab1000165e18a72bc116d0167126e13482 Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Sun, 13 Sep 2020 10:01:25 +0800 Subject: [PATCH 1/4] chore: code style --- Cargo.lock | 12 ++++++------ src/main.rs | 38 ++++++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f86c0a..aa18ac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,9 +124,9 @@ checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" [[package]] name = "getrandom" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" dependencies = [ "cfg-if", "libc", @@ -150,9 +150,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.76" +version = "0.2.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "755456fae044e6fa1ebbbd1b3e902ae19e73097ed4ed87bb79934a867c007bc3" +checksum = "f2f96b10ec2560088a8e76961b00d47107b3a625fecb76dedb29ee7ccbf98235" [[package]] name = "rand" @@ -232,9 +232,9 @@ dependencies = [ [[package]] name = "rust_util" -version = "0.6.7" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47a20a379d025a6f0da1bc84d6284137eea58c493484bf6912144a864f53c3de" +checksum = "c261320f663e65d0869f77036c454f627e82eecbd574946e6eb55a97d3dc490c" dependencies = [ "lazy_static", "libc", diff --git a/src/main.rs b/src/main.rs index cc10b82..2d6e6fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use std::thread; use std::time::Duration; use clap::{ Arg, App }; +const LOCAL_ADDR: &str = "127.0.0.1"; const TIMEOUT: u64 = 3 * 60 * 100; //3 minutes fn main() { @@ -31,34 +32,37 @@ fn main() { .arg(Arg::with_name("allowed_list").short("A").long("allowed-list").takes_value(true).multiple(true).help("Allowed IP list, e.g. 127.0.0.1, 120.0.0.0/8")) .get_matches(); - let local_port: i32 = matches.value_of("local_port").unwrap().parse().unwrap(); - let remote_port: i32 = matches.value_of("remote_port").unwrap().parse().unwrap(); + let local_port: u16 = matches.value_of("local_port").unwrap().parse().unwrap(); + let remote_port: u16 = matches.value_of("remote_port").unwrap().parse().unwrap(); let remote_host = matches.value_of("host").unwrap(); let bind_addr = match matches.value_of("bind") { Some(addr) => addr.to_owned(), - None => "127.0.0.1".to_owned(), + None => LOCAL_ADDR.to_owned(), }; forward(&bind_addr, local_port, &remote_host, remote_port); } -fn forward(bind_addr: &str, local_port: i32, remote_host: &str, remote_port: i32) { +fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16) { let local_addr = format!("{}:{}", bind_addr, local_port); + debugging!("Listen address and port: {}", local_addr); let local = UdpSocket::bind(&local_addr).unwrap_or_else(|_| panic!("Unable to bind to {}", &local_addr)); information!("Listening on {}", local.local_addr().unwrap()); let remote_addr = format!("{}:{}", remote_host, remote_port); - let responder = local.try_clone() - .unwrap_or_else(|_| panic!("Failed to clone primary listening address socket {}", local.local_addr().unwrap())); + let responder = local.try_clone().unwrap_or_else( + |_| panic!("Failed to clone primary listening address socket {}", local.local_addr().unwrap()) + ); let (main_sender, main_receiver) = channel::<(_, Vec)>(); thread::spawn(move || { debugging!("Started new thread to deal out responses to clients"); loop { let (dest, buf) = main_receiver.recv().unwrap(); let to_send = buf.as_slice(); - responder.send_to(to_send, dest) - .unwrap_or_else(|_| panic!("Failed to forward response from upstream server to client {}", dest)); + responder.send_to(to_send, dest).unwrap_or_else( + |_| panic!("Failed to forward response from upstream server to client {}", dest) + ); } }); @@ -94,10 +98,12 @@ fn forward(bind_addr: &str, local_port: i32, remote_host: &str, remote_port: i32 //connection to 0.0.0.0 in all cases. let temp_outgoing_addr = format!("0.0.0.0:{}", 1024 + rand::random::()); debugging!("Establishing new forwarder for client {} on {}", src_addr, &temp_outgoing_addr); - let upstream_send = UdpSocket::bind(&temp_outgoing_addr) - .unwrap_or_else(|_| panic!("Failed to bind to transient address {}", &temp_outgoing_addr)); - let upstream_recv = upstream_send.try_clone() - .unwrap_or_else(|_| panic!("Failed to clone client-specific connection to upstream!")); + let upstream_send = UdpSocket::bind(&temp_outgoing_addr).unwrap_or_else( + |_| panic!("Failed to bind to transient address {}", &temp_outgoing_addr) + ); + let upstream_recv = upstream_send.try_clone().unwrap_or_else( + |_| panic!("Failed to clone client-specific connection to upstream!") + ); let mut timeouts : u64 = 0; let timed_out = Arc::new(AtomicBool::new(false)); @@ -110,8 +116,7 @@ fn forward(bind_addr: &str, local_port: i32, remote_host: &str, remote_port: i32 match upstream_recv.recv_from(&mut from_upstream) { Ok((bytes_rcvd, _)) => { let to_send = from_upstream[..bytes_rcvd].to_vec(); - local_send_queue.send((src_addr, to_send)) - .expect("Failed to queue response from upstream server for forwarding!"); + local_send_queue.send((src_addr, to_send)).expect("Failed to queue response from upstream server for forwarding!"); }, Err(_) => { if local_timed_out.load(Ordering::Relaxed) { @@ -126,8 +131,9 @@ fn forward(bind_addr: &str, local_port: i32, remote_host: &str, remote_port: i32 loop { match receiver.recv_timeout(Duration::from_millis(TIMEOUT)) { Ok(from_client) => { - upstream_send.send_to(from_client.as_slice(), &remote_addr_copy) - .unwrap_or_else(|_| panic!("Failed to forward packet from client {} to upstream server!", src_addr)); + upstream_send.send_to(from_client.as_slice(), &remote_addr_copy).unwrap_or_else( + |_| panic!("Failed to forward packet from client {} to upstream server!", src_addr) + ); timeouts = 0; //reset timeout count }, Err(_) => { From 78da427e92bb49547e6bd66e5e5a795104d34f0b Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Sun, 13 Sep 2020 10:06:56 +0800 Subject: [PATCH 2/4] chore: code style --- src/main.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2d6e6fc..0b6721c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,7 +81,7 @@ fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16 let client_id = format!("{}", src_addr); if remove_existing { - debugging!("Removing existing forwarder from map."); + debugging!("Removing existing forwarder from map: {}", client_id); client_map.remove(&client_id); } @@ -105,7 +105,7 @@ fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16 |_| panic!("Failed to clone client-specific connection to upstream!") ); - let mut timeouts : u64 = 0; + let mut timeouts: u64 = 0; let timed_out = Arc::new(AtomicBool::new(false)); let local_timed_out = timed_out.clone(); @@ -123,7 +123,7 @@ fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16 debugging!("Terminating forwarder thread for client {} due to timeout", src_addr); break; } - } + }, }; } }); @@ -143,7 +143,7 @@ fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16 timed_out.store(true, Ordering::Relaxed); break; } - } + }, }; } }); @@ -161,7 +161,7 @@ fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16 debugging!("New connection received from previously timed-out client {}", client_id); remove_existing = true; continue; - } + }, } } } From dfd53aae803ed00a784babf6d7740cf479dd1fb3 Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Sun, 13 Sep 2020 10:30:46 +0800 Subject: [PATCH 3/4] feat: add allowed ip mask group --- Cargo.lock | 4 ++-- src/main.rs | 19 ++++++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa18ac2..bbee0f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,9 +232,9 @@ dependencies = [ [[package]] name = "rust_util" -version = "0.6.12" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c261320f663e65d0869f77036c454f627e82eecbd574946e6eb55a97d3dc490c" +checksum = "190763556e2faed2ba3120c04e3051001d84dbe556d397f030def64c1ae27fd2" dependencies = [ "lazy_static", "libc", diff --git a/src/main.rs b/src/main.rs index 0b6721c..1bd7046 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use std::sync::mpsc::channel; use std::thread; use std::time::Duration; use clap::{ Arg, App }; +use rust_util::util_net::IpAddressMaskGroup; const LOCAL_ADDR: &str = "127.0.0.1"; const TIMEOUT: u64 = 3 * 60 * 100; //3 minutes @@ -39,11 +40,20 @@ fn main() { Some(addr) => addr.to_owned(), None => LOCAL_ADDR.to_owned(), }; + let allowed_ip_address_mask_list = IpAddressMaskGroup::parse( + &matches.values_of("allowed_list") + .map(|l| l.map(|i| i.to_owned()).collect::>()) + .unwrap_or_else(|| vec![]) + ); + debugging!("Allowed ip address mask list count: {}", allowed_ip_address_mask_list.ip_address_mask_group.len()); + allowed_ip_address_mask_list.ip_address_mask_group.iter().for_each(|ip| { + debugging!("- {}", ip); + }); - forward(&bind_addr, local_port, &remote_host, remote_port); + forward(&bind_addr, local_port, &remote_host, remote_port, &allowed_ip_address_mask_list); } -fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16) { +fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16, allowed_ip_address_mask_list: &IpAddressMaskGroup) { let local_addr = format!("{}:{}", bind_addr, local_port); debugging!("Listen address and port: {}", local_addr); let local = UdpSocket::bind(&local_addr).unwrap_or_else(|_| panic!("Unable to bind to {}", &local_addr)); @@ -70,7 +80,10 @@ fn forward(bind_addr: &str, local_port: u16, remote_host: &str, remote_port: u16 let mut buf = [0; 64 * 1024]; loop { let (num_bytes, src_addr) = local.recv_from(&mut buf).expect("Didn't receive data"); - // TODO check src_addr ... + if !allowed_ip_address_mask_list.is_empty_or_matches(&src_addr) { + information!("Banned source address: {}", src_addr); + continue; + } //we create a new thread for each unique client let mut remove_existing = false; From 6a755179feef72fdf44d21d12b1cf646e8bdb805 Mon Sep 17 00:00:00 2001 From: "Hatter Jiang@Pixelbook" Date: Sun, 13 Sep 2020 10:40:32 +0800 Subject: [PATCH 4/4] chore: add logger level checl --- Cargo.lock | 4 ++-- src/main.rs | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbee0f6..76fc48b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,9 +232,9 @@ dependencies = [ [[package]] name = "rust_util" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "190763556e2faed2ba3120c04e3051001d84dbe556d397f030def64c1ae27fd2" +checksum = "7222f977acb4264fb55f1aa7cf11e09c735fe961b369aef92eea670949628498" dependencies = [ "lazy_static", "libc", diff --git a/src/main.rs b/src/main.rs index 1bd7046..785b3c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use std::sync::mpsc::channel; use std::thread; use std::time::Duration; use clap::{ Arg, App }; +use rust_util::util_msg::{ is_logger_level_enabled, MessageType }; use rust_util::util_net::IpAddressMaskGroup; const LOCAL_ADDR: &str = "127.0.0.1"; @@ -46,9 +47,11 @@ fn main() { .unwrap_or_else(|| vec![]) ); debugging!("Allowed ip address mask list count: {}", allowed_ip_address_mask_list.ip_address_mask_group.len()); - allowed_ip_address_mask_list.ip_address_mask_group.iter().for_each(|ip| { - debugging!("- {}", ip); - }); + if is_logger_level_enabled(MessageType::DEBUG) { + allowed_ip_address_mask_list.ip_address_mask_group.iter().for_each(|ip| { + debugging!("- {}", ip); + }); + } forward(&bind_addr, local_port, &remote_host, remote_port, &allowed_ip_address_mask_list); }