Skip to content

Commit

Permalink
client/session_test: Added tests for MonotonicTimestampGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
smoczy123 committed Jan 13, 2025
1 parent fef794f commit ef93421
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 2 deletions.
83 changes: 81 additions & 2 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile;
use super::session::Session;
use super::session_builder::SessionBuilder;
use crate as scylla;
use crate::batch::{Batch, BatchStatement};
use crate::batch::{Batch, BatchStatement, BatchType};
use crate::cluster::metadata::Strategy::NetworkTopologyStrategy;
use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType, UserDefinedType};
use crate::deserialize::DeserializeOwnedValue;
Expand Down Expand Up @@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeSet, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use uuid::Uuid;

Expand Down Expand Up @@ -1327,6 +1327,85 @@ async fn test_timestamp() {
assert_eq!(results, expected_results);
}

#[tokio::test]
async fn test_timestamp_generator() {
use crate::policies::timestamp_generator::TimestampGenerator;
use rand::random;

setup_tracing();
struct LocalTimestampGenerator {
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
}

impl TimestampGenerator for LocalTimestampGenerator {
fn next_timestamp(&self) -> i64 {
let timestamp = random::<i64>().abs();
self.generated_timestamps.lock().unwrap().insert(timestamp);
timestamp
}
}

let timestamps = Arc::new(Mutex::new(HashSet::new()));
let generator = LocalTimestampGenerator {
generated_timestamps: timestamps.clone(),
};

let session = create_new_session_builder()
.timestamp_generator(Arc::new(generator))
.build()
.await
.unwrap();
let ks = unique_keyspace_name();
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
session
.ddl(format!(
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
ks
))
.await
.unwrap();
let prepared = session
.prepare(format!(
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
ks
))
.await
.unwrap();
session.execute_unpaged(&prepared, []).await.unwrap();
let unprepared = Query::new(format!(
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
ks
));
session.query_unpaged(unprepared, []).await.unwrap();
let mut batch = Batch::new(BatchType::Unlogged);
let stmt = session
.prepare(format!(
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
ks
))
.await
.unwrap();
batch.append_statement(stmt);
session.batch(&batch, &((),)).await.unwrap();

let query_rows_result = session
.query_unpaged(
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
&[],
)
.await
.unwrap()
.into_rows_result()
.unwrap();

let timestamps_locked = timestamps.lock().unwrap();
assert!(query_rows_result
.rows::<(i32, i32, i64)>()
.unwrap()
.map(|row_result| row_result.unwrap())
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
}

#[ignore = "works on remote Scylla instances only (local ones are too fast)"]
#[tokio::test]
async fn test_request_timeout() {
Expand Down
53 changes: 53 additions & 0 deletions scylla/src/policies/timestamp_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,56 @@ impl TimestampGenerator for MonotonicTimestampGenerator {
}
}
}

#[test]
fn monotonic_timestamp_generator_is_monotonic() {
const NUMBER_OF_ITERATIONS: u32 = 1000;

let mut prev = None;
let mut cur;
let generator = MonotonicTimestampGenerator::new();
for _ in 0..NUMBER_OF_ITERATIONS {
cur = generator.next_timestamp();
if let Some(prev_val) = prev {
assert!(cur > prev_val);
}
prev = Some(cur);
}
}

#[test]
fn monotonic_timestamp_generator_is_monotonic_with_concurrency() {
use std::collections::HashSet;
use std::sync::Arc;

const NUMBER_OF_ITERATIONS: usize = 1000;
const NUMBER_OF_THREADS: usize = 10;
let generator = Arc::new(MonotonicTimestampGenerator::new());
let timestamps_sets: Vec<_> = std::thread::scope(|s| {
(0..NUMBER_OF_THREADS)
.map(|_| {
s.spawn(|| {
let timestamps: Vec<i64> = (0..NUMBER_OF_ITERATIONS)
.map(|_| generator.next_timestamp())
.collect();
assert!(timestamps.windows(2).all(|w| w[0] < w[1]));
let timestamps_set: HashSet<i64> = HashSet::from_iter(timestamps);
assert_eq!(
timestamps_set.len(),
NUMBER_OF_ITERATIONS,
"Colliding values in a single thread"
);
timestamps_set
})
})
.map(|handle| handle.join().unwrap())
.collect()
});

let full_set: HashSet<i64> = timestamps_sets.iter().flatten().copied().collect();
assert_eq!(
full_set.len(),
NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS,
"Colliding values between threads"
);
}

0 comments on commit ef93421

Please sign in to comment.