Skip to content

Commit

Permalink
Unit test coverage
Browse files Browse the repository at this point in the history
`crucible_svc.py` test coverage is now at 96%. While the remaining 4% is worth
some effort later, subsequent ILAB PRs will change some of this code anyway
and will require test adjustments so it's good enough for now.
  • Loading branch information
dbutenhof committed Jan 10, 2025
1 parent 840a9cf commit 4adaf7e
Show file tree
Hide file tree
Showing 3 changed files with 1,191 additions and 94 deletions.
87 changes: 20 additions & 67 deletions backend/app/services/crucible_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class Point:
value: float


colors = [
COLOR_NAMES = [
"black",
"aqua",
"blue",
Expand Down Expand Up @@ -1104,10 +1104,12 @@ async def get_runs(
s = self._normalize_date(start)
results["startDate"] = datetime.fromtimestamp(
s / 1000.0, tz=timezone.utc
)
).isoformat()
if end:
e = self._normalize_date(end)
results["endDate"] = datetime.fromtimestamp(e / 1000.0, tz=timezone.utc)
results["endDate"] = datetime.fromtimestamp(
e / 1000.0, tz=timezone.utc
).isoformat()

if s and e and s > e:
raise HTTPException(
Expand Down Expand Up @@ -1345,7 +1347,6 @@ async def get_samples(
sample = s["sample"]
sample["iteration"] = s["iteration"]["num"]
sample["primary_metric"] = s["iteration"]["primary-metric"]
sample["status"] = s["iteration"]["status"]
samples.append(sample)
return samples

Expand Down Expand Up @@ -1400,61 +1401,6 @@ async def get_periods(
body.append(period)
return body

async def get_timeline(self, run: str, **kwargs) -> dict[str, Any]:
"""Report the relative timeline of a run
With nested object lists, show runs to iterations to samples to
periods.
Args:
run: run ID
kwargs: additional OpenSearch parameters
"""
itr = await self.search(
index="iteration",
filters=[{"term": {"run.id": run}}],
**kwargs,
ignore_unavailable=True,
)
sam = await self.search(
index="sample",
filters=[{"term": {"run.id": run}}],
**kwargs,
ignore_unavailable=True,
)
per = await self.search(
index="period",
filters=[{"term": {"run.id": run}}],
**kwargs,
ignore_unavailable=True,
)
samples = defaultdict(list)
periods = defaultdict(list)

for s in self._hits(sam):
samples[s["iteration"]["id"]].append(s)
for p in self._hits(per):
periods[p["sample"]["id"]].append(p)

iterations = []
robj = {"id": run, "iterations": iterations}
body = {"run": robj}
for i in self._hits(itr):
if "begin" not in robj:
robj["begin"] = self._format_timestamp(i["run"]["begin"])
robj["end"] = self._format_timestamp(i["run"]["end"])
iteration = i["iteration"]
iterations.append(iteration)
iteration["samples"] = []
for s in samples.get(iteration["id"], []):
sample = s["sample"]
sample["periods"] = []
for pr in periods.get(sample["id"], []):
period = self._format_period(pr["period"])
sample["periods"].append(period)
iteration["samples"].append(sample)
return body

async def get_metrics_list(self, run: str, **kwargs) -> dict[str, Any]:
"""Return a list of metrics available for a run
Expand Down Expand Up @@ -1494,12 +1440,14 @@ async def get_metrics_list(self, run: str, **kwargs) -> dict[str, Any]:
if name in met:
record = met[name]
else:
record = {"periods": [], "breakouts": defaultdict(set)}
record = {"periods": [], "breakouts": defaultdict(list)}
met[name] = record
if "period" in h:
record["periods"].append(h["period"]["id"])
for n, v in desc["names"].items():
record["breakouts"][n].add(v)
# mimic a set, since the set type doesn't serialize
if v not in record["breakouts"][n]:
record["breakouts"][n].append(v)
return met

async def get_metric_breakouts(
Expand Down Expand Up @@ -1555,8 +1503,8 @@ async def get_metric_breakouts(
f"Metric name {metric_name} not found for run {run}",
)
classes = set()
response = {"label": metric, "class": classes}
breakouts = defaultdict(set)
response = {"label": metric}
breakouts = defaultdict(list)
pl = set()
for m in self._hits(metrics):
desc = m["metric_desc"]
Expand All @@ -1567,11 +1515,13 @@ async def get_metric_breakouts(
if "period" in m:
pl.add(m["period"]["id"])
for n, v in desc["names"].items():
breakouts[n].add(v)
if v not in breakouts[n]:
breakouts[n].append(v)
# We want to help filter a consistent summary, so only show those
# names with more than one value.
if len(pl) > 1:
response["periods"] = pl
response["periods"] = sorted(pl)
response["class"] = sorted(classes)
response["breakouts"] = {n: v for n, v in breakouts.items() if len(v) > 1}
self.logger.info("Processing took %.3f seconds", time.time() - start)
return response
Expand Down Expand Up @@ -1634,6 +1584,9 @@ async def get_metrics_data(
filters.extend(await self._build_timestamp_range_filters(periods))

response = []

# NOTE -- _get_metric_ids already failed if we found multiple IDs but
# aggregation wasn't specified.
if len(ids) > 1:
# Find the minimum sample interval of the selected metrics
aggdur = await self.search(
Expand Down Expand Up @@ -1961,9 +1914,9 @@ async def get_metrics_graph(self, graphdata: GraphList) -> dict[str, Any]:
if g.color:
color = g.color
else:
color = colors[cindex]
color = COLOR_NAMES[cindex]
cindex += 1
if cindex >= len(colors):
if cindex >= len(COLOR_NAMES):
cindex = 0
graphitem = {
"x": x,
Expand Down
91 changes: 69 additions & 22 deletions backend/tests/fake_elastic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Optional, Union

Expand All @@ -8,10 +9,21 @@
class Request:
index: str
body: dict[str, Any]
doc_type: str
params: Any
headers: Any
kwargs: dict[str, Any]
doc_type: Optional[str] = None
params: Optional[Any] = None
headers: Optional[Any] = None
kwargs: Optional[dict[str, Any]] = None

def __eq__(self, other) -> bool:
iok = self.index == other.index
bok = self.body == other.body
dok = self.doc_type == other.doc_type
pok = self.params == other.params
hok = self.headers == other.headers

# make empty dict and None match
kok = (not self.kwargs and not other.kwargs) or self.kwargs == other.kwargs
return iok and bok and dok and pok and hok and kok


class FakeAsyncElasticsearch(AsyncElasticsearch):
Expand All @@ -30,21 +42,37 @@ def __init__(self, hosts: Union[str, list[str]], **kwargs):
self.hosts = hosts
self.args = kwargs
self.closed = False
self.data = {}
self.data = defaultdict(list)
self.requests = []

# Testing helpers to manage fake searches
def set_query(
self,
root_index: str,
hit_list: Optional[list[dict[str, Any]]] = None,
aggregation_list: Optional[dict[str, Any]] = None,
aggregations: Optional[dict[str, Any]] = None,
version: int = 7,
repeat: int = 1,
):
"""Add a canned response to an Opensearch query
The overall response and items in the hit and aggregation lists will be
augmented with the usual boilerplate.
Multiple returns for a single index can be queued, in order, via
successive calls. To return the same result on multiple calls, specify
a "repeat" value greater than 1.
Args:
root_index: CDM index name (run, period, etc)
hit_list: list of hit objects to be returned
aggregation_list: list of aggregation objects to return
version: CDM version
repeat:
"""
ver = f"v{version:d}dev"
index = f"cdm{ver}-{root_index}"
hits = []
aggregations = None
if hit_list:
for d in hit_list:
source = d
Expand All @@ -57,16 +85,18 @@ def set_query(
"_source": source,
}
)
if aggregation_list:
aggregations = {
k: {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": v,
}
for k, v in aggregation_list.items()
}
self.data[index] = {
aggregate_response = {}
if aggregations:
for agg, val in aggregations.items():
if isinstance(val, list):
aggregate_response[agg] = {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": val,
}
else:
aggregate_response[agg] = val
response = {
"took": 1,
"timed_out": False,
"_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0},
Expand All @@ -76,8 +106,10 @@ def set_query(
"hits": hits,
},
}
if aggregations:
self.data[index]["aggregations"] = aggregations
if aggregate_response:
response["aggregations"] = aggregate_response
for c in range(repeat):
self.data[index].append(response)

# Faked AsyncElasticsearch methods
async def close(self):
Expand All @@ -92,6 +124,22 @@ async def ping(self, **kwargs):
async def search(
self, body=None, index=None, doc_type=None, params=None, headers=None, **kwargs
):
"""Return a canned response to a search query.
Args:
body: query body
index: Opensearch index name
doc_type: document type (rarely used)
params: Opensearch search parameters (rarely used)
headers: HTTP headers (rarely used)
kwargs: whatever else you might pass to search
Only the index is used here; to verify the correct Opensearch query
bodies and parameters, the full request is recorded for inspection.
Return:
A JSON dict with the first canned result for the index, or an error
"""
self.requests.append(
Request(
index=index,
Expand All @@ -102,9 +150,8 @@ async def search(
kwargs=kwargs,
)
)
if index in self.data:
target = self.data[index]
del self.data[index]
if index in self.data and len(self.data[index]) > 0:
target = self.data[index].pop(0)
return target
return {
"error": {
Expand Down
Loading

0 comments on commit 4adaf7e

Please sign in to comment.