Edit on GitHub

sqlmesh.core.model.meta

  1from __future__ import annotations
  2
  3import typing as t
  4from enum import Enum
  5from functools import cached_property
  6from typing_extensions import Self
  7
  8from pydantic import Field
  9from sqlglot import Dialect, exp, parse_one
 10from sqlglot.helper import ensure_collection, ensure_list
 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 12
 13from sqlmesh.core import dialect as d
 14from sqlmesh.core.config.common import VirtualEnvironmentMode
 15from sqlmesh.core.config.linter import LinterConfig
 16from sqlmesh.core.dialect import normalize_model_name
 17from sqlmesh.utils import classproperty
 18from sqlmesh.core.model.common import (
 19    bool_validator,
 20    default_catalog_validator,
 21    depends_on_validator,
 22    properties_validator,
 23    parse_properties,
 24)
 25from sqlmesh.core.model.kind import (
 26    CustomKind,
 27    IncrementalByUniqueKeyKind,
 28    ModelKind,
 29    OnDestructiveChange,
 30    SCDType2ByColumnKind,
 31    SCDType2ByTimeKind,
 32    TimeColumn,
 33    ViewKind,
 34    model_kind_validator,
 35    OnAdditiveChange,
 36)
 37from sqlmesh.core.node import _Node, str_or_exp_to_str
 38from sqlmesh.core.reference import Reference
 39from sqlmesh.utils.date import TimeLike
 40from sqlmesh.utils.errors import ConfigError
 41from sqlmesh.utils.pydantic import (
 42    ValidationInfo,
 43    field_validator,
 44    list_of_fields_validator,
 45    model_validator,
 46    get_dialect,
 47)
 48
 49if t.TYPE_CHECKING:
 50    from sqlmesh.core._typing import CustomMaterializationProperties, SessionProperties
 51    from sqlmesh.core.engine_adapter._typing import GrantsConfig
 52
 53FunctionCall = t.Tuple[str, t.Dict[str, exp.Expr]]
 54
 55
 56class GrantsTargetLayer(str, Enum):
 57    """Target layer(s) where grants should be applied."""
 58
 59    ALL = "all"
 60    PHYSICAL = "physical"
 61    VIRTUAL = "virtual"
 62
 63    @classproperty
 64    def default(cls) -> "GrantsTargetLayer":
 65        return GrantsTargetLayer.VIRTUAL
 66
 67    @property
 68    def is_all(self) -> bool:
 69        return self == GrantsTargetLayer.ALL
 70
 71    @property
 72    def is_physical(self) -> bool:
 73        return self == GrantsTargetLayer.PHYSICAL
 74
 75    @property
 76    def is_virtual(self) -> bool:
 77        return self == GrantsTargetLayer.VIRTUAL
 78
 79    def __str__(self) -> str:
 80        return self.name
 81
 82    def __repr__(self) -> str:
 83        return str(self)
 84
 85
 86class ModelMeta(_Node):
 87    """Metadata for models which can be defined in SQL."""
 88
 89    dialect: str = ""
 90    name: str
 91    kind: ModelKind = ViewKind()
 92    retention: t.Optional[int] = None  # not implemented yet
 93    table_format: t.Optional[str] = None
 94    storage_format: t.Optional[str] = None
 95    partitioned_by_: t.List[exp.Expr] = Field(default=[], alias="partitioned_by")
 96    clustered_by: t.List[exp.Expr] = []
 97    default_catalog: t.Optional[str] = None
 98    depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
 99    columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns")
