Skip to content

Commit

Permalink
Upgrade to Tokio 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sorz committed Dec 24, 2020
1 parent 891690d commit ba14064
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 39 deletions.
70 changes: 49 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ categories = ["command-line-utilities"]

[dependencies]
rand = "0.8"
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
net2 = "0.2"
clap = { version = "2", features = ["yaml", "color"] }
log = "0.4"
Expand All @@ -22,7 +23,7 @@ serde_json = "1.0"
serde_derive = "1.0"
serde_with = "1.6"
rust-ini = "0.16"
hyper = { git = "https://github.com/hyperium/hyper", rev = "3b3077d", optional = true, features = [
hyper = { version = "0.14", optional = true, features = [
"http1",
"server",
"stream",
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client;
pub mod monitor;
pub mod proxy;
pub mod stream;
#[cfg(all(feature = "systemd", target_os = "linux"))]
pub mod systemd;
#[cfg(target_os = "linux")]
Expand Down
11 changes: 6 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use moproxy::{
client::{Connectable, NewClient},
monitor::{Monitor, ServerList},
proxy::{ProxyProto, ProxyServer, UserPassAuthCredential},
stream::{TcpListenerStream, UnixListenerStream},
};

trait FromOptionStr<E, T: FromStr<Err = E>> {
Expand Down Expand Up @@ -144,7 +145,7 @@ async fn main() {
if http_addr.starts_with('/') {
let sock = web::AutoRemoveFile::new(&http_addr);
let listener = UnixListener::bind(&sock).expect("fail to bind web server");
let serv = web::run_server(listener, monitor.clone());
let serv = web::run_server(UnixListenerStream(listener), monitor.clone());
tokio::spawn(serv);
sock_file = Some(sock);
}
Expand All @@ -153,7 +154,7 @@ async fn main() {
let listener = TcpListener::bind(&http_addr)
.await
.expect("fail to bind web server");
let serv = web::run_server(listener, monitor.clone());
let serv = web::run_server(TcpListenerStream(listener), monitor.clone());
tokio::spawn(serv);
}
}
Expand All @@ -171,7 +172,7 @@ async fn main() {
let mut signals = signal(SignalKind::hangup()).expect("cannot catch signal");
#[cfg(unix)]
tokio::spawn(async move {
while signals.next().await.is_some() {
while signals.recv().await.is_some() {
#[cfg(all(feature = "systemd", target_os = "linux"))]
systemd::notify_realoding();

Expand Down Expand Up @@ -225,7 +226,7 @@ async fn main() {
check tcp_allowed_congestion_control?",
);
}
listeners.push(listener);
listeners.push(TcpListenerStream(listener));
}

// Watchdog
Expand Down Expand Up @@ -429,7 +430,7 @@ impl ServerListCfg {
let credential =
match (props.get("http username"), props.get("http password")) {
(None, None) => None,
(Some(user), _) if user.contains(":") => {
(Some(user), _) if user.contains(':') => {
return Err("semicolon (:) in http username")
}
(user, pass) => Some(UserPassAuthCredential::new(
Expand Down
2 changes: 1 addition & 1 deletion src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ async fn alive_test(server: &ProxyServer) -> io::Result<Duration> {
let result = timeout(server.max_wait(), async {
let mut stream = server.connect(&test_dns, Some(request)).await?;
stream.read_exact(&mut buf).await?;
stream.shutdown(Shutdown::Both)
stream.into_std()?.shutdown(Shutdown::Both)
})
.await;

Expand Down
9 changes: 5 additions & 4 deletions src/proxy/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
cmp, fmt,
future::Future,
io,
net::Shutdown,
ops::Neg,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -224,9 +223,11 @@ impl BiPipe {

// flush and does half close if seen eof
if reader.read_eof {
try_poll!(Pin::new(&mut writer.stream).poll_flush(cx));
if let Err(err) = writer.stream.shutdown(Shutdown::Write) {
debug!("fail to shutdown: {}", err);
// shutdown implies flush
match Pin::new(&mut writer.stream).poll_shutdown(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => debug!("fail to shutdown: {}", err),
}
reader.all_done = true;
return Poll::Ready(Ok(()));
Expand Down
27 changes: 27 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use futures_core::{ready, Stream};
use std::{
io::Result,
task::{Context, Poll},
};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};

macro_rules! impl_stream {
($name:ident : $listener:ty => $stream:ty) => {
pub struct $name(pub $listener);

impl Stream for $name {
type Item = Result<$stream>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let (stream, _) = ready!(self.0.poll_accept(cx))?;
Poll::Ready(Some(Ok(stream)))
}
}
};
}

impl_stream!(TcpListenerStream: TcpListener => TcpStream);
impl_stream!(UnixListenerStream: UnixListener => UnixStream);
7 changes: 1 addition & 6 deletions src/web/rich.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ impl ResourceBundle {
}

pub fn get(&self, path: &str) -> Option<(&'static str, Vec<u8>)> {
let name = if path.starts_with('/') {
&path[1..]
} else {
&path
};

let name = path.strip_prefix('/').unwrap_or(path);
let mut zip = self.zip.lock();
let mut file = zip.by_name(name).ok()?;
if !file.is_file() {
Expand Down

0 comments on commit ba14064

Please sign in to comment.