Skip to content

Commit

Permalink
Size-based packetization for dogstatsd batched metrics (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
prognant authored Jun 11, 2020
1 parent 058114c commit 19dd434
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 137 deletions.
86 changes: 59 additions & 27 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
# Tag name of entity_id
ENTITY_ID_TAG_NAME = "dd.internal.entity_id"

# Default buffer settings
UDP_OPTIMAL_PAYLOAD_LENGTH = 1432
UDS_OPTIMAL_PAYLOAD_LENGTH = 8192

# Mapping of each "DD_" prefixed environment variable to a specific tag name
DD_ENV_TAGS_MAPPING = {
'DD_ENTITY_ID': ENTITY_ID_TAG_NAME,
Expand All @@ -47,10 +51,11 @@
class DogStatsd(object):
OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3)

def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, namespace=None,
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=None, namespace=None,
constant_tags=None, use_ms=False, use_default_route=False,
socket_path=None, default_sample_rate=1, disable_telemetry=False,
telemetry_min_flush_interval=DEFAULT_TELEMETRY_MIN_FLUSH_INTERVAL):
telemetry_min_flush_interval=DEFAULT_TELEMETRY_MIN_FLUSH_INTERVAL,
max_buffer_len=0):
"""
Initialize a DogStatsd object.
Expand Down Expand Up @@ -88,9 +93,8 @@ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, nam
:param port: the port of the DogStatsd server.
:type port: integer
:param max_buffer_size: Maximum number of metrics to buffer before sending to the server
if sending metrics in batch
:type max_buffer_size: integer
:max_buffer_size: Deprecated option, do not use it anymore.
:type max_buffer_type: None
:param namespace: Namespace to prefix all metric names
:type namespace: string
Expand All @@ -111,10 +115,19 @@ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, nam
:param default_sample_rate: Sample rate to use by default for all metrics
:type default_sample_rate: float
:param max_buffer_len: Maximum number of bytes to buffer before sending to the server
if sending metrics in batch. If not specified it will be adjusted to a optimal value
depending on the connection type.
:type max_buffer_len: integer
"""

self.lock = Lock()

# Check for deprecated option
if max_buffer_size is not None:
log.warning("The parameter max_buffer_size is now deprecated and is not used anymore")

# Check host and port env vars
agent_host = os.environ.get('DD_AGENT_HOST')
if agent_host and host == DEFAULT_HOST:
Expand All @@ -129,20 +142,24 @@ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, nam
%s, using %s as port number", dogstatsd_port, port)

# Connection
self._max_payload_size = max_buffer_len
if socket_path is not None:
self.socket_path = socket_path
self.host = None
self.port = None
transport = "uds"
if not self._max_payload_size:
self._max_payload_size = UDP_OPTIMAL_PAYLOAD_LENGTH
else:
self.socket_path = None
self.host = self.resolve_host(host, use_default_route)
self.port = int(port)
transport = "udp"
if not self._max_payload_size:
self._max_payload_size = UDS_OPTIMAL_PAYLOAD_LENGTH

# Socket
self.socket = None
self.max_buffer_size = max_buffer_size
self._send = self._send_to_server
self.encoding = 'utf-8'

Expand All @@ -168,7 +185,7 @@ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, nam
"client_version:{}".format(__version__),
"client_transport:{}".format(transport),
]
self._reset_telementry()
self._reset_telemetry()
self._telemetry_flush_interval = telemetry_min_flush_interval
self._telemetry = not disable_telemetry

Expand All @@ -179,7 +196,7 @@ def enable_telemetry(self):
self._telemetry = True

def __enter__(self):
self.open_buffer(self.max_buffer_size)
self.open_buffer()
return self

def __exit__(self, type, value, traceback):
Expand Down Expand Up @@ -221,17 +238,19 @@ def get_socket(self):

return self.socket

def open_buffer(self, max_buffer_size=50):
def open_buffer(self, max_buffer_size=None):
"""
Open a buffer to send a batch of metrics in one packet.
Open a buffer to send a batch of metrics.
You can also use this as a context manager.
>>> with DogStatsd() as batch:
>>> batch.gauge('users.online', 123)
>>> batch.gauge('active.connections', 1001)
"""
self.max_buffer_size = max_buffer_size
if max_buffer_size is not None:
log.warning("The parameter max_buffer_size is now deprecated and is not used anymore")
self._current_buffer_total_size = 0
self.buffer = []
self._send = self._send_to_buffer

Expand Down Expand Up @@ -385,7 +404,7 @@ def _report(self, metric, metric_type, value, tags, sample_rate):
# Send it
self._send(payload)

def _reset_telementry(self):
def _reset_telemetry(self):
self.metrics_count = 0
self.events_count = 0
self.service_checks_count = 0
Expand All @@ -408,25 +427,30 @@ def _flush_telemetry(self):
))

def _is_telemetry_flush_time(self):
if self._telemetry \
and self._last_flush_time + self._telemetry_flush_interval < time.time():
return True
return False
return self._telemetry and self._last_flush_time + self._telemetry_flush_interval < time.time()

def _send_to_server(self, packet):
flush_telemetry = self._is_telemetry_flush_time()
if flush_telemetry:
packet += "\n"+self._flush_telemetry()
self._xmit_packet(packet, self._telemetry)
if self._is_telemetry_flush_time():
telemetry = self._flush_telemetry()
if self._xmit_packet(telemetry, False):
self._reset_telemetry()
self.packets_sent += 1
self.bytes_sent += len(telemetry)
else:
# Telemetry packet has been dropped, keep telemetry data for the next flush
self._last_flush_time = time.time()
self.bytes_dropped += len(telemetry)
self.packets_dropped += 1

def _xmit_packet(self, packet, telemetry):
try:
# If set, use socket directly
(self.socket or self.get_socket()).send(packet.encode(self.encoding))
if self._telemetry:
if flush_telemetry:
self._reset_telementry()
if telemetry:
self.packets_sent += 1
self.bytes_sent += len(packet)
return
return True
except socket.timeout:
# dogstatsd is overflowing, drop the packets (mimicks the UDP behaviour)
pass
Expand All @@ -441,18 +465,26 @@ def _send_to_server(self, packet):
self.close_socket()
except Exception as e:
log.error("Unexpected error: %s", str(e))

if self._telemetry:
if telemetry:
self.bytes_dropped += len(packet)
self.packets_dropped += 1
return False

def _send_to_buffer(self, packet):
self.buffer.append(packet)
if len(self.buffer) >= self.max_buffer_size:
if self._should_flush(len(packet)):
self._flush_buffer()
self.buffer.append(packet)
# Update the current buffer length, including line break to anticipate the final packet size
self._current_buffer_total_size += (len(packet) + 1)

def _should_flush(self, length_to_be_added):
if self._current_buffer_total_size + length_to_be_added > self._max_payload_size:
return True
return False

def _flush_buffer(self):
self._send_to_server("\n".join(self.buffer))
self._current_buffer_total_size = 0
self.buffer = []

def _escape_event_content(self, string):
Expand Down
Loading

0 comments on commit 19dd434

Please sign in to comment.