100    column_descriptions_: t.Optional[t.Dict[str, str]] = Field(
101        default=None, alias="column_descriptions"
102    )
103    audits: t.List[FunctionCall] = []
104    grains: t.List[exp.Expr] = []
105    references: t.List[exp.Expr] = []
106    physical_schema_override: t.Optional[str] = None
107    physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties")
108    virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties")
109    session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties")
110    allow_partials: bool = False
111    signals: t.List[FunctionCall] = []
112    enabled: bool = True
113    physical_version: t.Optional[str] = None
114    gateway: t.Optional[str] = None
115    optimize_query: t.Optional[bool] = None
116    ignored_rules_: t.Optional[t.Set[str]] = Field(
117        default=None, exclude=True, alias="ignored_rules"
118    )
119    formatting: t.Optional[bool] = Field(default=None, exclude=True)
120    virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
121    grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants")
122    grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default
123
124    _bool_validator = bool_validator
125    _model_kind_validator = model_kind_validator
126    _properties_validator = properties_validator
127    _default_catalog_validator = default_catalog_validator
128    _depends_on_validator = depends_on_validator
129
130    @field_validator("audits", "signals", mode="before")
131    def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any:
132        is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals"
133
134        return d.extract_function_calls(v, allow_tuples=is_signal)
135
136    @field_validator("tags", mode="before")
137    def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
138        return ensure_list(cls._validate_value_or_tuple(v, info.data))
139
140    @classmethod
141    def _validate_value_or_tuple(
142        cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False
143    ) -> t.Any:
144        dialect = data.get("dialect")
145
146        def _normalize(value: t.Any) -> t.Any:
147            return normalize_identifiers(value, dialect=dialect) if normalize else value
148
149        if isinstance(v, exp.Paren):
150            v = [v.unnest()]
151
152        if isinstance(v, (exp.Tuple, exp.Array)):
153            return [_normalize(e).name for e in v.expressions]
154        if isinstance(v, exp.Expr):
155            return _normalize(v).name
156        if isinstance(v, str):
157            value = _normalize(v)
158            return value.name if isinstance(value, exp.Expr) else value
159        if isinstance(v, (list, tuple)):
160            return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v]
161
162        return v
163
164    @field_validator("table_format", "storage_format", mode="before")
165    def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]:
166        if isinstance(v, exp.Expr) and not (isinstance(v, (exp.Literal, exp.Identifier))):
167            return v.sql(info.data.get("dialect"))
168        return str_or_exp_to_str(v)
169
170    @field_validator("dialect", mode="before")
171    def _dialect_validator(cls, v: t.Any) -> t.Optional[str]:
172        # dialects are parsed as identifiers and may get normalized as uppercase,
173        # so this ensures they'll be stored as lowercase
174        dialect = str_or_exp_to_str(v)
175        return dialect and dialect.lower()
176
177    @field_validator("physical_version", mode="before")
178    def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]:
179        if v is None:
180            return v
181        return str_or_exp_to_str(v)
182
183    @field_validator("gateway", mode="before")
184    def _gateway_validator(cls, v: t.Any) -> t.Optional[str]:
185        if v is None:
186            return None
187        gateway = str_or_exp_to_str(v)
188        return gateway and gateway.lower()
189
190    @field_validator("partitioned_by_", "clustered_by", mode="before")
191    def _partition_and_cluster_validator(cls, v: t.Any, info: ValidationInfo) -> t.List[exp.Expr]:
192        if (
193            isinstance(v, list)
194            and all(isinstance(i, str) for i in v)
195            and info.field_name == "partitioned_by_"
196        ):
197            # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str]
198            # however, we should only invoke this if the list contains strings because this validator is also
199            # called by Python models which might pass a List[exp.Expression]
200            string_to_parse = (
201                f"({','.join(v)})"  # recreate the (a, b, c) part of "partitioned_by (a, b, c)"
202            )
203            parsed = parse_one(
204                string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info)
205            )
206            v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v
207
208        expressions = list_of_fields_validator(v, info.data)
209
210        for expression in expressions:
211            num_cols = len(list(expression.find_all(exp.Column)))
212
213            error_msg: t.Optional[str] = None
214            if num_cols == 0:
215                error_msg = "does not contain a column"
216            elif num_cols > 1:
217                error_msg = "contains multiple columns"
218
219            if error_msg:
220                raise ConfigError(f"Field '{expression}' {error_msg}")
221
222        return expressions
223
224    @field_validator(
225        "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False
226    )
227    def _columns_validator(
228        cls, v: t.Any, info: ValidationInfo
229    ) -> t.Optional[t.Dict[str, exp.DataType]]:
230        columns_to_types = {}
231        dialect = info.data.get("dialect")
232
233        if isinstance(v, exp.Schema):
234            for column in v.expressions:
235                expr = column.args.get("kind")
236                if not isinstance(expr, exp.DataType):
237                    raise ConfigError(f"Missing data type for column '{column.name}'.")
238
239                expr.meta["dialect"] = dialect
240                columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr
241
242            return columns_to_types
243
244        if isinstance(v, dict):
245            dialect_obj = Dialect.get_or_raise(dialect)
246            udt = dialect_obj.SUPPORTS_USER_DEFINED_TYPES
247            for k, data_type in v.items():
248                is_string_type = isinstance(data_type, str)
249                expr = exp.DataType.build(data_type, dialect=dialect, udt=udt)
250                # When deserializing from a string (e.g. JSON roundtrip), normalize the type
251                # through the dialect's type system so that aliases (e.g. INT in BigQuery,
252                # which is an alias for INT64/BIGINT) are resolved to their canonical form.
253                # This ensures stable data hash computation across serialization/deserialization
254                # roundtrips. We skip this for DataType objects passed directly (Python API)
255                # since those should be used as-is.
256                if (
257                    is_string_type
258                    and dialect
259                    and expr.this
260                    not in (
261                        exp.DataType.Type.USERDEFINED,
262                        exp.DataType.Type.UNKNOWN,
263                    )
264                ):
265                    sql_repr = expr.sql(dialect=dialect)
266                    try:
267                        normalized = parse_one(sql_repr, read=dialect, into=exp.DataType)
268                        if normalized is not None:
269                            expr = normalized
270                    except Exception:
271                        pass
272                expr.meta["dialect"] = dialect
273                columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr
274
275            return columns_to_types
276
277        return v
278
279    @field_validator("column_descriptions_", mode="before")
280    def _column_descriptions_validator(
281        cls, vs: t.Any, info: ValidationInfo
282    ) -> t.Optional[t.Dict[str, str]]:
283        dialect = info.data.get("dialect")
284
285        if vs is None:
286            return None
287
288        if isinstance(vs, exp.Paren):
289            vs = vs.flatten()
290
291        if isinstance(vs, (exp.Tuple, exp.Array)):
292            vs = vs.expressions
293
294        raw_col_descriptions = (
295            vs
296            if isinstance(vs, dict)
297            else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs}
298        )
299
300        col_descriptions = {
301            normalize_identifiers(k, dialect=dialect).name: v
302            for k, v in raw_col_descriptions.items()
303        }
304
305        columns_to_types = info.data.get("columns_to_types_")
306        if columns_to_types:
307            from sqlmesh.core.console import get_console
308
309            console = get_console()
310            for column_name in list(col_descriptions):
311                if column_name not in columns_to_types:
312                    console.log_warning(
313                        f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model."
314                    )
315                    del col_descriptions[column_name]
316
317        return col_descriptions
318
319    @field_validator("grains", "references", mode="before")
320    def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expr]:
321        dialect = info.data.get("dialect")
322
323        if isinstance(vs, exp.Paren):
324            vs = vs.unnest()
325
326        if isinstance(vs, (exp.Tuple, exp.Array)):
327            vs = vs.expressions
328        else:
329            vs = [
330                d.parse_one(v, dialect=dialect) if isinstance(v, str) else v
331                for v in ensure_collection(vs)
332            ]
333
334        refs = []
335
336        for v in vs:
337            v = exp.column(v) if isinstance(v, exp.Identifier) else v
338            v.meta["dialect"] = dialect
339            refs.append(v)
340
341        return refs
342
343    @field_validator("ignored_rules_", mode="before")
344    def ignored_rules_validator(cls, vs: t.Any) -> t.Any:
345        return LinterConfig._validate_rules(vs)
346
347    @field_validator("grants_target_layer", mode="before")
348    def _grants_target_layer_validator(cls, v: t.Any) -> t.Any:
349        if isinstance(v, exp.Identifier):
350            return v.this
351        if isinstance(v, exp.Literal) and v.is_string:
352            return v.this
353        return v
354
355    @field_validator("session_properties_", mode="before")
356    def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
357        # use the generic properties validator to parse the session properties
358        parsed_session_properties = parse_properties(type(cls), v, info)
359        if not parsed_session_properties:
360            return parsed_session_properties
361
362        for eq in parsed_session_properties:
363            prop_name = eq.left.name
364
365            if prop_name == "query_label":
366                query_label = eq.right
367                if not isinstance(
368                    query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar)
369                ):
370                    raise ConfigError(
371                        "Invalid value for `session_properties.query_label`. Must be an array or tuple."
372                    )
373
374                label_tuples: t.List[exp.Expr] = (
375                    [query_label.unnest()]
376                    if isinstance(query_label, exp.Paren)
377                    else query_label.expressions
378                )
379
380                for label_tuple in label_tuples:
381                    if not (
382                        isinstance(label_tuple, exp.Tuple)
383                        and len(label_tuple.expressions) == 2
384                        and all(isinstance(label, exp.Literal) for label in label_tuple.expressions)
385                    ):
386                        raise ConfigError(
387                            "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2."
388                        )
389            elif prop_name == "authorization":
390                authorization = eq.right
391                if not (
392                    isinstance(authorization, exp.Literal) and authorization.is_string
393                ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)):
394                    raise ConfigError(
395                        "Invalid value for `session_properties.authorization`. Must be a string literal."
396                    )
397
398        return parsed_session_properties
399
400    @model_validator(mode="before")
401    def _pre_root_validator(cls, data: t.Any) -> t.Any:
402        if not isinstance(data, dict):
403            return data
404
405        grain = data.pop("grain", None)
406        if grain:
407            grains = data.get("grains")
408            if grains:
409                raise ConfigError(
410                    f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains"
411                )
412            data["grains"] = ensure_list(grain)
413
414        table_properties = data.pop("table_properties", None)
415        if table_properties:
416            if not isinstance(table_properties, str):
417                # Do not warn when deserializing from the state.
418                model_name = data["name"]
419                from sqlmesh.core.console import get_console
420
421                get_console().log_warning(
422                    f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead."
423                )
424            physical_properties = data.get("physical_properties")
425            if physical_properties:
426                raise ConfigError(
427                    f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties."
428                )
429
430            data["physical_properties"] = table_properties
431
432        return data
433
434    @model_validator(mode="after")
435    def _root_validator(self) -> Self:
436        kind: t.Any = self.kind
437
438        for field in ("partitioned_by_", "clustered_by"):
439            if (
440                getattr(self, field, None)
441                and not kind.is_materialized
442                and not (kind.is_view and kind.materialized)
443            ):
444                name = field[:-1] if field.endswith("_") else field
445                raise ValueError(f"{name} field cannot be set for {kind.name} models")
446        if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None):
447            raise ValueError(f"partitioned_by field is required for {kind.name} models")
448
449        # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str
450        if (storage_format := self.storage_format) and storage_format.lower() in {
451            "iceberg",
452            "hive",
453            "hudi",
454            "delta",
455        }:
456            from sqlmesh.core.console import get_console
457
458            get_console().log_warning(
459                f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead."
460            )
461
462        # Validate grants configuration for model kind support
463        if self.grants is not None and not kind.supports_grants:
464            raise ValueError(f"grants cannot be set for {kind.name} models")
465
466        return self
467
468    @property
469    def time_column(self) -> t.Optional[TimeColumn]:
470        """The time column for incremental models."""
471        return getattr(self.kind, "time_column", None)
472
473    @property
474    def unique_key(self) -> t.List[exp.Expr]:
475        if isinstance(
476            self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind)
477        ):
478            return self.kind.unique_key
479        return []
480
481    @property
482    def column_descriptions(self) -> t.Dict[str, str]:
483        """A dictionary of column names to annotation comments."""
484        return self.column_descriptions_ or {}
485
486    @property
487    def lookback(self) -> int:
488        """The incremental lookback window."""
489        return getattr(self.kind, "lookback", 0) or 0
490
491    def lookback_start(self, start: TimeLike) -> TimeLike:
492        if self.lookback == 0:
493            return start
494
495        for _ in range(self.lookback):
496            start = self.interval_unit.cron_prev(start)
497        return start
498
499    @property
500    def batch_size(self) -> t.Optional[int]:
501        """The maximal number of units in a single task for a backfill."""
502        return getattr(self.kind, "batch_size", None)
503
504    @property
505    def batch_concurrency(self) -> t.Optional[int]:
506        """The maximal number of batches that can run concurrently for a backfill."""
507        return getattr(self.kind, "batch_concurrency", None)
508
509    @cached_property
510    def physical_properties(self) -> t.Dict[str, exp.Expr]:
511        """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated."""
512        if self.physical_properties_:
513            return {e.this.name: e.expression for e in self.physical_properties_.expressions}
514        return {}
515
516    @cached_property
517    def virtual_properties(self) -> t.Dict[str, exp.Expr]:
518        """A dictionary of properties that will be applied to the virtual layer."""
519        if self.virtual_properties_:
520            return {e.this.name: e.expression for e in self.virtual_properties_.expressions}
521        return {}
522
523    @property
524    def session_properties(self) -> SessionProperties:
525        """A dictionary of session properties."""
526        if not self.session_properties_:
527            return {}
528
529        return d.interpret_key_value_pairs(self.session_properties_)
530
531    @property
532    def custom_materialization_properties(self) -> CustomMaterializationProperties:
533        if isinstance(self.kind, CustomKind):
534            return self.kind.materialization_properties
535        return {}
536
537    @cached_property
538    def grants(self) -> t.Optional[GrantsConfig]:
539        """A dictionary of grants mapping permission names to lists of grantees."""
540
541        if self.grants_ is None:
542            return None
543
544        if not self.grants_.expressions:
545            return {}
546
547        grants_dict = {}
548        for eq_expr in self.grants_.expressions:
549            try:
550                permission_name = self._validate_config_expression(eq_expr.left)
551                grantee_list = self._validate_nested_config_values(eq_expr.expression)
552                grants_dict[permission_name] = grantee_list
553            except ConfigError as e:
554                permission_name = (
555                    eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left)
556                )
557                raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}")
558
559        return grants_dict if grants_dict else None
560
561    @property
562    def all_references(self) -> t.List[Reference]:
563        """All references including grains."""
564        return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [
565            Reference(model_name=self.name, expression=e, unique=True) for e in self.references
566        ]
567
568    @property
569    def on(self) -> t.List[str]:
570        """The grains to be used as join condition in table_diff."""
571
572        on: t.List[str] = []
573        for expr in [ref.expression for ref in self.all_references if ref.unique]:
574            if isinstance(expr, exp.Tuple):
575                on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions])
576            else:
577                # Handle a single Column or Paren expression
578                on.append(expr.this.sql(dialect=self.dialect))
579
580        return on
581
582    @property
583    def managed_columns(self) -> t.Dict[str, exp.DataType]:
584        return getattr(self.kind, "managed_columns", {})
585
586    @property
587    def when_matched(self) -> t.Optional[exp.Whens]:
588        if isinstance(self.kind, IncrementalByUniqueKeyKind):
589            return self.kind.when_matched
590        return None
591
592    @property
593    def merge_filter(self) -> t.Optional[exp.Expr]:
594        if isinstance(self.kind, IncrementalByUniqueKeyKind):
595            return self.kind.merge_filter
596        return None
597
598    @property
599    def catalog(self) -> t.Optional[str]:
600        """Returns the catalog of a model."""
601        return self.fully_qualified_table.catalog
602
603    @cached_property
604    def fully_qualified_table(self) -> exp.Table:
605        return exp.to_table(self.fqn)
606
607    @cached_property
608    def fqn(self) -> str:
609        return normalize_model_name(
610            self.name, default_catalog=self.default_catalog, dialect=self.dialect
611        )
612
613    @property
614    def on_destructive_change(self) -> OnDestructiveChange:
615        return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
616
617    @property
618    def on_additive_change(self) -> OnAdditiveChange:
619        """Return the model's additive change setting if it has one."""
620        return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)
621
622    @property
623    def ignored_rules(self) -> t.Set[str]:
624        return self.ignored_rules_ or set()
625
626    def _validate_config_expression(self, expr: exp.Expr) -> str:
627        if isinstance(expr, (d.MacroFunc, d.MacroVar)):
628            raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}")
629
630        if isinstance(expr, exp.Null):
631            raise ConfigError("NULL value")
632
633        if isinstance(expr, exp.Literal):
634            return str(expr.this).strip()
635        if isinstance(expr, (exp.Column, exp.Identifier)):
636            return expr.name
637        return expr.sql(dialect=self.dialect).strip()
638
639    def _validate_nested_config_values(self, value_expr: exp.Expr) -> t.List[str]:
640        result = []
641
642        def flatten_expr(expr: exp.Expr) -> None:
643            if isinstance(expr, exp.Array):
644                for elem in expr.expressions:
645                    flatten_expr(elem)
646            elif isinstance(expr, (exp.Tuple, exp.Paren)):
647                expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions
648                for elem in expressions:
649                    flatten_expr(elem)
650            else:
651                result.append(self._validate_config_expression(expr))
652
653        flatten_expr(value_expr)
654        return result
FunctionCall = typing.Tuple[str, typing.Dict[str, sqlglot.expressions.core.Expr]]
class GrantsTargetLayer(builtins.str, enum.Enum):
57class GrantsTargetLayer(str, Enum):
58    """Target layer(s) where grants should be applied."""
59
60    ALL = "all"
61    PHYSICAL = "physical"
62    VIRTUAL = "virtual"
63
64    @classproperty
65    def default(cls) -> "GrantsTargetLayer":
66        return GrantsTargetLayer.VIRTUAL
67
68    @property
69    def is_all(self) -> bool:
70        return self == GrantsTargetLayer.ALL
71
72    @property
73    def is_physical(self) -> bool:
74        return self == GrantsTargetLayer.PHYSICAL
75
76    @property
77    def is_virtual(self) -> bool:
78        return self == GrantsTargetLayer.VIRTUAL
79
80    def __str__(self) -> str:
81        return self.name
82
83    def __repr__(self) -> str:
84        return str(self)

