Skip to content

Commit

Permalink
Add metrics to express data cache (#1146)
Browse files Browse the repository at this point in the history
## Description of change

Adds metrics to express data cache
Fixes a bug where getting a cache miss would be reported as an error
rather than a cache miss

Relevant issues: N/A

## Does this change impact existing behavior?

Adds metrics, no user facing functionality changes.

## Does this change need a changelog entry in any of the crates?

No

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish authored Nov 20, 2024
1 parent 021da95 commit f7b4524
Showing 1 changed file with 80 additions and 19 deletions.
99 changes: 80 additions & 19 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use std::collections::HashMap;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use std::time::Instant;

use async_trait::async_trait;
use base64ct::{Base64, Encoding};
Expand Down Expand Up @@ -111,11 +111,16 @@ where
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();

if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

Expand All @@ -130,8 +135,14 @@ where
.await
{
Ok(result) => result,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
};

pin_mut!(result);
Expand All @@ -144,33 +155,52 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
result.as_mut().increment_read_window(self.config.block_size as usize);
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
}
}
let buffer = buffer.freeze();

let object_metadata = result
.get_object_metadata()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let object_metadata = result.get_object_metadata().await.map_err(|err| {
emit_failure_metric_read("invalid_block_metadata");
DataCacheError::IoFailure(err.into())
})?;

let checksum = result
.get_object_checksum()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let crc32c = crc32c_from_base64(&checksum.checksum_crc32c.ok_or(DataCacheError::BlockChecksumMissing)?)
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let checksum = result.get_object_checksum().await.map_err(|err| {
emit_failure_metric_read("invalid_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| {
emit_failure_metric_read("missing_block_checksum");
DataCacheError::BlockChecksumMissing
})?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| {
emit_failure_metric_read("unparsable_block_checksum");
DataCacheError::IoFailure(err.into())
})?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata.validate_object_metadata(&object_metadata)?;
block_metadata
.validate_object_metadata(&object_metadata)
.inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?;

metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64);
metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}
Expand All @@ -183,18 +213,25 @@ where
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_write("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
let (data, checksum) = bytes.into_inner().map_err(|_| {
emit_failure_metric_write("invalid_block_content");
DataCacheError::InvalidBlockContent
})?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let data_length = data.len() as u64;

self.client
.put_object_single(
Expand All @@ -204,8 +241,13 @@ where
data,
)
.in_current_span()
.await?;
.await
.inspect_err(|_| {
emit_failure_metric_write("put_object_failure");
})?;

metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length);
metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64);
Ok(())
}

Expand All @@ -214,6 +256,17 @@ where
}
}

#[inline]
fn emit_failure_metric_read(reason: &'static str) {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
}

#[inline]
fn emit_failure_metric_write(reason: &'static str) {
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
}

/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
/// wanting to get (and avoid collisions with the key).
/// On miss, bypass the cache and go to the main data source.
Expand Down Expand Up @@ -508,6 +561,7 @@ mod tests {
let data = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests());

// Setup cache
let object_key = get_s3_key(
Expand Down Expand Up @@ -576,6 +630,13 @@ mod tests {
.await
.expect_err("cache should return error if object metadata doesn't match data");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));

// Get data that's not been written yet
let result = cache
.get_block(&cache_key_non_existent, 0, 0, data.len())
.await
.expect("cache should return None if data is not present");
assert_eq!(result, None);
}

#[tokio::test]
Expand Down

0 comments on commit f7b4524

Please sign in to comment.