Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement timestamp generators #1128

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [USE keyspace](queries/usekeyspace.md)
- [Schema agreement](queries/schema-agreement.md)
- [Query timeouts](queries/timeouts.md)
- [Timestamp generators](queries/timestamp-generators.md)

- [Execution profiles](execution-profiles/execution-profiles.md)
- [Creating a profile and setting it](execution-profiles/create-and-use.md)
Expand Down
10 changes: 6 additions & 4 deletions docs/source/queries/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Driver supports all kinds of statements supported by ScyllaDB. The following tables aim to bridge between DB concepts and driver's API.
They include recommendations on which API to use in what cases.

## Kinds of CQL statements (from the CQL protocol point of view):
## Kinds of CQL statements (from the CQL protocol point of view)

| Kind of CQL statement | Single | Batch |
|-----------------------|---------------------|------------------------------------------|
smoczy123 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -59,7 +59,7 @@ This is **NOT** strictly related to content of the CQL query string.
| Load balancing | advanced if prepared, else primitive | advanced if prepared **and ALL** statements in the batch target the same partition, else primitive |
| Suitable operations | most of operations | - a list of operations that needs to be executed atomically (batch LightWeight Transaction)</br> - a batch of operations targetting the same partition (as an advanced optimisation) |

## CQL statements - operations (based on what the CQL string contains):
## CQL statements - operations (based on what the CQL string contains)

| CQL data manipulation statement | Recommended statement kind | Recommended Session operation |
|------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
Expand All @@ -86,9 +86,10 @@ This is **NOT** strictly related to content of the CQL query string.

For more detailed comparison and more best practices, see [doc page about paging](paged.md).

### Queries are fully asynchronous - you can run as many of them in parallel as you wish.
### Queries are fully asynchronous - you can run as many of them in parallel as you wish

## `USE KEYSPACE`

## `USE KEYSPACE`:
There is a special functionality to enable [USE keyspace](usekeyspace.md).
wprzytula marked this conversation as resolved.
Show resolved Hide resolved

```{eval-rst}
Expand All @@ -106,4 +107,5 @@ There is a special functionality to enable [USE keyspace](usekeyspace.md).
schema-agreement
lwt
timeouts
timestamp-generators
```
54 changes: 54 additions & 0 deletions docs/source/queries/timestamp-generators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Timestamp generators

If you want to generate timestamps on the client side you can provide
a TimestampGenerator to a SessionBuilder when creating a Session. Then
every executed statement will have attached a new timestamp generated
by the provided TimestampGenerator.
Timestamps are set according to precendence:

1. ```USING TIMESTAMP``` in the query itself
2. Manually using ```set_timestamp``` on the query
3. Timestamp generated by the generator

## Simple Timestamp Generator

Most basic client-side timestamp generator. Generates timestamp
based on system clock. Provides no guarantees and panics when the system clock
provides timestamp before the unix epoch.

## Monotonic Timestamp Generator

Client-side timestamp generator. Guarantees monotonic timestamps
based on the system clock, with automatic timestamp incrementation
if the system clock timestamp would not be monotonic. If the clock skew
exceeds `warning_threshold` of the generator (can be changed with `with_warning_times`, 1s by default)
user will be warned with timestamp generation with `warning_interval` cooldown period
(can be changed with `with_warning_times`, 1s by default) to not spam the user. If user does not want to
be warned about the clock skew, the warnings can be turned off with `without_warnings` function.

``` rust
# extern crate scylla;
# use std::error::Error;
# async fn check_only_compiles() -> Result<(), Box<dyn std::error::Error>> {
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::policies::timestamp_generator::MonotonicTimestampGenerator;
use scylla::query::Query;
use std::sync::Arc;

let session: Session = SessionBuilder::new()
.known_node("127.0.0.1:9042")
.timestamp_generator(Arc::new(MonotonicTimestampGenerator::new()))
.build()
.await?;

// This query will have a timestamp generated
// by the monotonic timestamp generator
let my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
let to_insert: i32 = 12345;
session.query_unpaged(my_query, (to_insert,)).await?;
# Ok(())
# }
```


