Edit on GitHub

sqlmesh.core.model.meta

  1from __future__ import annotations
  2
  3import typing as t
  4from enum import Enum
  5from functools import cached_property
  6from typing_extensions import Self
  7
  8from pydantic import Field
  9from sqlglot import Dialect, exp, parse_one
 10from sqlglot.helper import ensure_collection, ensure_list
 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 12
 13from sqlmesh.core import dialect as d
 14from sqlmesh.core.config.common import VirtualEnvironmentMode
 15from sqlmesh.core.config.linter import LinterConfig
 16from sqlmesh.core.dialect import normalize_model_name
 17from sqlmesh.utils import classproperty
 18from sqlmesh.core.model.common import (
 19    bool_validator,
 20    default_catalog_validator,
 21    depends_on_validator,
 22    properties_validator,
 23    parse_properties,
 24)
 25from sqlmesh.core.model.kind import (
 26    CustomKind,
 27    IncrementalByUniqueKeyKind,
 28    ModelKind,
 29    OnDestructiveChange,
 30    SCDType2ByColumnKind,
 31    SCDType2ByTimeKind,
 32    TimeColumn,
 33    ViewKind,
 34    model_kind_validator,
 35    OnAdditiveChange,
 36)
 37from sqlmesh.core.node import _Node, str_or_exp_to_str
 38from sqlmesh.core.reference import Reference
 39from sqlmesh.utils.date import TimeLike
 40from sqlmesh.utils.errors import ConfigError
 41from sqlmesh.utils.pydantic import (
 42    ValidationInfo,
 43    field_validator,
 44    list_of_fields_validator,
 45    model_validator,
 46    get_dialect,
 47)
 48
 49if t.TYPE_CHECKING:
 50    from sqlmesh.core._typing import CustomMaterializationProperties, SessionProperties
 51    from sqlmesh.core.engine_adapter._typing import GrantsConfig
 52
 53FunctionCall = t.Tuple[str, t.Dict[str, exp.Expression]]
 54
 55
 56class GrantsTargetLayer(str, Enum):
 57    """Target layer(s) where grants should be applied."""
 58
 59    ALL = "all"
 60    PHYSICAL = "physical"
 61    VIRTUAL = "virtual"
 62
 63    @classproperty
 64    def default(cls) -> "GrantsTargetLayer":
 65        return GrantsTargetLayer.VIRTUAL
 66
 67    @property
 68    def is_all(self) -> bool:
 69        return self == GrantsTargetLayer.ALL
 70
 71    @property
 72    def is_physical(self) -> bool:
 73        return self == GrantsTargetLayer.PHYSICAL
 74
 75    @property
 76    def is_virtual(self) -> bool:
 77        return self == GrantsTargetLayer.VIRTUAL
 78
 79    def __str__(self) -> str:
 80        return self.name
 81
 82    def __repr__(self) -> str:
 83        return str(self)
 84
 85
 86class ModelMeta(_Node):
 87    """Metadata for models which can be defined in SQL."""
 88
 89    dialect: str = ""
 90    name: str
 91    kind: ModelKind = ViewKind()
 92    retention: t.Optional[int] = None  # not implemented yet
 93    table_format: t.Optional[str] = None
 94    storage_format: t.Optional[str] = None
 95    partitioned_by_: t.List[exp.Expression] = Field(default=[], alias="partitioned_by")
 96    clustered_by: t.List[exp.Expression] = []
 97    default_catalog: t.Optional[str] = None
 98    depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
 99    columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns")