Target layer(s) where grants should be applied.

ALL = ALL
PHYSICAL = PHYSICAL
VIRTUAL = VIRTUAL
default: GrantsTargetLayer
64    @classproperty
65    def default(cls) -> "GrantsTargetLayer":
66        return GrantsTargetLayer.VIRTUAL

Target layer(s) where grants should be applied.

is_all: bool
68    @property
69    def is_all(self) -> bool:
70        return self == GrantsTargetLayer.ALL
is_physical: bool
72    @property
73    def is_physical(self) -> bool:
74        return self == GrantsTargetLayer.PHYSICAL
is_virtual: bool
76    @property
77    def is_virtual(self) -> bool:
78        return self == GrantsTargetLayer.VIRTUAL
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ModelMeta(sqlmesh.core.node._Node):
 87class ModelMeta(_Node):
 88    """Metadata for models which can be defined in SQL."""
 89
 90    dialect: str = ""
 91    name: str
 92    kind: ModelKind = ViewKind()
 93    retention: t.Optional[int] = None  # not implemented yet
 94    table_format: t.Optional[str] = None
 95    storage_format: t.Optional[str] = None
 96    partitioned_by_: t.List[exp.Expr] = Field(default=[], alias="partitioned_by")
 97    clustered_by: t.List[exp.Expr] = []
 98    default_catalog: t.Optional[str] = None
 99    depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
