diff --git a/datadog/__init__.py b/datadog/__init__.py index dec93813a..2e7cf5f8f 100644 --- a/datadog/__init__.py +++ b/datadog/__init__.py @@ -46,6 +46,7 @@ def initialize( statsd_constant_tags=None, # type: Optional[List[str]] return_raw_response=False, # type: bool hostname_from_config=True, # type: bool + cardinality=None, # type: Optional[str] **kwargs # type: Any ): # type: (...) -> None @@ -112,6 +113,12 @@ def initialize( :param hostname_from_config: Set the hostname from the Datadog agent config (agent 5). Will be deprecated :type hostname_from_config: boolean + + :param cardinality: Set the global cardinality for all metrics. \ + Possible values are "none", "low", "orchestrator" and "high". + Can also be set via the DATADOG_CARDINALITY or DD_CARDINALITY environment variables. + :type cardinality: string + """ # API configuration api._api_key = api_key or api._api_key or os.environ.get("DATADOG_API_KEY", os.environ.get("DD_API_KEY")) @@ -146,6 +153,9 @@ def initialize( statsd.disable_buffering = statsd_disable_buffering api._return_raw_response = return_raw_response + # Set the global cardinality for all metrics + statsd.cardinality = cardinality or os.environ.get("DATADOG_CARDINALITY", os.environ.get("DD_CARDINALITY")) + # HTTP client and API options for key, value in iteritems(kwargs): attribute = "_{}".format(key) diff --git a/datadog/dogstatsd/aggregator.py b/datadog/dogstatsd/aggregator.py index 4a805b75e..22810ce58 100644 --- a/datadog/dogstatsd/aggregator.py +++ b/datadog/dogstatsd/aggregator.py @@ -5,10 +5,11 @@ SetMetric, ) from datadog.dogstatsd.metric_types import MetricType +from datadog.dogstatsd import DogStatsd class Aggregator(object): - def __init__(self): + def __init__(self, cardinality=None): self.metrics_map = { MetricType.COUNT: {}, MetricType.GAUGE: {}, @@ -19,6 +20,7 @@ def __init__(self): MetricType.GAUGE: threading.RLock(), MetricType.SET: threading.RLock(), } + self._cardinality = cardinality def flush_aggregated_metrics(self): metrics = [] @@ -34,29 +36,32 @@ def get_context(self, name, tags): tags_str = ",".join(tags) if tags is not None else "" return "{}:{}".format(name, tags_str) - def count(self, name, value, tags, rate, timestamp=0): + def count(self, name, value, tags, rate, timestamp=0, cardinality=None): return self.add_metric( - MetricType.COUNT, CountMetric, name, value, tags, rate, timestamp + MetricType.COUNT, CountMetric, name, value, tags, rate, timestamp, cardinality ) - def gauge(self, name, value, tags, rate, timestamp=0): + def gauge(self, name, value, tags, rate, timestamp=0, cardinality=None): return self.add_metric( - MetricType.GAUGE, GaugeMetric, name, value, tags, rate, timestamp + MetricType.GAUGE, GaugeMetric, name, value, tags, rate, timestamp, cardinality ) - def set(self, name, value, tags, rate, timestamp=0): + def set(self, name, value, tags, rate, timestamp=0, cardinality=None): return self.add_metric( - MetricType.SET, SetMetric, name, value, tags, rate, timestamp + MetricType.SET, SetMetric, name, value, tags, rate, timestamp, cardinality ) def add_metric( - self, metric_type, metric_class, name, value, tags, rate, timestamp=0 + self, metric_type, metric_class, name, value, tags, rate, timestamp=0, cardinality=None ): context = self.get_context(name, tags) with self._locks[metric_type]: if context in self.metrics_map[metric_type]: self.metrics_map[metric_type][context].aggregate(value) else: + if cardinality is None: + cardinality = self._cardinality + DogStatsd.validate_cardinality(cardinality) self.metrics_map[metric_type][context] = metric_class( - name, value, tags, rate, timestamp + name, value, tags, rate, timestamp, cardinality ) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index da9ece563..394399f19 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -140,6 +140,12 @@ def post_fork_child(): class DogStatsd(object): OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3) + # Cardinality + CARDINALITY_NONE = "none" + CARDINALITY_LOW = "low" + CARDINALITY_ORCHESTRATOR = "orchestrator" + CARDINALITY_HIGH = "high" + def __init__( self, host=DEFAULT_HOST, # type: Text @@ -162,6 +168,7 @@ def __init__( max_buffer_len=0, # type: int container_id=None, # type: Optional[Text] origin_detection_enabled=True, # type: bool + cardinality=None, # type: Optional[Text] socket_timeout=0, # type: Optional[float] telemetry_socket_timeout=0, # type: Optional[float] disable_background_sender=True, # type: bool @@ -307,6 +314,14 @@ def __init__( More on this: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp :type origin_detection_enabled: boolean + :param cardinality: Set the cardinality of the client. Optional. + This feature requires Datadog Agent version >=7.63.0. + When configured, the provided cardinality is sent to the Agent to enrich the metrics with + specific cardinality tags from Origin Detection. + Default: None. + More on this: https://docs.datadoghq.com/containers/kubernetes/tag/?tab=datadogoperator#out-of-the-box-tags + :type cardinality: string + :param socket_timeout: Set timeout for socket operations, in seconds. Optional. If sets to zero, never wait if operation can not be completed immediately. If set to None, wait forever. This option does not affect hostname resolution when using UDP. @@ -415,6 +430,7 @@ def __init__( self.namespace = namespace self.use_ms = use_ms self.default_sample_rate = default_sample_rate + self.cardinality = cardinality # Origin detection self._container_id = None @@ -450,7 +466,7 @@ def __init__( self._flush_interval = flush_interval self._flush_thread = None self._flush_thread_stop = threading.Event() - self.aggregator = Aggregator() + self.aggregator = Aggregator(self.cardinality) # Indicates if the process is about to fork, so we shouldn't start any new threads yet. self._forking = False @@ -824,7 +840,7 @@ def flush_aggregated_metrics(self): """ metrics = self.aggregator.flush_aggregated_metrics() for m in metrics: - self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp) + self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp, m.cardinality) def gauge( self, @@ -832,6 +848,7 @@ def gauge( value, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Record the value of a gauge, optionally setting a list of tags and a @@ -841,9 +858,9 @@ def gauge( >>> statsd.gauge("active.connections", 1001, tags=["protocol:http"]) """ if self._disable_aggregation: - self._report(metric, "g", value, tags, sample_rate) + self._report(metric, "g", value, tags, sample_rate, cardinality=cardinality) else: - self.aggregator.gauge(metric, value, tags, sample_rate) + self.aggregator.gauge(metric, value, tags, sample_rate, cardinality=cardinality) # Minimum Datadog Agent version: 7.40.0 def gauge_with_timestamp( @@ -853,6 +870,7 @@ def gauge_with_timestamp( timestamp, # type: int tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """u Record the value of a gauge with a Unix timestamp (in seconds), @@ -864,9 +882,9 @@ def gauge_with_timestamp( >>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"]) """ if self._disable_aggregation: - self._report(metric, "g", value, tags, sample_rate, timestamp) + self._report(metric, "g", value, tags, sample_rate, timestamp, cardinality) else: - self.aggregator.gauge(metric, value, tags, sample_rate, timestamp) + self.aggregator.gauge(metric, value, tags, sample_rate, timestamp, cardinality) def count( self, @@ -874,6 +892,7 @@ def count( value, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Count tracks how many times something happened per second, tags and a sample @@ -882,9 +901,9 @@ def count( >>> statsd.count("page.views", 123) """ if self._disable_aggregation: - self._report(metric, "c", value, tags, sample_rate) + self._report(metric, "c", value, tags, sample_rate, cardinality=cardinality) else: - self.aggregator.count(metric, value, tags, sample_rate) + self.aggregator.count(metric, value, tags, sample_rate, cardinality=cardinality) # Minimum Datadog Agent version: 7.40.0 def count_with_timestamp( @@ -894,6 +913,7 @@ def count_with_timestamp( timestamp=0, # type: int tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Count how many times something happened at a given Unix timestamp in seconds, @@ -904,9 +924,9 @@ def count_with_timestamp( >>> statsd.count("files.transferred", 124, timestamp=1713804588) """ if self._disable_aggregation: - self._report(metric, "c", value, tags, sample_rate, timestamp) + self._report(metric, "c", value, tags, sample_rate, timestamp, cardinality) else: - self.aggregator.count(metric, value, tags, sample_rate, timestamp) + self.aggregator.count(metric, value, tags, sample_rate, timestamp, cardinality) def increment( self, @@ -914,6 +934,7 @@ def increment( value=1, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Increment a counter, optionally setting a value, tags and a sample @@ -923,9 +944,9 @@ def increment( >>> statsd.increment("files.transferred", 124) """ if self._disable_aggregation: - self._report(metric, "c", value, tags, sample_rate) + self._report(metric, "c", value, tags, sample_rate, cardinality=cardinality) else: - self.aggregator.count(metric, value, tags, sample_rate) + self.aggregator.count(metric, value, tags, sample_rate, cardinality=cardinality) def decrement( self, @@ -933,6 +954,7 @@ def decrement( value=1, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Decrement a counter, optionally setting a value, tags and a sample @@ -943,9 +965,9 @@ def decrement( """ metric_value = -value if value else value if self._disable_aggregation: - self._report(metric, "c", metric_value, tags, sample_rate) + self._report(metric, "c", metric_value, tags, sample_rate, cardinality=cardinality) else: - self.aggregator.count(metric, metric_value, tags, sample_rate) + self.aggregator.count(metric, metric_value, tags, sample_rate, cardinality=cardinality) def histogram( self, @@ -953,6 +975,7 @@ def histogram( value, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Sample a histogram value, optionally setting tags and a sample rate. @@ -960,7 +983,7 @@ def histogram( >>> statsd.histogram("uploaded.file.size", 1445) >>> statsd.histogram("album.photo.count", 26, tags=["gender:female"]) """ - self._report(metric, "h", value, tags, sample_rate) + self._report(metric, "h", value, tags, sample_rate, cardinality=cardinality) def distribution( self, @@ -968,6 +991,7 @@ def distribution( value, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Send a global distribution value, optionally setting tags and a sample rate. @@ -975,7 +999,7 @@ def distribution( >>> statsd.distribution("uploaded.file.size", 1445) >>> statsd.distribution("album.photo.count", 26, tags=["gender:female"]) """ - self._report(metric, "d", value, tags, sample_rate) + self._report(metric, "d", value, tags, sample_rate, cardinality=cardinality) def timing( self, @@ -983,13 +1007,14 @@ def timing( value, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] + cardinality=None, # type: Optional[str] ): # type(...) -> None """ Record a timing, optionally setting tags and a sample rate. >>> statsd.timing("query.response.time", 1234) """ - self._report(metric, "ms", value, tags, sample_rate) + self._report(metric, "ms", value, tags, sample_rate, cardinality=cardinality) def timed(self, metric=None, tags=None, sample_rate=None, use_ms=None): """ @@ -1047,16 +1072,16 @@ def get_user(user_id): """ return DistributedContextManagerDecorator(self, metric, tags, sample_rate, use_ms) - def set(self, metric, value, tags=None, sample_rate=None): + def set(self, metric, value, tags=None, sample_rate=None, cardinality=None): """ Sample a set value. >>> statsd.set("visitors.uniques", 999) """ if self._disable_aggregation: - self._report(metric, "s", value, tags, sample_rate) + self._report(metric, "s", value, tags, sample_rate, cardinality=cardinality) else: - self.aggregator.set(metric, value, tags, sample_rate) + self.aggregator.set(metric, value, tags, sample_rate, cardinality=cardinality) def close_socket(self): """ @@ -1078,10 +1103,10 @@ def close_socket(self): self.telemetry_socket = None def _serialize_metric( - self, metric, metric_type, value, tags, sample_rate=1, timestamp=0 + self, metric, metric_type, value, tags, sample_rate=1, timestamp=0, cardinality=None ): # Create/format the metric packet - return "%s%s:%s|%s%s%s%s%s%s" % ( + return "%s%s:%s|%s%s%s%s%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, @@ -1090,10 +1115,11 @@ def _serialize_metric( ("|#" + ",".join(normalize_tags(tags))) if tags else "", ("|c:" + self._container_id if self._container_id else ""), ("|e:" + self._external_data if self._external_data else ""), + ("|card:" + cardinality if cardinality else ""), ("|T" + text(timestamp)) if timestamp > 0 else "", ) - def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0): + def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0, cardinality=None): """ Create a metric packet and send it. @@ -1120,10 +1146,15 @@ def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0): if not allows_timestamp or timestamp < 0: timestamp = 0 + if cardinality is None: + cardinality = self.cardinality + + self.validate_cardinality(cardinality) + # Resolve the full tag list tags = self._add_constant_tags(tags) payload = self._serialize_metric( - metric, metric_type, value, tags, sample_rate, timestamp + metric, metric_type, value, tags, sample_rate, timestamp, cardinality ) # Send it @@ -1299,6 +1330,7 @@ def event( priority=None, tags=None, hostname=None, + cardinality=None, ): """ Send an event. Attributes are the same as the Event API. @@ -1327,6 +1359,11 @@ def event( message, ) + if cardinality is None: + cardinality = self.cardinality + + self.validate_cardinality(cardinality) + if date_happened: string = "%s|d:%d" % (string, date_happened) if hostname: @@ -1343,6 +1380,8 @@ def event( string = "%s|#%s" % (string, ",".join(tags)) if self._container_id: string = "%s|c:%s" % (string, self._container_id) + if cardinality: + string = "%s|card:%s" % (string, cardinality) if len(string) > 8 * 1024: raise ValueError( @@ -1362,6 +1401,7 @@ def service_check( status, tags=None, timestamp=None, + cardinality=None, hostname=None, message=None, ): @@ -1377,6 +1417,11 @@ def service_check( # Append all client level tags to every status check tags = self._add_constant_tags(tags) + if cardinality is None: + cardinality = self.cardinality + + self.validate_cardinality(cardinality) + if timestamp: string = u"{0}|d:{1}".format(string, timestamp) if hostname: @@ -1387,6 +1432,8 @@ def service_check( string = u"{0}|m:{1}".format(string, message) if self._container_id: string = u"{0}|c:{1}".format(string, self._container_id) + if cardinality: + string = u"{0}|card:{1}".format(string, cardinality) if self._telemetry: self.service_checks_count += 1 @@ -1547,5 +1594,21 @@ def stop(self): self.flush_buffered_metrics() self.close_socket() + @staticmethod + def validate_cardinality(cardinality): + if cardinality not in ( + None, + statsd.CARDINALITY_NONE, + statsd.CARDINALITY_LOW, + statsd.CARDINALITY_ORCHESTRATOR, + statsd.CARDINALITY_HIGH, + ): + logging.warning( + "Cardinality must be one of the following: 'none', 'low', 'orchestrator' or 'high'. " + "Falling back to default cardinality." + ) + return None + return cardinality + statsd = DogStatsd() diff --git a/datadog/dogstatsd/metrics.py b/datadog/dogstatsd/metrics.py index 570c3dcf0..130bb35f5 100644 --- a/datadog/dogstatsd/metrics.py +++ b/datadog/dogstatsd/metrics.py @@ -2,22 +2,23 @@ class MetricAggregator(object): - def __init__(self, name, tags, rate, metric_type, value=0, timestamp=0): + def __init__(self, name, tags, rate, metric_type, value=0, timestamp=0, cardinality=None): self.name = name self.tags = tags self.rate = rate self.metric_type = metric_type self.value = value self.timestamp = timestamp + self.cardinality = cardinality def aggregate(self, value): raise NotImplementedError("Subclasses should implement this method.") class CountMetric(MetricAggregator): - def __init__(self, name, value, tags, rate, timestamp=0): + def __init__(self, name, value, tags, rate, timestamp=0, cardinality=None): super(CountMetric, self).__init__( - name, tags, rate, MetricType.COUNT, value, timestamp + name, tags, rate, MetricType.COUNT, value, timestamp, cardinality ) def aggregate(self, v): @@ -25,9 +26,9 @@ def aggregate(self, v): class GaugeMetric(MetricAggregator): - def __init__(self, name, value, tags, rate, timestamp=0): + def __init__(self, name, value, tags, rate, timestamp=0, cardinality=None): super(GaugeMetric, self).__init__( - name, tags, rate, MetricType.GAUGE, value, timestamp + name, tags, rate, MetricType.GAUGE, value, timestamp, cardinality ) def aggregate(self, v): @@ -35,10 +36,10 @@ def aggregate(self, v): class SetMetric(MetricAggregator): - def __init__(self, name, value, tags, rate, timestamp=0): + def __init__(self, name, value, tags, rate, timestamp=0, cardinality=None): default_value = 0 super(SetMetric, self).__init__( - name, tags, rate, MetricType.SET, default_value, default_value + name, tags, rate, MetricType.SET, default_value, default_value, cardinality ) self.data = set() self.data.add(value) diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index b9a24cfc1..79e38046e 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -269,6 +269,11 @@ def test_initialization(self): self.assertIsNone(statsd.host) self.assertIsNone(statsd.port) + # Add cardinality + options['cardinality'] = 'none' + initialize(**options) + self.assertEqual(statsd.cardinality, 'none') + def test_dogstatsd_initialization_with_env_vars(self): """ Dogstatsd can retrieve its config from env vars when @@ -317,6 +322,10 @@ def test_report_metric_with_unsupported_ts(self): self.statsd._report('set', 's', 123, tags=None, sample_rate=None, timestamp=100) self.assert_equal_telemetry('set:123|s\n', self.recv(2)) + def test_report_with_cardinality(self): + self.statsd._report('report', 'g', 123.4, tags=None, sample_rate=None, cardinality="orchestrator") + self.assert_equal_telemetry('report:123.4|g|card:orchestrator\n', self.recv(2)) + def test_gauge(self): self.statsd.gauge('gauge', 123.4) self.assert_equal_telemetry('gauge:123.4|g\n', self.recv(2)) @@ -325,6 +334,14 @@ def test_gauge_with_ts(self): self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=1066) self.assert_equal_telemetry("gauge:123.4|g|T1066\n", self.recv(2)) + def test_gauge_with_cardinality(self): + self.statsd.gauge('gauge', 123.4, cardinality="high") + self.assert_equal_telemetry('gauge:123.4|g|card:high\n', self.recv(2)) + + self.statsd._reset_telemetry() + self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=1066, cardinality="none") + self.assert_equal_telemetry("gauge:123.4|g|card:none|T1066\n", self.recv(2)) + def test_gauge_with_invalid_ts_should_be_ignored(self): self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=-500) self.assert_equal_telemetry("gauge:123.4|g\n", self.recv(2)) @@ -364,6 +381,16 @@ def test_count_with_ts(self): self.statsd.flush() self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2)) + def test_count_with_cardinality(self): + self.statsd.count('page.views', 11, cardinality="low") + self.statsd.flush() + self.assert_equal_telemetry('page.views:11|c|card:low\n', self.recv(2)) + + self.statsd._reset_telemetry() + self.statsd.count_with_timestamp("page.views", 11, timestamp=2121, cardinality="high") + self.statsd.flush() + self.assert_equal_telemetry("page.views:11|c|card:high|T2121\n", self.recv(2)) + def test_count_with_invalid_ts_should_be_ignored(self): self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066) self.statsd.flush() @@ -373,6 +400,10 @@ def test_histogram(self): self.statsd.histogram('histo', 123.4) self.assert_equal_telemetry('histo:123.4|h\n', self.recv(2)) + def test_histogram_with_cardinality(self): + self.statsd.histogram('histo', 123.4, cardinality="low") + self.assert_equal_telemetry('histo:123.4|h|card:low\n', self.recv(2)) + def test_pipe_in_tags(self): self.statsd.gauge('gt', 123.4, tags=['pipe|in:tag', 'red']) self.assert_equal_telemetry('gt:123.4|g|#pipe_in:tag,red\n', self.recv(2)) @@ -451,8 +482,9 @@ def test_event(self): u'L1\nL2', priority='low', date_happened=1375296969, + cardinality="orchestrator", ) - event2 = u'_e{5,6}:Title|L1\\nL2|d:1375296969|p:low\n' + event2 = u'_e{5,6}:Title|L1\\nL2|d:1375296969|p:low|card:orchestrator\n' self.assert_equal_telemetry( event2, self.recv(2), @@ -564,8 +596,10 @@ def test_service_check(self): self.statsd.service_check( 'my_check.name', self.statsd.WARNING, tags=['key1:val1', 'key2:val2'], timestamp=now, - hostname='i-abcd1234', message=u"♬ †øU \n†øU ¥ºu|m: T0µ ♪") - check = u'_sc|my_check.name|{0}|d:{1}|h:i-abcd1234|#key1:val1,key2:val2|m:{2}'.format(self.statsd.WARNING, now, u'♬ †øU \\n†øU ¥ºu|m\\: T0µ ♪\n') + hostname='i-abcd1234', message=u"♬ †øU \n†øU ¥ºu|m: T0µ ♪", + cardinality="low", + ) + check = u'_sc|my_check.name|{0}|d:{1}|h:i-abcd1234|#key1:val1,key2:val2|m:{2}|card:low\n'.format(self.statsd.WARNING, now, u'♬ †øU \\n†øU ¥ºu|m\\: T0µ ♪') self.assert_equal_telemetry( check, self.recv(2),