Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - custom grain for conversion metrics #1467

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, semantic_manifest: SemanticManifest) -> None: # noqa: D107
self._time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(semantic_manifest)
self.custom_granularities = TimeSpineSource.build_custom_granularities(list(self._time_spine_sources.values()))
self._semantic_model_lookup = SemanticModelLookup(
model=semantic_manifest, custom_granularities=self.custom_granularities
semantic_manifest=semantic_manifest, custom_granularities=self.custom_granularities
)
self._metric_lookup = MetricLookup(
semantic_manifest=self._semantic_manifest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,22 @@ def get_min_queryable_time_granularity(self, metric_reference: MetricReference)

def _get_min_queryable_time_granularity(self, metric_reference: MetricReference) -> TimeGranularity:
agg_time_dimension_specs = self._get_agg_time_dimension_specs_for_metric(metric_reference)
assert (
agg_time_dimension_specs
), f"No agg_time_dimension found for metric {metric_reference}. Something has been misconfigured."

minimum_queryable_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_specs[0].reference
)
if len(agg_time_dimension_specs) > 1:
for agg_time_dimension_spec in agg_time_dimension_specs[1:]:
defined_time_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_spec.reference
)
if defined_time_granularity.to_int() > minimum_queryable_granularity.to_int():
minimum_queryable_granularity = defined_time_granularity
minimum_queryable_granularity: Optional[TimeGranularity] = None
for agg_time_dimension_spec in agg_time_dimension_specs:
defined_time_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_spec.reference, entity_links=agg_time_dimension_spec.entity_links
)
if (
not minimum_queryable_granularity
or defined_time_granularity.to_int() > minimum_queryable_granularity.to_int()
):
minimum_queryable_granularity = defined_time_granularity

if not minimum_queryable_granularity:
raise ValueError(
f"No agg_time_dimension found for metric '{metric_reference.element_name}'. Something has been misconfigured."
)

return minimum_queryable_granularity

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import logging
from typing import Dict, List, Optional, Sequence, Set
from typing import Dict, List, Optional, Sequence, Set, Tuple

from dbt_semantic_interfaces.protocols.dimension import Dimension
from dbt_semantic_interfaces.protocols.entity import Entity
Expand Down Expand Up @@ -39,11 +39,14 @@
class SemanticModelLookup:
"""Tracks semantic information for semantic models held in a set of SemanticModelContainers."""