100    columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns")
101    column_descriptions_: t.Optional[t.Dict[str, str]] = Field(
102        default=None, alias="column_descriptions"
103    )
104    audits: t.List[FunctionCall] = []
105    grains: t.List[exp.Expr] = []
106    references: t.List[exp.Expr] = []
107    physical_schema_override: t.Optional[str] = None
108    physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties")
109    virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties")
110    session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties")
111    allow_partials: bool = False
112    signals: t.List[FunctionCall] = []
113    enabled: bool = True
114    physical_version: t.Optional[str] = None
115    gateway: t.Optional[str] = None
116    optimize_query: t.Optional[bool] = None
117    ignored_rules_: t.Optional[t.Set[str]] = Field(
118        default=None, exclude=True, alias="ignored_rules"
119    )
120    formatting: t.Optional[bool] = Field(default=None, exclude=True)
121    virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
122    grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants")
123    grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default
124
125    _bool_validator = bool_validator
126    _model_kind_validator = model_kind_validator
127    _properties_validator = properties_validator
128    _default_catalog_validator = default_catalog_validator
129    _depends_on_validator = depends_on_validator
130
131    @field_validator("audits", "signals", mode="before")
132    def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any:
133        is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals"
134
135        return d.extract_function_calls(v, allow_tuples=is_signal)
136
137    @field_validator("tags", mode="before")
138    def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
139        return ensure_list(cls._validate_value_or_tuple(v, info.data))
140
141    @classmethod
142    def _validate_value_or_tuple(
143        cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False
144    ) -> t.Any:
145        dialect = data.get("dialect")
146
147        def _normalize(value: t.Any) -> t.Any:
148            return normalize_identifiers(value, dialect=dialect) if normalize else value
149
150        if isinstance(v, exp.Paren):
151            v = [v.unnest()]
152
153        if isinstance(v, (exp.Tuple, exp.Array)):
154            return [_normalize(e).name for e in v.expressions]
155        if isinstance(v, exp.Expr):
156            return _normalize(v).name
157        if isinstance(v, str):
158            value = _normalize(v)
159            return value.name if isinstance(value, exp.Expr) else value
160        if isinstance(v, (list, tuple)):
161            return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v]
162
163        return v
164
165    @field_validator("table_format", "storage_format", mode="before")
166    def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]:
167        if isinstance(v, exp.Expr) and not (isinstance(v, (exp.Literal, exp.Identifier))):
168            return v.sql(info.data.get("dialect"))
169        return str_or_exp_to_str(v)
170
171    @field_validator("dialect", mode="before")
172    def _dialect_validator(cls, v: t.Any) -> t.Optional[str]:
173        # dialects are parsed as identifiers and may get normalized as uppercase,
174        # so this ensures they'll be stored as lowercase
175        dialect = str_or_exp_to_str(v)
176        return dialect and dialect.lower()
177
178    @field_validator("physical_version", mode="before")
179    def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]:
180        if v is None:
181            return v
182        return str_or_exp_to_str(v)
183
184    @field_validator("gateway", mode="before")
185    def _gateway_validator(cls, v: t.Any) -> t.Optional[str]:
186        if v is None:
187            return None
188        gateway = str_or_exp_to_str(v)
189        return gateway and gateway.lower()
190
191    @field_validator("partitioned_by_", "clustered_by", mode="before")
192    def _partition_and_cluster_validator(cls, v: t.Any, info: ValidationInfo) -> t.List[exp.Expr]:
193        if (
194            isinstance(v, list)
195            and all(isinstance(i, str) for i in v)
196            and info.field_name == "partitioned_by_"
197        ):
198            # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str]
199            # however, we should only invoke this if the list contains strings because this validator is also
200            # called by Python models which might pass a List[exp.Expression]
201            string_to_parse = (
202                f"({','.join(v)})"  # recreate the (a, b, c) part of "partitioned_by (a, b, c)"
203            )
204            parsed = parse_one(
205                string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info)
206            )
207            v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v
208
209        expressions = list_of_fields_validator(v, info.data)
210
211        for expression in expressions:
212            num_cols = len(list(expression.find_all(exp.Column)))
213
214            error_msg: t.Optional[str] = None
215            if num_cols == 0:
216                error_msg = "does not contain a column"
217            elif num_cols > 1:
218                error_msg = "contains multiple columns"
219
220            if error_msg:
221                raise ConfigError(f"Field '{expression}' {error_msg}")
222
223        return expressions
224
225    @field_validator(
226        "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False
227    )
228    def _columns_validator(
229        cls, v: t.Any, info: ValidationInfo
230    ) -> t.Optional[t.Dict[str, exp.DataType]]:
231        columns_to_types = {}
232        dialect = info.data.get("dialect")
233
234        if isinstance(v, exp.Schema):
235            for column in v.expressions:
236                expr = column.args.get("kind")
237                if not isinstance(expr, exp.DataType):
238                    raise ConfigError(f"Missing data type for column '{column.name}'.")
239
240                expr.meta["dialect"] = dialect
241                columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr
242
243            return columns_to_types
244
245        if isinstance(v, dict):
246            dialect_obj = Dialect.get_or_raise(dialect)
247            udt = dialect_obj.SUPPORTS_USER_DEFINED_TYPES
248            for k, data_type in v.items():
249                is_string_type = isinstance(data_type, str)
250                expr = exp.DataType.build(data_type, dialect=dialect, udt=udt)
251                # When deserializing from a string (e.g. JSON roundtrip), normalize the type
252                # through the dialect's type system so that aliases (e.g. INT in BigQuery,
253                # which is an alias for INT64/BIGINT) are resolved to their canonical form.
254                # This ensures stable data hash computation across serialization/deserialization
255                # roundtrips. We skip this for DataType objects passed directly (Python API)
256                # since those should be used as-is.
257                if (
258                    is_string_type
259                    and dialect
260                    and expr.this
261                    not in (
262                        exp.DataType.Type.USERDEFINED,
263                        exp.DataType.Type.UNKNOWN,
264                    )
265                ):
266                    sql_repr = expr.sql(dialect=dialect)
267                    try:
268                        normalized = parse_one(sql_repr, read=dialect, into=exp.DataType)
269                        if normalized is not None:
270                            expr = normalized
271                    except Exception:
272                        pass
273                expr.meta["dialect"] = dialect
274                columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr
275
276            return columns_to_types
277
278        return v
279
280    @field_validator("column_descriptions_", mode="before")
281    def _column_descriptions_validator(
282        cls, vs: t.Any, info: ValidationInfo
283    ) -> t.Optional[t.Dict[str, str]]:
284        dialect = info.data.get("dialect")
285
286        if vs is None:
287            return None
288
289        if isinstance(vs, exp.Paren):
290            vs = vs.flatten()
291
292        if isinstance(vs, (exp.Tuple, exp.Array)):
293            vs = vs.expressions
294
295        raw_col_descriptions = (
296            vs
297            if isinstance(vs, dict)
298            else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs}
299        )
300
301        col_descriptions = {
302            normalize_identifiers(k, dialect=dialect).name: v
303            for k, v in raw_col_descriptions.items()
304        }
305
306        columns_to_types = info.data.get("columns_to_types_")
307        if columns_to_types:
308            from sqlmesh.core.console import get_console
309
310            console = get_console()
311            for column_name in list(col_descriptions):
312                if column_name not in columns_to_types:
313                    console.log_warning(
314                        f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model."
315                    )
316                    del col_descriptions[column_name]
317
318        return col_descriptions
319
320    @field_validator("grains", "references", mode="before")
321    def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expr]:
322        dialect = info.data.get("dialect")
323
324        if isinstance(vs, exp.Paren):
325            vs = vs.unnest()
326
327        if isinstance(vs, (exp.Tuple, exp.Array)):
328            vs = vs.expressions
329        else:
330            vs = [
331                d.parse_one(v, dialect=dialect) if isinstance(v, str) else v
332                for v in ensure_collection(vs)
333            ]
334
335        refs = []
336
337        for v in vs:
338            v = exp.column(v) if isinstance(v, exp.Identifier) else v
339            v.meta["dialect"] = dialect
340            refs.append(v)
341
342        return refs
343
344    @field_validator("ignored_rules_", mode="before")
345    def ignored_rules_validator(cls, vs: t.Any) -> t.Any:
346        return LinterConfig._validate_rules(vs)
347
348    @field_validator("grants_target_layer", mode="before")
349    def _grants_target_layer_validator(cls, v: t.Any) -> t.Any:
350        if isinstance(v, exp.Identifier):
351            return v.this
352        if isinstance(v, exp.Literal) and v.is_string:
353            return v.this
354        return v
355
356    @field_validator("session_properties_", mode="before")
357    def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
358        # use the generic properties validator to parse the session properties
359        parsed_session_properties = parse_properties(type(cls), v, info)
360        if not parsed_session_properties:
361            return parsed_session_properties
362
363        for eq in parsed_session_properties:
364            prop_name = eq.left.name
365
366            if prop_name == "query_label":
367                query_label = eq.right
368                if not isinstance(
369                    query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar)
370                ):
371                    raise ConfigError(
372                        "Invalid value for `session_properties.query_label`. Must be an array or tuple."
373                    )
374
375                label_tuples: t.List[exp.Expr] = (
376                    [query_label.unnest()]
377                    if isinstance(query_label, exp.Paren)
378                    else query_label.expressions
379                )
380
381                for label_tuple in label_tuples:
382                    if not (
383                        isinstance(label_tuple, exp.Tuple)
384                        and len(label_tuple.expressions) == 2
385                        and all(isinstance(label, exp.Literal) for label in label_tuple.expressions)
386                    ):
387                        raise ConfigError(
388                            "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2."
389                        )
390            elif prop_name == "authorization":
391                authorization = eq.right
392                if not (
393                    isinstance(authorization, exp.Literal) and authorization.is_string
394                ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)):
395                    raise ConfigError(
396                        "Invalid value for `session_properties.authorization`. Must be a string literal."
397                    )
398
399        return parsed_session_properties
400
401    @model_validator(mode="before")
402    def _pre_root_validator(cls, data: t.Any) -> t.Any:
403        if not isinstance(data, dict):
404            return data
405
406        grain = data.pop("grain", None)
407        if grain:
408            grains = data.get("grains")
409            if grains:
410                raise ConfigError(
411                    f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains"
412                )
413            data["grains"] = ensure_list(grain)
414
415        table_properties = data.pop("table_properties", None)
416        if table_properties:
417            if not isinstance(table_properties, str):
418                # Do not warn when deserializing from the state.
419                model_name = data["name"]
420                from sqlmesh.core.console import get_console
421
422                get_console().log_warning(
423                    f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead."
424                )
425            physical_properties = data.get("physical_properties")
426            if physical_properties:
427                raise ConfigError(
428                    f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties."
429                )
430
431            data["physical_properties"] = table_properties
432
433        return data
434
435    @model_validator(mode="after")
436    def _root_validator(self) -> Self:
437        kind: t.Any = self.kind
438
439        for field in ("partitioned_by_", "clustered_by"):
440            if (
441                getattr(self, field, None)
442                and not kind.is_materialized
443                and not (kind.is_view and kind.materialized)
444            ):
445                name = field[:-1] if field.endswith("_") else field
446                raise ValueError(f"{name} field cannot be set for {kind.name} models")
447        if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None):
448            raise ValueError(f"partitioned_by field is required for {kind.name} models")
449
450        # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str
451        if (storage_format := self.storage_format) and storage_format.lower() in {
452            "iceberg",
453            "hive",
454            "hudi",
455            "delta",
456        }:
457            from sqlmesh.core.console import get_console
458
459            get_console().log_warning(
460                f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead."
461            )
462
463        # Validate grants configuration for model kind support
464        if self.grants is not None and not kind.supports_grants:
465            raise ValueError(f"grants cannot be set for {kind.name} models")
466
467        return self
468
469    @property
470    def time_column(self) -> t.Optional[TimeColumn]:
471        """The time column for incremental models."""
472        return getattr(self.kind, "time_column", None)
473
474    @property
475    def unique_key(self) -> t.List[exp.Expr]:
476        if isinstance(
477            self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind)
478        ):
479            return self.kind.unique_key
480        return []
481
482    @property
483    def column_descriptions(self) -> t.Dict[str, str]:
484        """A dictionary of column names to annotation comments."""
485        return self.column_descriptions_ or {}
486
487    @property
488    def lookback(self) -> int:
489        """The incremental lookback window."""
490        return getattr(self.kind, "lookback", 0) or 0
491
492    def lookback_start(self, start: TimeLike) -> TimeLike:
493        if self.lookback == 0:
494            return start
495
496        for _ in range(self.lookback):
497            start = self.interval_unit.cron_prev(start)
498        return start
499
500    @property
501    def batch_size(self) -> t.Optional[int]:
502        """The maximal number of units in a single task for a backfill."""
503        return getattr(self.kind, "batch_size", None)
504
505    @property
506    def batch_concurrency(self) -> t.Optional[int]:
507        """The maximal number of batches that can run concurrently for a backfill."""
508        return getattr(self.kind, "batch_concurrency", None)
509
510    @cached_property
511    def physical_properties(self) -> t.Dict[str, exp.Expr]:
512        """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated."""
513        if self.physical_properties_:
514            return {e.this.name: e.expression for e in self.physical_properties_.expressions}
515        return {}
516
517    @cached_property
518    def virtual_properties(self) -> t.Dict[str, exp.Expr]:
519        """A dictionary of properties that will be applied to the virtual layer."""
520        if self.virtual_properties_:
521            return {e.this.name: e.expression for e in self.virtual_properties_.expressions}
522        return {}
523
524    @property
525    def session_properties(self) -> SessionProperties:
526        """A dictionary of session properties."""
527        if not self.session_properties_:
528            return {}
529
530        return d.interpret_key_value_pairs(self.session_properties_)
531
532    @property
533    def custom_materialization_properties(self) -> CustomMaterializationProperties:
534        if isinstance(self.kind, CustomKind):
535            return self.kind.materialization_properties
536        return {}
537
538    @cached_property
539    def grants(self) -> t.Optional[GrantsConfig]:
540        """A dictionary of grants mapping permission names to lists of grantees."""
541
542        if self.grants_ is None:
543            return None
544
545        if not self.grants_.expressions:
546            return {}
547
548        grants_dict = {}
549        for eq_expr in self.grants_.expressions:
550            try:
551                permission_name = self._validate_config_expression(eq_expr.left)
552                grantee_list = self._validate_nested_config_values(eq_expr.expression)
553                grants_dict[permission_name] = grantee_list
554            except ConfigError as e:
555                permission_name = (
556                    eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left)
557                )
558                raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}")
559
560        return grants_dict if grants_dict else None
561
562    @property
563    def all_references(self) -> t.List[Reference]:
564        """All references including grains."""
565        return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [
566            Reference(model_name=self.name, expression=e, unique=True) for e in self.references
567        ]
568
569    @property
570    def on(self) -> t.List[str]:
571        """The grains to be used as join condition in table_diff."""
572
573        on: t.List[str] = []
574        for expr in [ref.expression for ref in self.all_references if ref.unique]:
575            if isinstance(expr, exp.Tuple):
576                on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions])
577            else:
578                # Handle a single Column or Paren expression
579                on.append(expr.this.sql(dialect=self.dialect))
580
581        return on
582
583    @property
584    def managed_columns(self) -> t.Dict[str, exp.DataType]:
585        return getattr(self.kind, "managed_columns", {})
586
587    @property
588    def when_matched(self) -> t.Optional[exp.Whens]:
589        if isinstance(self.kind, IncrementalByUniqueKeyKind):
590            return self.kind.when_matched
591        return None
592
593    @property
594    def merge_filter(self) -> t.Optional[exp.Expr]:
595        if isinstance(self.kind, IncrementalByUniqueKeyKind):
596            return self.kind.merge_filter
597        return None
598
599    @property
600    def catalog(self) -> t.Optional[str]:
601        """Returns the catalog of a model."""
602        return self.fully_qualified_table.catalog
603
604    @cached_property
605    def fully_qualified_table(self) -> exp.Table:
606        return exp.to_table(self.fqn)
607
608    @cached_property
609    def fqn(self) -> str:
610        return normalize_model_name(
611            self.name, default_catalog=self.default_catalog, dialect=self.dialect
612        )
613
614    @property
615    def on_destructive_change(self) -> OnDestructiveChange:
616        return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
617
618    @property
619    def on_additive_change(self) -> OnAdditiveChange:
620        """Return the model's additive change setting if it has one."""
621        return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)
622
623    @property
624    def ignored_rules(self) -> t.Set[str]:
625        return self.ignored_rules_ or set()
626
627    def _validate_config_expression(self, expr: exp.Expr) -> str:
628        if isinstance(expr, (d.MacroFunc, d.MacroVar)):
629            raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}")
630
631        if isinstance(expr, exp.Null):
632            raise ConfigError("NULL value")
633
634        if isinstance(expr, exp.Literal):
635            return str(expr.this).strip()
636        if isinstance(expr, (exp.Column, exp.Identifier)):
637            return expr.name
638        return expr.sql(dialect=self.dialect).strip()
639
640    def _validate_nested_config_values(self, value_expr: exp.Expr) -> t.List[str]:
641        result = []
642
643        def flatten_expr(expr: exp.Expr) -> None:
644            if isinstance(expr, exp.Array):
645                for elem in expr.expressions:
646                    flatten_expr(elem)
647            elif isinstance(expr, (exp.Tuple, exp.Paren)):
648                expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions
649                for elem in expressions:
650                    flatten_expr(elem)
651            else:
652                result.append(self._validate_config_expression(expr))
653
654        flatten_expr(value_expr)
655        return result

