Skip to content

Commit

Permalink
fix buffered_metric_context and aggregator, update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewqian2001datadog committed Dec 10, 2024
1 parent a84af9d commit 583e287
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 57 deletions.
22 changes: 17 additions & 5 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@


class Aggregator(object):
def __init__(self):
def __init__(self, maxSamplesPerContext=0):
self.metrics_map = {
MetricType.COUNT: {},
MetricType.GAUGE: {},
MetricType.SET: {},
MetricType.HISTOGRAM: BufferedMetricContexts(HistogramMetric),
MetricType.DISTRIBUTION: BufferedMetricContexts(DistributionMetric),
MetricType.TIMING: BufferedMetricContexts(TimingMetric)
}
self.buffered_metrics_map = {
MetricType.HISTOGRAM: BufferedMetricContexts(HistogramMetric, maxSamplesPerContext),
MetricType.DISTRIBUTION: BufferedMetricContexts(DistributionMetric, maxSamplesPerContext),
MetricType.TIMING: BufferedMetricContexts(TimingMetric, maxSamplesPerContext)
}
self._locks = {
MetricType.COUNT: threading.RLock(),
MetricType.GAUGE: threading.RLock(),
MetricType.SET: threading.RLock(),
MetricType.HISTOGRAM: threading.RLock(),
MetricType.DISTRIBUTION: threading.RLock(),
MetricType.TIMING: threading.RLock(),
}

def flush_aggregated_metrics(self):
Expand All @@ -37,6 +42,13 @@ def flush_aggregated_metrics(self):
self.metrics_map[metric_type] = {}
for metric in current_metrics.values():
metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])

for metric_type in self.buffered_metrics_map.keys():
with self._locks[metric_type]:
metric_context = self.buffered_metrics_map[metric_type]
self.buffered_metrics_map[metric_type] = {}
for metricList in metric_context.flush():
metrics.extend(metricList)
return metrics

def flush_aggregated_buffered_metrics(self):
Expand Down Expand Up @@ -99,7 +111,7 @@ def add_buffered_metric(
self, metric_type, name, value, tags, rate
):
context_key = self.get_context(name, tags)
metric_context = self.metrics_map[metric_type]
metric_context = self.buffered_metrics_map[metric_type]
return metric_context.sample(name, value, tags, rate, context_key)


33 changes: 18 additions & 15 deletions datadog/dogstatsd/buffered_metrics.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import random
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.metrics import MetricAggregator


class BufferedMetric(object):
def __init__(self, name, value, tags, metric_type, max_metric_samples=0, specified_rate=1.0):
def __init__(self, name, tags, metric_type, max_metric_samples=0, specified_rate=1.0):
self.name = name
self.tags = tags
self.metric_type = metric_type
self.max_metric_samples = max_metric_samples
self.specified_rate = specified_rate
self.data = [value]
self.data = []
self.stored_metric_samples = 1
self.total_metric_samples = 1

Expand All @@ -19,16 +20,21 @@ def sample(self, value):
self.total_metric_samples += 1

def maybe_keep_sample(self, value):
print("max metric samples is ", self.max_metric_samples)
print("stored metric samples is ", self.stored_metric_samples)
if self.max_metric_samples > 0:
if self.stored_metric_samples >= self.max_metric_samples:
i = random.randint(0, self.total_metric_samples - 1)
if i < self.max_metric_samples:
print("REPLACE")
self.data[i] = value
else:
print("APPEND")
self.data.append(value)
self.stored_metric_samples += 1
self.total_metric_samples += 1
else:
print("APPEND2")
self.sample(value)

def skip_sample(self):
Expand All @@ -41,25 +47,22 @@ def flush(self):
else:
rate = self.stored_metric_samples / total_metric_samples

return {
'name': self.name,
'tags': self.tags,
'metric_type': self.metric_type,
'rate': rate,
'values': self.data[:]
}
return [
MetricAggregator(self.name, self.tags, rate, self.metric_type, value)
for value in self.data
]


class HistogramMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(HistogramMetric, self).__init__(name, value, tags, MetricType.HISTOGRAM, max_metric_samples, rate)
def __init__(self, name, tags, max_metric_samples=0, rate=1.0):
super(HistogramMetric, self).__init__(name, tags, MetricType.HISTOGRAM, max_metric_samples, rate)


class DistributionMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(DistributionMetric, self).__init__(name, value, tags, MetricType.DISTRIBUTION, max_metric_samples, rate)
def __init__(self, name, tags, max_metric_samples=0, rate=1.0):
super(DistributionMetric, self).__init__(name, tags, MetricType.DISTRIBUTION, max_metric_samples, rate)


class TimingMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(TimingMetric, self).__init__(name, value, tags, MetricType.TIMING, max_metric_samples, rate)
def __init__(self, name, tags, max_metric_samples=0, rate=1.0):
super(TimingMetric, self).__init__(name, tags, MetricType.TIMING, max_metric_samples, rate)
30 changes: 15 additions & 15 deletions datadog/dogstatsd/buffered_metrics_context.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,51 @@
from threading import Lock
from random import random
import secrets

from datadog.dogstatsd.buffered_metrics import BufferedMetric

class BufferedMetricContexts:
def __init__(self, buffered_metric_type: BufferedMetric):
def __init__(self, buffered_metric_type: BufferedMetric, maxSamplesPerContext):
self.nb_context = 0
self.lock = Lock()
self.values = {}
self.maxSamplesPerContext = maxSamplesPerContext
self.buffered_metric_type = buffered_metric_type
self.random = random.Random()
self.random_lock = Lock()
self.random = secrets

def flush(self, metrics):
def flush(self):
metrics = []
"""Flush the metrics and reset the stored values."""
with self.lock:
values = self.values.copy()
self.values.clear()

for _, metric in values.items():
with metric.lock:
metrics.append(metric.flush())
metrics.append(metric.flush())

self.nb_context += len(values)
return metrics

def sample(self, name, value, tags, rate, context_key):
"""Sample a metric and store it if it meets the criteria."""
keeping_sample = self.should_sample(rate)

print("keeping sample is ", keeping_sample)
print("context_key is ", context_key)
with self.lock:
if context_key not in self.values:
# Create a new metric if it doesn't exist
self.values[context_key] = self.buffered_metric_type(name, value, tags, 0, rate)

self.values[context_key] = self.buffered_metric_type(name, tags, self.maxSamplesPerContext, rate)
metric = self.values[context_key]

print("values are :", self.values.keys())
if keeping_sample:
with self.random_lock:
metric.maybe_keep_sample(value, self.random, self.random_lock)
metric.maybe_keep_sample(value)
else:
metric.skip_sample()

def should_sample(self, rate):
"""Determine if a sample should be kept based on the specified rate."""
with self.random_lock:
return self.random.random() < rate
if rate >= 1:
return True
return secrets.SystemRandom().random() < rate

def get_nb_context(self):
"""Return the number of contexts."""
Expand Down
68 changes: 46 additions & 22 deletions tests/unit/dogstatsd/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,30 @@ def setUp(self):

