Skip to content

Commit

Permalink
Merge 'tests: merge test_metrics.py into prometheus_test.py ' from Ke…
Browse files Browse the repository at this point in the history
…fu Chai

in this series, we move all tests in `test_metrics.py` into `prometheus_test.py ` to drop the repeating code.

Closes #2254

* https://github.com/scylladb/seastar:
  tests: test protobuf support in prometheus_test.py
  tests: enable prometheus_test.py to test metrics without aggregation
  • Loading branch information
xemul committed Jun 10, 2024
2 parents ebf44ab + ca94c44 commit 71d9363
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 160 deletions.
213 changes: 189 additions & 24 deletions tests/unit/prometheus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,87 @@
#

import argparse
import math
import json
import re
import subprocess
import sys
import time
import unittest
import urllib.request
import urllib.parse
import yaml

from typing import Optional
from collections import namedtuple


class Exposition:
@classmethod
def from_hist(cls,
name: str,
hist: list[tuple[float, int]],
sum_: int,
count: int) -> 'Exposition':
# ignore these values, we might need to verify them in future
_, _ = sum_, count
buckets = (cls.value_to_bucket(le - 1) for le, _ in hist)
deltas = []
last_n = 0
for _, n in hist:
delta = n - last_n
last_n = n
deltas.append(delta)
return cls(name, dict(zip(buckets, deltas)), {})

@staticmethod
def value_to_bucket(value):
low = 2 ** math.floor(math.log(value, 2))
high = 2 * low
dif = (high - low) / 4
return low + dif * math.floor((value - low) / dif)

@staticmethod
def _values_to_histogram(values):
hist = {}
for val in values:
bucket = Exposition.value_to_bucket(val)
if bucket in hist:
hist[bucket] += 1
else:
hist[bucket] = 1
return hist

@classmethod
def from_conf(cls,
name: str,
type_: str,
values: list[str],
labels: dict[str, str]) -> 'Exposition':
if type_ in ('gauge', 'counter'):
assert len(values) == 1
return cls(name, float(values[0]), labels)
if type_ == 'histogram':
hist = cls._values_to_histogram(float(v) for v in values)
return cls(name, hist, {})
raise NotImplementedError(f'unsupported type: {type_}')

def __init__(self,
name: str,
type_: str,
value: str,
labels: Optional[dict[str, str]] = None) -> None:
value: int | list[tuple[float, int]],
labels: dict[str, str]) -> None:
self.name = name
if type_ == 'counter':
self.value = float(value)
elif type_ == 'gauge':
self.value = float(value)
else:
# we don't verify histogram or summary yet
self.value = None
self.value = value
self.labels = labels

def __repr__(self):
return f"{self.name=}, {self.value=}, {self.labels=}"

def __eq__(self, other):
if not isinstance(other, Exposition):
return False
return self.value == other.value


