Skip to content

Commit

Permalink
add flag for enabling/disabling extended aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewqian2001datadog committed Jan 5, 2025
1 parent a54c136 commit 7d78065
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
9 changes: 9 additions & 0 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def initialize(
statsd_host=None, # type: Optional[str]
statsd_port=None, # type: Optional[int]
statsd_disable_aggregation=True, # type: bool
statsd_disable_extended_aggregation=True, # type: bool
statsd_disable_buffering=True, # type: bool
statsd_aggregation_flush_interval=0.3, # type: float
statsd_use_default_route=False, # type: bool
Expand Down Expand Up @@ -82,6 +83,10 @@ def initialize(
(default: True).
:type statsd_disable_aggregation: boolean
:param statsd_disable_extended_aggregation: Enable/disable statsd client aggregation support for histograms, distributions and timing metrics
(default: True).
:type statsd_disable_extended_aggregation: boolean
:param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for
aggregation/buffering
(default: 0.3 seconds)
Expand Down Expand Up @@ -143,6 +148,10 @@ def initialize(
statsd.disable_aggregation()
else:
statsd.enable_aggregation(statsd_aggregation_flush_interval)
if statsd_disable_extended_aggregation:
statsd.disable_extended_aggregation()
else:
statsd.enable_extended_aggregation(statsd_aggregation_flush_interval)
statsd.disable_buffering = statsd_disable_buffering
api._return_raw_response = return_raw_response

Expand Down
44 changes: 34 additions & 10 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def __init__(
max_buffer_size=None, # type: None
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
disable_aggregation=True, # type: bool
disable_extended_aggregation=True, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
constant_tags=None, # type: Optional[List[str]]
Expand Down Expand Up @@ -236,7 +237,10 @@ def __init__(
it overrides the default value.
:type flush_interval: float
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longer aggregated by the client
:type disable_aggregation: bool
:disable_extended_aggregation: If true, metrics (Histogram, Distribution, Timing) are no longer aggregated by the client
:type disable_aggregation: bool
:disable_buffering: If set, metrics are no longered buffered by the client and
Expand Down Expand Up @@ -446,6 +450,7 @@ def __init__(

self._disable_buffering = disable_buffering
self._disable_aggregation = disable_aggregation
self._disable_extended_aggregation = disable_extended_aggregation

self._flush_interval = flush_interval
self._flush_thread = None
Expand All @@ -459,7 +464,7 @@ def __init__(
else:
self._send = self._send_to_server

if not self._disable_aggregation or not self._disable_buffering:
if not self._disable_aggregation or not self._disable_buffering or not self._disable_extended_aggregation:
self._start_flush_thread()
else:
log.debug("Statsd buffering and aggregation is disabled")
Expand Down Expand Up @@ -559,10 +564,8 @@ def _start_flush_thread(self):
def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
if not self._disable_aggregation:
if not self._disable_aggregation or not self._disable_extended_aggregation:
self.flush_aggregated_metrics()
# Histograms, Distribution and Timing metrics are not aggregated
self.flush_buffered_metrics()
if not self._disable_buffering:
self.flush_buffered_metrics()
self._flush_thread = threading.Thread(
Expand All @@ -582,7 +585,7 @@ def _stop_flush_thread(self):
if not self._flush_thread:
return
try:
if not self._disable_aggregation:
if not self._disable_aggregation or not self._disable_extended_aggregation:
self.flush_aggregated_metrics()
if not self.disable_buffering:
self.flush_buffered_metrics()
Expand Down Expand Up @@ -641,7 +644,7 @@ def disable_aggregation(self):

# If aggregation and buffering has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable aggregation.
if self._disable_aggregation and self.disable_buffering:
if self._disable_aggregation and self._disable_extended_aggregation and self.disable_buffering:
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

Expand All @@ -655,6 +658,30 @@ def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
self._send = self._send_to_server
self._start_flush_thread()

def disable_extended_aggregation(self):
with self._config_lock:
# If the toggle didn't change anything, this method is a noop
if self._disable_extended_aggregation:
return

self._disable_extended_aggregation = True

# If aggregation and buffering has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable aggregation.
if self._disable_aggregation and self._disable_extended_aggregation and self.disable_buffering:
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_extended_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
with self._config_lock:
if not self._disable_aggregation:
return
self._disable_aggregation = False
self._flush_interval = flush_interval
if self._disable_buffering:
self._send = self._send_to_server
self._start_flush_thread()

@staticmethod
def resolve_host(host, use_default_route):
"""
Expand Down Expand Up @@ -829,11 +856,8 @@ def flush_aggregated_metrics(self):
self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp)

buffered_metrics = self.aggregator.flush_aggregated_buffered_metrics()
send_method = self._send
self._send = self._send_to_buffer
for m in buffered_metrics:
self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp)
self._send = send_method

def gauge(
self,
Expand Down

0 comments on commit 7d78065

Please sign in to comment.