Skip to content

Commit

Permalink
Add --n-parallel
Browse files Browse the repository at this point in the history
To connect multiple proxies in parallel
  • Loading branch information
sorz committed Dec 11, 2017
1 parent 30d2ec9 commit 2e3aed0
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 28 deletions.
11 changes: 10 additions & 1 deletion src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ args:
long: remote-dns
help: >
try to obtain domain name from TLS SNI, and sent it to remote
proxy server.
proxy server. Only apply for port number 443.
- n-parallel:
long: n-parallel
value_name: N
takes_value: true
help: >
connect and send application data to N proxies in parallel, use
the first proxy that return valid data. Currently only support
TLS as application layer. Must turn on --remote-dns otherwise it
will be ignored.
- log-level:
long: log-level
takes_value: true
Expand Down
108 changes: 84 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ pub struct NewClientWithData {
left: TcpStream,
src: SocketAddr,
dest: Destination,
pending_data: Vec<u8>,
pending_data: Box<[u8]>,
is_tls: bool,
list: Arc<ServerList>,
handle: Handle,
}
Expand All @@ -46,7 +47,8 @@ pub struct ConnectedClient {
}

pub trait Connectable {
fn connect_server(self) -> Box<Future<Item=ConnectedClient, Error=()>>;
fn connect_server(self, n_parallel: usize)
-> Box<Future<Item=ConnectedClient, Error=()>>;
}

