From 4f78dd5d2f348532a84279be56406f7b3db7c631 Mon Sep 17 00:00:00 2001 From: jspeis Date: Tue, 24 Sep 2019 12:42:37 -0400 Subject: [PATCH 1/5] exports try_opt for reusability --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 856adf5b..faad8894 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -202,6 +202,7 @@ macro_rules! row { }; } +#[macro_export] macro_rules! try_opt { ($expr:expr) => { match $expr { From 7db22ccfe85cadd1d89d7b3dd793001dc2e23547 Mon Sep 17 00:00:00 2001 From: jspeis Date: Tue, 24 Sep 2019 12:48:57 -0400 Subject: [PATCH 2/5] adds timeout parameters for queries and query blocks --- src/types/options.rs | 26 +++++++++++++++++++++++++- src/types/query_result/mod.rs | 20 +++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/types/options.rs b/src/types/options.rs index 85eb05df..18079775 100644 --- a/src/types/options.rs +++ b/src/types/options.rs @@ -167,6 +167,12 @@ pub struct Options { /// Timeout for connection (defaults to `500 ms`) pub(crate) connection_timeout: Duration, + + /// Timeout for queries (defaults to `1800000 ms`) + pub(crate) query_timeout: Duration, + + /// Timeout for each block in a query (defaults to `180000 ms`) + pub(crate) query_block_timeout: Duration, } impl Default for Options { @@ -186,6 +192,8 @@ impl Default for Options { retry_timeout: Duration::from_secs(5), ping_timeout: Duration::from_millis(500), connection_timeout: Duration::from_millis(500), + query_timeout: Duration::from_millis(180000), + query_block_timeout: Duration::from_millis(180000), } } } @@ -289,6 +297,16 @@ impl Options { /// Timeout for connection (defaults to `500 ms`). => connection_timeout: Duration } + + property! { + /// Timeout for query (defaults to `180,000 ms`). + => query_timeout: Duration + } + + property! { + /// Timeout for each block in a query (defaults to `180,000 ms`). + => query_block_timeout: Duration + } } impl FromStr for Options { @@ -357,7 +375,13 @@ where "ping_timeout" => options.ping_timeout = parse_param(key, value, parse_duration)?, "connection_timeout" => { options.connection_timeout = parse_param(key, value, parse_duration)? - } + }, + "query_timeout" => { + options.query_timeout = parse_param(key, value, parse_duration)? + }, + "query_block_timeout" => { + options.query_block_timeout = parse_param(key, value, parse_duration)? + }, "compression" => options.compression = parse_param(key, value, parse_compression)?, _ => return Err(UrlError::UnknownParameter { param: key.into() }), }; diff --git a/src/types/query_result/mod.rs b/src/types/query_result/mod.rs index 8224a2ca..1e5e60de 100644 --- a/src/types/query_result/mod.rs +++ b/src/types/query_result/mod.rs @@ -10,6 +10,7 @@ use crate::{ Rows, }, ClientHandle, + try_opt, }; use self::{either::Either, fold_block::FoldBlock}; @@ -18,6 +19,16 @@ mod either; mod fold_block; mod stream_blocks; + +macro_rules! try_opt_stream { + ($expr:expr) => { + match $expr { + Ok(val) => val, + Err(err) => return Box::new(stream::once(Err(err))), + } + }; +} + /// Result of a query or statement execution. pub struct QueryResult { pub(crate) client: ClientHandle, @@ -65,6 +76,8 @@ impl QueryResult { /// Fetch data from table. It returns a block that contains all rows. pub fn fetch_all(self) -> BoxFuture<(ClientHandle, Block)> { + let timeout = try_opt!(self.client.context.options.get()).query_timeout; + wrap_future( self.fold_blocks(Vec::new(), |mut blocks, block| { if !block.is_empty() { @@ -72,7 +85,9 @@ impl QueryResult { } Ok(blocks) }) - .map(|(h, blocks)| (h, Block::concat(blocks.as_slice()))), + .timeout(timeout) + .map_err(Error::from) + .map(|(h, blocks)| (h, Block::concat(blocks.as_slice()))) ) } @@ -163,6 +178,7 @@ impl QueryResult { /// ``` pub fn stream_blocks(self) -> BoxStream { let query = self.query; + let timeout = try_opt_stream!(self.client.context.options.get()).query_block_timeout; self.client.wrap_stream(move |mut c| { info!("[send query] {}", query.get_sql()); @@ -180,6 +196,8 @@ impl QueryResult { context, pool, ) + .timeout(timeout) + .map_err(Error::from) }) } From 7f5ff24c4e9428186b6141107d892c4bb107081f Mon Sep 17 00:00:00 2001 From: jspeis Date: Tue, 24 Sep 2019 16:47:03 -0400 Subject: [PATCH 3/5] adds basic test case for query timeout --- src/pool/mod.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 4ce040b2..9175a868 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -467,4 +467,19 @@ mod test { run(done).unwrap(); } + + #[test] + fn test_query_timeout() { + let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=10ms"); + let pool = Pool::new(test_db_url.to_string()); + + let done = pool.get_handle() + .and_then(|c| c.query("SELECT sleep(1)").fetch_all()); + + + run(done).unwrap_err(); + + let info = pool.info(); + assert_eq!(info.ongoing, 0); + } } From a4f9ee74ebcb4952e16d6d48356cfaf4420fa333 Mon Sep 17 00:00:00 2001 From: jspeis Date: Wed, 25 Sep 2019 06:17:06 -0400 Subject: [PATCH 4/5] alters test case --- src/pool/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 9175a868..306d7c19 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -470,11 +470,11 @@ mod test { #[test] fn test_query_timeout() { - let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=10ms"); + let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=5ms"); let pool = Pool::new(test_db_url.to_string()); let done = pool.get_handle() - .and_then(|c| c.query("SELECT sleep(1)").fetch_all()); + .and_then(|c| c.query("SELECT sleep(10)").fetch_all()); run(done).unwrap_err(); From 8333ea9d87e9835b788bb2cede317438a9391f1e Mon Sep 17 00:00:00 2001 From: jspeis Date: Wed, 25 Sep 2019 06:17:21 -0400 Subject: [PATCH 5/5] improves formatting of numbers --- src/types/options.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/types/options.rs b/src/types/options.rs index 18079775..6d841840 100644 --- a/src/types/options.rs +++ b/src/types/options.rs @@ -168,10 +168,10 @@ pub struct Options { /// Timeout for connection (defaults to `500 ms`) pub(crate) connection_timeout: Duration, - /// Timeout for queries (defaults to `1800000 ms`) + /// Timeout for queries (defaults to `180,000 ms`) pub(crate) query_timeout: Duration, - /// Timeout for each block in a query (defaults to `180000 ms`) + /// Timeout for each block in a query (defaults to `180,000 ms`) pub(crate) query_block_timeout: Duration, } @@ -192,8 +192,8 @@ impl Default for Options { retry_timeout: Duration::from_secs(5), ping_timeout: Duration::from_millis(500), connection_timeout: Duration::from_millis(500), - query_timeout: Duration::from_millis(180000), - query_block_timeout: Duration::from_millis(180000), + query_timeout: Duration::from_millis(180_000), + query_block_timeout: Duration::from_millis(180_000), } } }