Skip to content

Commit

Permalink
scylla-cql: Implement NewSerializedValues
Browse files Browse the repository at this point in the history
This struct is very similar to SerializedValues, but does not support
named values (as those are no longer necessary after serialization
refactor) and uses new serialization interface, guaranteeing better type
safety.
  • Loading branch information
Lorak-mmk committed Nov 23, 2023
1 parent 550ce4f commit 728d8f7
Showing 1 changed file with 210 additions and 4 deletions.
214 changes: 210 additions & 4 deletions scylla-cql/src/types/serialize/row.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::{collections::HashMap, sync::Arc};

use bytes::BufMut;
use thiserror::Error;

use crate::frame::response::result::PreparedMetadata;
use crate::frame::value::ValueList;
use crate::frame::frame_errors::ParseError;
use crate::frame::response::result::{ColumnType, PreparedMetadata};
use crate::frame::types;
use crate::frame::value::SerializedValues;
use crate::frame::value::{SerializeValuesError, ValueList};
use crate::frame::{response::result::ColumnSpec, types::RawValue};
use crate::types::serialize::BufBackedRowWriter;

use super::{CellWriter, RowWriter, SerializationError};
use super::value::SerializeCql;
use super::{BufBackedCellWriter, CellWriter, RowWriter, SerializationError};

/// Contains information needed to serialize a row.
pub struct RowSerializationContext<'a> {
Expand Down Expand Up @@ -125,13 +131,150 @@ pub enum ValueListToSerializeRowAdapterError {
NoBindMarkerWithName { name: String },
}

/// A container for already serialized values. It is not aware of the types of contained values,
/// it is basically a byte buffer in the format expected by the CQL protocol.
/// Usually there is no need for a user of a driver to use this struct, it is mostly internal.
/// The exception are APIs like `ClusterData::compute_token` / `ClusterData::get_endpoints`.
/// Allows adding new values to the buffer and iterating over the content.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct NewSerializedValues {
serialized_values: Vec<u8>,
element_count: u16,
}

impl NewSerializedValues {
pub const fn new() -> Self {
NewSerializedValues {
serialized_values: Vec::new(),
element_count: 0,
}
}

pub fn from_serializable<T: SerializeRow>(
ctx: &RowSerializationContext,
row: &T,
) -> Result<Self, SerializationError> {
let mut data = Vec::new();
let element_count = {
let mut writer = BufBackedRowWriter::new(&mut data);
T::preliminary_type_check(ctx)?;
row.serialize(ctx, &mut writer)?;
writer.value_count()
};

Ok(NewSerializedValues {
serialized_values: data,
element_count,
})
}

pub fn is_empty(&self) -> bool {
self.element_count() == 0
}

pub fn iter(&self) -> impl Iterator<Item = RawValue> {
NewSerializedValuesIterator {
serialized_values: &self.serialized_values,
}
}

pub fn element_count(&self) -> u16 {
// We initialize first two bytes in new() and BufBackedRowWriter does too,
// so this unwrap is safe
self.element_count
}

pub fn buffer_size(&self) -> usize {
self.serialized_values.len()
}

pub(crate) fn write_to_request(&self, buf: &mut impl BufMut) {
buf.put_u16(self.element_count);
buf.put(self.serialized_values.as_slice())
}

/// Serializes value and appends it to the list
pub fn add_value<T: SerializeCql>(
&mut self,
val: &T,
typ: &ColumnType,
) -> Result<(), SerializationError> {
if self.element_count() == u16::MAX {
return Err(SerializationError(Arc::new(
SerializeValuesError::TooManyValues,
)));
}

T::preliminary_type_check(typ)?;

let len_before_serialize: usize = self.serialized_values.len();

let writer = BufBackedCellWriter::new(&mut self.serialized_values);
if let Err(e) = val.serialize(typ, writer) {
self.serialized_values.resize(len_before_serialize, 0);
Err(e)
} else {
self.element_count += 1;
Ok(())
}
}

/// Creates value list from the request frame
pub(crate) fn new_from_frame(buf: &mut &[u8]) -> Result<Self, ParseError> {
let values_num = types::read_short(buf)?;
let values_beg = *buf;
for _ in 0..values_num {
let _serialized = types::read_value(buf)?;
}

let values_len_in_buf = values_beg.len() - buf.len();
let values_in_frame = &values_beg[0..values_len_in_buf];
Ok(NewSerializedValues {
serialized_values: values_in_frame.to_vec(),
element_count: values_num,
})
}

// Temporary function, to be removed when we implement new batching API (right now it is needed in frame::request::mod.rs tests)
#[allow(dead_code)]
pub(crate) fn into_old_serialized_values(self) -> SerializedValues {
let mut frame = Vec::new();
self.write_to_request(&mut frame);
SerializedValues::new_from_frame(&mut frame.as_slice(), false).unwrap()
}
}

