Edit on GitHub

sqlmesh.core.metric.definition

  1from __future__ import annotations
  2
  3import typing as t
  4from pathlib import Path
  5
  6from sqlglot import exp
  7from sqlglot.helper import first
  8
  9from sqlmesh.core import dialect as d
 10from sqlmesh.core.node import str_or_exp_to_str
 11from sqlmesh.utils import UniqueKeyDict
 12from sqlmesh.utils.errors import ConfigError
 13from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator
 14
 15MeasureAndDimTables = t.Tuple[str, t.Tuple[str, ...]]
 16
 17
 18def load_metric_ddl(
 19    expression: exp.Expr, dialect: t.Optional[str], path: Path = Path(), **kwargs: t.Any
 20) -> MetricMeta:
 21    """Returns a MetricMeta from raw Metric DDL."""
 22    if not isinstance(expression, d.Metric):
 23        _raise_metric_config_error(
 24            f"Only METRIC(...) statements are allowed. Found {expression.sql(pretty=True)}", path
 25        )
 26
 27    metric = MetricMeta(
 28        **{
 29            "dialect": dialect,
 30            "description": (
 31                "\n".join(comment.strip() for comment in expression.comments)
 32                if expression.comments
 33                else None
 34            ),
 35            **{prop.name.lower(): prop.args.get("value") for prop in expression.expressions},
 36            **kwargs,
 37        }
 38    )
 39
 40    metric._path = path
 41
 42    return metric
 43
 44
 45def expand_metrics(metas: UniqueKeyDict[str, MetricMeta]) -> UniqueKeyDict[str, Metric]:
 46    """Resolves all metas into standalone metrics."""
 47    metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
 48
 49    for name, meta in metas.items():
 50        if name not in metrics:
 51            metrics[name] = meta.to_metric(metas, metrics)
 52
 53    return metrics
 54
 55
 56def remove_namespace(expression: str | exp.Column) -> str:
 57    """Given a column or a string, rewrite table namespaces like catalog.db to catalog__db"""
 58
 59    if not isinstance(expression, str):
 60        expression = first(
 61            ".".join(p.name for p in column.parts[:-1])
 62            for column in expression.find_all(exp.Column)
 63            if column.table
 64        )
 65    return expression.replace('"', "").replace(".", "__")
 66
 67
 68class MetricMeta(PydanticModel, frozen=True):
 69    """Raw metric definition without relationships or expansion of derived metrics."""
 70
 71    name: str
 72    dialect: str
 73    expression: exp.Expr
 74    description: t.Optional[str] = None
 75    owner: t.Optional[str] = None
 76
 77    _path: Path = Path()
 78
 79    @field_validator("name", mode="before")
 80    @classmethod
 81    def _name_validator(cls, v: t.Any) -> str:
 82        return (cls._string_validator(v) or "").lower()
 83
 84    @field_validator("dialect", "owner", "description", mode="before")
 85    @classmethod
 86    def _string_validator(cls, v: t.Any) -> t.Optional[str]:
 87        return str_or_exp_to_str(v)
 88
 89    @field_validator("expression", mode="before")
 90    def _validate_expression(cls, v: t.Any, info: ValidationInfo) -> exp.Expr:
 91        if isinstance(v, str):
 92            dialect = info.data.get("dialect")
 93            return d.parse_one(v, dialect=dialect)
 94        if isinstance(v, exp.Expr):
 95            return v
 96        return v
 97
 98    def to_metric(
 99        self, metas: t.Dict[str, MetricMeta], metrics: UniqueKeyDict[str, Metric]
100    ) -> Metric:
101        """Converts a metric meta into a fully expanded and standalone metric."""
102        metric_refs = {}
103        agg_or_ref = False
104
105        for node in self.expression.walk():
106            if isinstance(node, exp.Alias):
107                _raise_metric_config_error(
108                    f"Alias found for metric '{self.name}' which is not allowed", self._path
109                )
110            elif isinstance(node, exp.AggFunc):
111                agg_or_ref = True
112            elif isinstance(node, exp.Column) and not node.table:
113                agg_or_ref = True
114                ref = node.sql(dialect=self.dialect)
115
116                if ref not in metrics:
117                    metrics[ref] = metas[ref].to_metric(metas, metrics)
118
119                metric_refs[node] = metrics[ref]
120
121        if not agg_or_ref:
122            _raise_metric_config_error(
123                f"Metric '{self.name}' missing an aggregation or metric ref", self._path
124            )
125
126        if metric_refs:
127            expanded = self.expression.copy()
128            for column in expanded.find_all(exp.Column):
129                metric = metric_refs.get(column)
130
131                if metric:
132                    column.replace(metric.expanded.copy())
133        else:
134            expanded = exp.alias_(self.expression, self.name)
135
136        metric = Metric(**self.dict(), expanded=expanded)
137        metric._path = self._path
138        return metric
139
140
141class Metric(MetricMeta, frozen=True):
142    expanded: exp.Expr
143
144    @property
145    def aggs(self) -> t.Dict[exp.AggFunc, MeasureAndDimTables]:
146        """Returns a dictionary of aggregation to referenced tables.
147
148        This method removes catalog and schema information from columns.
149        """
150        return {
151            t.cast(
152                exp.AggFunc,
153                t.cast(exp.Expr, agg.parent).transform(
154                    lambda node: (
155                        exp.column(node.this, table=remove_namespace(node))
156                        if isinstance(node, exp.Column) and node.table
157                        else node
158                    )
159                ),
160            ): _get_measure_and_dim_tables(agg)
161            for agg in self.expanded.find_all(exp.AggFunc)
162        }
163
164    @property
165    def formula(self) -> exp.Expr:
166        """Returns the post aggregation formula of a metric.
167
168        For simple metrics it is just the metric name. For derived metrics,
169        it consists of the operations of the derived metrics without aggregations.
170        """
171        return exp.alias_(
172            self.expanded.transform(
173                lambda node: exp.column(node.args["alias"]) if isinstance(node, exp.Alias) else node
174            ),
175            self.name,
176            copy=False,
177        )
178
179
180def _raise_metric_config_error(msg: str, path: Path) -> None:
181    raise ConfigError(f"{msg}. '{path}'")
182
183
184def _get_measure_and_dim_tables(expression: exp.Expr) -> MeasureAndDimTables:
185    """Finds all the table references in a metric definition.
186
187    Additionally ensure than the first table returned is the 'measure' or numeric value being aggregated.
188    """
189
190    tables = {}
191    measure_table = None
192
193    def is_measure(node: exp.Expr) -> bool:
194        parent = node.parent
195
196        if isinstance(parent, exp.AggFunc) and node.arg_key == "this":
197            return True
198        if isinstance(parent, (exp.If, exp.Case)) and node.arg_key != "this":
199            return is_measure(parent)
200        if isinstance(parent, (exp.Binary, exp.Paren, exp.Distinct)):
201            return is_measure(parent)
202        return False
203
204    for node in expression.walk():
205        if isinstance(node, exp.Column) and node.table:
206            table = ".".join(p.sql() for p in node.parts[:-1])
207            tables[table] = True
208
209            if not measure_table and is_measure(node):
210                measure_table = table
211
212    if not measure_table:
213        raise ConfigError(f"Could not infer a measures table from '{expression}'")
214
215    tables.pop(measure_table)
216    return (measure_table, tuple(tables.keys()))
MeasureAndDimTables = typing.Tuple[str, typing.Tuple[str, ...]]
def load_metric_ddl( expression: sqlglot.expressions.core.Expr, dialect: Optional[str], path: pathlib.Path = PosixPath('.'), **kwargs: Any) -> MetricMeta:
19def load_metric_ddl(
20    expression: exp.Expr, dialect: t.Optional[str], path: Path = Path(), **kwargs: t.Any
21) -> MetricMeta:
22    """Returns a MetricMeta from raw Metric DDL."""
23    if not isinstance(expression, d.Metric):
24        _raise_metric_config_error(
25            f"Only METRIC(...) statements are allowed. Found {expression.sql(pretty=True)}", path
26        )
27
28    metric = MetricMeta(
29        **{
30            "dialect": dialect,
31            "description": (
32                "\n".join(comment.strip() for comment in expression.comments)
33                if expression.comments
34                else None
35            ),
36            **{prop.name.lower(): prop.args.get("value") for prop in expression.expressions},
37            **kwargs,
38        }
39    )
40
41    metric._path = path
42
43    return metric

