Skip to content

Commit

Permalink
Add metrics to express data cache
Browse files Browse the repository at this point in the history
Add test case to ensure the empty cache case returns Ok(None) rather than Err(...)

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish committed Nov 20, 2024
1 parent 9d26b3c commit 31877fb
Showing 1 changed file with 80 additions and 19 deletions.
99 changes: 80 additions & 19 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use std::collections::HashMap;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use std::time::Instant;

use async_trait::async_trait;
use base64ct::{Base64, Encoding};
Expand Down Expand Up @@ -111,11 +111,16 @@ where
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();

if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

Expand All @@ -130,8 +135,14 @@ where
.await
{
Ok(result) => result,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
};

pin_mut!(result);
Expand All @@ -144,33 +155,52 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
result.as_mut().increment_read_window(self.config.block_size as usize);
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
}
}
let buffer = buffer.freeze();

let object_metadata = result
.get_object_metadata()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let object_metadata = result.get_object_metadata().await.map_err(|err| {
emit_failure_metric_read("invalid_block_metadata");
DataCacheError::IoFailure(err.into())
})?;

let checksum = result
.get_object_checksum()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let crc32c = crc32c_from_base64(&checksum.checksum_crc32c.ok_or(DataCacheError::BlockChecksumMissing)?)
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let checksum = result.get_object_checksum().await.map_err(|err| {
emit_failure_metric_read("invalid_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| {
emit_failure_metric_read("missing_block_checksum");
DataCacheError::BlockChecksumMissing
})?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| {
emit_failure_metric_read("unparsable_block_checksum");
DataCacheError::IoFailure(err.into())
})?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata.validate_object_metadata(&object_metadata)?;
block_metadata
.validate_object_metadata(&object_metadata)
.inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?;

metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64);
metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}
Expand All @@ -183,18 +213,25 @@ where
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_write("invalid_block_metadata");
return Err(DataCacheError::InvalidBlockOffset);
}

let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
let (data, checksum) = bytes.into_inner().map_err(|_| {
emit_failure_metric_write("invalid_block_content");
DataCacheError::InvalidBlockContent
})?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let data_length = data.len() as u64;

self.client
.put_object_single(
Expand All @@ -204,8 +241,13 @@ where
data,
)
.in_current_span()
.await?;
.await
.inspect_err(|_| {
emit_failure_metric_write("put_object_failure");
})?;

metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length);
metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64);
Ok(())
}

Expand All @@ -214,6 +256,17 @@ where
}
}

#[inline]
fn emit_failure_metric_read(reason: &'static str) {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
}

#[inline]
fn emit_failure_metric_write(reason: &'static str) {
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
}

/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
/// wanting to get (and avoid collisions with the key).
/// On miss, bypass the cache and go to the main data source.
Expand Down Expand Up @@ -508,6 +561,7 @@ mod tests {
let data = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests());

// Setup cache
let object_key = get_s3_key(
Expand Down Expand Up @@ -576,6 +630,13 @@ mod tests {
.await
.expect_err("cache should return error if object metadata doesn't match data");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));

// Get data that's not been written yet
let result = cache
.get_block(&cache_key_non_existent, 0, 0, data.len())
.await
.expect("cache should return None if data is not present");
assert_eq!(result, None);
}

#[tokio::test]
Expand Down

0 comments on commit 31877fb

Please sign in to comment.