class Metrics:
prefix = 'seastar'
Expand Down Expand Up @@ -89,36 +143,67 @@ def get(self,
full_name = None
if name is not None:
full_name = f'{self.prefix}_{self.group}_{name}'
results: list[Exposition] = []
metric_type = None

# for histogram and summary as they are represented with multiple lines
hist_name = ''
hist_buckets = []
hist_sum = 0
hist_count = 0

for line in self.lines:
if not line:
continue
if line.startswith('# HELP'):
continue
if line.startswith('# TYPE'):
_, _, metric_name, type_ = line.split()
if full_name is None or metric_name == full_name:
metric_type = type_
_, _, type_metric_name, metric_type = line.split()
if hist_buckets:
yield Exposition.from_hist(hist_name,
hist_buckets,
hist_sum,
hist_count)
hist_buckets = []
if metric_type in ('histogram', 'summary'):
hist_name = type_metric_name
continue
matched = self.pattern.match(line)
assert matched, f'malformed metric line: {line}'

metric_name = matched.group('metric_name')
if full_name and metric_name != full_name:
value_metric_name = matched.group('metric_name')
if full_name and not value_metric_name.startswith(full_name):
continue

metric_labels = self._parse_labels(matched.group('labels'))
if labels is not None and metric_labels != labels:
continue

metric_value = matched.group('value')
results.append(Exposition(metric_name,
metric_type,
metric_value,
metric_labels))
return results
metric_value = float(matched.group('value'))
if metric_type == 'histogram':
if value_metric_name == f'{type_metric_name}_bucket':
last_value = 0
if hist_buckets:
last_value = hist_buckets[-1][1]
if metric_value - last_value != 0:
le = metric_labels['le'].strip('"')
hist_buckets.append((float(le), metric_value))
elif value_metric_name == f'{type_metric_name}_sum':
hist_sum = metric_value
elif value_metric_name == f'{type_metric_name}_count':
hist_count = metric_value
else:
raise RuntimeError(f'unknown histogram value: {line}')
elif metric_type == 'summary':
raise NotImplementedError('unsupported type: summary')
else:
yield Exposition(type_metric_name,
metric_value,
metric_labels)
if hist_buckets:
yield Exposition.from_hist(hist_name,
hist_buckets,
hist_sum,
hist_count)

def get_help(self, name: str) -> Optional[str]:
full_name = f'{self.prefix}_{self.group}_{name}'
Expand All @@ -135,6 +220,8 @@ class TestPrometheus(unittest.TestCase):
exporter_process = None
exporter_config = None
port = 10001
prometheus = None
prometheus_scrape_interval = 15

@classmethod
def setUpClass(cls) -> None:
Expand Down Expand Up @@ -175,7 +262,27 @@ def _get_metrics(cls,
body = f.read().decode('utf-8')
return Metrics(body.rstrip().split('\n'))

def test_filtering_by_label(self) -> None:
def test_filtering_by_label_sans_aggregation(self) -> None:
labels = {'private': '1'}
metrics = self._get_metrics(labels=labels)
actual_values = list(metrics.get())
expected_values = []
with open(self.exporter_config, encoding='utf-8') as f:
config = yaml.safe_load(f)
for metric in config['metrics']:
name = metric['name']
metric_name = f'{Metrics.prefix}_{Metrics.group}_{name}'
metric_labels = metric['labels']
if metric_labels != labels:
continue
e = Exposition.from_conf(metric_name,
metric['type'],
metric['values'],
metric_labels)
expected_values.append(e)
self.assertCountEqual(actual_values, expected_values)

def test_filtering_by_label_with_aggregation(self) -> None:
TestCase = namedtuple('TestCase', ['label', 'regex', 'found'])
label = 'private'
tests = [
Expand All @@ -188,7 +295,8 @@ def test_filtering_by_label(self) -> None:
for test in tests:
with self.subTest(regex=test.regex, found=test.found):
metrics = self._get_metrics(labels={test.label: test.regex})
self.assertEqual(len(metrics.get()), test.found)
values = list(metrics.get())
self.assertEqual(len(values), test.found)

def test_aggregated(self) -> None:
name = 'counter_1'
Expand Down Expand Up @@ -218,6 +326,55 @@ def test_help(self) -> None:
else:
self.assertIsNone(msg)

@staticmethod
def _from_native_histogram(values) -> dict[float, float]:
results = {}
for v in values:
bucket = Exposition.value_to_bucket(float(v[2]) - 1)
results[bucket] = float(v[3])
return results

@staticmethod
def _query_prometheus(host: str, query: str, type_: str) -> float | dict[float, float]:
url = f'http://{host}/api/v1/query?query={query}'
headers = {"Accept": "application/json"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as f:
results = json.load(f)["data"]["result"][0]
if type_ == 'histogram':
buckets = results["histogram"][1]["buckets"]
return TestPrometheus._from_native_histogram(buckets)
return float(results["value"][1])

def test_protobuf(self) -> None:
if self.prometheus is None:
self.skipTest("prometheus is not configured")

# Prometheus does not allow us to push metrics to it, neither
# can we force it to scrape an exporter, so we have to wait
# until prometheus scrapes the server
time.sleep(self.prometheus_scrape_interval + 1)
with open(self.exporter_config, encoding='utf-8') as f:
config = yaml.safe_load(f)

labels = {'private': '1'}
for metric in config['metrics']:
name = metric['name']
metric_name = f'{Metrics.prefix}_{Metrics.group}_{name}'
metric_labels = metric['labels']
if metric_labels != labels:
continue
metric_type = metric['type']
metric_value = metric['values']
e = Exposition.from_conf(metric_name,
metric_type,
metric_value,
metric_labels)
res = self._query_prometheus(self.prometheus,
metric_name,
metric_type)
self.assertEqual(res, e.value)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
Expand All @@ -227,8 +384,16 @@ def test_help(self) -> None:
parser.add_argument('--config',
required=True,
help='Path to the metrics definition file')
parser.add_argument('--prometheus',
help='A Prometheus to connect to')
parser.add_argument('--prometheus-scrape-interval',
type=int,
help='Prometheus scrape interval (in seconds)',
default=15)
opts, remaining = parser.parse_known_args()
remaining.insert(0, sys.argv[0])
TestPrometheus.exporter_path = opts.exporter
TestPrometheus.exporter_config = opts.config
TestPrometheus.prometheus = opts.prometheus
TestPrometheus.prometheus_scrape_interval = opts.prometheus_scrape_interval
unittest.main(argv=remaining)
Loading

0 comments on commit 71d9363

Please sign in to comment.