Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to express data cache #1146

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 80 additions & 19 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use std::collections::HashMap;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use std::time::Instant;

use async_trait::async_trait;
use base64ct::{Base64, Encoding};
Expand Down Expand Up @@ -111,11 +111,16 @@ where
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many points of failure.
Shall we factor out the logic into an inner method and only handle the metrics here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully, we won't need handle_get_object_err anymore.

Copy link
Contributor Author

@muddyfish muddyfish Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not planning on refactoring into an inner method, as that makes the reasons harder to propagate. Did remove the handle_get_object_err function though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative could be to still split into an inner function which records the failure (and reason), but move the hit/miss metric to the wrapper.

Happy to review the approach later, though.

let start = Instant::now();

if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

Expand All @@ -130,8 +135,14 @@ where
.await
{
Ok(result) => result,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
};

pin_mut!(result);
Expand All @@ -144,33 +155,52 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
result.as_mut().increment_read_window(self.config.block_size as usize);
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
}
}
let buffer = buffer.freeze();

let object_metadata = result
.get_object_metadata()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let object_metadata = result.get_object_metadata().await.map_err(|err| {
emit_failure_metric_read("invalid_block_metadata");
DataCacheError::IoFailure(err.into())
})?;

let checksum = result
.get_object_checksum()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let crc32c = crc32c_from_base64(&checksum.checksum_crc32c.ok_or(DataCacheError::BlockChecksumMissing)?)
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let checksum = result.get_object_checksum().await.map_err(|err| {
emit_failure_metric_read("invalid_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| {
emit_failure_metric_read("missing_block_checksum");
DataCacheError::BlockChecksumMissing
})?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| {
emit_failure_metric_read("unparsable_block_checksum");
DataCacheError::IoFailure(err.into())
})?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata.validate_object_metadata(&object_metadata)?;
block_metadata
.validate_object_metadata(&object_metadata)
.inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?;

metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64);
metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}
Expand All @@ -183,18 +213,25 @@ where
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
passaro marked this conversation as resolved.
Show resolved Hide resolved
let start = Instant::now();
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_write("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
let (data, checksum) = bytes.into_inner().map_err(|_| {
emit_failure_metric_write("invalid_block_content");
DataCacheError::InvalidBlockContent
})?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let data_length = data.len() as u64;

self.client
.put_object_single(
Expand All @@ -204,8 +241,13 @@ where
data,
)
.in_current_span()
.await?;
.await
.inspect_err(|_| {
emit_failure_metric_write("put_object_failure");
})?;

metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length);
metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64);
Ok(())
}

Expand All @@ -214,6 +256,17 @@ where
}
}

#[inline]
fn emit_failure_metric_read(reason: &'static str) {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
}

#[inline]
fn emit_failure_metric_write(reason: &'static str) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, non-blocking: we could just inline

metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
}

/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
/// wanting to get (and avoid collisions with the key).
/// On miss, bypass the cache and go to the main data source.
Expand Down Expand Up @@ -508,6 +561,7 @@ mod tests {
let data = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests());

// Setup cache
let object_key = get_s3_key(
Expand Down Expand Up @@ -576,6 +630,13 @@ mod tests {
.await
.expect_err("cache should return error if object metadata doesn't match data");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));

// Get data that's not been written yet
let result = cache
.get_block(&cache_key_non_existent, 0, 0, data.len())
.await
.expect("cache should return None if data is not present");
assert_eq!(result, None);
}

#[tokio::test]
Expand Down
Loading