sqlmesh.core.metric.definition
1from __future__ import annotations 2 3import typing as t 4from pathlib import Path 5 6from sqlglot import exp 7from sqlglot.helper import first 8 9from sqlmesh.core import dialect as d 10from sqlmesh.core.node import str_or_exp_to_str 11from sqlmesh.utils import UniqueKeyDict 12from sqlmesh.utils.errors import ConfigError 13from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator 14 15MeasureAndDimTables = t.Tuple[str, t.Tuple[str, ...]] 16 17 18def load_metric_ddl( 19 expression: exp.Expr, dialect: t.Optional[str], path: Path = Path(), **kwargs: t.Any 20) -> MetricMeta: 21 """Returns a MetricMeta from raw Metric DDL.""" 22 if not isinstance(expression, d.Metric): 23 _raise_metric_config_error( 24 f"Only METRIC(...) statements are allowed. Found {expression.sql(pretty=True)}", path 25 ) 26 27 metric = MetricMeta( 28 **{ 29 "dialect": dialect, 30 "description": ( 31 "\n".join(comment.strip() for comment in expression.comments) 32 if expression.comments 33 else None 34 ), 35 **{prop.name.lower(): prop.args.get("value") for prop in expression.expressions}, 36 **kwargs, 37 } 38 ) 39 40 metric._path = path 41 42 return metric 43 44 45def expand_metrics(metas: UniqueKeyDict[str, MetricMeta]) -> UniqueKeyDict[str, Metric]: 46 """Resolves all metas into standalone metrics.""" 47 metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 48 49 for name, meta in metas.items(): 50 if name not in metrics: 51 metrics[name] = meta.to_metric(metas, metrics) 52 53 return metrics 54 55 56def remove_namespace(expression: str | exp.Column) -> str: 57 """Given a column or a string, rewrite table namespaces like catalog.db to catalog__db""" 58 59 if not isinstance(expression, str): 60 expression = first( 61 ".".join(p.name for p in column.parts[:-1]) 62 for column in expression.find_all(exp.Column) 63 if column.table 64 ) 65 return expression.replace('"', "").replace(".", "__") 66 67 68class MetricMeta(PydanticModel, frozen=True): 69 """Raw metric definition without relationships or expansion of derived metrics.""" 70 71 name: str 72 dialect: str 73 expression: exp.Expr 74 description: t.Optional[str] = None 75 owner: t.Optional[str] = None 76 77 _path: Path = Path() 78 79 @field_validator("name", mode="before") 80 @classmethod 81 def _name_validator(cls, v: t.Any) -> str: 82 return (cls._string_validator(v) or "").lower() 83 84 @field_validator("dialect", "owner", "description", mode="before") 85 @classmethod 86 def _string_validator(cls, v: t.Any) -> t.Optional[str]: 87 return str_or_exp_to_str(v) 88 89 @field_validator("expression", mode="before") 90 def _validate_expression(cls, v: t.Any, info: ValidationInfo) -> exp.Expr: 91 if isinstance(v, str): 92 dialect = info.data.get("dialect") 93 return d.parse_one(v, dialect=dialect) 94 if isinstance(v, exp.Expr): 95 return v 96 return v 97 98 def to_metric( 99 self, metas: t.Dict[str, MetricMeta], metrics: UniqueKeyDict[str, Metric] 100 ) -> Metric: 101 """Converts a metric meta into a fully expanded and standalone metric.""" 102 metric_refs = {} 103 agg_or_ref = False 104 105 for node in self.expression.walk(): 106 if isinstance(node, exp.Alias): 107 _raise_metric_config_error( 108 f"Alias found for metric '{self.name}' which is not allowed", self._path 109 ) 110 elif isinstance(node, exp.AggFunc): 111 agg_or_ref = True 112 elif isinstance(node, exp.Column) and not node.table: 113 agg_or_ref = True 114 ref = node.sql(dialect=self.dialect) 115 116 if ref not in metrics: 117 metrics[ref] = metas[ref].to_metric(metas, metrics) 118 119 metric_refs[node] = metrics[ref] 120 121 if not agg_or_ref: 122 _raise_metric_config_error( 123 f"Metric '{self.name}' missing an aggregation or metric ref", self._path 124 ) 125 126 if metric_refs: 127 expanded = self.expression.copy() 128 for column in expanded.find_all(exp.Column): 129 metric = metric_refs.get(column) 130 131 if metric: 132 column.replace(metric.expanded.copy()) 133 else: 134 expanded = exp.alias_(self.expression, self.name) 135 136 metric = Metric(**self.dict(), expanded=expanded) 137 metric._path = self._path 138 return metric 139 140 141class Metric(MetricMeta, frozen=True): 142 expanded: exp.Expr 143 144 @property 145 def aggs(self) -> t.Dict[exp.AggFunc, MeasureAndDimTables]: 146 """Returns a dictionary of aggregation to referenced tables. 147 148 This method removes catalog and schema information from columns. 149 """ 150 return { 151 t.cast( 152 exp.AggFunc, 153 t.cast(exp.Expr, agg.parent).transform( 154 lambda node: ( 155 exp.column(node.this, table=remove_namespace(node)) 156 if isinstance(node, exp.Column) and node.table 157 else node 158 ) 159 ), 160 ): _get_measure_and_dim_tables(agg) 161 for agg in self.expanded.find_all(exp.AggFunc) 162 } 163 164 @property 165 def formula(self) -> exp.Expr: 166 """Returns the post aggregation formula of a metric. 167 168 For simple metrics it is just the metric name. For derived metrics, 169 it consists of the operations of the derived metrics without aggregations. 170 """ 171 return exp.alias_( 172 self.expanded.transform( 173 lambda node: exp.column(node.args["alias"]) if isinstance(node, exp.Alias) else node 174 ), 175 self.name, 176 copy=False, 177 ) 178 179 180def _raise_metric_config_error(msg: str, path: Path) -> None: 181 raise ConfigError(f"{msg}. '{path}'") 182 183 184def _get_measure_and_dim_tables(expression: exp.Expr) -> MeasureAndDimTables: 185 """Finds all the table references in a metric definition. 186 187 Additionally ensure than the first table returned is the 'measure' or numeric value being aggregated. 188 """ 189 190 tables = {} 191 measure_table = None 192 193 def is_measure(node: exp.Expr) -> bool: 194 parent = node.parent 195 196 if isinstance(parent, exp.AggFunc) and node.arg_key == "this": 197 return True 198 if isinstance(parent, (exp.If, exp.Case)) and node.arg_key != "this": 199 return is_measure(parent) 200 if isinstance(parent, (exp.Binary, exp.Paren, exp.Distinct)): 201 return is_measure(parent) 202 return False 203 204 for node in expression.walk(): 205 if isinstance(node, exp.Column) and node.table: 206 table = ".".join(p.sql() for p in node.parts[:-1]) 207 tables[table] = True 208 209 if not measure_table and is_measure(node): 210 measure_table = table 211 212 if not measure_table: 213 raise ConfigError(f"Could not infer a measures table from '{expression}'") 214 215 tables.pop(measure_table) 216 return (measure_table, tuple(tables.keys()))
19def load_metric_ddl( 20 expression: exp.Expr, dialect: t.Optional[str], path: Path = Path(), **kwargs: t.Any 21) -> MetricMeta: 22 """Returns a MetricMeta from raw Metric DDL.""" 23 if not isinstance(expression, d.Metric): 24 _raise_metric_config_error( 25 f"Only METRIC(...) statements are allowed. Found {expression.sql(pretty=True)}", path 26 ) 27 28 metric = MetricMeta( 29 **{ 30 "dialect": dialect, 31 "description": ( 32 "\n".join(comment.strip() for comment in expression.comments) 33 if expression.comments 34 else None 35 ), 36 **{prop.name.lower(): prop.args.get("value") for prop in expression.expressions}, 37 **kwargs, 38 } 39 ) 40 41 metric._path = path 42 43 return metric
Returns a MetricMeta from raw Metric DDL.
46def expand_metrics(metas: UniqueKeyDict[str, MetricMeta]) -> UniqueKeyDict[str, Metric]: 47 """Resolves all metas into standalone metrics.""" 48 metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 49 50 for name, meta in metas.items(): 51 if name not in metrics: 52 metrics[name] = meta.to_metric(metas, metrics) 53 54 return metrics
Resolves all metas into standalone metrics.
57def remove_namespace(expression: str | exp.Column) -> str: 58 """Given a column or a string, rewrite table namespaces like catalog.db to catalog__db""" 59 60 if not isinstance(expression, str): 61 expression = first( 62 ".".join(p.name for p in column.parts[:-1]) 63 for column in expression.find_all(exp.Column) 64 if column.table 65 ) 66 return expression.replace('"', "").replace(".", "__")
Given a column or a string, rewrite table namespaces like catalog.db to catalog__db
69class MetricMeta(PydanticModel, frozen=True): 70 """Raw metric definition without relationships or expansion of derived metrics.""" 71 72 name: str 73 dialect: str 74 expression: exp.Expr 75 description: t.Optional[str] = None 76 owner: t.Optional[str] = None 77 78 _path: Path = Path() 79 80 @field_validator("name", mode="before") 81 @classmethod 82 def _name_validator(cls, v: t.Any) -> str: 83 return (cls._string_validator(v) or "").lower() 84 85 @field_validator("dialect", "owner", "description", mode="before") 86 @classmethod 87 def _string_validator(cls, v: t.Any) -> t.Optional[str]: 88 return str_or_exp_to_str(v) 89 90 @field_validator("expression", mode="before") 91 def _validate_expression(cls, v: t.Any, info: ValidationInfo) -> exp.Expr: 92 if isinstance(v, str): 93 dialect = info.data.get("dialect") 94 return d.parse_one(v, dialect=dialect) 95 if isinstance(v, exp.Expr): 96 return v 97 return v 98 99 def to_metric( 100 self, metas: t.Dict[str, MetricMeta], metrics: UniqueKeyDict[str, Metric] 101 ) -> Metric: 102 """Converts a metric meta into a fully expanded and standalone metric.""" 103 metric_refs = {} 104 agg_or_ref = False 105 106 for node in self.expression.walk(): 107 if isinstance(node, exp.Alias): 108 _raise_metric_config_error( 109 f"Alias found for metric '{self.name}' which is not allowed", self._path 110 ) 111 elif isinstance(node, exp.AggFunc): 112 agg_or_ref = True 113 elif isinstance(node, exp.Column) and not node.table: 114 agg_or_ref = True 115 ref = node.sql(dialect=self.dialect) 116 117 if ref not in metrics: 118 metrics[ref] = metas[ref].to_metric(metas, metrics) 119 120 metric_refs[node] = metrics[ref] 121 122 if not agg_or_ref: 123 _raise_metric_config_error( 124 f"Metric '{self.name}' missing an aggregation or metric ref", self._path 125 ) 126 127 if metric_refs: 128 expanded = self.expression.copy() 129 for column in expanded.find_all(exp.Column): 130 metric = metric_refs.get(column) 131 132 if metric: 133 column.replace(metric.expanded.copy()) 134 else: 135 expanded = exp.alias_(self.expression, self.name) 136 137 metric = Metric(**self.dict(), expanded=expanded) 138 metric._path = self._path 139 return metric
Raw metric definition without relationships or expansion of derived metrics.
99 def to_metric( 100 self, metas: t.Dict[str, MetricMeta], metrics: UniqueKeyDict[str, Metric] 101 ) -> Metric: 102 """Converts a metric meta into a fully expanded and standalone metric.""" 103 metric_refs = {} 104 agg_or_ref = False 105 106 for node in self.expression.walk(): 107 if isinstance(node, exp.Alias): 108 _raise_metric_config_error( 109 f"Alias found for metric '{self.name}' which is not allowed", self._path 110 ) 111 elif isinstance(node, exp.AggFunc): 112 agg_or_ref = True 113 elif isinstance(node, exp.Column) and not node.table: 114 agg_or_ref = True 115 ref = node.sql(dialect=self.dialect) 116 117 if ref not in metrics: 118 metrics[ref] = metas[ref].to_metric(metas, metrics) 119 120 metric_refs[node] = metrics[ref] 121 122 if not agg_or_ref: 123 _raise_metric_config_error( 124 f"Metric '{self.name}' missing an aggregation or metric ref", self._path 125 ) 126 127 if metric_refs: 128 expanded = self.expression.copy() 129 for column in expanded.find_all(exp.Column): 130 metric = metric_refs.get(column) 131 132 if metric: 133 column.replace(metric.expanded.copy()) 134 else: 135 expanded = exp.alias_(self.expression, self.name) 136 137 metric = Metric(**self.dict(), expanded=expanded) 138 metric._path = self._path 139 return metric
Converts a metric meta into a fully expanded and standalone metric.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
142class Metric(MetricMeta, frozen=True): 143 expanded: exp.Expr 144 145 @property 146 def aggs(self) -> t.Dict[exp.AggFunc, MeasureAndDimTables]: 147 """Returns a dictionary of aggregation to referenced tables. 148 149 This method removes catalog and schema information from columns. 150 """ 151 return { 152 t.cast( 153 exp.AggFunc, 154 t.cast(exp.Expr, agg.parent).transform( 155 lambda node: ( 156 exp.column(node.this, table=remove_namespace(node)) 157 if isinstance(node, exp.Column) and node.table 158 else node 159 ) 160 ), 161 ): _get_measure_and_dim_tables(agg) 162 for agg in self.expanded.find_all(exp.AggFunc) 163 } 164 165 @property 166 def formula(self) -> exp.Expr: 167 """Returns the post aggregation formula of a metric. 168 169 For simple metrics it is just the metric name. For derived metrics, 170 it consists of the operations of the derived metrics without aggregations. 171 """ 172 return exp.alias_( 173 self.expanded.transform( 174 lambda node: exp.column(node.args["alias"]) if isinstance(node, exp.Alias) else node 175 ), 176 self.name, 177 copy=False, 178 )
Raw metric definition without relationships or expansion of derived metrics.
145 @property 146 def aggs(self) -> t.Dict[exp.AggFunc, MeasureAndDimTables]: 147 """Returns a dictionary of aggregation to referenced tables. 148 149 This method removes catalog and schema information from columns. 150 """ 151 return { 152 t.cast( 153 exp.AggFunc, 154 t.cast(exp.Expr, agg.parent).transform( 155 lambda node: ( 156 exp.column(node.this, table=remove_namespace(node)) 157 if isinstance(node, exp.Column) and node.table 158 else node 159 ) 160 ), 161 ): _get_measure_and_dim_tables(agg) 162 for agg in self.expanded.find_all(exp.AggFunc) 163 }
Returns a dictionary of aggregation to referenced tables.
This method removes catalog and schema information from columns.
165 @property 166 def formula(self) -> exp.Expr: 167 """Returns the post aggregation formula of a metric. 168 169 For simple metrics it is just the metric name. For derived metrics, 170 it consists of the operations of the derived metrics without aggregations. 171 """ 172 return exp.alias_( 173 self.expanded.transform( 174 lambda node: exp.column(node.args["alias"]) if isinstance(node, exp.Alias) else node 175 ), 176 self.name, 177 copy=False, 178 )
Returns the post aggregation formula of a metric.
For simple metrics it is just the metric name. For derived metrics, it consists of the operations of the derived metrics without aggregations.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs