diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 5240ae952..c2d63d89a 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -170,7 +170,7 @@ where pub type ObjectClientResult = Result>; /// Errors returned by a [`get_object`](ObjectClient::get_object) request -#[derive(Debug, Error, PartialEq, Eq)] +#[derive(Clone, Debug, Error, PartialEq, Eq)] #[non_exhaustive] pub enum GetObjectError { #[error("The bucket does not exist")] diff --git a/mountpoint-s3-client/src/s3_crt_client/get_object.rs b/mountpoint-s3-client/src/s3_crt_client/get_object.rs index f2be3c0df..36a61da3e 100644 --- a/mountpoint-s3-client/src/s3_crt_client/get_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/get_object.rs @@ -30,6 +30,8 @@ pub enum ObjectHeadersError { UnknownError, #[error("requested object checksums, but did not specify it in the request")] DidNotRequestChecksums, + #[error("the underlying get object request failed: {0}")] + GetObjectError(GetObjectError), } impl S3CrtClient { @@ -114,11 +116,14 @@ impl S3CrtClient { let _ = sender.unbounded_send(Ok((offset, data.into()))); }, move |result| { - // FIXME - Ideally we'd include a reason why we failed here. - object_headers_setter_on_finish.or_set(Err(ObjectHeadersError::UnknownError)); if result.is_err() { - Err(parse_get_object_error(result).map(ObjectClientError::ServiceError)) + let err = parse_get_object_error(result); + object_headers_setter_on_finish.or_set(Err(err + .clone() + .map_or(ObjectHeadersError::UnknownError, ObjectHeadersError::GetObjectError))); + Err(err.map(ObjectClientError::ServiceError)) } else { + object_headers_setter_on_finish.or_set(Err(ObjectHeadersError::UnknownError)); Ok(()) } }, @@ -173,7 +178,10 @@ impl S3GetObjectRequest { self.headers.get().await } } - .map_err(|_| ObjectClientError::ClientError(S3RequestError::RequestCanceled)) + .map_err(|err| match err { + ObjectHeadersError::GetObjectError(err) => ObjectClientError::ServiceError(err), + _ => ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(err))), + }) } } diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index 46ce485e4..e2bee70d8 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -1,7 +1,7 @@ +use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; use crate::object::ObjectId; use std::collections::HashMap; - -use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; +use std::time::Instant; use async_trait::async_trait; use base64ct::{Base64, Encoding}; @@ -99,6 +99,30 @@ where Ok(()) } + + fn handle_get_object_err( + &self, + err: ObjectClientError, + ) -> DataCacheResult> { + // Error cases only. This can never return actual bytes. + match err { + ObjectClientError::ServiceError(GetObjectError::NoSuchKey) => { + metrics::counter!("express_data_cache.block_hit").increment(0); + Ok(None) + } + e => { + self.emit_failure_metric("get_object_failure", MetricsType::Read); + Err(DataCacheError::IoFailure(e.into())) + } + } + } + + fn emit_failure_metric(&self, reason: &'static str, type_: MetricsType) { + if matches!(type_, MetricsType::Read) { + metrics::counter!("express_data_cache.block_hit").increment(0); + } + metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => type_.as_str()).increment(1); + } } #[async_trait] @@ -113,11 +137,15 @@ where block_offset: u64, object_size: usize, ) -> DataCacheResult> { + let start = Instant::now(); + if object_size > self.config.max_object_size { + self.emit_failure_metric("over_max_object_size", MetricsType::Read); return Ok(None); } if block_offset != block_idx * self.config.block_size { + self.emit_failure_metric("invalid_block_offset", MetricsType::Read); return Err(DataCacheError::InvalidBlockOffset); } @@ -132,24 +160,31 @@ where .await { Ok(result) => result, - Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None), - Err(e) => return Err(e.into()), + Err(err) => return self.handle_get_object_err(err), }; - let object_metadata = result - .get_object_metadata() - .await - .map_err(|err| DataCacheError::IoFailure(err.into()))?; + let object_metadata = match result.get_object_metadata().await { + Ok(metadata) => metadata, + Err(err) => return self.handle_get_object_err(err), + }; - let checksum = result - .get_object_checksum() - .await - .map_err(|err| DataCacheError::IoFailure(err.into()))?; - let crc32c = crc32c_from_base64(&checksum.checksum_crc32c.ok_or(DataCacheError::BlockChecksumMissing)?) - .map_err(|err| DataCacheError::IoFailure(err.into()))?; + let checksum = match result.get_object_checksum().await { + Ok(checksum) => checksum, + Err(err) => return self.handle_get_object_err(err), + }; + let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| { + self.emit_failure_metric("block_checksum_missing", MetricsType::Read); + DataCacheError::BlockChecksumMissing + })?; + let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| { + self.emit_failure_metric("block_checksum_unparsable", MetricsType::Read); + DataCacheError::IoFailure(err.into()) + })?; let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c); - block_metadata.validate_object_metadata(&object_metadata)?; + block_metadata + .validate_object_metadata(&object_metadata) + .inspect_err(|_| self.emit_failure_metric("invalid_block_metadata", MetricsType::Read))?; pin_mut!(result); // Guarantee that the request will start even in case of `initial_read_window == 0`. @@ -161,6 +196,7 @@ where match chunk { Ok((offset, body)) => { if offset != buffer.len() as u64 { + self.emit_failure_metric("invalid_block_offset", MetricsType::Read); return Err(DataCacheError::InvalidBlockOffset); } buffer.extend_from_slice(&body); @@ -168,11 +204,16 @@ where // Ensure the flow-control window is large enough. result.as_mut().increment_read_window(self.config.block_size as usize); } - Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None), - Err(e) => return Err(e.into()), + Err(e) => { + return self.handle_get_object_err(e); + } } } let buffer = buffer.freeze(); + + metrics::counter!("express_data_cache.block_hit").increment(1); + metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64); + metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64); Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c))) } @@ -184,18 +225,25 @@ where bytes: ChecksummedBytes, object_size: usize, ) -> DataCacheResult<()> { + let start = Instant::now(); if object_size > self.config.max_object_size { + self.emit_failure_metric("over_max_object_size", MetricsType::Write); return Ok(()); } if block_offset != block_idx * self.config.block_size { + self.emit_failure_metric("invalid_block_metadata", MetricsType::Write); return Err(DataCacheError::InvalidBlockOffset); } let object_key = get_s3_key(&self.prefix, &cache_key, block_idx); - let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; + let (data, checksum) = bytes.into_inner().map_err(|_| { + self.emit_failure_metric("invalid_block_content", MetricsType::Write); + DataCacheError::InvalidBlockContent + })?; let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum); + let data_length = data.len() as u64; self.client .put_object_single( @@ -205,8 +253,13 @@ where data, ) .in_current_span() - .await?; + .await + .inspect_err(|_| { + self.emit_failure_metric("put_object_failure", MetricsType::Write); + })?; + metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length); + metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64); Ok(()) } @@ -326,6 +379,20 @@ impl BlockMetadata { } } +enum MetricsType { + Read, + Write, +} + +impl MetricsType { + fn as_str(&self) -> &'static str { + match self { + MetricsType::Read => "read", + MetricsType::Write => "write", + } + } +} + /// Get the prefix for objects we'll be creating in S3 pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String { hex::encode( @@ -509,6 +576,7 @@ mod tests { let data = ChecksummedBytes::new("Foo".into()); let data_2 = ChecksummedBytes::new("Bar".into()); let cache_key = ObjectId::new("a".into(), ETag::for_tests()); + let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests()); // Setup cache let object_key = get_s3_key( @@ -577,6 +645,13 @@ mod tests { .await .expect_err("cache should return error if object metadata doesn't match data"); assert!(matches!(err, DataCacheError::InvalidBlockHeader(_))); + + // Get data that's not been written yet + let result = cache + .get_block(&cache_key_non_existent, 0, 0, data.len()) + .await + .expect("cache should return None if data is not present"); + assert_eq!(result, None); } #[tokio::test]