Skip to content

Commit

Permalink
Merge pull request suharev7#50 from jspeis/master
Browse files Browse the repository at this point in the history
Adding client side timeout options for queries
  • Loading branch information
suharev7 authored Sep 25, 2019
2 parents 324f9cd + 8333ea9 commit 96577d4
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ macro_rules! row {
};
}

#[macro_export]
macro_rules! try_opt {
($expr:expr) => {
match $expr {
Expand Down
15 changes: 15 additions & 0 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,19 @@ mod test {

run(done).unwrap();
}

#[test]
fn test_query_timeout() {
let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=5ms");
let pool = Pool::new(test_db_url.to_string());

let done = pool.get_handle()
.and_then(|c| c.query("SELECT sleep(10)").fetch_all());


run(done).unwrap_err();

let info = pool.info();
assert_eq!(info.ongoing, 0);
}
}
26 changes: 25 additions & 1 deletion src/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ pub struct Options {

/// Timeout for connection (defaults to `500 ms`)
pub(crate) connection_timeout: Duration,

/// Timeout for queries (defaults to `180,000 ms`)
pub(crate) query_timeout: Duration,

/// Timeout for each block in a query (defaults to `180,000 ms`)
pub(crate) query_block_timeout: Duration,
}

impl Default for Options {
Expand All @@ -186,6 +192,8 @@ impl Default for Options {
retry_timeout: Duration::from_secs(5),
ping_timeout: Duration::from_millis(500),
connection_timeout: Duration::from_millis(500),
query_timeout: Duration::from_millis(180_000),
query_block_timeout: Duration::from_millis(180_000),
}
}
}
Expand Down Expand Up @@ -289,6 +297,16 @@ impl Options {
/// Timeout for connection (defaults to `500 ms`).
=> connection_timeout: Duration
}

property! {
/// Timeout for query (defaults to `180,000 ms`).
=> query_timeout: Duration
}

property! {
/// Timeout for each block in a query (defaults to `180,000 ms`).
=> query_block_timeout: Duration
}
}

impl FromStr for Options {
Expand Down Expand Up @@ -357,7 +375,13 @@ where
"ping_timeout" => options.ping_timeout = parse_param(key, value, parse_duration)?,
"connection_timeout" => {
options.connection_timeout = parse_param(key, value, parse_duration)?
}
},
"query_timeout" => {
options.query_timeout = parse_param(key, value, parse_duration)?
},
"query_block_timeout" => {
options.query_block_timeout = parse_param(key, value, parse_duration)?
},
"compression" => options.compression = parse_param(key, value, parse_compression)?,
_ => return Err(UrlError::UnknownParameter { param: key.into() }),
};
Expand Down
20 changes: 19 additions & 1 deletion src/types/query_result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
Rows,
},
ClientHandle,
try_opt,
};

use self::{either::Either, fold_block::FoldBlock};
Expand All @@ -18,6 +19,16 @@ mod either;
mod fold_block;
mod stream_blocks;


macro_rules! try_opt_stream {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(err) => return Box::new(stream::once(Err(err))),
}
};
}

/// Result of a query or statement execution.
pub struct QueryResult {
pub(crate) client: ClientHandle,
Expand Down Expand Up @@ -65,14 +76,18 @@ impl QueryResult {

/// Fetch data from table. It returns a block that contains all rows.
pub fn fetch_all(self) -> BoxFuture<(ClientHandle, Block)> {
let timeout = try_opt!(self.client.context.options.get()).query_timeout;

wrap_future(
self.fold_blocks(Vec::new(), |mut blocks, block| {
if !block.is_empty() {
blocks.push(block);
}
Ok(blocks)
})
.map(|(h, blocks)| (h, Block::concat(blocks.as_slice()))),
.timeout(timeout)
.map_err(Error::from)
.map(|(h, blocks)| (h, Block::concat(blocks.as_slice())))
)
}

Expand Down Expand Up @@ -163,6 +178,7 @@ impl QueryResult {
/// ```
pub fn stream_blocks(self) -> BoxStream<Block> {
let query = self.query;
let timeout = try_opt_stream!(self.client.context.options.get()).query_block_timeout;

self.client.wrap_stream(move |mut c| {
info!("[send query] {}", query.get_sql());
Expand All @@ -180,6 +196,8 @@ impl QueryResult {
context,
pool,
)
.timeout(timeout)
.map_err(Error::from)
})
}

Expand Down

0 comments on commit 96577d4

Please sign in to comment.