Skip to content

Commit

Permalink
Directly implement ComputeAggregation (#2425)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent 551760b commit b9a422b
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 82 deletions.
69 changes: 23 additions & 46 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ impl Default for AggregateTimeInitiator {
/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
///
/// If this is not provided, a default of cumulative will be used (except for the
/// last-value aggregate function where delta is the only appropriate
/// temporality).
temporality: Option<Temporality>,
temporality: Temporality,

/// The attribute filter the aggregate function will use on the input of
/// measurements.
Expand All @@ -116,7 +112,7 @@ pub(crate) struct AggregateBuilder<T> {
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Option<Temporality>, filter: Option<Filter>) -> Self {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
AggregateBuilder {
temporality,
filter,
Expand All @@ -140,16 +136,12 @@ impl<T: Number> AggregateBuilder<T> {

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv = Arc::new(LastValue::new());
let lv = Arc::new(LastValue::new(self.temporality));
let agg_lv = Arc::clone(&lv);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_lv.delta(dest),
_ => agg_lv.cumulative(dest),
},
agg_lv,
)
}

Expand All @@ -158,31 +150,23 @@ impl<T: Number> AggregateBuilder<T> {
&self,
monotonic: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(PrecomputedSum::new(monotonic));
let s = Arc::new(PrecomputedSum::new(self.temporality, monotonic));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
},
agg_sum,
)
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(monotonic));
let s = Arc::new(Sum::new(self.temporality, monotonic));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
},
agg_sum,
)
}

Expand All @@ -193,17 +177,15 @@ impl<T: Number> AggregateBuilder<T> {
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
let h = Arc::new(Histogram::new(
self.temporality,
boundaries,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
},
)
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
}

/// Builds an exponential histogram aggregate function input and output.
Expand All @@ -215,21 +197,15 @@ impl<T: Number> AggregateBuilder<T> {
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(ExpoHistogram::new(
self.temporality,
max_size,
max_scale,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
},
)
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
}
}

Expand All @@ -245,7 +221,8 @@ mod tests {

#[test]
fn last_value_aggregation() {
let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
let (measure, agg) =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand All @@ -271,7 +248,7 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) =
AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -312,7 +289,7 @@ mod tests {
#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -353,7 +330,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
Expand Down Expand Up @@ -396,7 +373,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
Expand Down
45 changes: 31 additions & 14 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
use std::{
f64::consts::LOG2_E,
mem::replace,
ops::DerefMut,
sync::{Arc, Mutex},
};

use opentelemetry::{otel_debug, KeyValue};
use std::sync::OnceLock;
Expand All @@ -8,7 +13,7 @@ use crate::metrics::{
Temporality,
};

use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap};
use super::{aggregate::AggregateTimeInitiator, Aggregator, ComputeAggregation, Number, ValueMap};

pub(crate) const EXPO_MAX_SCALE: i8 = 20;
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
Expand Down Expand Up @@ -351,13 +356,15 @@ struct BucketConfig {
pub(crate) struct ExpoHistogram<T: Number> {
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
init_time: AggregateTimeInitiator,
temporality: Temporality,
record_sum: bool,
record_min_max: bool,
}

impl<T: Number> ExpoHistogram<T> {
/// Create a new exponential histogram.
pub(crate) fn new(
temporality: Temporality,
max_size: u32,
max_scale: i8,
record_min_max: bool,
Expand All @@ -368,9 +375,10 @@ impl<T: Number> ExpoHistogram<T> {
max_size: max_size as i32,
max_scale,
}),
init_time: AggregateTimeInitiator::default(),
temporality,
record_sum,
record_min_max,
init_time: AggregateTimeInitiator::default(),
}
}

Expand All @@ -385,10 +393,7 @@ impl<T: Number> ExpoHistogram<T> {
self.value_map.measure(value, attrs);
}

pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
Expand Down Expand Up @@ -442,7 +447,7 @@ impl<T: Number> ExpoHistogram<T> {
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}

pub(crate) fn cumulative(
fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
Expand Down Expand Up @@ -500,6 +505,18 @@ impl<T: Number> ExpoHistogram<T> {
}
}

impl<T> ComputeAggregation for Arc<ExpoHistogram<T>>
where
T: Number,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
match self.temporality {
Temporality::Delta => self.delta(dest),
_ => self.cumulative(dest),
}
}
}

#[cfg(test)]
mod tests {
use std::{ops::Neg, time::SystemTime};
Expand Down Expand Up @@ -665,7 +682,7 @@ mod tests {
];

for test in test_cases {
let h = ExpoHistogram::new(4, 20, true, true);
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
for v in test.values {
h.measure(v, &[]);
}
Expand Down Expand Up @@ -714,7 +731,7 @@ mod tests {
];

for test in test_cases {
let h = ExpoHistogram::new(4, 20, true, true);
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
for v in test.values {
h.measure(v, &[]);
}
Expand Down Expand Up @@ -1241,7 +1258,7 @@ mod tests {
name: "Delta Single",
build: Box::new(move || {
box_val(
AggregateBuilder::new(Some(Temporality::Delta), None)
AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down Expand Up @@ -1284,7 +1301,7 @@ mod tests {
name: "Cumulative Single",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down Expand Up @@ -1327,7 +1344,7 @@ mod tests {
name: "Delta Multiple",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Delta), None)
internal::AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down Expand Up @@ -1373,7 +1390,7 @@ mod tests {
name: "Cumulative Multiple ",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down
Loading

0 comments on commit b9a422b

Please sign in to comment.