Skip to content

Commit

Permalink
Add metrics to express data cache
Browse files Browse the repository at this point in the history
Add test case to ensure the empty cache case returns Ok(None) rather than Err(...)

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish committed Nov 19, 2024
1 parent 378a56c commit 339d2ce
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 24 deletions.
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ where
pub type ObjectClientResult<T, S, C> = Result<T, ObjectClientError<S, C>>;

/// Errors returned by a [`get_object`](ObjectClient::get_object) request
#[derive(Debug, Error, PartialEq, Eq)]
#[derive(Clone, Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum GetObjectError {
#[error("The bucket does not exist")]
Expand Down
16 changes: 12 additions & 4 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum ObjectHeadersError {
UnknownError,
#[error("requested object checksums, but did not specify it in the request")]
DidNotRequestChecksums,
#[error("the underlying get object request failed: {0}")]
GetObjectError(GetObjectError),
}

impl S3CrtClient {
Expand Down Expand Up @@ -114,11 +116,14 @@ impl S3CrtClient {
let _ = sender.unbounded_send(Ok((offset, data.into())));
},
move |result| {
// FIXME - Ideally we'd include a reason why we failed here.
object_headers_setter_on_finish.or_set(Err(ObjectHeadersError::UnknownError));
if result.is_err() {
Err(parse_get_object_error(result).map(ObjectClientError::ServiceError))
let err = parse_get_object_error(result);
object_headers_setter_on_finish.or_set(Err(err
.clone()
.map_or(ObjectHeadersError::UnknownError, ObjectHeadersError::GetObjectError)));
Err(err.map(ObjectClientError::ServiceError))
} else {
object_headers_setter_on_finish.or_set(Err(ObjectHeadersError::UnknownError));
Ok(())
}
},
Expand Down Expand Up @@ -173,7 +178,10 @@ impl S3GetObjectRequest {
self.headers.get().await
}
}
.map_err(|_| ObjectClientError::ClientError(S3RequestError::RequestCanceled))
.map_err(|err| match err {
ObjectHeadersError::GetObjectError(err) => ObjectClientError::ServiceError(err),
_ => ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(err))),
})
}
}

Expand Down
113 changes: 94 additions & 19 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use std::collections::HashMap;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use std::time::Instant;

use async_trait::async_trait;
use base64ct::{Base64, Encoding};
Expand Down Expand Up @@ -99,6 +99,30 @@ where

Ok(())
}

fn handle_get_object_err(
&self,
err: ObjectClientError<GetObjectError, Client::ClientError>,
) -> DataCacheResult<Option<ChecksummedBytes>> {
// Error cases only. This can never return actual bytes.
match err {
ObjectClientError::ServiceError(GetObjectError::NoSuchKey) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
Ok(None)
}
e => {
self.emit_failure_metric("get_object_failure", MetricsType::Read);
Err(DataCacheError::IoFailure(e.into()))
}
}
}

fn emit_failure_metric(&self, reason: &'static str, type_: MetricsType) {
if matches!(type_, MetricsType::Read) {
metrics::counter!("express_data_cache.block_hit").increment(0);
}
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => type_.as_str()).increment(1);
}
}

#[async_trait]
Expand All @@ -113,11 +137,15 @@ where
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();

if object_size > self.config.max_object_size {
self.emit_failure_metric("over_max_object_size", MetricsType::Read);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
self.emit_failure_metric("invalid_block_offset", MetricsType::Read);
return Err(DataCacheError::InvalidBlockOffset);
}

Expand All @@ -132,24 +160,31 @@ where
.await
{
Ok(result) => result,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(err) => return self.handle_get_object_err(err),
};

let object_metadata = result
.get_object_metadata()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let object_metadata = match result.get_object_metadata().await {
Ok(metadata) => metadata,
Err(err) => return self.handle_get_object_err(err),
};

let checksum = result
.get_object_checksum()
.await
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let crc32c = crc32c_from_base64(&checksum.checksum_crc32c.ok_or(DataCacheError::BlockChecksumMissing)?)
.map_err(|err| DataCacheError::IoFailure(err.into()))?;
let checksum = match result.get_object_checksum().await {
Ok(checksum) => checksum,
Err(err) => return self.handle_get_object_err(err),
};
let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| {
self.emit_failure_metric("block_checksum_missing", MetricsType::Read);
DataCacheError::BlockChecksumMissing
})?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| {
self.emit_failure_metric("block_checksum_unparsable", MetricsType::Read);
DataCacheError::IoFailure(err.into())
})?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata.validate_object_metadata(&object_metadata)?;
block_metadata
.validate_object_metadata(&object_metadata)
.inspect_err(|_| self.emit_failure_metric("invalid_block_metadata", MetricsType::Read))?;

pin_mut!(result);
// Guarantee that the request will start even in case of `initial_read_window == 0`.
Expand All @@ -161,18 +196,24 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
self.emit_failure_metric("invalid_block_offset", MetricsType::Read);
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
result.as_mut().increment_read_window(self.config.block_size as usize);
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
Err(e) => {
return self.handle_get_object_err(e);
}
}
}
let buffer = buffer.freeze();

metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64);
metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);
Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}

Expand All @@ -184,18 +225,25 @@ where
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
if object_size > self.config.max_object_size {
self.emit_failure_metric("over_max_object_size", MetricsType::Write);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
self.emit_failure_metric("invalid_block_metadata", MetricsType::Write);
return Err(DataCacheError::InvalidBlockOffset);
}

let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
let (data, checksum) = bytes.into_inner().map_err(|_| {
self.emit_failure_metric("invalid_block_content", MetricsType::Write);
DataCacheError::InvalidBlockContent
})?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let data_length = data.len() as u64;

self.client
.put_object_single(
Expand All @@ -205,8 +253,13 @@ where
data,
)
.in_current_span()
.await?;
.await
.inspect_err(|_| {
self.emit_failure_metric("put_object_failure", MetricsType::Write);
})?;

metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length);
metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64);
Ok(())
}

Expand Down Expand Up @@ -326,6 +379,20 @@ impl BlockMetadata {
}
}

enum MetricsType {
Read,
Write,
}

impl MetricsType {
fn as_str(&self) -> &'static str {
match self {
MetricsType::Read => "read",
MetricsType::Write => "write",
}
}
}

/// Get the prefix for objects we'll be creating in S3
pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
hex::encode(
Expand Down Expand Up @@ -509,6 +576,7 @@ mod tests {
let data = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests());

// Setup cache
let object_key = get_s3_key(
Expand Down Expand Up @@ -577,6 +645,13 @@ mod tests {
.await
.expect_err("cache should return error if object metadata doesn't match data");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));

// Get data that's not been written yet
let result = cache
.get_block(&cache_key_non_existent, 0, 0, data.len())
.await
.expect("cache should return None if data is not present");
assert_eq!(result, None);
}

#[tokio::test]
Expand Down

0 comments on commit 339d2ce

Please sign in to comment.