Metadata for models which can be defined in SQL.

dialect: str
name: str
retention: Optional[int]
table_format: Optional[str]
storage_format: Optional[str]
partitioned_by_: List[sqlglot.expressions.core.Expr]
clustered_by: List[sqlglot.expressions.core.Expr]
default_catalog: Optional[str]
depends_on_: Optional[Set[str]]
columns_to_types_: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]]
column_descriptions_: Optional[Dict[str, str]]
audits: List[Tuple[str, Dict[str, sqlglot.expressions.core.Expr]]]
grains: List[sqlglot.expressions.core.Expr]
references: List[sqlglot.expressions.core.Expr]
physical_schema_override: Optional[str]
physical_properties_: Optional[sqlglot.expressions.query.Tuple]
virtual_properties_: Optional[sqlglot.expressions.query.Tuple]
session_properties_: Optional[sqlglot.expressions.query.Tuple]
allow_partials: bool
signals: List[Tuple[str, Dict[str, sqlglot.expressions.core.Expr]]]
enabled: bool
physical_version: Optional[str]
gateway: Optional[str]
optimize_query: Optional[bool]
ignored_rules_: Optional[Set[str]]
formatting: Optional[bool]
grants_: Optional[sqlglot.expressions.query.Tuple]
grants_target_layer: GrantsTargetLayer
@field_validator('ignored_rules_', mode='before')
def ignored_rules_validator(cls, vs: Any) -> Any:
344    @field_validator("ignored_rules_", mode="before")
345    def ignored_rules_validator(cls, vs: t.Any) -> t.Any:
346        return LinterConfig._validate_rules(vs)
@field_validator('session_properties_', mode='before')
def session_properties_validator(cls, v: Any, info: pydantic_core.core_schema.ValidationInfo) -> Any:
356    @field_validator("session_properties_", mode="before")
357    def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
358        # use the generic properties validator to parse the session properties
359        parsed_session_properties = parse_properties(type(cls), v, info)
360        if not parsed_session_properties:
361            return parsed_session_properties
362
363        for eq in parsed_session_properties:
364            prop_name = eq.left.name
365
366            if prop_name == "query_label":
367                query_label = eq.right
368                if not isinstance(
369                    query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar)
370                ):
371                    raise ConfigError(
372                        "Invalid value for `session_properties.query_label`. Must be an array or tuple."
373                    )
374
375                label_tuples: t.List[exp.Expr] = (
376                    [query_label.unnest()]
377                    if isinstance(query_label, exp.Paren)
378                    else query_label.expressions
379                )
380
381                for label_tuple in label_tuples:
382                    if not (
383                        isinstance(label_tuple, exp.Tuple)
384                        and len(label_tuple.expressions) == 2
385                        and all(isinstance(label, exp.Literal) for label in label_tuple.expressions)
386                    ):
387                        raise ConfigError(
388                            "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2."
389                        )
390            elif prop_name == "authorization":
391                authorization = eq.right
392                if not (
393                    isinstance(authorization, exp.Literal) and authorization.is_string
394                ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)):
395                    raise ConfigError(
396                        "Invalid value for `session_properties.authorization`. Must be a string literal."
397                    )
398
399        return parsed_session_properties
time_column: Optional[sqlmesh.core.model.kind.TimeColumn]
469    @property
470    def time_column(self) -> t.Optional[TimeColumn]:
471        """The time column for incremental models."""
472        return getattr(self.kind, "time_column", None)

