Skip to content

Commit

Permalink
use existing threading function
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewqian2001datadog committed Jul 16, 2024
1 parent bcee4cb commit 9c66402
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 31 deletions.
31 changes: 6 additions & 25 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
)
from datadog.dogstatsd.metric_types import MetricType


class Aggregator(object):
def __init__(self, client):
self.client = client

self.metrics_map = {
MetricType.COUNT: {},
MetricType.GAUGE: {},
Expand All @@ -23,45 +21,28 @@ def __init__(self, client):
MetricType.SET: threading.RLock(),
}

self.closed = threading.Event()
self.exited = threading.Event()

def start(self, flush_interval):
self.flush_interval = flush_interval
self.ticker = threading.Timer(self.flush_interval, self.tick)
self.ticker.start()

def tick(self):
while not self.closed.is_set():
self.send_metrics()
time.sleep(self.flush_interval)
self.exited.set()
self.client._start_flush_thread(flush_interval, self.send_metrics)

def stop(self):
self.client._stop_flush_thread()
self.send_metrics()

def send_metrics(self):
for metric in self.flush_metrics():
self.client._report(
metric.name, metric.type, metric.value, metric.tags, metric.timestamp
)

def stop(self):
self.closed.set()
self.ticker.cancel()
self.exited.wait()
self.send_metrics()

def flush_metrics(self):
metrics = []

for metric_type in self.metrics_map.keys():
with self.locks[metric_type]:
current_metrics = self.metrics_map[metric_type]
self.metrics_map[metric_type] = {}

for metric in current_metrics.values():
metrics.extend(
metric.get_data() if isinstance(metric, SetMetric) else [metric]
)

metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])
return metrics

def get_context(self, name, tags):
Expand Down
10 changes: 5 additions & 5 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def __init__(
self._flush_interval = flush_interval
self._flush_thread_stop = threading.Event()
self._flush_thread = None
self._start_flush_thread(self._flush_interval)
self._start_flush_thread(self._flush_interval, self.flush)

self._queue = None
self._sender_thread = None
Expand Down Expand Up @@ -514,7 +514,7 @@ def enable_telemetry(self):
self._telemetry = True

# Note: Invocations of this method should be thread-safe
def _start_flush_thread(self, flush_interval):
def _start_flush_thread(self, flush_interval, flush_function):
if self._disable_buffering or self._flush_interval <= MIN_FLUSH_INTERVAL:
log.debug("Statsd periodic buffer flush is disabled")
return
Expand All @@ -528,7 +528,7 @@ def _start_flush_thread(self, flush_interval):
def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
self.flush()
flush_function()

self._flush_thread = threading.Thread(
name="{}_flush_thread".format(self.__class__.__name__),
Expand Down Expand Up @@ -594,7 +594,7 @@ def disable_buffering(self, is_disabled):
log.debug("Statsd buffering is disabled")
else:
self._send = self._send_to_buffer
self._start_flush_thread(self._flush_interval)
self._start_flush_thread(self._flush_interval, self.flush)

@staticmethod
def resolve_host(host, use_default_route):
Expand Down Expand Up @@ -1422,7 +1422,7 @@ def post_fork(self):
self._forking = False

with self._config_lock:
self._start_flush_thread(self._flush_interval)
self._start_flush_thread(self._flush_interval, self.flush)
self._start_sender_thread()

def stop(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/dogstatsd/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from mock import Mock
from datadog.dogstatsd import DogStatsd
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.aggregator import Aggregator # Update with the correct module name
from datadog.dogstatsd.aggregator import Aggregator


class TestAggregator(unittest.TestCase):
Expand Down

0 comments on commit 9c66402

Please sign in to comment.