From 2e3aed0921cec210e629325d7ca2f66e9990d21e Mon Sep 17 00:00:00 2001 From: sorz Date: Tue, 12 Dec 2017 02:11:26 +0800 Subject: [PATCH] Add --n-parallel To connect multiple proxies in parallel --- src/cli.yml | 11 ++++- src/client.rs | 108 +++++++++++++++++++++++++++++++++++++++----------- src/main.rs | 9 +++-- 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/src/cli.yml b/src/cli.yml index bef191b..01ecc2f 100644 --- a/src/cli.yml +++ b/src/cli.yml @@ -58,7 +58,16 @@ args: long: remote-dns help: > try to obtain domain name from TLS SNI, and sent it to remote - proxy server. + proxy server. Only apply for port number 443. + - n-parallel: + long: n-parallel + value_name: N + takes_value: true + help: > + connect and send application data to N proxies in parallel, use + the first proxy that return valid data. Currently only support + TLS as application layer. Must turn on --remote-dns otherwise it + will be ignored. - log-level: long: log-level takes_value: true diff --git a/src/client.rs b/src/client.rs index eb156b4..3b56a7b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -29,7 +29,8 @@ pub struct NewClientWithData { left: TcpStream, src: SocketAddr, dest: Destination, - pending_data: Vec, + pending_data: Box<[u8]>, + is_tls: bool, list: Arc, handle: Handle, } @@ -46,7 +47,8 @@ pub struct ConnectedClient { } pub trait Connectable { - fn connect_server(self) -> Box>; + fn connect_server(self, n_parallel: usize) + -> Box>; } impl NewClient { @@ -72,18 +74,24 @@ impl NewClient { let data = read(left, vec![0u8; 768]) .map_err(|err| warn!("fail to read hello from client: {}", err)); let result = timer.timeout(data, wait).map(move |(left, data, len)| { - match tls::parse_client_hello(&data[..len]) { - Err(err) => info!("fail to parse hello: {}", err), - Ok(TlsClientHello { server_name: None, .. } ) => - debug!("not SNI found in client hello"), + let is_tls = match tls::parse_client_hello(&data[..len]) { + Err(err) => { + info!("fail to parse hello: {}", err); + false + }, + Ok(TlsClientHello { server_name: None, .. } ) => { + debug!("not SNI found in client hello"); + true + }, Ok(TlsClientHello { server_name: Some(name), .. } ) => { debug!("SNI found: {}", name); dest = (name, dest.port).into(); + true }, }; NewClientWithData { - left, src, dest, list, handle, - pending_data: data[..len].to_vec(), + left, src, dest, list, handle, is_tls, + pending_data: data[..len].to_vec().into_boxed_slice(), } }).map_err(|_| info!("no tls request received before timeout")); Box::new(result) @@ -91,7 +99,7 @@ impl NewClient { } impl Connectable for NewClient { - fn connect_server(self) + fn connect_server(self, _n_parallel: usize) -> Box> { let NewClient { left, src, dest, list, handle } = self; let infos = list.get_infos().clone(); @@ -107,27 +115,50 @@ impl Connectable for NewClient { } impl Connectable for NewClientWithData { - fn connect_server(self) + fn connect_server(self, n_parallel: usize) -> Box> { let NewClientWithData { - left, src, dest, list, handle, pending_data } = self; + left, src, dest, list, handle, pending_data, is_tls } = self; let infos = list.get_infos().clone(); - let seq = try_connect_seq(dest.clone(), infos, handle.clone()) - .and_then(move |(right, info)| { - write_all(right, pending_data) - .map(move |(right, _)| (right, info)) - .map_err(|err| warn!("fail to write: {}", err)) - }).map(move |(right, info)| { - info!("{} => {} via {}", src, dest, info.server.tag); - ConnectedClient { - left, right, src, dest, proxy: info, list, handle - } - }).map_err(|_| warn!("all proxy server down")); - Box::new(seq) + let n_parallel = if is_tls { + cmp::min(infos.len(), n_parallel) + } else { + 0 + }; + let (infos_par, infos_seq) = infos.split_at( + cmp::min(infos.len(), n_parallel)); + let conn_par = try_connect_par(dest.clone(), infos_par.to_vec(), + pending_data.clone(), handle.clone()); + let conn_seq = try_connect_seq(dest.clone(), infos_seq.to_vec(), + handle.clone()); + let conn = conn_par.then(move |result| match result { + Ok((right, info, data)) => { + let write = write_all(left, data) + .map(move |(left, _)| (left, right, info)) + .map_err(|err| warn!("fail to write client: {}", err)); + Box::new(write) as Box> + }, + Err(_) => { + let seq = conn_seq.and_then(move |(right, info)| { + write_all(right, pending_data) + .map(move |(right, _)| (left, right, info)) + .map_err(|err| warn!("fail to write: {}", err)) + }); + Box::new(seq) + }, + }); + let client = conn.map(move |(left, right, info)| { + info!("{} => {} via {}", src, dest, info.server.tag); + ConnectedClient { + left, right, src, dest, proxy: info, list, handle + } + }).map_err(|_| warn!("all proxy server down")); + Box::new(client) } } -fn try_connect_seq(dest: Destination, servers: Vec, handle: Handle) +fn try_connect_seq(dest: Destination, servers: Vec, + handle: Handle) -> Box> { let timer = Timer::default(); let try_all = stream::iter_ok(servers).for_each(move |info| { @@ -153,6 +184,35 @@ fn try_connect_seq(dest: Destination, servers: Vec, handle: Handle) Box::new(try_all) } +fn try_connect_par(dest: Destination, servers: Vec, + pending_data: Box<[u8]>, handle: Handle) + -> Box), Error=()>> { + let timer = Timer::default(); + let conns: Vec<_> = servers.into_iter().map(move |info| { + let right = info.server.connect(dest.clone(), &handle); + let wait = Duration::from_secs(5); + let tag = info.server.tag.clone(); + let data_copy = pending_data.clone(); + timer.timeout(right, wait).and_then(move |right| { + write_all(right, data_copy) + }).and_then(|(right, buf)| { + read(right, buf) + }).map(|(right, buf, len)| { + // TODO: verify server hello + (right, info, buf[..len].to_vec().into_boxed_slice()) + }).map_err(move |err| { + warn!("fail to connect {}: {}", tag, err); + }) + }).collect(); + if conns.is_empty() { + Box::new(future::err(())) + } else { + debug!("try to connect {} servers in parallel", conns.len()); + Box::new(future::select_ok(conns) + .map(|(result, _)| result)) + } +} + impl ConnectedClient { pub fn serve(self) -> Box> { let ConnectedClient { left, right, dest, diff --git a/src/main.rs b/src/main.rs index 795e48c..f26b27d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,6 +57,9 @@ fn main() { .expect("missing probe secs").parse() .expect("not a vaild probe secs"); let remote_dns = args.is_present("remote-dns"); + let n_parallel = args.value_of("n-parallel") + .map(|v| v.parse().expect("not a valid number")) + .unwrap_or(0 as usize); let servers = parse_servers(&args); if servers.len() == 0 { @@ -87,11 +90,11 @@ fn main() { sock, servers.clone(), handle.clone()); let conn = client.and_then(move |client| if remote_dns && client.dest.port == 443 { - Box::new(client.retrive_dest().and_then(|client| { - client.connect_server() + Box::new(client.retrive_dest().and_then(move |client| { + client.connect_server(n_parallel) })) } else { - client.connect_server() + client.connect_server(0) }); let serv = conn.and_then(|client| client.serve()); handle.spawn(serv);