100    column_descriptions_: t.Optional[t.Dict[str, str]] = Field(
101        default=None, alias="column_descriptions"
102    )
103    audits: t.List[FunctionCall] = []
104    grains: t.List[exp.Expression] = []
105    references: t.List[exp.Expression] = []
106    physical_schema_override: t.Optional[str] = None
107    physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties")
108    virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties")
109    session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties")
110    allow_partials: bool = False
111    signals: t.List[FunctionCall] = []
112    enabled: bool = True
113    physical_version: t.Optional[str] = None
114    gateway: t.Optional[str] = None
115    optimize_query: t.Optional[bool] = None
116    ignored_rules_: t.Optional[t.Set[str]] = Field(
117        default=None, exclude=True, alias="ignored_rules"
118    )
119    formatting: t.Optional[bool] = Field(default=None, exclude=True)
120    virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
121    grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants")
122    grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default
123
124    _bool_validator = bool_validator
125    _model_kind_validator = model_kind_validator
126    _properties_validator = properties_validator
127    _default_catalog_validator = default_catalog_validator
128    _depends_on_validator = depends_on_validator
129
130    @field_validator("audits", "signals", mode="before")
131    def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any:
132        is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals"
133
134        return d.extract_function_calls(v, allow_tuples=is_signal)
135
136    @field_validator("tags", mode="before")
137    def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
138        return ensure_list(cls._validate_value_or_tuple(v, info.data))
139
140    @classmethod
141    def _validate_value_or_tuple(
142        cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False
143    ) -> t.Any:
144        dialect = data.get("dialect")
145
146        def _normalize(value: t.Any) -> t.Any:
147            return normalize_identifiers(value, dialect=dialect) if normalize else value
148
149        if isinstance(v, exp.Paren):
150            v = [v.unnest()]
151
152        if isinstance(v, (exp.Tuple, exp.Array)):
153            return [_normalize(e).name for e in v.expressions]
154        if isinstance(v, exp.Expression):
155            return _normalize(v).name
156        if isinstance(v, str):
157            value = _normalize(v)
158            return value.name if isinstance(value, exp.Expression) else value
159        if isinstance(v, (list, tuple)):
160            return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v]
161
162        return v
163
164    @field_validator("table_format", "storage_format", mode="before")
165    def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]:
166        if isinstance(v, exp.Expression) and not (isinstance(v, (exp.Literal, exp.Identifier))):
167            return v.sql(info.data.get("dialect"))
168        return str_or_exp_to_str(v)
169
170    @field_validator("dialect", mode="before")
171    def _dialect_validator(cls, v: t.Any) -> t.Optional[str]:
172        # dialects are parsed as identifiers and may get normalized as uppercase,
173        # so this ensures they'll be stored as lowercase
174        dialect = str_or_exp_to_str(v)
175        return dialect and dialect.lower()
176
177    @field_validator("physical_version", mode="before")
178    def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]:
179        if v is None:
180            return v
181        return str_or_exp_to_str(v)
182
183    @field_validator("gateway", mode="before")
184    def _gateway_validator(cls, v: t.Any) -> t.Optional[str]:
185        if v is None:
186            return None
187        gateway = str_or_exp_to_str(v)
188        return gateway and gateway.lower()
189
190    @field_validator("partitioned_by_", "clustered_by", mode="before")
191    def _partition_and_cluster_validator(
192        cls, v: t.Any, info: ValidationInfo
193    ) -> t.List[exp.Expression]:
194        if (
195            isinstance(v, list)
196            and all(isinstance(i, str) for i in v)
197            and info.field_name == "partitioned_by_"
198        ):
199            # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str]
200            # however, we should only invoke this if the list contains strings because this validator is also
201            # called by Python models which might pass a List[exp.Expression]
202            string_to_parse = (
203                f"({','.join(v)})"  # recreate the (a, b, c) part of "partitioned_by (a, b, c)"
204            )
205            parsed = parse_one(
206                string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info)
207            )
208            v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v
209
210        expressions = list_of_fields_validator(v, info.data)
211
212        for expression in expressions:
213            num_cols = len(list(expression.find_all(exp.Column)))
214
215            error_msg: t.Optional[str] = None
216            if num_cols == 0:
217                error_msg = "does not contain a column"
218            elif num_cols > 1:
219                error_msg = "contains multiple columns"
220
221            if error_msg:
222                raise ConfigError(f"Field '{expression}' {error_msg}")
223
224        return expressions
225
226    @field_validator(
227        "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False
228    )
229    def _columns_validator(
230        cls, v: t.Any, info: ValidationInfo
231    ) -> t.Optional[t.Dict[str, exp.DataType]]:
232        columns_to_types = {}
233        dialect = info.data.get("dialect")
234
235        if isinstance(v, exp.Schema):
236            for column in v.expressions:
237                expr = column.args.get("kind")
238                if not isinstance(expr, exp.DataType):
239                    raise ConfigError(f"Missing data type for column '{column.name}'.")
240
241                expr.meta["dialect"] = dialect
242                columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr
243
244            return columns_to_types
245
246        if isinstance(v, dict):
247            udt = Dialect.get_or_raise(dialect).SUPPORTS_USER_DEFINED_TYPES
248            for k, data_type in v.items():
249                expr = exp.DataType.build(data_type, dialect=dialect, udt=udt)
250                expr.meta["dialect"] = dialect
251                columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr
252
253            return columns_to_types
254
255        return v
256
257    @field_validator("column_descriptions_", mode="before")
258    def _column_descriptions_validator(
259        cls, vs: t.Any, info: ValidationInfo
260    ) -> t.Optional[t.Dict[str, str]]:
261        dialect = info.data.get("dialect")
262
263        if vs is None:
264            return None
265
266        if isinstance(vs, exp.Paren):
267            vs = vs.flatten()
268
269        if isinstance(vs, (exp.Tuple, exp.Array)):
270            vs = vs.expressions
271
272        raw_col_descriptions = (
273            vs
274            if isinstance(vs, dict)
275            else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs}
276        )
277
278        col_descriptions = {
279            normalize_identifiers(k, dialect=dialect).name: v
280            for k, v in raw_col_descriptions.items()
281        }
282
283        columns_to_types = info.data.get("columns_to_types_")
284        if columns_to_types:
285            from sqlmesh.core.console import get_console
286
287            console = get_console()
288            for column_name in list(col_descriptions):
289                if column_name not in columns_to_types:
290                    console.log_warning(
291                        f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model."
292                    )
293                    del col_descriptions[column_name]
294
295        return col_descriptions
296
297    @field_validator("grains", "references", mode="before")
298    def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expression]:
299        dialect = info.data.get("dialect")
300
301        if isinstance(vs, exp.Paren):
302            vs = vs.unnest()
303
304        if isinstance(vs, (exp.Tuple, exp.Array)):
305            vs = vs.expressions
306        else:
307            vs = [
308                d.parse_one(v, dialect=dialect) if isinstance(v, str) else v
309                for v in ensure_collection(vs)
310            ]
311
312        refs = []
313
314        for v in vs:
315            v = exp.column(v) if isinstance(v, exp.Identifier) else v
316            v.meta["dialect"] = dialect
317            refs.append(v)
318
319        return refs
320
321    @field_validator("ignored_rules_", mode="before")
322    def ignored_rules_validator(cls, vs: t.Any) -> t.Any:
323        return LinterConfig._validate_rules(vs)
324
325    @field_validator("grants_target_layer", mode="before")
326    def _grants_target_layer_validator(cls, v: t.Any) -> t.Any:
327        if isinstance(v, exp.Identifier):
328            return v.this
329        if isinstance(v, exp.Literal) and v.is_string:
330            return v.this
331        return v
332
333    @field_validator("session_properties_", mode="before")
334    def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
335        # use the generic properties validator to parse the session properties
336        parsed_session_properties = parse_properties(type(cls), v, info)
337        if not parsed_session_properties:
338            return parsed_session_properties
339
340        for eq in parsed_session_properties:
341            prop_name = eq.left.name
342
343            if prop_name == "query_label":
344                query_label = eq.right
345                if not isinstance(
346                    query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar)
347                ):
348                    raise ConfigError(
349                        "Invalid value for `session_properties.query_label`. Must be an array or tuple."
350                    )
351
352                label_tuples: t.List[exp.Expression] = (
353                    [query_label.unnest()]
354                    if isinstance(query_label, exp.Paren)
355                    else query_label.expressions
356                )
357
358                for label_tuple in label_tuples:
359                    if not (
360                        isinstance(label_tuple, exp.Tuple)
361                        and len(label_tuple.expressions) == 2
362                        and all(isinstance(label, exp.Literal) for label in label_tuple.expressions)
363                    ):
364                        raise ConfigError(
365                            "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2."
366                        )
367            elif prop_name == "authorization":
368                authorization = eq.right
369                if not (
370                    isinstance(authorization, exp.Literal) and authorization.is_string
371                ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)):
372                    raise ConfigError(
373                        "Invalid value for `session_properties.authorization`. Must be a string literal."
374                    )
375
376        return parsed_session_properties
377
378    @model_validator(mode="before")
379    def _pre_root_validator(cls, data: t.Any) -> t.Any:
380        if not isinstance(data, dict):
381            return data
382
383        grain = data.pop("grain", None)
384        if grain:
385            grains = data.get("grains")
386            if grains:
387                raise ConfigError(
388                    f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains"
389                )
390            data["grains"] = ensure_list(grain)
391
392        table_properties = data.pop("table_properties", None)
393        if table_properties:
394            if not isinstance(table_properties, str):
395                # Do not warn when deserializing from the state.
396                model_name = data["name"]
397                from sqlmesh.core.console import get_console
398
399                get_console().log_warning(
400                    f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead."
401                )
402            physical_properties = data.get("physical_properties")
403            if physical_properties:
404                raise ConfigError(
405                    f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties."
406                )
407
408            data["physical_properties"] = table_properties
409
410        return data
411
412    @model_validator(mode="after")
413    def _root_validator(self) -> Self:
414        kind: t.Any = self.kind
415
416        for field in ("partitioned_by_", "clustered_by"):
417            if (
418                getattr(self, field, None)
419                and not kind.is_materialized
420                and not (kind.is_view and kind.materialized)
421            ):
422                name = field[:-1] if field.endswith("_") else field
423                raise ValueError(f"{name} field cannot be set for {kind.name} models")
424        if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None):
425            raise ValueError(f"partitioned_by field is required for {kind.name} models")
426
427        # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str
428        if (storage_format := self.storage_format) and storage_format.lower() in {
429            "iceberg",
430            "hive",
431            "hudi",
432            "delta",
433        }:
434            from sqlmesh.core.console import get_console
435
436            get_console().log_warning(
437                f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead."
438            )
439
440        # Validate grants configuration for model kind support
441        if self.grants is not None and not kind.supports_grants:
442            raise ValueError(f"grants cannot be set for {kind.name} models")
443
444        return self
445
446    @property
447    def time_column(self) -> t.Optional[TimeColumn]:
448        """The time column for incremental models."""
449        return getattr(self.kind, "time_column", None)
450
451    @property
452    def unique_key(self) -> t.List[exp.Expression]:
453        if isinstance(
454            self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind)
455        ):
456            return self.kind.unique_key
457        return []
458
459    @property
460    def column_descriptions(self) -> t.Dict[str, str]:
461        """A dictionary of column names to annotation comments."""
462        return self.column_descriptions_ or {}
463
464    @property
465    def lookback(self) -> int:
466        """The incremental lookback window."""
467        return getattr(self.kind, "lookback", 0) or 0
468
469    def lookback_start(self, start: TimeLike) -> TimeLike:
470        if self.lookback == 0:
471            return start
472
473        for _ in range(self.lookback):
474            start = self.interval_unit.cron_prev(start)
475        return start
476
477    @property
478    def batch_size(self) -> t.Optional[int]:
479        """The maximal number of units in a single task for a backfill."""
480        return getattr(self.kind, "batch_size", None)
481
482    @property
483    def batch_concurrency(self) -> t.Optional[int]:
484        """The maximal number of batches that can run concurrently for a backfill."""
485        return getattr(self.kind, "batch_concurrency", None)
486
487    @cached_property
488    def physical_properties(self) -> t.Dict[str, exp.Expression]:
489        """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated."""
490        if self.physical_properties_:
491            return {e.this.name: e.expression for e in self.physical_properties_.expressions}
492        return {}
493
494    @cached_property
495    def virtual_properties(self) -> t.Dict[str, exp.Expression]:
496        """A dictionary of properties that will be applied to the virtual layer."""
497        if self.virtual_properties_:
498            return {e.this.name: e.expression for e in self.virtual_properties_.expressions}
499        return {}
500
501    @property
502    def session_properties(self) -> SessionProperties:
503        """A dictionary of session properties."""
504        if not self.session_properties_:
505            return {}
506
507        return d.interpret_key_value_pairs(self.session_properties_)
508
509    @property
510    def custom_materialization_properties(self) -> CustomMaterializationProperties:
511        if isinstance(self.kind, CustomKind):
512            return self.kind.materialization_properties
513        return {}
514
515    @cached_property
516    def grants(self) -> t.Optional[GrantsConfig]:
517        """A dictionary of grants mapping permission names to lists of grantees."""
518
519        if self.grants_ is None:
520            return None
521
522        if not self.grants_.expressions:
523            return {}
524
525        grants_dict = {}
526        for eq_expr in self.grants_.expressions:
527            try:
528                permission_name = self._validate_config_expression(eq_expr.left)
529                grantee_list = self._validate_nested_config_values(eq_expr.expression)
530                grants_dict[permission_name] = grantee_list
531            except ConfigError as e:
532                permission_name = (
533                    eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left)
534                )
535                raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}")
536
537        return grants_dict if grants_dict else None
538
539    @property
540    def all_references(self) -> t.List[Reference]:
541        """All references including grains."""
542        return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [
543            Reference(model_name=self.name, expression=e, unique=True) for e in self.references
544        ]
545
546    @property
547    def on(self) -> t.List[str]:
548        """The grains to be used as join condition in table_diff."""
549
550        on: t.List[str] = []
551        for expr in [ref.expression for ref in self.all_references if ref.unique]:
552            if isinstance(expr, exp.Tuple):
553                on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions])
554            else:
555                # Handle a single Column or Paren expression
556                on.append(expr.this.sql(dialect=self.dialect))
557
558        return on
559
560    @property
561    def managed_columns(self) -> t.Dict[str, exp.DataType]:
562        return getattr(self.kind, "managed_columns", {})
563
564    @property
565    def when_matched(self) -> t.Optional[exp.Whens]:
566        if isinstance(self.kind, IncrementalByUniqueKeyKind):
567            return self.kind.when_matched
568        return None
569
570    @property
571    def merge_filter(self) -> t.Optional[exp.Expression]:
572        if isinstance(self.kind, IncrementalByUniqueKeyKind):
573            return self.kind.merge_filter
574        return None
575
576    @property
577    def catalog(self) -> t.Optional[str]:
578        """Returns the catalog of a model."""
579        return self.fully_qualified_table.catalog
580
581    @cached_property
582    def fully_qualified_table(self) -> exp.Table:
583        return exp.to_table(self.fqn)
584
585    @cached_property
586    def fqn(self) -> str:
587        return normalize_model_name(
588            self.name, default_catalog=self.default_catalog, dialect=self.dialect
589        )
590
591    @property
592    def on_destructive_change(self) -> OnDestructiveChange:
593        return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
594
595    @property
596    def on_additive_change(self) -> OnAdditiveChange:
597        """Return the model's additive change setting if it has one."""
598        return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)
599
600    @property
601    def ignored_rules(self) -> t.Set[str]:
602        return self.ignored_rules_ or set()
603
604    def _validate_config_expression(self, expr: exp.Expression) -> str:
605        if isinstance(expr, (d.MacroFunc, d.MacroVar)):
606            raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}")
607
608        if isinstance(expr, exp.Null):
609            raise ConfigError("NULL value")
610
611        if isinstance(expr, exp.Literal):
612            return str(expr.this).strip()
613        if isinstance(expr, (exp.Column, exp.Identifier)):
614            return expr.name
615        return expr.sql(dialect=self.dialect).strip()
616
617    def _validate_nested_config_values(self, value_expr: exp.Expression) -> t.List[str]:
618        result = []
619
620        def flatten_expr(expr: exp.Expression) -> None:
621            if isinstance(expr, exp.Array):
622                for elem in expr.expressions:
623                    flatten_expr(elem)
624            elif isinstance(expr, (exp.Tuple, exp.Paren)):
625                expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions
626                for elem in expressions:
627                    flatten_expr(elem)
628            else:
629                result.append(self._validate_config_expression(expr))
630
631        flatten_expr(value_expr)
632        return result
FunctionCall = typing.Tuple[str, typing.Dict[str, sqlglot.expressions.Expression]]
class GrantsTargetLayer(builtins.str, enum.Enum):
57class GrantsTargetLayer(str, Enum):
58    """Target layer(s) where grants should be applied."""
59
60    ALL = "all"
61    PHYSICAL = "physical"
62    VIRTUAL = "virtual"
63
64    @classproperty
65    def default(cls) -> "GrantsTargetLayer":
66        return GrantsTargetLayer.VIRTUAL
67
68    @property
69    def is_all(self) -> bool:
70        return self == GrantsTargetLayer.ALL
71
72    @property
73    def is_physical(self) -> bool:
74        return self == GrantsTargetLayer.PHYSICAL
75
76    @property
77    def is_virtual(self) -> bool:
78        return self == GrantsTargetLayer.VIRTUAL
79
80    def __str__(self) -> str:
81        return self.name
82
83    def __repr__(self) -> str:
84        return str(self)

