-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement timestamp generators #1128
base: main
Are you sure you want to change the base?
Conversation
|
80fa6c4
to
3c0da90
Compare
811f0bf
to
1d70873
Compare
1d70873
to
0f151f1
Compare
6d1943a
to
876d544
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do other drivers (beside cpp-driver) have timestamp generators? If so, how do they implement them?
I'm asking because cpp-compatible implementation does not have to reside in Rust Driver: cpp-rust-driver can implement it in its codebase. We should think about what implementation(s) we want to provide to our users, not what cpp-rust-driver needs here.
Did you maybe test how long a call to MonotonicTimestampGenerator::next_timestamp()
takes? The clock has a microsecond resolution and microsecond is a lot of time. It may be possible that if we call the generator often (like in your unit test), then it may always choose last + 1
branch, because we are still in the same microsecond as previous call. In that case after multiple calls the returned value may be far in the future compared to system clock. If there are multiple clients, it may cause issues.
I've also checked Java and Python drivers, they implement it in the exact same way, taking the microsecond time since epoch. Unfortunately, |
876d544
to
feba1aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
I don't think connection is the right layer to utilize the generator, imo it is more fitting for the session layer. Look into session.rs file, into functions like
execute
orrun_query
(cc: @wprzytula ) -
Nit: the commit messages have too long first lines, which makes github not render them correctly. Avoid having any lines longer than 70 characters, especially the first line (which should ideally have at most 50 characters).
-
The
MonotonicTimestampGenerator
struct could use more explanation in its doc comment. Please describe what it guarantees, how it behaves (errors, drifting etc). -
On that front, documentation book should also be updated with info about timestamp generators. It should either be a new file in
queries
folder, or a new folder. @wprzytula @muzarski wdyt it the better place?
Ideally, we could have a new directory with a file for each generator. However, if it turns out in multiple very short |
My first thought was to put the timestamp generation in the session layer, however the functions I've changed in |
For normal queries modifying |
feba1aa
to
4ad3cd7
Compare
69b7789
to
a66210e
Compare
async fn next_timestamp(&self) -> i64 { | ||
loop { | ||
let last = self.last.load(Ordering::SeqCst); | ||
let cur = self.compute_next(last).await; | ||
if self | ||
.last | ||
.compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) | ||
.is_ok() | ||
{ | ||
return cur; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 When the client is under high load, won't this approach be a problem?
We should benchmark this, @muzarski could you help @smoczy123 with that?
The alternative approach would be to just put last
under mutex, avoiding the retries. The added benefit of that is that you can store Instant
under Mutex, which would simplify the code in compute_next
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prepared a branch that uses rust-driver version from this PR: https://github.com/muzarski/cql-stress/tree/timestamp-gen.
@smoczy123, you can set the timestamp generator for c-s
frontend in src/bin/cql-stress-cassandra-stress/main.rs
in prepare_run
function. The Session
object is created in this function.
Commands for some simple workloads:
cql-stress-cassandra-stress write n=1000000 -pop seq=1..1000000 -rate threads=20 -node <ip addresses>
cql-stress-cassandra-stress read n=1000000 -pop seq=1..1000000 -rate threads=20 -node <ip addresses>
First one will insert 1M rows to the databse, while the latter reads the rows and validates them. You can play around with the run parameters and options. You can also try running multiple loads simultaneously to simulate multi-client scenario.
To run scylla locally, you can see https://hub.docker.com/r/scylladb/scylla/. Then you can replace <ip addresses>
in the commands above, with your scylla nodes' ips (comma-delimited list of ips).
If you stumble upon any problems, feel free to ping me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've ran write and read workloads for a different numbers of threads in both single and multi client scenarios and I've seen no measurable difference in speed between using a monotonic timestamp generator and not using one. It seems like this does not cause issues with latency.
a66210e
to
569168c
Compare
All of your comments should be addressed now, I've left two conversations open as I'm not sure about those two |
impl TimestampGenerator for MonotonicTimestampGenerator { | ||
fn next_timestamp(&self) -> i64 { | ||
loop { | ||
let last = self.last.load(Ordering::SeqCst); | ||
let cur = self.compute_next(last); | ||
if self | ||
.last | ||
.compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) | ||
.is_ok() | ||
{ | ||
return cur; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌱 When we merge this PR, we need to open an issue to benchmark which implementation is faster under high load: current one, or simply using a mutex. No need to do this now because the implementation can be changed later.
fn compute_next(&self, last: i64) -> i64 { | ||
let current = SystemTime::now().duration_since(UNIX_EPOCH); | ||
if let Ok(cur_time) = current { | ||
let u_cur = cur_time.as_micros() as i64; | ||
if u_cur > last { | ||
return u_cur; | ||
} else if let Some(cfg) = self.config.as_ref() { | ||
if last - u_cur > cfg.warning_threshold.as_micros() as i64 { | ||
let mut last_warn = self.last_warning.lock().unwrap(); | ||
let now = Instant::now(); | ||
if now >= last_warn.checked_add(cfg.warning_interval).unwrap() { | ||
*last_warn = now; | ||
drop(last_warn); | ||
warn!( | ||
"Clock skew detected. The current time ({}) was {} \ | ||
microseconds behind the last generated timestamp ({}). \ | ||
The next generated timestamp will be artificially incremented \ | ||
to guarantee monotonicity.", | ||
u_cur, | ||
last - u_cur, | ||
last | ||
) | ||
} | ||
} | ||
} | ||
} else { | ||
warn!("Clock skew detected. The current time was behind UNIX epoch."); | ||
} | ||
|
||
last + 1 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 In general I'd prefer time operations to use SystemTime / Duration / Instant when possible. This makes the code easier to follow (no need to remember which var is a duration and which is a timestamp) and less error prone. By refactoring this I noticed that there is actually no need to handle the case of clock being behind the unix epoch in most cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are my proposed changes (which assume that we use AtomicU64 instead of AtomicI64):
// This is guaranteed to return a monotonic timestamp. If clock skew is detected
// then this method will increment the last timestamp.
fn compute_next(&self, last: SystemTime) -> SystemTime {
let current = SystemTime::now();
if current > last {
return current;
}
if let Some(cfg) = self.config.as_ref() {
// Panic safety: in the previous branch we return if "last" is earlier.
let skew = last.duration_since(current).unwrap();
if skew > cfg.warning_threshold {
let mut last_warn = self.last_warning.lock().unwrap();
let now = Instant::now();
//
if now >= last_warn.checked_add(cfg.warning_interval).unwrap() {
*last_warn = now;
drop(last_warn);
// Generated timestamps can only increase, so "last" can't be behind the epoch.
warn!(
"Clock skew detected. The current time ({:?}) was {:?} \
microseconds behind the last generated timestamp ({:?}). \
The next generated timestamp will be artificially incremented \
to guarantee monotonicity.",
current, skew, last
)
}
}
}
// Timestamps so far into the future that we can't represent them with
// SystemTime is not the case we need to consider.
last.checked_add(Duration::from_micros(1)).unwrap()
}
fn next_timestamp(&self) -> i64 {
loop {
let last_micros = self.last.load(Ordering::SeqCst);
// Panic safety: the microseconds value is either 0 or was created from
// a system time, so it is safe to turn it back into system time.
let last_time = UNIX_EPOCH
.checked_add(Duration::from_micros(last_micros))
.unwrap();
let cur = self.compute_next(last_time);
// Initial value is 0, which will work here, and values from "compute_next"
// can never decrease, so it is not possible for "cur" to be behind epoch.
// Lossy conversion to u64 is fine - we don't need to care about timestamps
// so far into the future.
let cur_micros = cur.duration_since(UNIX_EPOCH).unwrap().as_micros() as u64;
if self
.last
.compare_exchange(last_micros, cur_micros, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// Again, no need to consider so big timestamps.
return cur_micros as i64;
}
}
}
I change compute_next
to operate on typed time-related types and moved integer conversions to next_timestamp. I change the warning to use debug impls for the types. Notice that we no longer need to separately handle to case of clock behind epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparison on SystemTime operates at a nanosecond level, so this does not generate monotonic timestamps. We would have to get the actual timestamp to compare it properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, but then we lose the panic safety, because current may be later that last, just not late enough to generate a new timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparison on SystemTime operates at a nanosecond level, so this does not generate monotonic timestamps. We would have to get the actual timestamp to compare it properly.
Ah, I did not think about that, sorry.
/// guarantee becomes void. | ||
pub struct MonotonicTimestampGenerator { | ||
last: AtomicI64, | ||
last_warning: Mutex<Instant>, | ||
config: Option<MonotonicTimestampGeneratorCfg>, | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 Why do you store i64 instead of u64? I don't think we intend to store times before epoch, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it would be more elegant, as IIRC timestamps are i64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I forgot to delete this comment.
Neither of the types are perfect here. Protocol uses i64, but forbids negative values - another artifact of this protocol being designed with only Java in mind 🤦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the AtomicU64 variant has the advantage that Duration::from_micros
accepts u64, so I think it is the better choice in this case.
The trait should still use i64 - because that is what the protocol uses (even if it forbids negative values).
149844f
to
6887705
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 Overally, looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much better. One question: I remember there was an example added in this PR (in the examples/
folder). Did you remove it or do I remember it wrong?
I don't think there ever was an example, you might have misremembered the code from the timestamp-generators.md as one |
6887705
to
5d78e6b
Compare
c01a5aa
to
ed7b08d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Small nit and a question
Added TimestampGenerator trait and MonotonicTimestampGenerator based on c++ driver's implementation
Also added an ability to set it through Session Builder
The timestamp generator in ConnectionConfig is set in Session::Connect()
ed7b08d
to
9c2059d
Compare
Generated timestamp is only set if user did not provide one
9c2059d
to
8b0e125
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contribution!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me now, thanks
To achieve parity with cpp-driver we need to implement client-side timestamp generators.
This pull request adds a TimestampGenerator trait and a MonotonicTimestampGenerator that implements it,
together with an extension to SessionBuilder that provides an ability to set a TimestampGenerator in Session
and use it to generate timestamps.
Fixes #1032
Pre-review checklist
./docs/source/
.Fixes:
annotations to PR description.