def test_aggregator_sample(self):
tags = ["tag1", "tag2"]
for _ in range(2):
self.aggregator.gauge("gaugeTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 1)
self.assertIn("gaugeTest:tag1,tag2", self.aggregator.metrics_map[MetricType.GAUGE])

self.aggregator.gauge("gaugeTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 1)
self.assertIn("gaugeTest:tag1,tag2", self.aggregator.metrics_map[MetricType.GAUGE])
self.aggregator.count("countTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 1)
self.assertIn("countTest:tag1,tag2", self.aggregator.metrics_map[MetricType.COUNT])

self.aggregator.count("countTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 1)
self.assertIn("countTest:tag1,tag2", self.aggregator.metrics_map[MetricType.COUNT])
self.aggregator.set("setTest", "value1", tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 1)
self.assertIn("setTest:tag1,tag2", self.aggregator.metrics_map[MetricType.SET])

self.aggregator.set("setTest", "value1", tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 1)
self.assertIn("setTest:tag1,tag2", self.aggregator.metrics_map[MetricType.SET])
self.aggregator.histogram("histogramTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.HISTOGRAM].values), 1)
self.assertIn("histogramTest:tag1,tag2", self.aggregator.buffered_metrics_map[MetricType.HISTOGRAM].values)

self.aggregator.gauge("gaugeTest", 123, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 1)
self.assertIn("gaugeTest:tag1,tag2", self.aggregator.metrics_map[MetricType.GAUGE])
self.aggregator.distribution("distributionTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.DISTRIBUTION].values), 1)
self.assertIn("distributionTest:tag1,tag2", self.aggregator.buffered_metrics_map[MetricType.DISTRIBUTION].values)

self.aggregator.count("countTest", 10, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 1)
self.assertIn("countTest:tag1,tag2", self.aggregator.metrics_map[MetricType.COUNT])

self.aggregator.set("setTest", "value1", tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 1)
self.assertIn("setTest:tag1,tag2", self.aggregator.metrics_map[MetricType.SET])
self.aggregator.timing("timingTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.TIMING].values), 1)
self.assertIn("timingTest:tag1,tag2", self.aggregator.buffered_metrics_map[MetricType.TIMING].values)

def test_aggregator_flush(self):
tags = ["tag1", "tag2"]
Expand All @@ -50,23 +50,47 @@ def test_aggregator_flush(self):
self.aggregator.set("setTest1", "value2", tags, 1)
self.aggregator.set("setTest2", "value1", tags, 1)

self.aggregator.histogram("histogramTest1", 21, tags, 1)
self.aggregator.histogram("histogramTest1", 22, tags, 1)
self.aggregator.histogram("histogramTest2", 23, tags, 1)

self.aggregator.distribution("distributionTest1", 21, tags, 1)
self.aggregator.distribution("distributionTest1", 22, tags, 1)
self.aggregator.distribution("distributionTest2", 23, tags, 1)

self.aggregator.timing("timingTest1", 21, tags, 1)
self.aggregator.timing("timingTest1", 22, tags, 1)
self.aggregator.timing("timingTest2", 23, tags, 1)

metrics = self.aggregator.flush_aggregated_metrics()
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 0)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 0)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 0)

self.assertEqual(len(metrics), 7)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.HISTOGRAM]), 0)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.DISTRIBUTION]), 0)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.TIMING]), 0)
self.assertEqual(len(metrics), 16)
metrics.sort(key=lambda m: (m.metric_type, m.name, m.value))

expected_metrics = [
{"metric_type": MetricType.COUNT, "name": "countTest1", "tags": tags, "rate": 1, "value": 31, "timestamp": 0},
{"metric_type": MetricType.COUNT, "name": "countTest2", "tags": tags, "rate": 1, "value": 1, "timestamp": 0},
{"metric_type": MetricType.DISTRIBUTION, "name": "distributionTest1", "tags": tags, "rate": 1, "value": 21},
{"metric_type": MetricType.DISTRIBUTION, "name": "distributionTest1", "tags": tags, "rate": 1, "value": 22},
{"metric_type": MetricType.DISTRIBUTION, "name": "distributionTest2", "tags": tags, "rate": 1, "value": 23},
{"metric_type": MetricType.GAUGE, "name": "gaugeTest1", "tags": tags, "rate": 1, "value": 10, "timestamp": 0},
{"metric_type": MetricType.GAUGE, "name": "gaugeTest2", "tags": tags, "rate": 1, "value": 15, "timestamp": 0},
{"metric_type": MetricType.HISTOGRAM, "name": "histogramTest1", "tags": tags, "rate": 1, "value": 21},
{"metric_type": MetricType.HISTOGRAM, "name": "histogramTest1", "tags": tags, "rate": 1, "value": 22},
{"metric_type": MetricType.HISTOGRAM, "name": "histogramTest2", "tags": tags, "rate": 1, "value": 23},
{"metric_type": MetricType.TIMING, "name": "timingTest1", "tags": tags, "rate": 1, "value": 21},
{"metric_type": MetricType.TIMING, "name": "timingTest1", "tags": tags, "rate": 1, "value": 22},
{"metric_type": MetricType.TIMING, "name": "timingTest2", "tags": tags, "rate": 1, "value": 23},
{"metric_type": MetricType.SET, "name": "setTest1", "tags": tags, "rate": 1, "value": "value1", "timestamp": 0},
{"metric_type": MetricType.SET, "name": "setTest1", "tags": tags, "rate": 1, "value": "value2", "timestamp": 0},
{"metric_type": MetricType.SET, "name": "setTest2", "tags": tags, "rate": 1, "value": "value1", "timestamp": 0},
]

for metric, expected in zip(metrics, expected_metrics):
self.assertEqual(metric.name, expected["name"])
self.assertEqual(metric.tags, expected["tags"])
Expand Down

0 comments on commit 583e287

Please sign in to comment.