The time column for incremental models.

unique_key: List[sqlglot.expressions.core.Expr]
474    @property
475    def unique_key(self) -> t.List[exp.Expr]:
476        if isinstance(
477            self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind)
478        ):
479            return self.kind.unique_key
480        return []
column_descriptions: Dict[str, str]
482    @property
483    def column_descriptions(self) -> t.Dict[str, str]:
484        """A dictionary of column names to annotation comments."""
485        return self.column_descriptions_ or {}

A dictionary of column names to annotation comments.

lookback: int
487    @property
488    def lookback(self) -> int:
489        """The incremental lookback window."""
490        return getattr(self.kind, "lookback", 0) or 0

The incremental lookback window.

def lookback_start( self, start: Union[datetime.date, datetime.datetime, str, int, float]) -> Union[datetime.date, datetime.datetime, str, int, float]:
492    def lookback_start(self, start: TimeLike) -> TimeLike:
493        if self.lookback == 0:
494            return start
495
496        for _ in range(self.lookback):
497            start = self.interval_unit.cron_prev(start)
498        return start
batch_size: Optional[int]
500    @property
501    def batch_size(self) -> t.Optional[int]:
502        """The maximal number of units in a single task for a backfill."""
503        return getattr(self.kind, "batch_size", None)

The maximal number of units in a single task for a backfill.