7 changes: 7 additions & 0 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::policies::host_filter::HostFilter;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
use crate::policies::speculative_execution;
use crate::policies::timestamp_generator::TimestampGenerator;
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
#[allow(deprecated)]
Expand Down Expand Up @@ -180,6 +181,10 @@ pub struct SessionConfig {
/// Generally, this options is best left as default (false).
pub disallow_shard_aware_port: bool,

/// Timestamp generator used for generating timestamps on the client-side
/// If None, server-side timestamps are used.
pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,

/// If empty, fetch all keyspaces
pub keyspaces_to_fetch: Vec<String>,

Expand Down Expand Up @@ -292,6 +297,7 @@ impl SessionConfig {
connect_timeout: Duration::from_secs(5),
connection_pool_size: Default::default(),
disallow_shard_aware_port: false,
timestamp_generator: None,
keyspaces_to_fetch: Vec::new(),
fetch_schema_metadata: true,
keepalive_interval: Some(Duration::from_secs(30)),
Expand Down Expand Up @@ -981,6 +987,7 @@ where
compression: config.compression,
tcp_nodelay: config.tcp_nodelay,
tcp_keepalive_interval: config.tcp_keepalive_interval,
timestamp_generator: config.timestamp_generator,
#[cfg(feature = "ssl")]
ssl_config: config.ssl_context.map(SslConfig::new_with_global_context),
authenticator: config.authenticator.clone(),
Expand Down
23 changes: 23 additions & 0 deletions scylla/src/client/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::cloud::{CloudConfig, CloudConfigError};
use crate::errors::NewSessionError;
use crate::policies::address_translator::AddressTranslator;
use crate::policies::host_filter::HostFilter;
use crate::policies::timestamp_generator::TimestampGenerator;
use crate::statement::Consistency;
#[cfg(feature = "ssl")]
use openssl::ssl::SslContext;
Expand Down Expand Up @@ -684,6 +685,28 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
self
}

/// Set the timestamp generator that will generate timestamps on the client-side.
///
/// # Example
/// ```
/// # use scylla::client::session::Session;
/// # use scylla::client::session_builder::SessionBuilder;
/// # use scylla::policies::timestamp_generator::SimpleTimestampGenerator;
/// # use std::sync::Arc;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .timestamp_generator(Arc::new(SimpleTimestampGenerator::new()))
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn timestamp_generator(mut self, timestamp_generator: Arc<dyn TimestampGenerator>) -> Self {
self.config.timestamp_generator = Some(timestamp_generator);
self
}
Lorak-mmk marked this conversation as resolved.
Show resolved Hide resolved

/// Set the keyspaces to be fetched, to retrieve their strategy, and schema metadata if enabled
/// No keyspaces, the default value, means all the keyspaces will be fetched.
///
Expand Down
86 changes: 84 additions & 2 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile;
use super::session::Session;
use super::session_builder::SessionBuilder;
use crate as scylla;
use crate::batch::{Batch, BatchStatement};
use crate::batch::{Batch, BatchStatement, BatchType};
use crate::cluster::metadata::Strategy::NetworkTopologyStrategy;
use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType, UserDefinedType};
use crate::deserialize::DeserializeOwnedValue;
Expand Down Expand Up @@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeSet, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use uuid::Uuid;

Expand Down Expand Up @@ -1327,6 +1327,88 @@ async fn test_timestamp() {
assert_eq!(results, expected_results);
}