def __init__(self, model: SemanticManifest, custom_granularities: Dict[str, ExpandedTimeGranularity]) -> None:
def __init__(
self, semantic_manifest: SemanticManifest, custom_granularities: Dict[str, ExpandedTimeGranularity]
) -> None:
"""Initializer.

Args:
model: the semantic manifest used for loading semantic model definitions
semantic_manifest: the semantic manifest used for loading semantic model definitions
custom_granularities: custom time granularities supported in the manifest
"""
self._custom_granularities = custom_granularities
self._measure_index: Dict[MeasureReference, SemanticModel] = {}
Expand All @@ -52,20 +55,20 @@ def __init__(self, model: SemanticManifest, custom_granularities: Dict[str, Expa
self._measure_non_additive_dimension_specs: Dict[MeasureReference, NonAdditiveDimensionSpec] = {}
self._dimension_index: Dict[DimensionReference, List[SemanticModel]] = {}
self._entity_index: Dict[EntityReference, List[SemanticModel]] = {}

self._dimension_ref_to_spec: Dict[DimensionReference, DimensionSpec] = {}
self._entity_ref_to_spec: Dict[EntityReference, EntitySpec] = {}
self._primary_entity_index: Dict[EntityReference, List[SemanticModel]] = {}

self._semantic_model_to_aggregation_time_dimensions: Dict[
SemanticModelReference, ElementGrouper[TimeDimensionReference, MeasureSpec]
] = {}

self._semantic_model_reference_to_semantic_model: Dict[SemanticModelReference, SemanticModel] = {}
for semantic_model in sorted(model.semantic_models, key=lambda semantic_model: semantic_model.name):
for semantic_model in sorted(semantic_manifest.semantic_models, key=lambda semantic_model: semantic_model.name):
self._add_semantic_model(semantic_model)

# Cache for defined time granularity.
self._time_dimension_to_defined_time_granularity: Dict[TimeDimensionReference, TimeGranularity] = {}
self._time_dimension_to_defined_time_granularity: Dict[
Tuple[TimeDimensionReference, EntityReference], TimeGranularity
] = {}

# Cache for agg. time dimension for measure.
self._measure_reference_to_agg_time_dimension_specs: Dict[MeasureReference, Sequence[TimeDimensionSpec]] = {}
Expand All @@ -76,36 +79,65 @@ def get_dimension_references(self) -> Sequence[DimensionReference]:

@staticmethod
def get_dimension_from_semantic_model(
semantic_model: SemanticModel, dimension_reference: LinkableElementReference
semantic_model: SemanticModel, dimension_reference: DimensionReference
) -> Dimension:
"""Get dimension from semantic model."""
for dim in semantic_model.dimensions:
if dim.reference == dimension_reference:
if dim.reference.element_name == dimension_reference.element_name:
return dim
raise ValueError(
f"No dimension with name ({dimension_reference}) in semantic_model with name ({semantic_model.name})"
f"No dimension with name '{dimension_reference.element_name}' in semantic_model with name '{semantic_model.name}'."
)

def get_dimension(self, dimension_reference: DimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
def _get_primary_entity_from_entity_links_for_dimension(
self, entity_links: Sequence[EntityReference]
) -> EntityReference:
"""The last entity in the entity_links is the primary entity in the semantic model where the dimension is defined."""
if not entity_links:
raise ValueError("Entity links must be provided to dedupe dimensions across semantic models.")

return entity_links[-1]

def get_dimension(
self, dimension_reference: DimensionReference, entity_links: Sequence[EntityReference]
) -> Dimension:
"""Retrieves a full dimension object from its semantic model."""
# If the reference passed is a TimeDimensionReference, convert to DimensionReference.
dimension_reference = DimensionReference(dimension_reference.element_name)

semantic_models = self._dimension_index.get(dimension_reference)
if not semantic_models:
semantic_models_for_dimension_name = self._dimension_index.get(dimension_reference)
if not semantic_models_for_dimension_name:
raise ValueError(
f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models"
f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models."
)

return SemanticModelLookup.get_dimension_from_semantic_model(
# Dimension object should match across semantic models, so just use the first semantic model.
semantic_model=semantic_models[0],
dimension_reference=dimension_reference,
primary_entity = self._get_primary_entity_from_entity_links_for_dimension(entity_links)
semantic_models_with_primary_entity = self._primary_entity_index.get(primary_entity)
if not semantic_models_with_primary_entity:
raise ValueError(
"Last entity link for dimension must be the primary entity in the semantic model where the dimension is defined."
f"Could not find primary entity '{primary_entity.element_name}' in configured semantic models."
f"Got dimension '{dimension_reference.element_name}' with entity links: {entity_links}"
)

intersecting_semantic_models = set(semantic_models_for_dimension_name).intersection(
set(semantic_models_with_primary_entity)
)
if len(intersecting_semantic_models) == 0:
raise ValueError(
f"Could not find dimension with name '{dimension_reference.element_name}' in semantic models with primary "
f"entity '{primary_entity.element_name}'. Got dimension '{dimension_reference.element_name}' with entity links:"
f"{entity_links}."
)
if len(intersecting_semantic_models) > 1:
raise ValueError(
f"Found multiple semantic models with dimension '{dimension_reference.element_name}' and primary entity "
f"'{primary_entity.element_name}'. This should have been prevented by validations."
)

def get_time_dimension(self, time_dimension_reference: TimeDimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
return self.get_dimension(dimension_reference=time_dimension_reference.dimension_reference)
return SemanticModelLookup.get_dimension_from_semantic_model(
semantic_model=intersecting_semantic_models.pop(), dimension_reference=dimension_reference
)

@property
def measure_references(self) -> Sequence[MeasureReference]:
Expand Down Expand Up @@ -246,6 +278,7 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
self._dimension_index[dim.reference] = semantic_models_for_dimension

if not StructuredLinkableSpecName.from_name(dim.name).is_element_name:
# TODO?
# TODO: [custom granularity] change this to an assertion once we're sure there aren't exceptions
logger.warning(
LazyFormat(
Expand All @@ -254,18 +287,15 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
)
)

# TODO: Construct these specs correctly. All of the time dimension specs have the default granularity
self._dimension_ref_to_spec[dim.time_dimension_reference or dim.reference] = (
TimeDimensionSpec(element_name=dim.name, entity_links=())
if dim.type is DimensionType.TIME
else DimensionSpec(element_name=dim.name, entity_links=())
)

for entity in semantic_model.entities:
semantic_models_for_entity = self._entity_index.get(entity.reference, []) + [semantic_model]
self._entity_index[entity.reference] = semantic_models_for_entity

self._entity_ref_to_spec[entity.reference] = EntitySpec(element_name=entity.name, entity_links=())
primary_entity = self.resolved_primary_entity(semantic_model)
if primary_entity:
self._primary_entity_index[primary_entity] = self._primary_entity_index.get(primary_entity, []) + [
semantic_model
]

self._semantic_model_reference_to_semantic_model[semantic_model.reference] = semantic_model

Expand Down Expand Up @@ -352,16 +382,29 @@ def entity_links_for_local_elements(semantic_model: SemanticModel) -> Sequence[E

return sorted(possible_entity_links, key=lambda entity_reference: entity_reference.element_name)

def get_element_spec_for_name(self, element_name: str) -> LinkableInstanceSpec:
def get_linkable_element_spec_from_semantic_model(
self, element_name: str, semantic_model: SemanticModel
) -> LinkableInstanceSpec:
"""Returns the spec for the given name of a linkable element (dimension or entity)."""
if TimeDimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
return self._dimension_ref_to_spec[TimeDimensionReference(element_name=element_name)]
elif DimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
return self._dimension_ref_to_spec[DimensionReference(element_name=element_name)]
elif EntityReference(element_name=element_name) in self._entity_ref_to_spec:
return self._entity_ref_to_spec[EntityReference(element_name=element_name)]
else:
raise ValueError(f"Unable to find linkable element {element_name} in manifest")
if DimensionReference(element_name) in self._dimension_index:
dimension = self.get_dimension_from_semantic_model(
semantic_model=semantic_model, dimension_reference=DimensionReference(element_name)
)
entity_links = (self.get_primary_entity_else_error(semantic_model),)
if dimension.type == DimensionType.TIME:
return TimeDimensionSpec(
element_name=element_name,
entity_links=entity_links,
time_granularity=(
ExpandedTimeGranularity.from_time_granularity(self.resolve_defined_time_granularity(dimension))
),
)
return DimensionSpec(element_name=element_name, entity_links=entity_links)

elif EntityReference(element_name) in self._entity_index:
return EntitySpec(element_name=element_name, entity_links=())

raise ValueError(f"Unable to find linkable element '{element_name}' in semantic model '{semantic_model.name}'.")

def get_agg_time_dimension_specs_for_measure(
self, measure_reference: MeasureReference
Expand All @@ -382,33 +425,37 @@ def _get_agg_time_dimension_specs_for_measure(
# A measure's agg_time_dimension is required to be in the same semantic model as the measure,
# so we can assume the same semantic model for both measure and dimension.
semantic_model = self.get_semantic_model_for_measure(measure_reference)
entity_link = self.resolved_primary_entity(semantic_model)
assert entity_link is not None, (
f"Expected semantic model {semantic_model} to have a primary entity since it has a "
"measure requiring an agg_time_dimension, but found none.",
)
return TimeDimensionSpec.generate_possible_specs_for_time_dimension(
time_dimension_reference=agg_time_dimension,
entity_links=(entity_link,),
entity_links=(self.get_primary_entity_else_error(semantic_model),),
custom_granularities=self._custom_granularities,
)

def get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
def get_defined_time_granularity(
self, time_dimension_reference: TimeDimensionReference, entity_links: Sequence[EntityReference]
) -> TimeGranularity:
"""Time granularity from the time dimension's YAML definition. If not set, defaults to DAY."""
result = self._time_dimension_to_defined_time_granularity.get(time_dimension_reference)
primary_entity = self._get_primary_entity_from_entity_links_for_dimension(entity_links)

cache_key = (time_dimension_reference, primary_entity)
result = self._time_dimension_to_defined_time_granularity.get(cache_key)
if result is not None:
return result

result = self._get_defined_time_granularity(time_dimension_reference)
self._time_dimension_to_defined_time_granularity[time_dimension_reference] = result
result = self._get_defined_time_granularity(
time_dimension_reference=time_dimension_reference, entity_links=entity_links
)
self._time_dimension_to_defined_time_granularity[cache_key] = result
return result

def _get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
time_dimension = self.get_dimension(time_dimension_reference)

defined_time_granularity = DEFAULT_TIME_GRANULARITY
if time_dimension.type_params and time_dimension.type_params.time_granularity:
defined_time_granularity = time_dimension.type_params.time_granularity
def _get_defined_time_granularity(
self, time_dimension_reference: TimeDimensionReference, entity_links: Sequence[EntityReference]
) -> TimeGranularity:
return self.resolve_defined_time_granularity(
self.get_dimension(dimension_reference=time_dimension_reference, entity_links=entity_links)
)

return defined_time_granularity
@staticmethod
def resolve_defined_time_granularity(dimension: Dimension) -> TimeGranularity:
"""Resolve the time granularity from the dimension's YAML definition."""
return dimension.type_params.time_granularity if dimension.type_params else DEFAULT_TIME_GRANULARITY
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from metricflow_semantics.model.semantics.linkable_element import ElementPathKey
from metricflow_semantics.naming.linkable_spec_name import StructuredLinkableSpecName
from metricflow_semantics.visitor import VisitorOutputT

if typing.TYPE_CHECKING:
from metricflow_semantics.specs.dimension_spec import DimensionSpec
Expand All @@ -19,7 +20,6 @@
from metricflow_semantics.specs.metadata_spec import MetadataSpec
from metricflow_semantics.specs.metric_spec import MetricSpec
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.visitor import VisitorOutputT


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from metricflow_semantics.specs.entity_spec import EntitySpec
from metricflow_semantics.specs.group_by_metric_spec import GroupByMetricSpec
from metricflow_semantics.specs.instance_spec import InstanceSpecVisitor, LinkableInstanceSpec
from metricflow_semantics.specs.spec_set import InstanceSpecSet
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -154,6 +155,15 @@ def difference(self, other: LinkableSpecSet) -> LinkableSpecSet: # noqa: D102
def create_from_specs(specs: Sequence[LinkableInstanceSpec]) -> LinkableSpecSet: # noqa: D102
return _group_specs_by_type(specs)

@property
def as_instance_spec_set(self) -> InstanceSpecSet: # noqa: D102
return InstanceSpecSet(
dimension_specs=self.dimension_specs,
entity_specs=self.entity_specs,
time_dimension_specs=self.time_dimension_specs,
group_by_metric_specs=self.group_by_metric_specs,
)


@dataclass
class _GroupSpecByTypeVisitor(InstanceSpecVisitor[None]):
Expand Down
Loading
Loading