Target layer(s) where grants should be applied.

ALL = ALL
PHYSICAL = PHYSICAL
VIRTUAL = VIRTUAL
default: GrantsTargetLayer
64    @classproperty
65    def default(cls) -> "GrantsTargetLayer":
66        return GrantsTargetLayer.VIRTUAL

Target layer(s) where grants should be applied.

is_all: bool
68    @property
69    def is_all(self) -> bool:
70        return self == GrantsTargetLayer.ALL
is_physical: bool
72    @property
73    def is_physical(self) -> bool:
74        return self == GrantsTargetLayer.PHYSICAL
is_virtual: bool
76    @property
77    def is_virtual(self) -> bool:
78        return self == GrantsTargetLayer.VIRTUAL
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ModelMeta(sqlmesh.core.node._Node):
 87class ModelMeta(_Node):
 88    """Metadata for models which can be defined in SQL."""
 89
 90    dialect: str = ""
 91    name: str
 92    kind: ModelKind = ViewKind()
 93    retention: t.Optional[int] = None  # not implemented yet
 94    table_format: t.Optional[str] = None
 95    storage_format: t.Optional[str] = None
 96    partitioned_by_: t.List[exp.Expression] = Field(default=[], alias="partitioned_by")
 97    clustered_by: t.List[exp.Expression] = []
 98    default_catalog: t.Optional[str] = None
 99    depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