Returns a MetricMeta from raw Metric DDL.

def expand_metrics( metas: sqlmesh.utils.UniqueKeyDict[str, MetricMeta]) -> sqlmesh.utils.UniqueKeyDict[str, Metric]:
46def expand_metrics(metas: UniqueKeyDict[str, MetricMeta]) -> UniqueKeyDict[str, Metric]:
47    """Resolves all metas into standalone metrics."""
48    metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
49
50    for name, meta in metas.items():
51        if name not in metrics:
52            metrics[name] = meta.to_metric(metas, metrics)
53
54    return metrics

Resolves all metas into standalone metrics.

def remove_namespace(expression: str | sqlglot.expressions.core.Column) -> str:
57def remove_namespace(expression: str | exp.Column) -> str:
58    """Given a column or a string, rewrite table namespaces like catalog.db to catalog__db"""
59
60    if not isinstance(expression, str):
61        expression = first(
62            ".".join(p.name for p in column.parts[:-1])
63            for column in expression.find_all(exp.Column)
64            if column.table
65        )
66    return expression.replace('"', "").replace(".", "__")

Given a column or a string, rewrite table namespaces like catalog.db to catalog__db

class MetricMeta(sqlmesh.utils.pydantic.PydanticModel):
 69class MetricMeta(PydanticModel, frozen=True):
 70    """Raw metric definition without relationships or expansion of derived metrics."""
 71
 72    name: str
 73    dialect: str
 74    expression: exp.Expr
 75    description: t.Optional[str] = None
 76    owner: t.Optional[str] = None
 77
 78    _path: Path = Path()
 79
 80    @field_validator("name", mode="before")
 81    @classmethod
 82    def _name_validator(cls, v: t.Any) -> str:
 83        return (cls._string_validator(v) or "").lower()
 84
 85    @field_validator("dialect", "owner", "description", mode="before")
 86    @classmethod
 87    def _string_validator(cls, v: t.Any) -> t.Optional[str]:
 88        return str_or_exp_to_str(v)
 89
 90    @field_validator("expression", mode="before")
 91    def _validate_expression(cls, v: t.Any, info: ValidationInfo) -> exp.Expr:
 92        if isinstance(v, str):
 93            dialect = info.data.get("dialect")
 94            return d.parse_one(v, dialect=dialect)
 95        if isinstance(v, exp.Expr):
 96            return v
 97        return v
 98
 99    def to_metric(
100        self, metas: t.Dict[str, MetricMeta], metrics: UniqueKeyDict[str, Metric]
101    ) -> Metric:
102        """Converts a metric meta into a fully expanded and standalone metric."""
103        metric_refs = {}
104        agg_or_ref = False
105
106        for node in self.expression.walk():
107            if isinstance(node, exp.Alias):
108                _raise_metric_config_error(
109                    f"Alias found for metric '{self.name}' which is not allowed", self._path
110                )
111            elif isinstance(node, exp.AggFunc):
112                agg_or_ref = True
113            elif isinstance(node, exp.Column) and not node.table:
114                agg_or_ref = True
115                ref = node.sql(dialect=self.dialect)
116
117                if ref not in metrics:
118                    metrics[ref] = metas[ref].to_metric(metas, metrics)
119
120                metric_refs[node] = metrics[ref]
121
122        if not agg_or_ref:
123            _raise_metric_config_error(
124                f"Metric '{self.name}' missing an aggregation or metric ref", self._path
125            )
126
127        if metric_refs:
128            expanded = self.expression.copy()
129            for column in expanded.find_all(exp.Column):
130                metric = metric_refs.get(column)
131
132                if metric:
133                    column.replace(metric.expanded.copy())
134        else:
135            expanded = exp.alias_(self.expression, self.name)
136
137        metric = Metric(**self.dict(), expanded=expanded)
138        metric._path = self._path
139        return metric

