From baf9712df1ebfc3a5ed8504e1ab3372baf94b0ac Mon Sep 17 00:00:00 2001 From: Robert Morrison Date: Mon, 30 Sep 2024 06:43:29 -0700 Subject: [PATCH] fix: pass pool_timer to hyper_util to enable the idle cleanup task (#2434) * fix: pass pool_timer to hyper_util to enable the idle cleanup task * tests: integration test for pool idle timeout --- src/async_impl/client.rs | 2 +- tests/client.rs | 21 +++++++++++++++++++++ tests/support/server.rs | 23 +++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 9a34f3fb6..095adf4d8 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -726,8 +726,8 @@ impl ClientBuilder { } } - #[cfg(not(target_arch = "wasm32"))] builder.timer(hyper_util::rt::TokioTimer::new()); + builder.pool_timer(hyper_util::rt::TokioTimer::new()); builder.pool_idle_timeout(config.pool_idle_timeout); builder.pool_max_idle_per_host(config.pool_max_idle_per_host); connector.set_keepalive(config.tcp_keepalive); diff --git a/tests/client.rs b/tests/client.rs index 18aaf4e99..51fb9dfa0 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -572,3 +572,24 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent server.shutdown().await; } + +#[tokio::test] +async fn close_connection_after_idle_timeout() { + let mut server = server::http(move |_| async move { http::Response::default() }); + + let client = reqwest::Client::builder() + .pool_idle_timeout(std::time::Duration::from_secs(1)) + .build() + .unwrap(); + + let url = format!("http://{}", server.addr()); + + client.get(&url).send().await.unwrap(); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + assert!(server + .events() + .iter() + .any(|e| matches!(e, server::Event::ConnectionClosed))); +} diff --git a/tests/support/server.rs b/tests/support/server.rs index 43742b60e..29835ead1 100644 --- a/tests/support/server.rs +++ b/tests/support/server.rs @@ -12,13 +12,27 @@ use tokio::sync::oneshot; pub struct Server { addr: net::SocketAddr, panic_rx: std_mpsc::Receiver<()>, + events_rx: std_mpsc::Receiver, shutdown_tx: Option>, } +#[non_exhaustive] +pub enum Event { + ConnectionClosed, +} + impl Server { pub fn addr(&self) -> net::SocketAddr { self.addr } + + pub fn events(&mut self) -> Vec { + let mut events = Vec::new(); + while let Ok(event) = self.events_rx.try_recv() { + events.push(event); + } + events + } } impl Drop for Server { @@ -67,6 +81,7 @@ where let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let (panic_tx, panic_rx) = std_mpsc::channel(); + let (events_tx, events_rx) = std_mpsc::channel(); let tname = format!( "test({})-support-server", test_name, @@ -92,8 +107,10 @@ where async move { Ok::<_, Infallible>(fut.await) } }); let builder = builder.clone(); + let events_tx = events_tx.clone(); tokio::spawn(async move { let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await; + let _ = events_tx.send(Event::ConnectionClosed); }); } } @@ -105,6 +122,7 @@ where Server { addr, panic_rx, + events_rx, shutdown_tx: Some(shutdown_tx), } }) @@ -152,6 +170,7 @@ where let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let (panic_tx, panic_rx) = std_mpsc::channel(); + let (events_tx, events_rx) = std_mpsc::channel(); let tname = format!( "test({})-support-server", test_name, @@ -169,9 +188,11 @@ where Some(accepted) = endpoint.accept() => { let conn = accepted.await.expect("accepted"); let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(conn)).await.unwrap(); + let events_tx = events_tx.clone(); let func = func.clone(); tokio::spawn(async move { while let Ok(Some((req, stream))) = h3_conn.accept().await { + let events_tx = events_tx.clone(); let func = func.clone(); tokio::spawn(async move { let (mut tx, rx) = stream.split(); @@ -198,6 +219,7 @@ where } } tx.finish().await.unwrap(); + events_tx.send(Event::ConnectionClosed).unwrap(); }); } }); @@ -211,6 +233,7 @@ where Server { addr, panic_rx, + events_rx, shutdown_tx: Some(shutdown_tx), } })