100    columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns")
101    column_descriptions_: t.Optional[t.Dict[str, str]] = Field(
102        default=None, alias="column_descriptions"
103    )
104    audits: t.List[FunctionCall] = []
105    grains: t.List[exp.Expression] = []
106    references: t.List[exp.Expression] = []
107    physical_schema_override: t.Optional[str] = None
108    physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties")
109    virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties")
110    session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties")
111    allow_partials: bool = False
112    signals: t.List[FunctionCall] = []
113    enabled: bool = True
114    physical_version: t.Optional[str] = None
115    gateway: t.Optional[str] = None
116    optimize_query: t.Optional[bool] = None
117    ignored_rules_: t.Optional[t.Set[str]] = Field(
118        default=None, exclude=True, alias="ignored_rules"
119    )
120    formatting: t.Optional[bool] = Field(default=None, exclude=True)
121    virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
122    grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants")
123    grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default
124
125    _bool_validator = bool_validator
126    _model_kind_validator = model_kind_validator
127    _properties_validator = properties_validator
128    _default_catalog_validator = default_catalog_validator
129    _depends_on_validator = depends_on_validator
130
131    @field_validator("audits", "signals", mode="before")
132    def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any:
133        is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals"
134
135        return d.extract_function_calls(v, allow_tuples=is_signal)
136
137    @field_validator("tags", mode="before")
138    def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
139        return ensure_list(cls._validate_value_or_tuple(v, info.data))
140
141    @classmethod
142    def _validate_value_or_tuple(
143        cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False
144    ) -> t.Any:
145        dialect = data.get("dialect")
146
147        def _normalize(value: t.Any) -> t.Any:
148            return normalize_identifiers(value, dialect=dialect) if normalize else value
149
150        if isinstance(v, exp.Paren):
151            v = [v.unnest()]
152
153        if isinstance(v, (exp.Tuple, exp.Array)):
154            return [_normalize(e).name for e in v.expressions]
155        if isinstance(v, exp.Expression):
156            return _normalize(v).name
157        if isinstance(v, str):
158            value = _normalize(v)
159            return value.name if isinstance(value, exp.Expression) else value
160        if isinstance(v, (list, tuple)):
161            return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v]
162
163        return v
164
165    @field_validator("table_format", "storage_format", mode="before")
166    def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]:
167        if isinstance(v, exp.Expression) and not (isinstance(v, (exp.Literal, exp.Identifier))):
168            return v.sql(info.data.get("dialect"))
169        return str_or_exp_to_str(v)
170
171    @field_validator("dialect", mode="before")
172    def _dialect_validator(cls, v: t.Any) -> t.Optional[str]:
173        # dialects are parsed as identifiers and may get normalized as uppercase,
174        # so this ensures they'll be stored as lowercase
175        dialect = str_or_exp_to_str(v)
176        return dialect and dialect.lower()
177
178    @field_validator("physical_version", mode="before")
179    def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]:
180        if v is None:
181            return v
182        return str_or_exp_to_str(v)
183
184    @field_validator("gateway", mode="before")
185    def _gateway_validator(cls, v: t.Any) -> t.Optional[str]:
186        if v is None:
187            return None
188        gateway = str_or_exp_to_str(v)
189        return gateway and gateway.lower()
190
191    @field_validator("partitioned_by_", "clustered_by", mode="before")
192    def _partition_and_cluster_validator(
193        cls, v: t.Any, info: ValidationInfo
194    ) -> t.List[exp.Expression]:
195        if (
196            isinstance(v, list)
197            and all(isinstance(i, str) for i in v)
198            and info.field_name == "partitioned_by_"
199        ):
200            # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str]
201            # however, we should only invoke this if the list contains strings because this validator is also
202            # called by Python models which might pass a List[exp.Expression]
203            string_to_parse = (
204                f"({','.join(v)})"  # recreate the (a, b, c) part of "partitioned_by (a, b, c)"
205            )
206            parsed = parse_one(
207                string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info)
208            )
209            v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v
210
211        expressions = list_of_fields_validator(v, info.data)
212
213        for expression in expressions:
214            num_cols = len(list(expression.find_all(exp.Column)))
215
216            error_msg: t.Optional[str] = None
217            if num_cols == 0:
218                error_msg = "does not contain a column"
219            elif num_cols > 1:
220                error_msg = "contains multiple columns"
221
222            if error_msg:
223                raise ConfigError(f"Field '{expression}' {error_msg}")
224
225        return expressions
226
227    @field_validator(
228        "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False
229    )
230    def _columns_validator(
231        cls, v: t.Any, info: ValidationInfo
232    ) -> t.Optional[t.Dict[str, exp.DataType]]:
233        columns_to_types = {}
234        dialect = info.data.get("dialect")
235
236        if isinstance(v, exp.Schema):
237            for column in v.expressions:
238                expr = column.args.get("kind")
239                if not isinstance(expr, exp.DataType):
240                    raise ConfigError(f"Missing data type for column '{column.name}'.")
241
242                expr.meta["dialect"] = dialect
243                columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr
244
245            return columns_to_types
246
247        if isinstance(v, dict):
248            udt = Dialect.get_or_raise(dialect).SUPPORTS_USER_DEFINED_TYPES
249            for k, data_type in v.items():
250                expr = exp.DataType.build(data_type, dialect=dialect, udt=udt)
251                expr.meta["dialect"] = dialect
252                columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr
253
254            return columns_to_types
255
256        return v
257
258    @field_validator("column_descriptions_", mode="before")
259    def _column_descriptions_validator(
260        cls, vs: t.Any, info: ValidationInfo
261    ) -> t.Optional[t.Dict[str, str]]:
262        dialect = info.data.get("dialect")
263
264        if vs is None:
265            return None
266
267        if isinstance(vs, exp.Paren):
268            vs = vs.flatten()
269
270        if isinstance(vs, (exp.Tuple, exp.Array)):
271            vs = vs.expressions
272
273        raw_col_descriptions = (
274            vs
275            if isinstance(vs, dict)
276            else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs}
277        )
278
279        col_descriptions = {
280            normalize_identifiers(k, dialect=dialect).name: v
281            for k, v in raw_col_descriptions.items()
282        }
283
284        columns_to_types = info.data.get("columns_to_types_")
285        if columns_to_types:
286            from sqlmesh.core.console import get_console
287
288            console = get_console()
289            for column_name in list(col_descriptions):
290                if column_name not in columns_to_types:
291                    console.log_warning(
292                        f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model."
293                    )
294                    del col_descriptions[column_name]
295
296        return col_descriptions
297
298    @field_validator("grains", "references", mode="before")
299    def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expression]:
300        dialect = info.data.get("dialect")
301
302        if isinstance(vs, exp.Paren):
303            vs = vs.unnest()
304
305        if isinstance(vs, (exp.Tuple, exp.Array)):
306            vs = vs.expressions
307        else:
308            vs = [
309                d.parse_one(v, dialect=dialect) if isinstance(v, str) else v
310                for v in ensure_collection(vs)
311            ]
312
313        refs = []
314
315        for v in vs:
316            v = exp.column(v) if isinstance(v, exp.Identifier) else v
317            v.meta["dialect"] = dialect
318            refs.append(v)
319
320        return refs
321
322    @field_validator("ignored_rules_", mode="before")
323    def ignored_rules_validator(cls, vs: t.Any) -> t.Any:
324        return LinterConfig._validate_rules(vs)
325
326    @field_validator("grants_target_layer", mode="before")
327    def _grants_target_layer_validator(cls, v: t.Any) -> t.Any:
328        if isinstance(v, exp.Identifier):
329            return v.this
330        if isinstance(v, exp.Literal) and v.is_string:
331            return v.this
332        return v
333
334    @field_validator("session_properties_", mode="before")
335    def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
336        # use the generic properties validator to parse the session properties
337        parsed_session_properties = parse_properties(type(cls), v, info)
338        if not parsed_session_properties:
339            return parsed_session_properties
340
341        for eq in parsed_session_properties:
342            prop_name = eq.left.name
343
344            if prop_name == "query_label":
345                query_label = eq.right
346                if not isinstance(
347                    query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar)
348                ):
349                    raise ConfigError(
350                        "Invalid value for `session_properties.query_label`. Must be an array or tuple."
351                    )
352
353                label_tuples: t.List[exp.Expression] = (
354                    [query_label.unnest()]
355                    if isinstance(query_label, exp.Paren)
356                    else query_label.expressions
357                )
358
359                for label_tuple in label_tuples:
360                    if not (
361                        isinstance(label_tuple, exp.Tuple)
362                        and len(label_tuple.expressions) == 2
363                        and all(isinstance(label, exp.Literal) for label in label_tuple.expressions)
364                    ):
365                        raise ConfigError(
366                            "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2."
367                        )
368            elif prop_name == "authorization":
369                authorization = eq.right
370                if not (
371                    isinstance(authorization, exp.Literal) and authorization.is_string
372                ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)):
373                    raise ConfigError(
374                        "Invalid value for `session_properties.authorization`. Must be a string literal."
375                    )
376
377        return parsed_session_properties
378
379    @model_validator(mode="before")
380    def _pre_root_validator(cls, data: t.Any) -> t.Any:
381        if not isinstance(data, dict):
382            return data
383
384        grain = data.pop("grain", None)
385        if grain:
386            grains = data.get("grains")
387            if grains:
388                raise ConfigError(
389                    f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains"
390                )
391            data["grains"] = ensure_list(grain)
392
393        table_properties = data.pop("table_properties", None)
394        if table_properties:
395            if not isinstance(table_properties, str):
396                # Do not warn when deserializing from the state.
397                model_name = data["name"]
398                from sqlmesh.core.console import get_console
399
400                get_console().log_warning(
401                    f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead."
402                )
403            physical_properties = data.get("physical_properties")
404            if physical_properties:
405                raise ConfigError(
406                    f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties."
407                )
408
409            data["physical_properties"] = table_properties
410
411        return data
412
413    @model_validator(mode="after")
414    def _root_validator(self) -> Self:
415        kind: t.Any = self.kind
416
417        for field in ("partitioned_by_", "clustered_by"):
418            if (
419                getattr(self, field, None)
420                and not kind.is_materialized
421                and not (kind.is_view and kind.materialized)
422            ):
423                name = field[:-1] if field.endswith("_") else field
424                raise ValueError(f"{name} field cannot be set for {kind.name} models")
425        if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None):
426            raise ValueError(f"partitioned_by field is required for {kind.name} models")
427
428        # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str
429        if (storage_format := self.storage_format) and storage_format.lower() in {
430            "iceberg",
431            "hive",
432            "hudi",
433            "delta",
434        }:
435            from sqlmesh.core.console import get_console
436
437            get_console().log_warning(
438                f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead."
439            )
440
441        # Validate grants configuration for model kind support
442        if self.grants is not None and not kind.supports_grants:
443            raise ValueError(f"grants cannot be set for {kind.name} models")
444
445        return self
446
447    @property
448    def time_column(self) -> t.Optional[TimeColumn]:
449        """The time column for incremental models."""
450        return getattr(self.kind, "time_column", None)
451
452    @property
453    def unique_key(self) -> t.List[exp.Expression]:
454        if isinstance(
455            self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind)
456        ):
457            return self.kind.unique_key
458        return []
459
460    @property
461    def column_descriptions(self) -> t.Dict[str, str]:
462        """A dictionary of column names to annotation comments."""
463        return self.column_descriptions_ or {}
464
465    @property
466    def lookback(self) -> int:
467        """The incremental lookback window."""
468        return getattr(self.kind, "lookback", 0) or 0
469
470    def lookback_start(self, start: TimeLike) -> TimeLike:
471        if self.lookback == 0:
472            return start
473
474        for _ in range(self.lookback):
475            start = self.interval_unit.cron_prev(start)
476        return start
477
478    @property
479    def batch_size(self) -> t.Optional[int]:
480        """The maximal number of units in a single task for a backfill."""
481        return getattr(self.kind, "batch_size", None)
482
483    @property
484    def batch_concurrency(self) -> t.Optional[int]:
485        """The maximal number of batches that can run concurrently for a backfill."""
486        return getattr(self.kind, "batch_concurrency", None)
487
488    @cached_property
489    def physical_properties(self) -> t.Dict[str, exp.Expression]:
490        """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated."""
491        if self.physical_properties_:
492            return {e.this.name: e.expression for e in self.physical_properties_.expressions}
493        return {}
494
495    @cached_property
496    def virtual_properties(self) -> t.Dict[str, exp.Expression]:
497        """A dictionary of properties that will be applied to the virtual layer."""
498        if self.virtual_properties_:
499            return {e.this.name: e.expression for e in self.virtual_properties_.expressions}
500        return {}
501
502    @property
503    def session_properties(self) -> SessionProperties:
504        """A dictionary of session properties."""
505        if not self.session_properties_:
506            return {}
507
508        return d.interpret_key_value_pairs(self.session_properties_)
509
510    @property
511    def custom_materialization_properties(self) -> CustomMaterializationProperties:
512        if isinstance(self.kind, CustomKind):
513            return self.kind.materialization_properties
514        return {}
515
516    @cached_property
517    def grants(self) -> t.Optional[GrantsConfig]:
518        """A dictionary of grants mapping permission names to lists of grantees."""
519
520        if self.grants_ is None:
521            return None
522
523        if not self.grants_.expressions:
524            return {}
525
526        grants_dict = {}
527        for eq_expr in self.grants_.expressions:
528            try:
529                permission_name = self._validate_config_expression(eq_expr.left)
530                grantee_list = self._validate_nested_config_values(eq_expr.expression)
531                grants_dict[permission_name] = grantee_list
532            except ConfigError as e:
533                permission_name = (
534                    eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left)
535                )
536                raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}")
537
538        return grants_dict if grants_dict else None
539
540    @property
541    def all_references(self) -> t.List[Reference]:
542        """All references including grains."""
543        return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [
544            Reference(model_name=self.name, expression=e, unique=True) for e in self.references
545        ]
546
547    @property
548    def on(self) -> t.List[str]:
549        """The grains to be used as join condition in table_diff."""
550
551        on: t.List[str] = []
552        for expr in [ref.expression for ref in self.all_references if ref.unique]:
553            if isinstance(expr, exp.Tuple):
554                on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions])
555            else:
556                # Handle a single Column or Paren expression
557                on.append(expr.this.sql(dialect=self.dialect))
558
559        return on
560
561    @property
562    def managed_columns(self) -> t.Dict[str, exp.DataType]:
563        return getattr(self.kind, "managed_columns", {})
564
565    @property
566    def when_matched(self) -> t.Optional[exp.Whens]:
567        if isinstance(self.kind, IncrementalByUniqueKeyKind):
568            return self.kind.when_matched
569        return None
570
571    @property
572    def merge_filter(self) -> t.Optional[exp.Expression]:
573        if isinstance(self.kind, IncrementalByUniqueKeyKind):
574            return self.kind.merge_filter
575        return None
576
577    @property
578    def catalog(self) -> t.Optional[str]:
579        """Returns the catalog of a model."""
580        return self.fully_qualified_table.catalog
581
582    @cached_property
583    def fully_qualified_table(self) -> exp.Table:
584        return exp.to_table(self.fqn)
585
586    @cached_property
587    def fqn(self) -> str:
588        return normalize_model_name(
589            self.name, default_catalog=self.default_catalog, dialect=self.dialect
590        )
591
592    @property
593    def on_destructive_change(self) -> OnDestructiveChange:
594        return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
595
596    @property
597    def on_additive_change(self) -> OnAdditiveChange:
598        """Return the model's additive change setting if it has one."""
599        return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)
600
601    @property
602    def ignored_rules(self) -> t.Set[str]:
603        return self.ignored_rules_ or set()
604
605    def _validate_config_expression(self, expr: exp.Expression) -> str:
606        if isinstance(expr, (d.MacroFunc, d.MacroVar)):
607            raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}")
608
609        if isinstance(expr, exp.Null):
610            raise ConfigError("NULL value")
611
612        if isinstance(expr, exp.Literal):
613            return str(expr.this).strip()
614        if isinstance(expr, (exp.Column, exp.Identifier)):
615            return expr.name
616        return expr.sql(dialect=self.dialect).strip()
617
618    def _validate_nested_config_values(self, value_expr: exp.Expression) -> t.List[str]:
619        result = []
620
621        def flatten_expr(expr: exp.Expression) -> None:
622            if isinstance(expr, exp.Array):
623                for elem in expr.expressions:
624                    flatten_expr(elem)
625            elif isinstance(expr, (exp.Tuple, exp.Paren)):
626                expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions
627                for elem in expressions:
628                    flatten_expr(elem)
629            else:
630                result.append(self._validate_config_expression(expr))
631
632        flatten_expr(value_expr)
633        return result