Raw metric definition without relationships or expansion of derived metrics.

name: str
dialect: str
expression: sqlglot.expressions.core.Expr
description: Optional[str]
owner: Optional[str]
def to_metric( self, metas: Dict[str, MetricMeta], metrics: sqlmesh.utils.UniqueKeyDict[str, Metric]) -> Metric:
 99    def to_metric(
100        self, metas: t.Dict[str, MetricMeta], metrics: UniqueKeyDict[str, Metric]
101    ) -> Metric:
102        """Converts a metric meta into a fully expanded and standalone metric."""
103        metric_refs = {}
104        agg_or_ref = False
105
106        for node in self.expression.walk():
107            if isinstance(node, exp.Alias):
108                _raise_metric_config_error(
109                    f"Alias found for metric '{self.name}' which is not allowed", self._path
110                )
111            elif isinstance(node, exp.AggFunc):
112                agg_or_ref = True
113            elif isinstance(node, exp.Column) and not node.table:
114                agg_or_ref = True
115                ref = node.sql(dialect=self.dialect)
116
117                if ref not in metrics:
118                    metrics[ref] = metas[ref].to_metric(metas, metrics)
119
120                metric_refs[node] = metrics[ref]
121
122        if not agg_or_ref:
123            _raise_metric_config_error(
124                f"Metric '{self.name}' missing an aggregation or metric ref", self._path
125            )
126
127        if metric_refs:
128            expanded = self.expression.copy()
129            for column in expanded.find_all(exp.Column):
130                metric = metric_refs.get(column)
131
132                if metric:
133                    column.replace(metric.expanded.copy())
134        else:
135            expanded = exp.alias_(self.expression, self.name)
136
137        metric = Metric(**self.dict(), expanded=expanded)
138        metric._path = self._path
139        return metric

