Skip to content

Commit

Permalink
[AMLII-2019] Max samples per context for Histogram, Distribution and …
Browse files Browse the repository at this point in the history
…Timing metrics (Experimental Feature) (#863)

This experimental feature allows the user to limit the number of samples per context for histogram, distribution, and timing metrics.

This can be enabled with the statsd_max_samples_per_context flag. When enabled up to n samples will be kept in per context for Histogram, Distribution and Timing metrics when Aggregation is enabled. The default value is 0 which means no limit.
  • Loading branch information
andrewqian2001datadog authored Jan 27, 2025
1 parent 5482cf4 commit 830c7b3
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 38 deletions.
9 changes: 7 additions & 2 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def initialize(
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
statsd_max_samples_per_context=0, # type: Optional[int]
statsd_constant_tags=None, # type: Optional[List[str]]
return_raw_response=False, # type: bool
hostname_from_config=True, # type: bool
Expand Down Expand Up @@ -82,8 +83,12 @@ def initialize(
(default: True).
:type statsd_disable_aggregation: boolean
:param statsd_max_samples_per_context: Set the max samples per context for Histogram,
Distribution and Timing metrics. Use with the statsd_disable_aggregation set to False.
:type statsd_max_samples_per_context: int
:param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for
aggregation/buffering
aggregation/buffering (This feature is experimental)
(default: 0.3 seconds)
:type statsd_aggregation_flush_interval: float
Expand Down Expand Up @@ -142,7 +147,7 @@ def initialize(
if statsd_disable_aggregation:
statsd.disable_aggregation()
else:
statsd.enable_aggregation(statsd_aggregation_flush_interval)
statsd.enable_aggregation(statsd_aggregation_flush_interval, statsd_max_samples_per_context)
statsd.disable_buffering = statsd_disable_buffering
api._return_raw_response = return_raw_response

Expand Down
50 changes: 49 additions & 1 deletion datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,28 @@
GaugeMetric,
SetMetric,
)
from datadog.dogstatsd.max_sample_metric import (
HistogramMetric,
DistributionMetric,
TimingMetric
)
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.max_sample_metric_context import MaxSampleMetricContexts


class Aggregator(object):
def __init__(self):
def __init__(self, max_samples_per_context=0):
self.max_samples_per_context = max_samples_per_context
self.metrics_map = {
MetricType.COUNT: {},
MetricType.GAUGE: {},
MetricType.SET: {},
}
self.max_sample_metric_map = {
MetricType.HISTOGRAM: MaxSampleMetricContexts(HistogramMetric),
MetricType.DISTRIBUTION: MaxSampleMetricContexts(DistributionMetric),
MetricType.TIMING: MaxSampleMetricContexts(TimingMetric)
}
self._locks = {
MetricType.COUNT: threading.RLock(),
MetricType.GAUGE: threading.RLock(),
Expand All @@ -28,6 +40,18 @@ def flush_aggregated_metrics(self):
self.metrics_map[metric_type] = {}
for metric in current_metrics.values():
metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])

return metrics

def set_max_samples_per_context(self, max_samples_per_context=0):
self.max_samples_per_context = max_samples_per_context

def flush_aggregated_sampled_metrics(self):
metrics = []
for metric_type in self.max_sample_metric_map.keys():
metric_context = self.max_sample_metric_map[metric_type]
for metricList in metric_context.flush():
metrics.extend(metricList)
return metrics

def get_context(self, name, tags):
Expand Down Expand Up @@ -60,3 +84,27 @@ def add_metric(
self.metrics_map[metric_type][context] = metric_class(
name, value, tags, rate, timestamp
)

def histogram(self, name, value, tags, rate):
return self.add_max_sample_metric(
MetricType.HISTOGRAM, name, value, tags, rate
)

def distribution(self, name, value, tags, rate):
return self.add_max_sample_metric(
MetricType.DISTRIBUTION, name, value, tags, rate
)

def timing(self, name, value, tags, rate):
return self.add_max_sample_metric(
MetricType.TIMING, name, value, tags, rate
)

def add_max_sample_metric(
self, metric_type, name, value, tags, rate
):
if rate is None:
rate = 1
context_key = self.get_context(name, tags)
metric_context = self.max_sample_metric_map[metric_type]
return metric_context.sample(name, value, tags, rate, context_key, self.max_samples_per_context)
43 changes: 32 additions & 11 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(
telemetry_port=None, # type: Union[str, int]
telemetry_socket_path=None, # type: Text
max_buffer_len=0, # type: int
max_metric_samples_per_context=0, # type: int
container_id=None, # type: Optional[Text]
origin_detection_enabled=True, # type: bool
socket_timeout=0, # type: Optional[float]
Expand Down Expand Up @@ -236,9 +237,14 @@ def __init__(
it overrides the default value.
:type flush_interval: float
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longer aggregated by the client
:type disable_aggregation: bool
:max_metric_samples_per_context: Sets the maximum amount of samples for Histogram, Distribution
and Timings metrics (default 0). This feature should be used alongside aggregation. This feature
is experimental.
:type max_metric_samples_per_context: int
:disable_buffering: If set, metrics are no longered buffered by the client and
all data is sent synchronously to the server
:type disable_buffering: bool
Expand Down Expand Up @@ -450,7 +456,7 @@ def __init__(
self._flush_interval = flush_interval
self._flush_thread = None
self._flush_thread_stop = threading.Event()
self.aggregator = Aggregator()
self.aggregator = Aggregator(max_metric_samples_per_context)
# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
self._forking = False

Expand Down Expand Up @@ -643,10 +649,11 @@ def disable_aggregation(self):
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, max_samples_per_context=0):
with self._config_lock:
if not self._disable_aggregation:
return
self.aggregator.set_max_samples_per_context(max_samples_per_context)
self._disable_aggregation = False
self._flush_interval = flush_interval
if self._disable_buffering:
Expand Down Expand Up @@ -826,6 +833,10 @@ def flush_aggregated_metrics(self):
for m in metrics:
self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp)

sampled_metrics = self.aggregator.flush_aggregated_sampled_metrics()
for m in sampled_metrics:
self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp, False)

def gauge(
self,
metric, # type: Text
Expand Down Expand Up @@ -960,7 +971,10 @@ def histogram(
>>> statsd.histogram("uploaded.file.size", 1445)
>>> statsd.histogram("album.photo.count", 26, tags=["gender:female"])
"""
self._report(metric, "h", value, tags, sample_rate)
if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0:
self.aggregator.histogram(metric, value, tags, sample_rate)
else:
self._report(metric, "h", value, tags, sample_rate)

def distribution(
self,
Expand All @@ -975,7 +989,10 @@ def distribution(
>>> statsd.distribution("uploaded.file.size", 1445)
>>> statsd.distribution("album.photo.count", 26, tags=["gender:female"])
"""
self._report(metric, "d", value, tags, sample_rate)
if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0:
self.aggregator.distribution(metric, value, tags, sample_rate)
else:
self._report(metric, "d", value, tags, sample_rate)

def timing(
self,
Expand All @@ -989,7 +1006,10 @@ def timing(
>>> statsd.timing("query.response.time", 1234)
"""
self._report(metric, "ms", value, tags, sample_rate)
if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0:
self.aggregator.timing(metric, value, tags, sample_rate)
else:
self._report(metric, "ms", value, tags, sample_rate)

def timed(self, metric=None, tags=None, sample_rate=None, use_ms=None):
"""
Expand Down Expand Up @@ -1093,7 +1113,7 @@ def _serialize_metric(
("|T" + text(timestamp)) if timestamp > 0 else "",
)

def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0, sampling=True):
"""
Create a metric packet and send it.
Expand All @@ -1109,11 +1129,12 @@ def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
if self._telemetry:
self.metrics_count += 1

if sample_rate is None:
sample_rate = self.default_sample_rate
if sampling:
if sample_rate is None:
sample_rate = self.default_sample_rate

if sample_rate != 1 and random() > sample_rate:
return
if sample_rate != 1 and random() > sample_rate:
return
# timestamps (protocol v1.3) only allowed on gauges and counts
allows_timestamp = metric_type == MetricType.GAUGE or metric_type == MetricType.COUNT

Expand Down
64 changes: 64 additions & 0 deletions datadog/dogstatsd/max_sample_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import random
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.metrics import MetricAggregator
from threading import Lock


class MaxSampleMetric(object):
def __init__(self, name, tags, metric_type, specified_rate=1.0, max_metric_samples=0):
self.name = name
self.tags = tags
self.lock = Lock()
self.metric_type = metric_type
self.max_metric_samples = max_metric_samples
self.specified_rate = specified_rate
self.data = [None] * max_metric_samples if max_metric_samples > 0 else []
self.stored_metric_samples = 0
self.total_metric_samples = 0

def sample(self, value):
if self.max_metric_samples == 0:
self.data.append(value)
else:
self.data[self.stored_metric_samples] = value
self.stored_metric_samples += 1
self.total_metric_samples += 1

def maybe_keep_sample_work_unsafe(self, value):
if self.max_metric_samples > 0:
self.total_metric_samples += 1
if self.stored_metric_samples < self.max_metric_samples:
self.data[self.stored_metric_samples] = value
self.stored_metric_samples += 1
else:
i = random.randint(0, self.total_metric_samples - 1)
if i < self.max_metric_samples:
self.data[i] = value
else:
self.sample(value)

def skip_sample(self):
self.total_metric_samples += 1

def flush(self):
rate = self.stored_metric_samples / self.total_metric_samples
with self.lock:
return [
MetricAggregator(self.name, self.tags, rate, self.metric_type, self.data[i])
for i in range(self.stored_metric_samples)
]


class HistogramMetric(MaxSampleMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(HistogramMetric, self).__init__(name, tags, MetricType.HISTOGRAM, rate, max_metric_samples)


class DistributionMetric(MaxSampleMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(DistributionMetric, self).__init__(name, tags, MetricType.DISTRIBUTION, rate, max_metric_samples)


class TimingMetric(MaxSampleMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(TimingMetric, self).__init__(name, tags, MetricType.TIMING, rate, max_metric_samples)
40 changes: 40 additions & 0 deletions datadog/dogstatsd/max_sample_metric_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from threading import Lock
import random


class MaxSampleMetricContexts:
def __init__(self, max_sample_metric_type):
self.lock = Lock()
self.values = {}
self.max_sample_metric_type = max_sample_metric_type

def flush(self):
metrics = []
"""Flush the metrics and reset the stored values."""
with self.lock:
temp = self.values
self.values = {}
for _, metric in temp.items():
metrics.append(metric.flush())
return metrics

def sample(self, name, value, tags, rate, context_key, max_samples_per_context):
"""Sample a metric and store it if it meets the criteria."""
keeping_sample = self.should_sample(rate)
with self.lock:
if context_key not in self.values:
# Create a new metric if it doesn't exist
self.values[context_key] = self.max_sample_metric_type(name, tags, max_samples_per_context)
metric = self.values[context_key]
metric.lock.acquire()
if keeping_sample:
metric.maybe_keep_sample_work_unsafe(value)
else:
metric.skip_sample()
metric.lock.release()

def should_sample(self, rate):
"""Determine if a sample should be kept based on the specified rate."""
if rate >= 1:
return True
return random.random() < rate
3 changes: 3 additions & 0 deletions datadog/dogstatsd/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ class MetricType:
COUNT = "c"
GAUGE = "g"
SET = "s"
HISTOGRAM = "h"
DISTRIBUTION = "d"
TIMING = "ms"
Loading

0 comments on commit 830c7b3

Please sign in to comment.