batch_concurrency: Optional[int]
505    @property
506    def batch_concurrency(self) -> t.Optional[int]:
507        """The maximal number of batches that can run concurrently for a backfill."""
508        return getattr(self.kind, "batch_concurrency", None)

The maximal number of batches that can run concurrently for a backfill.

physical_properties: Dict[str, sqlglot.expressions.core.Expr]
510    @cached_property
511    def physical_properties(self) -> t.Dict[str, exp.Expr]:
512        """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated."""
513        if self.physical_properties_:
514            return {e.this.name: e.expression for e in self.physical_properties_.expressions}
515        return {}

A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.

virtual_properties: Dict[str, sqlglot.expressions.core.Expr]
517    @cached_property
518    def virtual_properties(self) -> t.Dict[str, exp.Expr]:
519        """A dictionary of properties that will be applied to the virtual layer."""
520        if self.virtual_properties_:
521            return {e.this.name: e.expression for e in self.virtual_properties_.expressions}
522        return {}

A dictionary of properties that will be applied to the virtual layer.

session_properties: Dict[str, sqlglot.expressions.core.Expr | str | int | float | bool]
524    @property
525    def session_properties(self) -> SessionProperties:
526        """A dictionary of session properties."""
527        if not self.session_properties_:
528            return {}
529
530        return d.interpret_key_value_pairs(self.session_properties_)