Converts a metric meta into a fully expanded and standalone metric.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class Metric(MetricMeta):
142class Metric(MetricMeta, frozen=True):
143    expanded: exp.Expr
144
145    @property
146    def aggs(self) -> t.Dict[exp.AggFunc, MeasureAndDimTables]:
147        """Returns a dictionary of aggregation to referenced tables.
148
149        This method removes catalog and schema information from columns.
150        """
151        return {
152            t.cast(
153                exp.AggFunc,
154                t.cast(exp.Expr, agg.parent).transform(
155                    lambda node: (
156                        exp.column(node.this, table=remove_namespace(node))
157                        if isinstance(node, exp.Column) and node.table
158                        else node
159                    )
160                ),
161            ): _get_measure_and_dim_tables(agg)
162            for agg in self.expanded.find_all(exp.AggFunc)
163        }
164
165    @property
166    def formula(self) -> exp.Expr:
167        """Returns the post aggregation formula of a metric.
168
169        For simple metrics it is just the metric name. For derived metrics,
170        it consists of the operations of the derived metrics without aggregations.
171        """
172        return exp.alias_(
173            self.expanded.transform(
174                lambda node: exp.column(node.args["alias"]) if isinstance(node, exp.Alias) else node
175            ),
176            self.name,
177            copy=False,
178        )

Raw metric definition without relationships or expansion of derived metrics.

expanded: sqlglot.expressions.core.Expr
aggs: Dict[sqlglot.expressions.core.AggFunc, Tuple[str, Tuple[str, ...]]]
145    @property
146    def aggs(self) -> t.Dict[exp.AggFunc, MeasureAndDimTables]:
147        """Returns a dictionary of aggregation to referenced tables.
148
149        This method removes catalog and schema information from columns.
150        """
151        return {
152            t.cast(
153                exp.AggFunc,
154                t.cast(exp.Expr, agg.parent).transform(
155                    lambda node: (
156                        exp.column(node.this, table=remove_namespace(node))
157                        if isinstance(node, exp.Column) and node.table
158                        else node
159                    )
160                ),
161            ): _get_measure_and_dim_tables(agg)
162            for agg in self.expanded.find_all(exp.AggFunc)
163        }

Returns a dictionary of aggregation to referenced tables.

This method removes catalog and schema information from columns.

formula: sqlglot.expressions.core.Expr
165    @property
166    def formula(self) -> exp.Expr:
167        """Returns the post aggregation formula of a metric.
168
169        For simple metrics it is just the metric name. For derived metrics,
170        it consists of the operations of the derived metrics without aggregations.
171        """
172        return exp.alias_(
173            self.expanded.transform(
174                lambda node: exp.column(node.args["alias"]) if isinstance(node, exp.Alias) else node
175            ),
176            self.name,
177            copy=False,
178        )

Returns the post aggregation formula of a metric.

For simple metrics it is just the metric name. For derived metrics, it consists of the operations of the derived metrics without aggregations.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
MetricMeta
name
dialect
expression
description
owner
to_metric
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields