From 70945d70fa2e429f21ba264a87e11fbe84ff725a Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Mon, 13 Jan 2025 12:29:50 +0100 Subject: [PATCH 1/6] policies/timestamp_generator: Added MonotonicTimestampGenerator Added TimestampGenerator trait and MonotonicTimestampGenerator based on c++ driver's implementation --- scylla/src/policies/mod.rs | 1 + scylla/src/policies/timestamp_generator.rs | 147 +++++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 scylla/src/policies/timestamp_generator.rs diff --git a/scylla/src/policies/mod.rs b/scylla/src/policies/mod.rs index 6e8bd3985e..db6c330903 100644 --- a/scylla/src/policies/mod.rs +++ b/scylla/src/policies/mod.rs @@ -20,3 +20,4 @@ pub mod host_filter; pub mod load_balancing; pub mod retry; pub mod speculative_execution; +pub mod timestamp_generator; diff --git a/scylla/src/policies/timestamp_generator.rs b/scylla/src/policies/timestamp_generator.rs new file mode 100644 index 0000000000..58f6dc9c06 --- /dev/null +++ b/scylla/src/policies/timestamp_generator.rs @@ -0,0 +1,147 @@ +use std::{ + sync::atomic::AtomicI64, + time::{SystemTime, UNIX_EPOCH}, +}; + +use std::sync::atomic::Ordering; +use std::sync::Mutex; +use tokio::time::{Duration, Instant}; +use tracing::warn; + +/// Trait used to represent a timestamp generator +pub trait TimestampGenerator: Send + Sync { + /// This generates a new timestamp + fn next_timestamp(&self) -> i64; +} + +/// Basic timestamp generator. Provides no guarantees, if system clock returns +/// time before UNIX epoch it panics. +#[derive(Default)] +pub struct SimpleTimestampGenerator {} + +impl SimpleTimestampGenerator { + pub fn new() -> Self { + SimpleTimestampGenerator {} + } +} + +impl TimestampGenerator for SimpleTimestampGenerator { + fn next_timestamp(&self) -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() as i64 + } +} + +/// Warning configuration for MonotonicTimestampGenerator +struct MonotonicTimestampGeneratorWarningsCfg { + warning_threshold: Duration, + warning_interval: Duration, +} + +/// Monotonic timestamp generator. Guarantees monotonicity of timestamps. +/// If system clock will not provide an increased timestamp, then the timestamp will +/// be artificially increased. If the config is provided and the clock skew is bigger than +/// warning_threshold (by default 1 second), then the user will be warned about +/// the skew repeatedly, with warning_interval provided in the settings (by default 1 second). +/// Remember that this generator only guarantees monotonicity within one instance of this struct! +/// If you create multiple instances the monotonicity guarantee becomes void. +pub struct MonotonicTimestampGenerator { + last: AtomicI64, + last_warning: Mutex, + config: Option, +} + +impl MonotonicTimestampGenerator { + /// Creates a new monotonic timestamp generator with default settings + pub fn new() -> Self { + MonotonicTimestampGenerator { + last: AtomicI64::new(0), + last_warning: Mutex::new(Instant::now()), + config: Some(MonotonicTimestampGeneratorWarningsCfg { + warning_threshold: Duration::from_secs(1), + warning_interval: Duration::from_secs(1), + }), + } + } + + pub fn with_warning_times( + mut self, + warning_threshold: Duration, + warning_interval: Duration, + ) -> Self { + self.config = Some(MonotonicTimestampGeneratorWarningsCfg { + warning_threshold, + warning_interval, + }); + self + } + + pub fn without_warnings(mut self) -> Self { + self.config = None; + self + } + + // This is guaranteed to return a monotonic timestamp. If clock skew is detected + // then this method will increment the last timestamp. + fn compute_next(&self, last: i64) -> i64 { + let current = SystemTime::now().duration_since(UNIX_EPOCH); + if let Ok(cur_time) = current { + // We have generated a valid timestamp + let u_cur = cur_time.as_micros() as i64; + if u_cur > last { + // We have generated a valid, monotonic timestamp + return u_cur; + } else if let Some(cfg) = self.config.as_ref() { + // We have detected clock skew, we will increment the last timestamp, and check if we should warn the user + if last - u_cur > cfg.warning_threshold.as_micros() as i64 { + // We have detected a clock skew bigger than the threshold, we check if we warned the user recently + let mut last_warn = self.last_warning.lock().unwrap(); + let now = Instant::now(); + if now >= last_warn.checked_add(cfg.warning_interval).unwrap() { + // We have not warned the user recently, we will warn the user + *last_warn = now; + drop(last_warn); + warn!( + "Clock skew detected. The current time ({}) was {} \ + microseconds behind the last generated timestamp ({}). \ + The next generated timestamp will be artificially incremented \ + to guarantee monotonicity.", + u_cur, + last - u_cur, + last + ) + } + } + } + } else { + // We have generated a timestamp before UNIX epoch, we will warn the user and increment the last timestamp + warn!("Clock skew detected. The current time was behind UNIX epoch."); + } + + last + 1 + } +} + +impl Default for MonotonicTimestampGenerator { + fn default() -> Self { + Self::new() + } +} + +impl TimestampGenerator for MonotonicTimestampGenerator { + fn next_timestamp(&self) -> i64 { + loop { + let last = self.last.load(Ordering::SeqCst); + let cur = self.compute_next(last); + if self + .last + .compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + return cur; + } + } + } +} From f3fd0afffc3eec8a9e9a8cebc79eebc1b09079f8 Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Mon, 13 Jan 2025 12:34:23 +0100 Subject: [PATCH 2/6] client/session: Added timestamp generator to SessionConfig Also added an ability to set it through Session Builder --- scylla/src/client/session.rs | 6 ++++++ scylla/src/client/session_builder.rs | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/scylla/src/client/session.rs b/scylla/src/client/session.rs index 2ba11889b7..3492794926 100644 --- a/scylla/src/client/session.rs +++ b/scylla/src/client/session.rs @@ -30,6 +30,7 @@ use crate::policies::host_filter::HostFilter; use crate::policies::load_balancing::{self, RoutingInfo}; use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession}; use crate::policies::speculative_execution; +use crate::policies::timestamp_generator::TimestampGenerator; use crate::prepared_statement::PreparedStatement; use crate::query::Query; #[allow(deprecated)] @@ -180,6 +181,10 @@ pub struct SessionConfig { /// Generally, this options is best left as default (false). pub disallow_shard_aware_port: bool, + /// Timestamp generator used for generating timestamps on the client-side + /// If None, server-side timestamps are used. + pub timestamp_generator: Option>, + /// If empty, fetch all keyspaces pub keyspaces_to_fetch: Vec, @@ -292,6 +297,7 @@ impl SessionConfig { connect_timeout: Duration::from_secs(5), connection_pool_size: Default::default(), disallow_shard_aware_port: false, + timestamp_generator: None, keyspaces_to_fetch: Vec::new(), fetch_schema_metadata: true, keepalive_interval: Some(Duration::from_secs(30)), diff --git a/scylla/src/client/session_builder.rs b/scylla/src/client/session_builder.rs index ee68e70ea6..cc543f3b19 100644 --- a/scylla/src/client/session_builder.rs +++ b/scylla/src/client/session_builder.rs @@ -14,6 +14,7 @@ use crate::cloud::{CloudConfig, CloudConfigError}; use crate::errors::NewSessionError; use crate::policies::address_translator::AddressTranslator; use crate::policies::host_filter::HostFilter; +use crate::policies::timestamp_generator::TimestampGenerator; use crate::statement::Consistency; #[cfg(feature = "ssl")] use openssl::ssl::SslContext; @@ -684,6 +685,28 @@ impl GenericSessionBuilder { self } + /// Set the timestamp generator that will generate timestamps on the client-side. + /// + /// # Example + /// ``` + /// # use scylla::client::session::Session; + /// # use scylla::client::session_builder::SessionBuilder; + /// # use scylla::policies::timestamp_generator::SimpleTimestampGenerator; + /// # use std::sync::Arc; + /// # async fn example() -> Result<(), Box> { + /// let session: Session = SessionBuilder::new() + /// .known_node("127.0.0.1:9042") + /// .timestamp_generator(Arc::new(SimpleTimestampGenerator::new())) + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn timestamp_generator(mut self, timestamp_generator: Arc) -> Self { + self.config.timestamp_generator = Some(timestamp_generator); + self + } + /// Set the keyspaces to be fetched, to retrieve their strategy, and schema metadata if enabled /// No keyspaces, the default value, means all the keyspaces will be fetched. /// From cde5f7fb6e0a40167bbb89de307b9851bbaff0b9 Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Mon, 13 Jan 2025 12:38:24 +0100 Subject: [PATCH 3/6] network/connection: Added timestamp generator to ConnectionConfig The timestamp generator in ConnectionConfig is set in Session::Connect() --- scylla/src/client/session.rs | 1 + scylla/src/network/connection.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/scylla/src/client/session.rs b/scylla/src/client/session.rs index 3492794926..e774ddffbb 100644 --- a/scylla/src/client/session.rs +++ b/scylla/src/client/session.rs @@ -987,6 +987,7 @@ where compression: config.compression, tcp_nodelay: config.tcp_nodelay, tcp_keepalive_interval: config.tcp_keepalive_interval, + timestamp_generator: config.timestamp_generator, #[cfg(feature = "ssl")] ssl_config: config.ssl_context.map(SslConfig::new_with_global_context), authenticator: config.authenticator.clone(), diff --git a/scylla/src/network/connection.rs b/scylla/src/network/connection.rs index bb9dc20b34..c7c1f17200 100644 --- a/scylla/src/network/connection.rs +++ b/scylla/src/network/connection.rs @@ -22,6 +22,7 @@ use crate::frame::{ FrameParams, SerializedRequest, }; use crate::policies::address_translator::AddressTranslator; +use crate::policies::timestamp_generator::TimestampGenerator; use crate::query::Query; use crate::response::query_result::QueryResult; use crate::response::{ @@ -324,6 +325,7 @@ pub(crate) struct ConnectionConfig { pub(crate) compression: Option, pub(crate) tcp_nodelay: bool, pub(crate) tcp_keepalive_interval: Option, + pub(crate) timestamp_generator: Option>, #[cfg(feature = "ssl")] pub(crate) ssl_config: Option, pub(crate) connect_timeout: std::time::Duration, @@ -349,6 +351,7 @@ impl Default for ConnectionConfig { compression: None, tcp_nodelay: true, tcp_keepalive_interval: None, + timestamp_generator: None, event_sender: None, #[cfg(feature = "ssl")] ssl_config: None, From 234a6fa73ccd44e3b2490e20768e59a2fc8b44c3 Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Wed, 4 Dec 2024 23:21:30 +0100 Subject: [PATCH 4/6] network/connection: Added logic for setting a timestamp Generated timestamp is only set if user did not provide one --- scylla/src/network/connection.rs | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/scylla/src/network/connection.rs b/scylla/src/network/connection.rs index c7c1f17200..dc0485f4d4 100644 --- a/scylla/src/network/connection.rs +++ b/scylla/src/network/connection.rs @@ -856,6 +856,14 @@ impl Connection { page_size: Option, paging_state: PagingState, ) -> Result { + let get_timestamp_from_gen = || { + self.config + .timestamp_generator + .as_ref() + .map(|gen| gen.next_timestamp()) + }; + let timestamp = query.get_timestamp().or_else(get_timestamp_from_gen); + let query_frame = query::Query { contents: Cow::Borrowed(&query.contents), parameters: query::QueryParameters { @@ -865,7 +873,7 @@ impl Connection { page_size: page_size.map(Into::into), paging_state, skip_metadata: false, - timestamp: query.get_timestamp(), + timestamp, }, }; @@ -918,6 +926,16 @@ impl Connection { page_size: Option, paging_state: PagingState, ) -> Result { + let get_timestamp_from_gen = || { + self.config + .timestamp_generator + .as_ref() + .map(|gen| gen.next_timestamp()) + }; + let timestamp = prepared_statement + .get_timestamp() + .or_else(get_timestamp_from_gen); + let execute_frame = execute::Execute { id: prepared_statement.get_id().to_owned(), parameters: query::QueryParameters { @@ -925,7 +943,7 @@ impl Connection { serial_consistency, values: Cow::Borrowed(values), page_size: page_size.map(Into::into), - timestamp: prepared_statement.get_timestamp(), + timestamp, skip_metadata: prepared_statement.get_use_cached_result_metadata(), paging_state, }, @@ -1060,13 +1078,21 @@ impl Connection { let values = RawBatchValuesAdapter::new(values, contexts); + let get_timestamp_from_gen = || { + self.config + .timestamp_generator + .as_ref() + .map(|gen| gen.next_timestamp()) + }; + let timestamp = batch.get_timestamp().or_else(get_timestamp_from_gen); + let batch_frame = batch::Batch { statements: Cow::Borrowed(&batch.statements), values, batch_type: batch.get_type(), consistency, serial_consistency, - timestamp: batch.get_timestamp(), + timestamp, }; loop { From ca3258cf306da52eb455604653b30f9d93526335 Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Mon, 13 Jan 2025 12:40:28 +0100 Subject: [PATCH 5/6] client/session_test: Added tests for MonotonicTimestampGenerator --- scylla/src/client/session_test.rs | 86 +++++++++++++++++++++- scylla/src/policies/timestamp_generator.rs | 53 +++++++++++++ 2 files changed, 137 insertions(+), 2 deletions(-) diff --git a/scylla/src/client/session_test.rs b/scylla/src/client/session_test.rs index 5f7e6f90cf..196c2861ae 100644 --- a/scylla/src/client/session_test.rs +++ b/scylla/src/client/session_test.rs @@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile; use super::session::Session; use super::session_builder::SessionBuilder; use crate as scylla; -use crate::batch::{Batch, BatchStatement}; +use crate::batch::{Batch, BatchStatement, BatchType}; use crate::cluster::metadata::Strategy::NetworkTopologyStrategy; use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType, UserDefinedType}; use crate::deserialize::DeserializeOwnedValue; @@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue; use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeSet, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; use uuid::Uuid; @@ -1327,6 +1327,88 @@ async fn test_timestamp() { assert_eq!(results, expected_results); } +#[tokio::test] +async fn test_timestamp_generator() { + use crate::policies::timestamp_generator::TimestampGenerator; + use rand::random; + + setup_tracing(); + struct LocalTimestampGenerator { + generated_timestamps: Arc>>, + } + + impl TimestampGenerator for LocalTimestampGenerator { + fn next_timestamp(&self) -> i64 { + let timestamp = random::().abs(); + self.generated_timestamps.lock().unwrap().insert(timestamp); + timestamp + } + } + + let timestamps = Arc::new(Mutex::new(HashSet::new())); + let generator = LocalTimestampGenerator { + generated_timestamps: timestamps.clone(), + }; + + let session = create_new_session_builder() + .timestamp_generator(Arc::new(generator)) + .build() + .await + .unwrap(); + let ks = unique_keyspace_name(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)", + ks + )) + .await + .unwrap(); + + let prepared = session + .prepare(format!( + "INSERT INTO {}.t_generator (a, b) VALUES (1, 1)", + ks + )) + .await + .unwrap(); + session.execute_unpaged(&prepared, []).await.unwrap(); + + let unprepared = Query::new(format!( + "INSERT INTO {}.t_generator (a, b) VALUES (2, 2)", + ks + )); + session.query_unpaged(unprepared, []).await.unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let stmt = session + .prepare(format!( + "INSERT INTO {}.t_generator (a, b) VALUES (3, 3)", + ks + )) + .await + .unwrap(); + batch.append_statement(stmt); + session.batch(&batch, &((),)).await.unwrap(); + + let query_rows_result = session + .query_unpaged( + format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks), + &[], + ) + .await + .unwrap() + .into_rows_result() + .unwrap(); + + let timestamps_locked = timestamps.lock().unwrap(); + assert!(query_rows_result + .rows::<(i32, i32, i64)>() + .unwrap() + .map(|row_result| row_result.unwrap()) + .all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime))); +} + #[ignore = "works on remote Scylla instances only (local ones are too fast)"] #[tokio::test] async fn test_request_timeout() { diff --git a/scylla/src/policies/timestamp_generator.rs b/scylla/src/policies/timestamp_generator.rs index 58f6dc9c06..b35f3ad1be 100644 --- a/scylla/src/policies/timestamp_generator.rs +++ b/scylla/src/policies/timestamp_generator.rs @@ -145,3 +145,56 @@ impl TimestampGenerator for MonotonicTimestampGenerator { } } } + +#[test] +fn monotonic_timestamp_generator_is_monotonic() { + const NUMBER_OF_ITERATIONS: u32 = 1000; + + let mut prev = None; + let mut cur; + let generator = MonotonicTimestampGenerator::new(); + for _ in 0..NUMBER_OF_ITERATIONS { + cur = generator.next_timestamp(); + if let Some(prev_val) = prev { + assert!(cur > prev_val); + } + prev = Some(cur); + } +} + +#[test] +fn monotonic_timestamp_generator_is_monotonic_with_concurrency() { + use std::collections::HashSet; + use std::sync::Arc; + + const NUMBER_OF_ITERATIONS: usize = 1000; + const NUMBER_OF_THREADS: usize = 10; + let generator = Arc::new(MonotonicTimestampGenerator::new()); + let timestamps_sets: Vec<_> = std::thread::scope(|s| { + (0..NUMBER_OF_THREADS) + .map(|_| { + s.spawn(|| { + let timestamps: Vec = (0..NUMBER_OF_ITERATIONS) + .map(|_| generator.next_timestamp()) + .collect(); + assert!(timestamps.windows(2).all(|w| w[0] < w[1])); + let timestamps_set: HashSet = HashSet::from_iter(timestamps); + assert_eq!( + timestamps_set.len(), + NUMBER_OF_ITERATIONS, + "Colliding values in a single thread" + ); + timestamps_set + }) + }) + .map(|handle| handle.join().unwrap()) + .collect() + }); + + let full_set: HashSet = timestamps_sets.iter().flatten().copied().collect(); + assert_eq!( + full_set.len(), + NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS, + "Colliding values between threads" + ); +} From 8b0e125facb2ff9408148dcac04c7eaad90079dc Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Wed, 11 Dec 2024 22:50:14 +0100 Subject: [PATCH 6/6] docs: Added timestamp generator docs --- docs/source/SUMMARY.md | 1 + docs/source/queries/queries.md | 10 ++-- docs/source/queries/timestamp-generators.md | 54 +++++++++++++++++++++ 3 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 docs/source/queries/timestamp-generators.md diff --git a/docs/source/SUMMARY.md b/docs/source/SUMMARY.md index c5f65dc92c..3093d5f355 100644 --- a/docs/source/SUMMARY.md +++ b/docs/source/SUMMARY.md @@ -27,6 +27,7 @@ - [USE keyspace](queries/usekeyspace.md) - [Schema agreement](queries/schema-agreement.md) - [Query timeouts](queries/timeouts.md) + - [Timestamp generators](queries/timestamp-generators.md) - [Execution profiles](execution-profiles/execution-profiles.md) - [Creating a profile and setting it](execution-profiles/create-and-use.md) diff --git a/docs/source/queries/queries.md b/docs/source/queries/queries.md index 128be32aa1..4683b6f2b9 100644 --- a/docs/source/queries/queries.md +++ b/docs/source/queries/queries.md @@ -3,7 +3,7 @@ Driver supports all kinds of statements supported by ScyllaDB. The following tables aim to bridge between DB concepts and driver's API. They include recommendations on which API to use in what cases. -## Kinds of CQL statements (from the CQL protocol point of view): +## Kinds of CQL statements (from the CQL protocol point of view) | Kind of CQL statement | Single | Batch | |-----------------------|---------------------|------------------------------------------| @@ -59,7 +59,7 @@ This is **NOT** strictly related to content of the CQL query string. | Load balancing | advanced if prepared, else primitive | advanced if prepared **and ALL** statements in the batch target the same partition, else primitive | | Suitable operations | most of operations | - a list of operations that needs to be executed atomically (batch LightWeight Transaction)
- a batch of operations targetting the same partition (as an advanced optimisation) | -## CQL statements - operations (based on what the CQL string contains): +## CQL statements - operations (based on what the CQL string contains) | CQL data manipulation statement | Recommended statement kind | Recommended Session operation | |------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------| @@ -86,9 +86,10 @@ This is **NOT** strictly related to content of the CQL query string. For more detailed comparison and more best practices, see [doc page about paging](paged.md). -### Queries are fully asynchronous - you can run as many of them in parallel as you wish. +### Queries are fully asynchronous - you can run as many of them in parallel as you wish + +## `USE KEYSPACE` -## `USE KEYSPACE`: There is a special functionality to enable [USE keyspace](usekeyspace.md). ```{eval-rst} @@ -106,4 +107,5 @@ There is a special functionality to enable [USE keyspace](usekeyspace.md). schema-agreement lwt timeouts + timestamp-generators ``` diff --git a/docs/source/queries/timestamp-generators.md b/docs/source/queries/timestamp-generators.md new file mode 100644 index 0000000000..b218c42f5e --- /dev/null +++ b/docs/source/queries/timestamp-generators.md @@ -0,0 +1,54 @@ +# Timestamp generators + +If you want to generate timestamps on the client side you can provide +a TimestampGenerator to a SessionBuilder when creating a Session. Then +every executed statement will have attached a new timestamp generated +by the provided TimestampGenerator. +Timestamps are set according to precendence: + +1. ```USING TIMESTAMP``` in the query itself +2. Manually using ```set_timestamp``` on the query +3. Timestamp generated by the generator + +## Simple Timestamp Generator + +Most basic client-side timestamp generator. Generates timestamp +based on system clock. Provides no guarantees and panics when the system clock +provides timestamp before the unix epoch. + +## Monotonic Timestamp Generator + +Client-side timestamp generator. Guarantees monotonic timestamps +based on the system clock, with automatic timestamp incrementation +if the system clock timestamp would not be monotonic. If the clock skew +exceeds `warning_threshold` of the generator (can be changed with `with_warning_times`, 1s by default) +user will be warned with timestamp generation with `warning_interval` cooldown period +(can be changed with `with_warning_times`, 1s by default) to not spam the user. If user does not want to +be warned about the clock skew, the warnings can be turned off with `without_warnings` function. + +``` rust +# extern crate scylla; +# use std::error::Error; +# async fn check_only_compiles() -> Result<(), Box> { +use scylla::client::session::Session; +use scylla::client::session_builder::SessionBuilder; +use scylla::policies::timestamp_generator::MonotonicTimestampGenerator; +use scylla::query::Query; +use std::sync::Arc; + +let session: Session = SessionBuilder::new() + .known_node("127.0.0.1:9042") + .timestamp_generator(Arc::new(MonotonicTimestampGenerator::new())) + .build() + .await?; + +// This query will have a timestamp generated +// by the monotonic timestamp generator +let my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)"); +let to_insert: i32 = 12345; +session.query_unpaged(my_query, (to_insert,)).await?; +# Ok(()) +# } +``` + +