Metadata for models which can be defined in SQL.

dialect: str
name: str
retention: Optional[int]
table_format: Optional[str]
storage_format: Optional[str]
partitioned_by_: List[sqlglot.expressions.Expression]
clustered_by: List[sqlglot.expressions.Expression]
default_catalog: Optional[str]
depends_on_: Optional[Set[str]]
columns_to_types_: Optional[Dict[str, sqlglot.expressions.DataType]]
column_descriptions_: Optional[Dict[str, str]]
audits: List[Tuple[str, Dict[str, sqlglot.expressions.Expression]]]
grains: List[sqlglot.expressions.Expression]
references: List[sqlglot.expressions.Expression]
physical_schema_override: Optional[str]
physical_properties_: Optional[sqlglot.expressions.Tuple]
virtual_properties_: Optional[sqlglot.expressions.Tuple]
session_properties_: Optional[sqlglot.expressions.Tuple]
allow_partials: bool
signals: List[Tuple[str, Dict[str, sqlglot.expressions.Expression]]]
enabled: bool
physical_version: Optional[str]
gateway: Optional[str]
optimize_query: Optional[bool]
ignored_rules_: Optional[Set[str]]
formatting: Optional[bool]
grants_: Optional[sqlglot.expressions.Tuple]
grants_target_layer: GrantsTargetLayer
@field_validator('ignored_rules_', mode='before')
def ignored_rules_validator(cls, vs: Any) -> Any:
322    @field_validator("ignored_rules_", mode="before")
323    def ignored_rules_validator(cls, vs: t.Any) -> t.Any:
324        return LinterConfig._validate_rules(vs)
@field_validator('session_properties_', mode='before')
def session_properties_validator(cls, v: Any, info: pydantic_core.core_schema.ValidationInfo) -> Any:
334    @field_validator("session_properties_", mode="before")
335    def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
336        # use the generic properties validator to parse the session properties
337        parsed_session_properties = parse_properties(type(cls), v, info)
338        if not parsed_session_properties:
339            return parsed_session_properties
340
341        for eq in parsed_session_properties:
342            prop_name = eq.left.name
343
344            if prop_name == "query_label":
345                query_label = eq.right
346                if not isinstance(
347                    query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar)
348                ):
349                    raise ConfigError(
350                        "Invalid value for `session_properties.query_label`. Must be an array or tuple."
351                    )
352
353                label_tuples: t.List[exp.Expression] = (
354                    [query_label.unnest()]
355                    if isinstance(query_label, exp.Paren)
356                    else query_label.expressions
357                )
358
359                for label_tuple in label_tuples:
360                    if not (
361                        isinstance(label_tuple, exp.Tuple)
362                        and len(label_tuple.expressions) == 2
363                        and all(isinstance(label, exp.Literal) for label in label_tuple.expressions)
364                    ):
365                        raise ConfigError(
366                            "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2."
367                        )
368            elif prop_name == "authorization":
369                authorization = eq.right
370                if not (
371                    isinstance(authorization, exp.Literal) and authorization.is_string
372                ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)):
373                    raise ConfigError(
374                        "Invalid value for `session_properties.authorization`. Must be a string literal."
375                    )
376
377        return parsed_session_properties
time_column: Optional[sqlmesh.core.model.kind.TimeColumn]
447    @property
448    def time_column(self) -> t.Optional[TimeColumn]:
449        """The time column for incremental models."""
450        return getattr(self.kind, "time_column", None)

