Skip to content

Commit

Permalink
Public API: Use NewSerializedValues / SerializeRow
Browse files Browse the repository at this point in the history
This is one of the main steps of serialization refactor: using new trait
in public API. Thanks to our implementations for this new trait
migrating should not be a huge issue for users - in many cases being
fully compatible.

One change missing from this commit is Batch support, as this requires
more changes (which will also be more breaking and require more
migration effort), so it will be implemented separately.
  • Loading branch information
Lorak-mmk committed Nov 23, 2023
1 parent 728d8f7 commit 0adab94
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 179 deletions.
25 changes: 18 additions & 7 deletions scylla-cql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use std::borrow::Cow;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

use scylla_cql::frame::request::SerializableRequest;
use scylla_cql::frame::value::SerializedValues;
use scylla_cql::frame::value::ValueList;
use scylla_cql::frame::response::result::ColumnType;
use scylla_cql::frame::{request::query, Compression, SerializedRequest};
use scylla_cql::types::serialize::row::NewSerializedValues;

fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Query<'a> {
fn make_query(contents: &str, values: NewSerializedValues) -> query::Query<'_> {
query::Query {
contents: Cow::Borrowed(contents),
parameters: query::QueryParameters {
consistency: scylla_cql::Consistency::LocalQuorum,
serial_consistency: None,
values: Cow::Borrowed(values),
values: Cow::Owned(values),
page_size: None,
paging_state: None,
timestamp: None,
Expand All @@ -22,13 +22,24 @@ fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Que
}

fn serialized_request_make_bench(c: &mut Criterion) {
let mut values = NewSerializedValues::new();
let mut group = c.benchmark_group("LZ4Compression.SerializedRequest");
let query_args = [
("INSERT foo INTO ks.table_name (?)", &(1234,).serialized().unwrap()),
("INSERT foo, bar, baz INTO ks.table_name (?, ?, ?)", &(1234, "a value", "i am storing a string").serialized().unwrap()),
("INSERT foo INTO ks.table_name (?)", {
values.add_value(&1234, &ColumnType::Int).unwrap();
values.clone()
}),
("INSERT foo, bar, baz INTO ks.table_name (?, ?, ?)", {
values.add_value(&"a value", &ColumnType::Text).unwrap();
values.add_value(&"i am storing a string", &ColumnType::Text).unwrap();
values.clone()
}),
(
"INSERT foo, bar, baz, boop, blah INTO longer_keyspace.a_big_table_name (?, ?, ?, ?, 1000)",
&(1234, "a value", "i am storing a string", "dc0c8cd7-d954-47c1-8722-a857941c43fb").serialized().unwrap()
{
values.add_value(&"dc0c8cd7-d954-47c1-8722-a857941c43fb", &ColumnType::Text).unwrap();
values.clone()
}
),
];
let queries = query_args.map(|(q, v)| make_query(q, v));
Expand Down
36 changes: 26 additions & 10 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ mod tests {
query::{Query, QueryParameters},
DeserializableRequest, SerializableRequest,
},
response::result::ColumnType,
types::{self, SerialConsistency},
value::SerializedValues,
},
types::serialize::row::NewSerializedValues,
Consistency,
};

Expand All @@ -129,8 +130,8 @@ mod tests {
page_size: Some(323),
paging_state: Some(vec![2, 1, 3, 7].into()),
values: {
let mut vals = SerializedValues::new();
vals.add_value(&2137).unwrap();
let mut vals = NewSerializedValues::new();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Cow::Owned(vals)
},
};
Expand All @@ -156,9 +157,9 @@ mod tests {
page_size: None,
paging_state: None,
values: {
let mut vals = SerializedValues::new();
vals.add_named_value("the_answer", &42).unwrap();
vals.add_named_value("really?", &2137).unwrap();
let mut vals = NewSerializedValues::new();
vals.add_value(&42, &ColumnType::Int).unwrap();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Cow::Owned(vals)
},
};
Expand Down Expand Up @@ -189,8 +190,18 @@ mod tests {

// Not execute's values, because named values are not supported in batches.
values: vec![
query.parameters.values.deref().clone(),
query.parameters.values.deref().clone(),
query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values(),
query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values(),
],
};
{
Expand All @@ -212,7 +223,7 @@ mod tests {
timestamp: None,
page_size: None,
paging_state: None,
values: Cow::Owned(SerializedValues::new()),
values: Cow::Owned(NewSerializedValues::new()),
};
let query = Query {
contents: contents.clone(),
Expand Down Expand Up @@ -261,7 +272,12 @@ mod tests {
serial_consistency: None,
timestamp: None,

values: vec![query.parameters.values.deref().clone()],
values: vec![query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values()],
};
{
let mut buf = Vec::new();
Expand Down
24 changes: 14 additions & 10 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::borrow::Cow;

use crate::frame::{frame_errors::ParseError, types::SerialConsistency};
use crate::{
frame::{frame_errors::ParseError, types::SerialConsistency},
types::serialize::row::NewSerializedValues,
};
use bytes::{Buf, BufMut, Bytes};

use crate::{
frame::request::{RequestOpcode, SerializableRequest},
frame::types,
frame::value::SerializedValues,
};

use super::DeserializableRequest;
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct QueryParameters<'a> {
pub timestamp: Option<i64>,
pub page_size: Option<i32>,
pub paging_state: Option<Bytes>,
pub values: Cow<'a, SerializedValues>,
pub values: Cow<'a, NewSerializedValues>,
}

impl Default for QueryParameters<'_> {
Expand All @@ -72,7 +74,7 @@ impl Default for QueryParameters<'_> {
timestamp: None,
page_size: None,
paging_state: None,
values: Cow::Borrowed(SerializedValues::EMPTY),
values: Cow::Owned(NewSerializedValues::new()),
}
}
}
Expand Down Expand Up @@ -102,10 +104,6 @@ impl QueryParameters<'_> {
flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
}

if self.values.has_names() {
flags |= FLAG_WITH_NAMES_FOR_VALUES;
}

buf.put_u8(flags);

if !self.values.is_empty() {
Expand Down Expand Up @@ -151,10 +149,16 @@ impl<'q> QueryParameters<'q> {
let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;

if values_have_names_flag {
return Err(ParseError::BadIncomingData(
"Named values in frame are currently unsupported".to_string(),
));
}

let values = Cow::Owned(if values_flag {
SerializedValues::new_from_frame(buf, values_have_names_flag)?
NewSerializedValues::new_from_frame(buf)?
} else {
SerializedValues::new()
NewSerializedValues::new()
});

let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
Expand Down
60 changes: 43 additions & 17 deletions scylla/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use criterion::{criterion_group, criterion_main, Criterion};
use bytes::BytesMut;
use scylla::{
frame::types,
frame::value::ValueList,
transport::partitioner::{calculate_token_for_partition_key, Murmur3Partitioner},
};
use scylla_cql::{frame::response::result::ColumnType, types::serialize::row::NewSerializedValues};

fn types_benchmark(c: &mut Criterion) {
let mut buf = BytesMut::with_capacity(64);
Expand Down Expand Up @@ -40,23 +40,49 @@ fn types_benchmark(c: &mut Criterion) {
}

fn calculate_token_bench(c: &mut Criterion) {
let simple_pk = ("I'm prepared!!!",);
let serialized_simple_pk = simple_pk.serialized().unwrap().into_owned();
let simple_pk_long_column = (
17_i32,
16_i32,
String::from_iter(std::iter::repeat('.').take(2000)),
);
let serialized_simple_pk_long_column = simple_pk_long_column.serialized().unwrap().into_owned();
let mut serialized_simple_pk = NewSerializedValues::new();
serialized_simple_pk
.add_value(&"I'm prepared!!!", &ColumnType::Text)
.unwrap();

let complex_pk = (17_i32, 16_i32, "I'm prepared!!!");
let serialized_complex_pk = complex_pk.serialized().unwrap().into_owned();
let complex_pk_long_column = (
17_i32,
16_i32,
String::from_iter(std::iter::repeat('.').take(2000)),
);
let serialized_values_long_column = complex_pk_long_column.serialized().unwrap().into_owned();
let mut serialized_simple_pk_long_column = NewSerializedValues::new();
serialized_simple_pk_long_column
.add_value(&17_i32, &ColumnType::Int)
.unwrap();
serialized_simple_pk_long_column
.add_value(&16_i32, &ColumnType::Int)
.unwrap();
serialized_simple_pk_long_column
.add_value(
&String::from_iter(std::iter::repeat('.').take(2000)),
&ColumnType::Text,
)
.unwrap();

let mut serialized_complex_pk = NewSerializedValues::new();
serialized_complex_pk
.add_value(&17_i32, &ColumnType::Int)
.unwrap();
serialized_complex_pk
.add_value(&16_i32, &ColumnType::Int)
.unwrap();
serialized_complex_pk
.add_value(&"I'm prepared!!!", &ColumnType::Text)
.unwrap();

let mut serialized_values_long_column = NewSerializedValues::new();
serialized_values_long_column
.add_value(&17_i32, &ColumnType::Int)
.unwrap();
serialized_values_long_column
.add_value(&16_i32, &ColumnType::Int)
.unwrap();
serialized_values_long_column
.add_value(
&String::from_iter(std::iter::repeat('.').take(2000)),
&ColumnType::Text,
)
.unwrap();

c.bench_function("calculate_token_from_partition_key simple pk", |b| {
b.iter(|| calculate_token_for_partition_key(&serialized_simple_pk, &Murmur3Partitioner))
Expand Down
Loading

0 comments on commit 0adab94

Please sign in to comment.