impl Default for NewSerializedValues {
fn default() -> Self {
Self::new()
}
}

#[derive(Clone, Copy)]
pub struct NewSerializedValuesIterator<'a> {
serialized_values: &'a [u8],
}

impl<'a> Iterator for NewSerializedValuesIterator<'a> {
type Item = RawValue<'a>;

fn next(&mut self) -> Option<Self::Item> {
if self.serialized_values.is_empty() {
return None;
}

Some(types::read_value(&mut self.serialized_values).expect("badly encoded value"))
}
}

#[cfg(test)]
mod tests {
use crate::frame::response::result::{ColumnSpec, ColumnType, TableSpec};
use crate::frame::types::RawValue;
use crate::frame::value::{MaybeUnset, SerializedValues, ValueList};
use crate::types::serialize::BufBackedRowWriter;

use super::{RowSerializationContext, SerializeRow};
use super::{NewSerializedValues, RowSerializationContext, SerializeRow};

fn col_spec(name: &str, typ: ColumnType) -> ColumnSpec {
ColumnSpec {
Expand Down Expand Up @@ -202,4 +345,67 @@ mod tests {
// Skip the value count
assert_eq!(&sorted_row_data[2..], unsorted_row_data);
}

#[test]
fn test_empty_serialized_values() {
let values = NewSerializedValues::new();
assert!(values.is_empty());
assert_eq!(values.element_count(), 0);
assert_eq!(values.buffer_size(), 0);
assert_eq!(values.iter().count(), 0);
}

#[test]
fn test_serialized_values_content() {
let mut values = NewSerializedValues::new();
values.add_value(&1234i32, &ColumnType::Int).unwrap();
values.add_value(&"abcdefg", &ColumnType::Ascii).unwrap();
let mut buf = Vec::new();
values.write_to_request(&mut buf);
assert_eq!(
buf,
[
0, 2, // element count
0, 0, 0, 4, // size of int
0, 0, 4, 210, // content of int (1234)
0, 0, 0, 7, // size of string
97, 98, 99, 100, 101, 102, 103, // content of string ('abcdefg')
]
)
}

#[test]
fn test_serialized_values_iter() {
let mut values = NewSerializedValues::new();
values.add_value(&1234i32, &ColumnType::Int).unwrap();
values.add_value(&"abcdefg", &ColumnType::Ascii).unwrap();

let mut iter = values.iter();
assert_eq!(iter.next(), Some(RawValue::Value(&[0, 0, 4, 210])));
assert_eq!(
iter.next(),
Some(RawValue::Value(&[97, 98, 99, 100, 101, 102, 103]))
);
assert_eq!(iter.next(), None);
}

#[test]
fn test_serialized_values_max_capacity() {
let mut values = NewSerializedValues::new();
for _ in 0..65535 {
values
.add_value(&123456789i64, &ColumnType::BigInt)
.unwrap();
}

// Adding this value should fail, we reached max capacity
values
.add_value(&123456789i64, &ColumnType::BigInt)
.unwrap_err();

assert_eq!(values.iter().count(), 65535);
assert!(values
.iter()
.all(|v| v == RawValue::Value(&[0, 0, 0, 0, 0x07, 0x5b, 0xcd, 0x15])))
}
}

0 comments on commit 728d8f7

Please sign in to comment.