impl NewClient {
Expand All @@ -72,26 +74,32 @@ impl NewClient {
let data = read(left, vec![0u8; 768])
.map_err(|err| warn!("fail to read hello from client: {}", err));
let result = timer.timeout(data, wait).map(move |(left, data, len)| {
match tls::parse_client_hello(&data[..len]) {
Err(err) => info!("fail to parse hello: {}", err),
Ok(TlsClientHello { server_name: None, .. } ) =>
debug!("not SNI found in client hello"),
let is_tls = match tls::parse_client_hello(&data[..len]) {
Err(err) => {
info!("fail to parse hello: {}", err);
false
},
Ok(TlsClientHello { server_name: None, .. } ) => {
debug!("not SNI found in client hello");
true
},
Ok(TlsClientHello { server_name: Some(name), .. } ) => {
debug!("SNI found: {}", name);
dest = (name, dest.port).into();
true
},
};
NewClientWithData {
left, src, dest, list, handle,
pending_data: data[..len].to_vec(),
left, src, dest, list, handle, is_tls,
pending_data: data[..len].to_vec().into_boxed_slice(),
}
}).map_err(|_| info!("no tls request received before timeout"));
Box::new(result)
}
}

impl Connectable for NewClient {
fn connect_server(self)
fn connect_server(self, _n_parallel: usize)
-> Box<Future<Item=ConnectedClient, Error=()>> {
let NewClient { left, src, dest, list, handle } = self;
let infos = list.get_infos().clone();
Expand All @@ -107,27 +115,50 @@ impl Connectable for NewClient {
}

impl Connectable for NewClientWithData {
fn connect_server(self)
fn connect_server(self, n_parallel: usize)
-> Box<Future<Item=ConnectedClient, Error=()>> {
let NewClientWithData {
left, src, dest, list, handle, pending_data } = self;
left, src, dest, list, handle, pending_data, is_tls } = self;
let infos = list.get_infos().clone();
let seq = try_connect_seq(dest.clone(), infos, handle.clone())
.and_then(move |(right, info)| {
write_all(right, pending_data)
.map(move |(right, _)| (right, info))
.map_err(|err| warn!("fail to write: {}", err))
}).map(move |(right, info)| {
info!("{} => {} via {}", src, dest, info.server.tag);
ConnectedClient {
left, right, src, dest, proxy: info, list, handle
}
}).map_err(|_| warn!("all proxy server down"));
Box::new(seq)
let n_parallel = if is_tls {
cmp::min(infos.len(), n_parallel)
} else {
0
};
let (infos_par, infos_seq) = infos.split_at(
cmp::min(infos.len(), n_parallel));
let conn_par = try_connect_par(dest.clone(), infos_par.to_vec(),
pending_data.clone(), handle.clone());
let conn_seq = try_connect_seq(dest.clone(), infos_seq.to_vec(),
handle.clone());
let conn = conn_par.then(move |result| match result {
Ok((right, info, data)) => {
let write = write_all(left, data)
.map(move |(left, _)| (left, right, info))
.map_err(|err| warn!("fail to write client: {}", err));
Box::new(write) as Box<Future<Item=_, Error=()>>
},
Err(_) => {
let seq = conn_seq.and_then(move |(right, info)| {
write_all(right, pending_data)
.map(move |(right, _)| (left, right, info))
.map_err(|err| warn!("fail to write: {}", err))
});
Box::new(seq)
},
});
let client = conn.map(move |(left, right, info)| {
info!("{} => {} via {}", src, dest, info.server.tag);
ConnectedClient {
left, right, src, dest, proxy: info, list, handle
}
}).map_err(|_| warn!("all proxy server down"));
Box::new(client)
}
}

fn try_connect_seq(dest: Destination, servers: Vec<ServerInfo>, handle: Handle)
fn try_connect_seq(dest: Destination, servers: Vec<ServerInfo>,
handle: Handle)
-> Box<Future<Item=(TcpStream, ServerInfo), Error=()>> {
let timer = Timer::default();
let try_all = stream::iter_ok(servers).for_each(move |info| {
Expand All @@ -153,6 +184,35 @@ fn try_connect_seq(dest: Destination, servers: Vec<ServerInfo>, handle: Handle)
Box::new(try_all)
}

fn try_connect_par(dest: Destination, servers: Vec<ServerInfo>,
pending_data: Box<[u8]>, handle: Handle)
-> Box<Future<Item=(TcpStream, ServerInfo, Box<[u8]>), Error=()>> {
let timer = Timer::default();
let conns: Vec<_> = servers.into_iter().map(move |info| {
let right = info.server.connect(dest.clone(), &handle);
let wait = Duration::from_secs(5);
let tag = info.server.tag.clone();
let data_copy = pending_data.clone();
timer.timeout(right, wait).and_then(move |right| {
write_all(right, data_copy)
}).and_then(|(right, buf)| {
read(right, buf)
}).map(|(right, buf, len)| {
// TODO: verify server hello
(right, info, buf[..len].to_vec().into_boxed_slice())
}).map_err(move |err| {
warn!("fail to connect {}: {}", tag, err);
})
}).collect();
if conns.is_empty() {
Box::new(future::err(()))
} else {
debug!("try to connect {} servers in parallel", conns.len());
Box::new(future::select_ok(conns)
.map(|(result, _)| result))
}
}

impl ConnectedClient {
pub fn serve(self) -> Box<Future<Item=(), Error=()>> {
let ConnectedClient { left, right, dest,
Expand Down
9 changes: 6 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ fn main() {
.expect("missing probe secs").parse()
.expect("not a vaild probe secs");
let remote_dns = args.is_present("remote-dns");
let n_parallel = args.value_of("n-parallel")
.map(|v| v.parse().expect("not a valid number"))
.unwrap_or(0 as usize);

let servers = parse_servers(&args);
if servers.len() == 0 {
Expand Down Expand Up @@ -87,11 +90,11 @@ fn main() {
sock, servers.clone(), handle.clone());
let conn = client.and_then(move |client|
if remote_dns && client.dest.port == 443 {
Box::new(client.retrive_dest().and_then(|client| {
client.connect_server()
Box::new(client.retrive_dest().and_then(move |client| {
client.connect_server(n_parallel)
}))
} else {
client.connect_server()
client.connect_server(0)
});
let serv = conn.and_then(|client| client.serve());
handle.spawn(serv);
Expand Down

0 comments on commit 2e3aed0

Please sign in to comment.