A dictionary of session properties.

custom_materialization_properties: Dict[str, sqlglot.expressions.core.Expr | str | int | float | bool]
532    @property
533    def custom_materialization_properties(self) -> CustomMaterializationProperties:
534        if isinstance(self.kind, CustomKind):
535            return self.kind.materialization_properties
536        return {}
grants: Optional[<MagicMock id='132726840138512'>]
538    @cached_property
539    def grants(self) -> t.Optional[GrantsConfig]:
540        """A dictionary of grants mapping permission names to lists of grantees."""
541
542        if self.grants_ is None:
543            return None
544
545        if not self.grants_.expressions:
546            return {}
547
548        grants_dict = {}
549        for eq_expr in self.grants_.expressions:
550            try:
551                permission_name = self._validate_config_expression(eq_expr.left)
552                grantee_list = self._validate_nested_config_values(eq_expr.expression)
553                grants_dict[permission_name] = grantee_list
554            except ConfigError as e:
555                permission_name = (
556                    eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left)
557                )
558                raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}")
559
560        return grants_dict if grants_dict else None

A dictionary of grants mapping permission names to lists of grantees.

all_references: List[sqlmesh.core.reference.Reference]
562    @property
563    def all_references(self) -> t.List[Reference]:
564        """All references including grains."""
565        return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [
566            Reference(model_name=self.name, expression=e, unique=True) for e in self.references
567        ]

All references including grains.

on: List[str]
569    @property
570    def on(self) -> t.List[str]:
571        """The grains to be used as join condition in table_diff."""
572
573        on: t.List[str] = []
574        for expr in [ref.expression for ref in self.all_references if ref.unique]:
575            if isinstance(expr, exp.Tuple):
576                on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions])
577            else:
578                # Handle a single Column or Paren expression
579                on.append(expr.this.sql(dialect=self.dialect))
580
581        return on

The grains to be used as join condition in table_diff.

managed_columns: Dict[str, sqlglot.expressions.datatypes.DataType]
583    @property
584    def managed_columns(self) -> t.Dict[str, exp.DataType]:
585        return getattr(self.kind, "managed_columns", {})
when_matched: Optional[sqlglot.expressions.dml.Whens]
587    @property
588    def when_matched(self) -> t.Optional[exp.Whens]:
589        if isinstance(self.kind, IncrementalByUniqueKeyKind):
590            return self.kind.when_matched
591        return None
merge_filter: Optional[sqlglot.expressions.core.Expr]
593    @property
594    def merge_filter(self) -> t.Optional[exp.Expr]:
595        if isinstance(self.kind, IncrementalByUniqueKeyKind):
596            return self.kind.merge_filter
597        return None
catalog: Optional[str]
599    @property
600    def catalog(self) -> t.Optional[str]:
601        """Returns the catalog of a model."""
602        return self.fully_qualified_table.catalog

Returns the catalog of a model.

fully_qualified_table: sqlglot.expressions.query.Table
604    @cached_property
605    def fully_qualified_table(self) -> exp.Table:
606        return exp.to_table(self.fqn)
fqn: str
608    @cached_property
609    def fqn(self) -> str:
610        return normalize_model_name(
611            self.name, default_catalog=self.default_catalog, dialect=self.dialect
612        )
on_destructive_change: sqlmesh.core.model.kind.OnDestructiveChange
614    @property
615    def on_destructive_change(self) -> OnDestructiveChange:
616        return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
on_additive_change: sqlmesh.core.model.kind.OnAdditiveChange
618    @property
619    def on_additive_change(self) -> OnAdditiveChange:
620        """Return the model's additive change setting if it has one."""
621        return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)

Return the model's additive change setting if it has one.

ignored_rules: Set[str]
623    @property
624    def ignored_rules(self) -> t.Set[str]:
625        return self.ignored_rules_ or set()
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
copy
interval_unit
depends_on
data_hash
metadata_hash
is_metadata_only_change
is_data_change
croniter
cron_next
cron_prev
cron_floor
text_diff
is_model
is_audit
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields