From b9a422b39e866c93e3bde52eb427339ba33e2600 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Wed, 18 Dec 2024 03:36:29 +0200 Subject: [PATCH] Directly implement ComputeAggregation (#2425) Co-authored-by: Cijo Thomas Co-authored-by: Cijo Thomas --- .../src/metrics/internal/aggregate.rs | 69 +++++++------------ .../metrics/internal/exponential_histogram.rs | 45 ++++++++---- .../src/metrics/internal/histogram.rs | 44 +++++++++--- .../src/metrics/internal/last_value.rs | 27 ++++++-- .../src/metrics/internal/precomputed_sum.rs | 22 +++++- opentelemetry-sdk/src/metrics/internal/sum.rs | 23 +++++-- opentelemetry-sdk/src/metrics/pipeline.rs | 2 +- 7 files changed, 150 insertions(+), 82 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index edfc50d9d1..64d50c1e1c 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -100,11 +100,7 @@ impl Default for AggregateTimeInitiator { /// Builds aggregate functions pub(crate) struct AggregateBuilder { /// The temporality used for the returned aggregate functions. - /// - /// If this is not provided, a default of cumulative will be used (except for the - /// last-value aggregate function where delta is the only appropriate - /// temporality). - temporality: Option, + temporality: Temporality, /// The attribute filter the aggregate function will use on the input of /// measurements. @@ -116,7 +112,7 @@ pub(crate) struct AggregateBuilder { type Filter = Arc bool + Send + Sync>; impl AggregateBuilder { - pub(crate) fn new(temporality: Option, filter: Option) -> Self { + pub(crate) fn new(temporality: Temporality, filter: Option) -> Self { AggregateBuilder { temporality, filter, @@ -140,16 +136,12 @@ impl AggregateBuilder { /// Builds a last-value aggregate function input and output. pub(crate) fn last_value(&self) -> (impl Measure, impl ComputeAggregation) { - let lv = Arc::new(LastValue::new()); + let lv = Arc::new(LastValue::new(self.temporality)); let agg_lv = Arc::clone(&lv); - let t = self.temporality; ( self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_lv.delta(dest), - _ => agg_lv.cumulative(dest), - }, + agg_lv, ) } @@ -158,31 +150,23 @@ impl AggregateBuilder { &self, monotonic: bool, ) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(PrecomputedSum::new(monotonic)); + let s = Arc::new(PrecomputedSum::new(self.temporality, monotonic)); let agg_sum = Arc::clone(&s); - let t = self.temporality; ( self.filter(move |n, a: &[KeyValue]| s.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_sum.delta(dest), - _ => agg_sum.cumulative(dest), - }, + agg_sum, ) } /// Builds a sum aggregate function input and output. pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(Sum::new(monotonic)); + let s = Arc::new(Sum::new(self.temporality, monotonic)); let agg_sum = Arc::clone(&s); - let t = self.temporality; ( self.filter(move |n, a: &[KeyValue]| s.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_sum.delta(dest), - _ => agg_sum.cumulative(dest), - }, + agg_sum, ) } @@ -193,17 +177,15 @@ impl AggregateBuilder { record_min_max: bool, record_sum: bool, ) -> (impl Measure, impl ComputeAggregation) { - let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum)); + let h = Arc::new(Histogram::new( + self.temporality, + boundaries, + record_min_max, + record_sum, + )); let agg_h = Arc::clone(&h); - let t = self.temporality; - ( - self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_h.delta(dest), - _ => agg_h.cumulative(dest), - }, - ) + (self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h) } /// Builds an exponential histogram aggregate function input and output. @@ -215,21 +197,15 @@ impl AggregateBuilder { record_sum: bool, ) -> (impl Measure, impl ComputeAggregation) { let h = Arc::new(ExpoHistogram::new( + self.temporality, max_size, max_scale, record_min_max, record_sum, )); let agg_h = Arc::clone(&h); - let t = self.temporality; - ( - self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_h.delta(dest), - _ => agg_h.cumulative(dest), - }, - ) + (self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h) } } @@ -245,7 +221,8 @@ mod tests { #[test] fn last_value_aggregation() { - let (measure, agg) = AggregateBuilder::::new(None, None).last_value(); + let (measure, agg) = + AggregateBuilder::::new(Temporality::Cumulative, None).last_value(); let mut a = Gauge { data_points: vec![GaugeDataPoint { attributes: vec![KeyValue::new("a", 1)], @@ -271,7 +248,7 @@ mod tests { fn precomputed_sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { let (measure, agg) = - AggregateBuilder::::new(Some(temporality), None).precomputed_sum(true); + AggregateBuilder::::new(temporality, None).precomputed_sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -312,7 +289,7 @@ mod tests { #[test] fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true); + let (measure, agg) = AggregateBuilder::::new(temporality, None).sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -353,7 +330,7 @@ mod tests { #[test] fn explicit_bucket_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + let (measure, agg) = AggregateBuilder::::new(temporality, None) .explicit_bucket_histogram(vec![1.0], true, true); let mut a = Histogram { data_points: vec![HistogramDataPoint { @@ -396,7 +373,7 @@ mod tests { #[test] fn exponential_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + let (measure, agg) = AggregateBuilder::::new(temporality, None) .exponential_bucket_histogram(4, 20, true, true); let mut a = ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 42f3794ad1..86dc5def19 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,4 +1,9 @@ -use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex}; +use std::{ + f64::consts::LOG2_E, + mem::replace, + ops::DerefMut, + sync::{Arc, Mutex}, +}; use opentelemetry::{otel_debug, KeyValue}; use std::sync::OnceLock; @@ -8,7 +13,7 @@ use crate::metrics::{ Temporality, }; -use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap}; +use super::{aggregate::AggregateTimeInitiator, Aggregator, ComputeAggregation, Number, ValueMap}; pub(crate) const EXPO_MAX_SCALE: i8 = 20; pub(crate) const EXPO_MIN_SCALE: i8 = -10; @@ -351,6 +356,7 @@ struct BucketConfig { pub(crate) struct ExpoHistogram { value_map: ValueMap>>, init_time: AggregateTimeInitiator, + temporality: Temporality, record_sum: bool, record_min_max: bool, } @@ -358,6 +364,7 @@ pub(crate) struct ExpoHistogram { impl ExpoHistogram { /// Create a new exponential histogram. pub(crate) fn new( + temporality: Temporality, max_size: u32, max_scale: i8, record_min_max: bool, @@ -368,9 +375,10 @@ impl ExpoHistogram { max_size: max_size as i32, max_scale, }), + init_time: AggregateTimeInitiator::default(), + temporality, record_sum, record_min_max, - init_time: AggregateTimeInitiator::default(), } } @@ -385,10 +393,7 @@ impl ExpoHistogram { self.value_map.measure(value, attrs); } - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { let time = self.init_time.delta(); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); @@ -442,7 +447,7 @@ impl ExpoHistogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { @@ -500,6 +505,18 @@ impl ExpoHistogram { } } +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} + #[cfg(test)] mod tests { use std::{ops::Neg, time::SystemTime}; @@ -665,7 +682,7 @@ mod tests { ]; for test in test_cases { - let h = ExpoHistogram::new(4, 20, true, true); + let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true); for v in test.values { h.measure(v, &[]); } @@ -714,7 +731,7 @@ mod tests { ]; for test in test_cases { - let h = ExpoHistogram::new(4, 20, true, true); + let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true); for v in test.values { h.measure(v, &[]); } @@ -1241,7 +1258,7 @@ mod tests { name: "Delta Single", build: Box::new(move || { box_val( - AggregateBuilder::new(Some(Temporality::Delta), None) + AggregateBuilder::new(Temporality::Delta, None) .exponential_bucket_histogram( max_size, max_scale, @@ -1284,7 +1301,7 @@ mod tests { name: "Cumulative Single", build: Box::new(move || { box_val( - internal::AggregateBuilder::new(Some(Temporality::Cumulative), None) + internal::AggregateBuilder::new(Temporality::Cumulative, None) .exponential_bucket_histogram( max_size, max_scale, @@ -1327,7 +1344,7 @@ mod tests { name: "Delta Multiple", build: Box::new(move || { box_val( - internal::AggregateBuilder::new(Some(Temporality::Delta), None) + internal::AggregateBuilder::new(Temporality::Delta, None) .exponential_bucket_histogram( max_size, max_scale, @@ -1373,7 +1390,7 @@ mod tests { name: "Cumulative Multiple ", build: Box::new(move || { box_val( - internal::AggregateBuilder::new(Some(Temporality::Cumulative), None) + internal::AggregateBuilder::new(Temporality::Cumulative, None) .exponential_bucket_histogram( max_size, max_scale, diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index f535566ecf..89987f8488 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,5 +1,6 @@ use std::mem::replace; use std::ops::DerefMut; +use std::sync::Arc; use std::sync::Mutex; use crate::metrics::data::HistogramDataPoint; @@ -8,6 +9,7 @@ use crate::metrics::Temporality; use opentelemetry::KeyValue; use super::aggregate::AggregateTimeInitiator; +use super::ComputeAggregation; use super::ValueMap; use super::{Aggregator, Number}; @@ -68,15 +70,21 @@ impl Buckets { /// buckets. pub(crate) struct Histogram { value_map: ValueMap>>, + init_time: AggregateTimeInitiator, + temporality: Temporality, bounds: Vec, record_min_max: bool, record_sum: bool, - init_time: AggregateTimeInitiator, } impl Histogram { #[allow(unused_mut)] - pub(crate) fn new(mut bounds: Vec, record_min_max: bool, record_sum: bool) -> Self { + pub(crate) fn new( + temporality: Temporality, + mut bounds: Vec, + record_min_max: bool, + record_sum: bool, + ) -> Self { #[cfg(feature = "spec_unstable_metrics_views")] { // TODO: When views are used, validate this upfront @@ -87,10 +95,11 @@ impl Histogram { let buckets_count = bounds.len() + 1; Histogram { value_map: ValueMap::new(buckets_count), + init_time: AggregateTimeInitiator::default(), + temporality, bounds, record_min_max, record_sum, - init_time: AggregateTimeInitiator::default(), } } @@ -106,11 +115,9 @@ impl Histogram { self.value_map.measure((measurement, index), attrs); } - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { let time = self.init_time.delta(); + let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -157,7 +164,7 @@ impl Histogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { @@ -209,17 +216,34 @@ impl Histogram { } } +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} + #[cfg(test)] mod tests { use super::*; #[test] fn check_buckets_are_selected_correctly() { - let hist = Histogram::::new(vec![1.0, 3.0, 6.0], false, false); + let hist = Arc::new(Histogram::::new( + Temporality::Cumulative, + vec![1.0, 3.0, 6.0], + false, + false, + )); for v in 1..11 { hist.measure(v, &[]); } - let (count, dp) = hist.cumulative(None); + let (count, dp) = ComputeAggregation::call(&hist, None); let dp = dp.unwrap(); let dp = dp.as_any().downcast_ref::>().unwrap(); assert_eq!(count, 1); diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index b140fdbc40..7209e4dd69 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,9 +1,14 @@ -use crate::metrics::data::{self, Aggregation, GaugeDataPoint}; +use std::sync::Arc; + +use crate::metrics::{ + data::{self, Aggregation, GaugeDataPoint}, + Temporality, +}; use opentelemetry::KeyValue; use super::{ - aggregate::AggregateTimeInitiator, Aggregator, AtomicTracker, AtomicallyUpdate, Number, - ValueMap, + aggregate::AggregateTimeInitiator, Aggregator, AtomicTracker, AtomicallyUpdate, + ComputeAggregation, Number, ValueMap, }; /// this is reused by PrecomputedSum @@ -42,13 +47,15 @@ where pub(crate) struct LastValue { value_map: ValueMap>, init_time: AggregateTimeInitiator, + temporality: Temporality, } impl LastValue { - pub(crate) fn new() -> Self { + pub(crate) fn new(temporality: Temporality) -> Self { LastValue { value_map: ValueMap::new(()), init_time: AggregateTimeInitiator::default(), + temporality, } } @@ -123,3 +130,15 @@ impl LastValue { ) } } + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 7b67147011..a4d408c817 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -4,23 +4,27 @@ use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; use super::aggregate::AggregateTimeInitiator; +use super::ComputeAggregation; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; +use std::sync::Arc; use std::{collections::HashMap, sync::Mutex}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { value_map: ValueMap>, - monotonic: bool, init_time: AggregateTimeInitiator, + temporality: Temporality, + monotonic: bool, reported: Mutex, T>>, } impl PrecomputedSum { - pub(crate) fn new(monotonic: bool) -> Self { + pub(crate) fn new(temporality: Temporality, monotonic: bool) -> Self { PrecomputedSum { value_map: ValueMap::new(()), - monotonic, init_time: AggregateTimeInitiator::default(), + temporality, + monotonic, reported: Mutex::new(Default::default()), } } @@ -118,3 +122,15 @@ impl PrecomputedSum { ) } } + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 5f51be79c2..b65e2f7f4d 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::vec; use crate::metrics::data::{self, Aggregation, SumDataPoint}; @@ -5,7 +6,7 @@ use crate::metrics::Temporality; use opentelemetry::KeyValue; use super::aggregate::AggregateTimeInitiator; -use super::{Aggregator, AtomicTracker, Number}; +use super::{Aggregator, AtomicTracker, ComputeAggregation, Number}; use super::{AtomicallyUpdate, ValueMap}; struct Increment @@ -42,8 +43,9 @@ where /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { value_map: ValueMap>, - monotonic: bool, init_time: AggregateTimeInitiator, + temporality: Temporality, + monotonic: bool, } impl Sum { @@ -52,11 +54,12 @@ impl Sum { /// /// Each sum is scoped by attributes and the aggregation cycle the measurements /// were made in. - pub(crate) fn new(monotonic: bool) -> Self { + pub(crate) fn new(temporality: Temporality, monotonic: bool) -> Self { Sum { value_map: ValueMap::new(()), - monotonic, + temporality, init_time: AggregateTimeInitiator::default(), + monotonic, } } @@ -138,3 +141,15 @@ impl Sum { ) } } + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 8fb94c289f..5ba4bba75f 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -384,7 +384,7 @@ where .clone() .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); - let b = AggregateBuilder::new(Some(self.pipeline.reader.temporality(kind)), filter); + let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter); let (m, ca) = match aggregate_fn(b, &agg, kind) { Ok(Some((m, ca))) => (m, ca), other => return other.map(|fs| fs.map(|(m, _)| m)), // Drop aggregator or error