The time column for incremental models.

unique_key: List[sqlglot.expressions.Expression]
452    @property
453    def unique_key(self) -> t.List[exp.Expression]:
454        if isinstance(
455            self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind)
456        ):
457            return self.kind.unique_key
458        return []
column_descriptions: Dict[str, str]
460    @property
461    def column_descriptions(self) -> t.Dict[str, str]:
462        """A dictionary of column names to annotation comments."""
463        return self.column_descriptions_ or {}

A dictionary of column names to annotation comments.

lookback: int
465    @property
466    def lookback(self) -> int:
467        """The incremental lookback window."""
468        return getattr(self.kind, "lookback", 0) or 0

The incremental lookback window.

def lookback_start( self, start: Union[datetime.date, datetime.datetime, str, int, float]) -> Union[datetime.date, datetime.datetime, str, int, float]:
470    def lookback_start(self, start: TimeLike) -> TimeLike:
471        if self.lookback == 0:
472            return start
473
474        for _ in range(self.lookback):
475            start = self.interval_unit.cron_prev(start)
476        return start
batch_size: Optional[int]
478    @property
479    def batch_size(self) -> t.Optional[int]:
480        """The maximal number of units in a single task for a backfill."""
481        return getattr(self.kind, "batch_size", None)

The maximal number of units in a single task for a backfill.

