diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index fe6301ce3..a767f75ae 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -1,7 +1,7 @@ +use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; use crate::object::ObjectId; use std::collections::HashMap; - -use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; +use std::time::Instant; use async_trait::async_trait; use base64ct::{Base64, Encoding}; @@ -111,11 +111,16 @@ where block_offset: u64, object_size: usize, ) -> DataCacheResult> { + let start = Instant::now(); + if object_size > self.config.max_object_size { + metrics::counter!("express_data_cache.block_hit").increment(0); + metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1); return Ok(None); } if block_offset != block_idx * self.config.block_size { + emit_failure_metric_read("invalid_block_offset"); return Err(DataCacheError::InvalidBlockOffset); } @@ -130,8 +135,14 @@ where .await { Ok(result) => result, - Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None), - Err(e) => return Err(e.into()), + Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => { + metrics::counter!("express_data_cache.block_hit").increment(0); + return Ok(None); + } + Err(e) => { + emit_failure_metric_read("get_object_failure"); + return Err(DataCacheError::IoFailure(e.into())); + } }; pin_mut!(result); @@ -144,6 +155,7 @@ where match chunk { Ok((offset, body)) => { if offset != buffer.len() as u64 { + emit_failure_metric_read("invalid_block_offset"); return Err(DataCacheError::InvalidBlockOffset); } buffer.extend_from_slice(&body); @@ -151,26 +163,44 @@ where // Ensure the flow-control window is large enough. result.as_mut().increment_read_window(self.config.block_size as usize); } - Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None), - Err(e) => return Err(e.into()), + Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => { + metrics::counter!("express_data_cache.block_hit").increment(0); + return Ok(None); + } + Err(e) => { + emit_failure_metric_read("get_object_failure"); + return Err(DataCacheError::IoFailure(e.into())); + } } } let buffer = buffer.freeze(); - let object_metadata = result - .get_object_metadata() - .await - .map_err(|err| DataCacheError::IoFailure(err.into()))?; + let object_metadata = result.get_object_metadata().await.map_err(|err| { + emit_failure_metric_read("invalid_block_metadata"); + DataCacheError::IoFailure(err.into()) + })?; - let checksum = result - .get_object_checksum() - .await - .map_err(|err| DataCacheError::IoFailure(err.into()))?; - let crc32c = crc32c_from_base64(&checksum.checksum_crc32c.ok_or(DataCacheError::BlockChecksumMissing)?) - .map_err(|err| DataCacheError::IoFailure(err.into()))?; + let checksum = result.get_object_checksum().await.map_err(|err| { + emit_failure_metric_read("invalid_block_checksum"); + DataCacheError::IoFailure(err.into()) + })?; + let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| { + emit_failure_metric_read("missing_block_checksum"); + DataCacheError::BlockChecksumMissing + })?; + let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| { + emit_failure_metric_read("unparsable_block_checksum"); + DataCacheError::IoFailure(err.into()) + })?; let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c); - block_metadata.validate_object_metadata(&object_metadata)?; + block_metadata + .validate_object_metadata(&object_metadata) + .inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?; + + metrics::counter!("express_data_cache.block_hit").increment(1); + metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64); + metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64); Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c))) } @@ -183,18 +213,25 @@ where bytes: ChecksummedBytes, object_size: usize, ) -> DataCacheResult<()> { + let start = Instant::now(); if object_size > self.config.max_object_size { + metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1); return Ok(()); } if block_offset != block_idx * self.config.block_size { + emit_failure_metric_write("invalid_block_metadata"); return Err(DataCacheError::InvalidBlockOffset); } let object_key = get_s3_key(&self.prefix, &cache_key, block_idx); - let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; + let (data, checksum) = bytes.into_inner().map_err(|_| { + emit_failure_metric_write("invalid_block_content"); + DataCacheError::InvalidBlockContent + })?; let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum); + let data_length = data.len() as u64; self.client .put_object_single( @@ -204,8 +241,13 @@ where data, ) .in_current_span() - .await?; + .await + .inspect_err(|_| { + emit_failure_metric_write("put_object_failure"); + })?; + metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length); + metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64); Ok(()) } @@ -214,6 +256,17 @@ where } } +#[inline] +fn emit_failure_metric_read(reason: &'static str) { + metrics::counter!("express_data_cache.block_hit").increment(0); + metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1); +} + +#[inline] +fn emit_failure_metric_write(reason: &'static str) { + metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1); +} + /// Metadata about the cached object to ensure that the object we've retrieved is the one we were /// wanting to get (and avoid collisions with the key). /// On miss, bypass the cache and go to the main data source. @@ -508,6 +561,7 @@ mod tests { let data = ChecksummedBytes::new("Foo".into()); let data_2 = ChecksummedBytes::new("Bar".into()); let cache_key = ObjectId::new("a".into(), ETag::for_tests()); + let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests()); // Setup cache let object_key = get_s3_key( @@ -576,6 +630,13 @@ mod tests { .await .expect_err("cache should return error if object metadata doesn't match data"); assert!(matches!(err, DataCacheError::InvalidBlockHeader(_))); + + // Get data that's not been written yet + let result = cache + .get_block(&cache_key_non_existent, 0, 0, data.len()) + .await + .expect("cache should return None if data is not present"); + assert_eq!(result, None); } #[tokio::test]