Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preallocate and keep memory for HashMap in Metric aggregation #2343

Merged
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
precomputed_sum::PrecomputedSum, sum::Sum, Number,
};

const STREAM_CARDINALITY_LIMIT: u32 = 2000;
pub(crate) const STREAM_CARDINALITY_LIMIT: u32 = 2000;
cijothomas marked this conversation as resolved.
Show resolved Hide resolved

/// Checks whether aggregator has hit cardinality limit for metric streams
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
Expand Down
37 changes: 29 additions & 8 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};

use aggregate::is_under_cardinality_limit;
use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -51,6 +51,11 @@
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

/// Used by collect exclusively. The data type must match the one used in
/// `trackers` to allow mem::swap.
trackers_for_collect: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
Expand All @@ -67,7 +72,14 @@
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
trackers: RwLock::new(HashMap::with_capacity(
1 + STREAM_CARDINALITY_LIMIT as usize,
)),
// TODO: For cumulative, this is not required, so avoid this
// pre-allocation.
trackers_for_collect: RwLock::new(HashMap::with_capacity(
1 + STREAM_CARDINALITY_LIMIT as usize,
)),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
Expand Down Expand Up @@ -170,16 +182,25 @@
));
}

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
let mut trackers = if let Ok(mut trackers_guard) = self.trackers.write() {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
if let Ok(mut trackers_for_collect_guard) = self.trackers_for_collect.write() {
swap(
trackers_guard.deref_mut(),
trackers_for_collect_guard.deref_mut(),
);
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
trackers_for_collect_guard
} else {
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
return;

Check warning on line 195 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L194-L195

Added lines #L194 - L195 were not covered by tests
}
Err(_) => todo!(),
} else {
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
return;

Check warning on line 199 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L198-L199

Added lines #L198 - L199 were not covered by tests
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
}
Expand Down
2 changes: 1 addition & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ rand = { version = "0.8.4", features = ["small_rng"] }
tracing = { workspace = true, features = ["std"]}
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
num-format = "0.4.4"
sysinfo = { version = "0.30.12", optional = true }
sysinfo = { version = "0.32.0", optional = true }
cijothomas marked this conversation as resolved.
Show resolved Hide resolved

[features]
stats = ["sysinfo"]
Loading