batch_concurrency: Optional[int]
483    @property
484    def batch_concurrency(self) -> t.Optional[int]:
485        """The maximal number of batches that can run concurrently for a backfill."""
486        return getattr(self.kind, "batch_concurrency", None)

The maximal number of batches that can run concurrently for a backfill.

physical_properties: Dict[str, sqlglot.expressions.Expression]
488    @cached_property
489    def physical_properties(self) -> t.Dict[str, exp.Expression]:
490        """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated."""
491        if self.physical_properties_:
492            return {e.this.name: e.expression for e in self.physical_properties_.expressions}
493        return {}

A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.

virtual_properties: Dict[str, sqlglot.expressions.Expression]
495    @cached_property
496    def virtual_properties(self) -> t.Dict[str, exp.Expression]:
497        """A dictionary of properties that will be applied to the virtual layer."""
498        if self.virtual_properties_:
499            return {e.this.name: e.expression for e in self.virtual_properties_.expressions}
500        return {}

A dictionary of properties that will be applied to the virtual layer.

session_properties: Dict[str, sqlglot.expressions.Expression | str | int | float | bool]
502    @property
503    def session_properties(self) -> SessionProperties:
504        """A dictionary of session properties."""
505        if not self.session_properties_:
506            return {}
507
508        return d.interpret_key_value_pairs(self.session_properties_)

A dictionary of session properties.

custom_materialization_properties: Dict[str, sqlglot.expressions.Expression | str | int | float | bool]
510    @property
511    def custom_materialization_properties(self) -> CustomMaterializationProperties:
512        if isinstance(self.kind, CustomKind):
513            return self.kind.materialization_properties
514        return {}
grants: Optional[<MagicMock id='126494291945280'>]
516    @cached_property
517    def grants(self) -> t.Optional[GrantsConfig]:
518        """A dictionary of grants mapping permission names to lists of grantees."""
519
520        if self.grants_ is None:
521            return None
522
523        if not self.grants_.expressions:
524            return {}
525
526        grants_dict = {}
527        for eq_expr in self.grants_.expressions:
528            try:
529                permission_name = self._validate_config_expression(eq_expr.left)
530                grantee_list = self._validate_nested_config_values(eq_expr.expression)
531                grants_dict[permission_name] = grantee_list
532            except ConfigError as e:
533                permission_name = (
534                    eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left)
535                )
536                raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}")
537
538        return grants_dict if grants_dict else None

A dictionary of grants mapping permission names to lists of grantees.

all_references: List[sqlmesh.core.reference.Reference]
540    @property
541    def all_references(self) -> t.List[Reference]:
542        """All references including grains."""
543        return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [
544            Reference(model_name=self.name, expression=e, unique=True) for e in self.references
545        ]

All references including grains.

on: List[str]
547    @property
548    def on(self) -> t.List[str]:
549        """The grains to be used as join condition in table_diff."""
550
551        on: t.List[str] = []
552        for expr in [ref.expression for ref in self.all_references if ref.unique]:
553            if isinstance(expr, exp.Tuple):
554                on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions])
555            else:
556                # Handle a single Column or Paren expression
557                on.append(expr.this.sql(dialect=self.dialect))
558
559        return on

The grains to be used as join condition in table_diff.

managed_columns: Dict[str, sqlglot.expressions.DataType]
561    @property
562    def managed_columns(self) -> t.Dict[str, exp.DataType]:
563        return getattr(self.kind, "managed_columns", {})
when_matched: Optional[sqlglot.expressions.Whens]
565    @property
566    def when_matched(self) -> t.Optional[exp.Whens]:
567        if isinstance(self.kind, IncrementalByUniqueKeyKind):
568            return self.kind.when_matched
569        return None
merge_filter: Optional[sqlglot.expressions.Expression]
571    @property
572    def merge_filter(self) -> t.Optional[exp.Expression]:
573        if isinstance(self.kind, IncrementalByUniqueKeyKind):
574            return self.kind.merge_filter
575        return None
catalog: Optional[str]
577    @property
578    def catalog(self) -> t.Optional[str]:
579        """Returns the catalog of a model."""
580        return self.fully_qualified_table.catalog

Returns the catalog of a model.

fully_qualified_table: sqlglot.expressions.Table
582    @cached_property
583    def fully_qualified_table(self) -> exp.Table:
584        return exp.to_table(self.fqn)
fqn: str
586    @cached_property
587    def fqn(self) -> str:
588        return normalize_model_name(
589            self.name, default_catalog=self.default_catalog, dialect=self.dialect
590        )
on_destructive_change: sqlmesh.core.model.kind.OnDestructiveChange
592    @property
593    def on_destructive_change(self) -> OnDestructiveChange:
594        return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
on_additive_change: sqlmesh.core.model.kind.OnAdditiveChange
596    @property
597    def on_additive_change(self) -> OnAdditiveChange:
598        """Return the model's additive change setting if it has one."""
599        return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)

Return the model's additive change setting if it has one.

ignored_rules: Set[str]
601    @property
602    def ignored_rules(self) -> t.Set[str]:
603        return self.ignored_rules_ or set()
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
copy
interval_unit
depends_on
data_hash
metadata_hash
is_metadata_only_change
is_data_change
croniter
cron_next
cron_prev
cron_floor
text_diff
is_model
is_audit
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields