diff --git a/src/lib.rs b/src/lib.rs index 856adf5b..faad8894 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -202,6 +202,7 @@ macro_rules! row { }; } +#[macro_export] macro_rules! try_opt { ($expr:expr) => { match $expr { diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 4ce040b2..306d7c19 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -467,4 +467,19 @@ mod test { run(done).unwrap(); } + + #[test] + fn test_query_timeout() { + let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=5ms"); + let pool = Pool::new(test_db_url.to_string()); + + let done = pool.get_handle() + .and_then(|c| c.query("SELECT sleep(10)").fetch_all()); + + + run(done).unwrap_err(); + + let info = pool.info(); + assert_eq!(info.ongoing, 0); + } } diff --git a/src/types/options.rs b/src/types/options.rs index 85eb05df..6d841840 100644 --- a/src/types/options.rs +++ b/src/types/options.rs @@ -167,6 +167,12 @@ pub struct Options { /// Timeout for connection (defaults to `500 ms`) pub(crate) connection_timeout: Duration, + + /// Timeout for queries (defaults to `180,000 ms`) + pub(crate) query_timeout: Duration, + + /// Timeout for each block in a query (defaults to `180,000 ms`) + pub(crate) query_block_timeout: Duration, } impl Default for Options { @@ -186,6 +192,8 @@ impl Default for Options { retry_timeout: Duration::from_secs(5), ping_timeout: Duration::from_millis(500), connection_timeout: Duration::from_millis(500), + query_timeout: Duration::from_millis(180_000), + query_block_timeout: Duration::from_millis(180_000), } } } @@ -289,6 +297,16 @@ impl Options { /// Timeout for connection (defaults to `500 ms`). => connection_timeout: Duration } + + property! { + /// Timeout for query (defaults to `180,000 ms`). + => query_timeout: Duration + } + + property! { + /// Timeout for each block in a query (defaults to `180,000 ms`). + => query_block_timeout: Duration + } } impl FromStr for Options { @@ -357,7 +375,13 @@ where "ping_timeout" => options.ping_timeout = parse_param(key, value, parse_duration)?, "connection_timeout" => { options.connection_timeout = parse_param(key, value, parse_duration)? - } + }, + "query_timeout" => { + options.query_timeout = parse_param(key, value, parse_duration)? + }, + "query_block_timeout" => { + options.query_block_timeout = parse_param(key, value, parse_duration)? + }, "compression" => options.compression = parse_param(key, value, parse_compression)?, _ => return Err(UrlError::UnknownParameter { param: key.into() }), }; diff --git a/src/types/query_result/mod.rs b/src/types/query_result/mod.rs index 8224a2ca..1e5e60de 100644 --- a/src/types/query_result/mod.rs +++ b/src/types/query_result/mod.rs @@ -10,6 +10,7 @@ use crate::{ Rows, }, ClientHandle, + try_opt, }; use self::{either::Either, fold_block::FoldBlock}; @@ -18,6 +19,16 @@ mod either; mod fold_block; mod stream_blocks; + +macro_rules! try_opt_stream { + ($expr:expr) => { + match $expr { + Ok(val) => val, + Err(err) => return Box::new(stream::once(Err(err))), + } + }; +} + /// Result of a query or statement execution. pub struct QueryResult { pub(crate) client: ClientHandle, @@ -65,6 +76,8 @@ impl QueryResult { /// Fetch data from table. It returns a block that contains all rows. pub fn fetch_all(self) -> BoxFuture<(ClientHandle, Block)> { + let timeout = try_opt!(self.client.context.options.get()).query_timeout; + wrap_future( self.fold_blocks(Vec::new(), |mut blocks, block| { if !block.is_empty() { @@ -72,7 +85,9 @@ impl QueryResult { } Ok(blocks) }) - .map(|(h, blocks)| (h, Block::concat(blocks.as_slice()))), + .timeout(timeout) + .map_err(Error::from) + .map(|(h, blocks)| (h, Block::concat(blocks.as_slice()))) ) } @@ -163,6 +178,7 @@ impl QueryResult { /// ``` pub fn stream_blocks(self) -> BoxStream { let query = self.query; + let timeout = try_opt_stream!(self.client.context.options.get()).query_block_timeout; self.client.wrap_stream(move |mut c| { info!("[send query] {}", query.get_sql()); @@ -180,6 +196,8 @@ impl QueryResult { context, pool, ) + .timeout(timeout) + .map_err(Error::from) }) }