#[tokio::test]
async fn test_timestamp_generator() {
use crate::policies::timestamp_generator::TimestampGenerator;
use rand::random;

setup_tracing();
struct LocalTimestampGenerator {
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
}

impl TimestampGenerator for LocalTimestampGenerator {
fn next_timestamp(&self) -> i64 {
let timestamp = random::<i64>().abs();
self.generated_timestamps.lock().unwrap().insert(timestamp);
timestamp
}
}

let timestamps = Arc::new(Mutex::new(HashSet::new()));
let generator = LocalTimestampGenerator {
generated_timestamps: timestamps.clone(),
};

let session = create_new_session_builder()
.timestamp_generator(Arc::new(generator))
.build()
.await
.unwrap();
let ks = unique_keyspace_name();
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
session
.ddl(format!(
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
ks
))
.await
.unwrap();

let prepared = session
.prepare(format!(
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
ks
))
.await
.unwrap();
session.execute_unpaged(&prepared, []).await.unwrap();

let unprepared = Query::new(format!(
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
ks
));
session.query_unpaged(unprepared, []).await.unwrap();

let mut batch = Batch::new(BatchType::Unlogged);
let stmt = session
.prepare(format!(
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
ks
))
.await
.unwrap();
batch.append_statement(stmt);
session.batch(&batch, &((),)).await.unwrap();

let query_rows_result = session
.query_unpaged(
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
&[],
)
.await
.unwrap()
.into_rows_result()
.unwrap();

let timestamps_locked = timestamps.lock().unwrap();
assert!(query_rows_result
.rows::<(i32, i32, i64)>()
.unwrap()
.map(|row_result| row_result.unwrap())
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
}

#[ignore = "works on remote Scylla instances only (local ones are too fast)"]
#[tokio::test]
async fn test_request_timeout() {
Expand Down
35 changes: 32 additions & 3 deletions scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::frame::{
FrameParams, SerializedRequest,
};
use crate::policies::address_translator::AddressTranslator;
use crate::policies::timestamp_generator::TimestampGenerator;
use crate::query::Query;
use crate::response::query_result::QueryResult;
use crate::response::{
Expand Down Expand Up @@ -324,6 +325,7 @@ pub(crate) struct ConnectionConfig {
pub(crate) compression: Option<Compression>,
pub(crate) tcp_nodelay: bool,
pub(crate) tcp_keepalive_interval: Option<Duration>,
pub(crate) timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
#[cfg(feature = "ssl")]
pub(crate) ssl_config: Option<SslConfig>,
pub(crate) connect_timeout: std::time::Duration,
Expand All @@ -349,6 +351,7 @@ impl Default for ConnectionConfig {
compression: None,
tcp_nodelay: true,
tcp_keepalive_interval: None,
timestamp_generator: None,
event_sender: None,
#[cfg(feature = "ssl")]
ssl_config: None,
Expand Down Expand Up @@ -853,6 +856,14 @@ impl Connection {
page_size: Option<PageSize>,
paging_state: PagingState,
) -> Result<QueryResponse, RequestAttemptError> {
let get_timestamp_from_gen = || {
self.config
.timestamp_generator
.as_ref()
.map(|gen| gen.next_timestamp())
};
let timestamp = query.get_timestamp().or_else(get_timestamp_from_gen);

let query_frame = query::Query {
contents: Cow::Borrowed(&query.contents),
parameters: query::QueryParameters {
Expand All @@ -862,7 +873,7 @@ impl Connection {
page_size: page_size.map(Into::into),
paging_state,
skip_metadata: false,
timestamp: query.get_timestamp(),
timestamp,
},
};

Expand Down Expand Up @@ -915,14 +926,24 @@ impl Connection {
page_size: Option<PageSize>,
paging_state: PagingState,
) -> Result<QueryResponse, RequestAttemptError> {
let get_timestamp_from_gen = || {
self.config
.timestamp_generator
.as_ref()
.map(|gen| gen.next_timestamp())
};
let timestamp = prepared_statement
.get_timestamp()
.or_else(get_timestamp_from_gen);

let execute_frame = execute::Execute {
id: prepared_statement.get_id().to_owned(),
parameters: query::QueryParameters {
consistency,
serial_consistency,
values: Cow::Borrowed(values),
page_size: page_size.map(Into::into),
timestamp: prepared_statement.get_timestamp(),
timestamp,
skip_metadata: prepared_statement.get_use_cached_result_metadata(),
paging_state,
},
Expand Down Expand Up @@ -1057,13 +1078,21 @@ impl Connection {

let values = RawBatchValuesAdapter::new(values, contexts);

let get_timestamp_from_gen = || {
self.config
.timestamp_generator
.as_ref()
.map(|gen| gen.next_timestamp())
};
let timestamp = batch.get_timestamp().or_else(get_timestamp_from_gen);

let batch_frame = batch::Batch {
statements: Cow::Borrowed(&batch.statements),
values,
batch_type: batch.get_type(),
consistency,
serial_consistency,
timestamp: batch.get_timestamp(),
timestamp,
};

loop {
Expand Down
1 change: 1 addition & 0 deletions scylla/src/policies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pub mod host_filter;
pub mod load_balancing;
pub mod retry;
pub mod speculative_execution;
pub mod timestamp_generator;
Loading
Loading