Edit on GitHub

sqlmesh.dbt.model

  1from __future__ import annotations
  2
  3import datetime
  4import typing as t
  5import logging
  6
  7from sqlglot import exp
  8from sqlglot.errors import SqlglotError
  9from sqlglot.helper import ensure_list
 10
 11from sqlmesh.core import dialect as d
 12from sqlmesh.core.config.base import UpdateStrategy
 13from sqlmesh.core.config.common import VirtualEnvironmentMode
 14from sqlmesh.core.console import get_console
 15from sqlmesh.core.model import (
 16    EmbeddedKind,
 17    FullKind,
 18    IncrementalByTimeRangeKind,
 19    IncrementalByUniqueKeyKind,
 20    IncrementalUnmanagedKind,
 21    Model,
 22    ModelKind,
 23    SCDType2ByColumnKind,
 24    ViewKind,
 25    ManagedKind,
 26    create_sql_model,
 27)
 28from sqlmesh.core.model.kind import (
 29    SCDType2ByTimeKind,
 30    OnDestructiveChange,
 31    OnAdditiveChange,
 32    on_destructive_change_validator,
 33    on_additive_change_validator,
 34    DbtCustomKind,
 35)
 36from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy
 37from sqlmesh.dbt.common import SqlStr, sql_str_validator
 38from sqlmesh.utils.errors import ConfigError
 39from sqlmesh.utils.pydantic import field_validator
 40
 41if t.TYPE_CHECKING:
 42    from sqlmesh.core.audit.definition import ModelAudit
 43    from sqlmesh.dbt.context import DbtContext
 44    from sqlmesh.dbt.package import MaterializationConfig
 45
 46logger = logging.getLogger(__name__)
 47
 48
 49logger = logging.getLogger(__name__)
 50
 51
 52INCREMENTAL_BY_TIME_RANGE_STRATEGIES = set(
 53    ["delete+insert", "insert_overwrite", "microbatch", "incremental_by_time_range"]
 54)
 55INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = set(["merge"])
 56
 57
 58def collection_to_str(collection: t.Iterable) -> str:
 59    return ", ".join(f"'{item}'" for item in sorted(collection))
 60
 61
 62class ModelConfig(BaseModelConfig):
 63    """
 64    ModelConfig contains all config parameters available to DBT models
 65
 66    See https://docs.getdbt.com/reference/configs-and-properties for
 67    a more detailed description of each config parameter under the
 68    General propreties, General configs, and For models sections.
 69
 70    Args:
 71        sql: The model sql
 72        time_column: The name of the time column
 73        cron: A cron string specifying how often the model should be refreshed, leveraging the
 74            [croniter](https://github.com/kiorky/croniter) library.
 75        interval_unit: The duration of an interval for the model. By default, it is computed from the cron expression.
 76        batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None,
 77            then backfilling this model will do all of history in one job. If this is set, a model's backfill
 78            will be chunked such that each individual job will only contain jobs with max `batch_size` intervals.
 79        lookback: The number of previous incremental intervals in the lookback window.
 80        cluster_by: Field(s) to use for clustering in data warehouses that support clustering
 81        incremental_strategy: Strategy used to build the incremental model
 82        materialized: How the model will be materialized in the database
 83        sql_header: SQL statement to run before table/view creation. Currently implemented as a pre-hook.
 84        unique_key: List of columns that define row uniqueness for the model
 85        partition_by: List of partition columns or dictionary of bigquery partition by parameters ([dbt bigquery config](https://docs.getdbt.com/reference/resource-configs/bigquery-configs)).
 86    """
 87
 88    # sqlmesh fields
 89    sql: SqlStr = SqlStr("")
 90    time_column: t.Optional[t.Union[str, t.Dict[str, str]]] = None
 91    cron: t.Optional[str] = None
 92    interval_unit: t.Optional[str] = None
 93    batch_concurrency: t.Optional[int] = None
 94    forward_only: bool = True
 95    disable_restatement: t.Optional[bool] = None
 96    allow_partials: bool = True
 97    physical_version: t.Optional[str] = None
 98    auto_restatement_cron: t.Optional[str] = None
 99    auto_restatement_intervals: t.Optional[int] = None
100    partition_by_time_column: t.Optional[bool] = None
101    on_destructive_change: t.Optional[OnDestructiveChange] = None
102    on_additive_change: t.Optional[OnAdditiveChange] = None
103
104    # DBT configuration fields
105    cluster_by: t.Optional[t.List[str]] = None
106    incremental_strategy: t.Optional[str] = None
107    materialized: str = Materialization.VIEW.value
108    sql_header: t.Optional[str] = None
109    unique_key: t.Optional[t.List[str]] = None
110    partition_by: t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]] = None
111    full_refresh: t.Optional[bool] = None
112    on_schema_change: str = "ignore"
113
114    # Snapshot (SCD Type 2) Fields
115    updated_at: t.Optional[str] = None
116    strategy: t.Optional[str] = None
117    invalidate_hard_deletes: bool = False
118    target_schema: t.Optional[str] = None
119    check_cols: t.Optional[t.Union[t.List[str], str]] = None
120
121    # Microbatch Fields
122    event_time: t.Optional[str] = None
123    begin: t.Optional[datetime.datetime] = None
124    concurrent_batches: t.Optional[bool] = None
125
126    # Shared SQLMesh and DBT configuration fields
127    batch_size: t.Optional[t.Union[int, str]] = None
128    lookback: t.Optional[int] = None
129
130    # redshift
131    bind: t.Optional[bool] = None
132
133    # bigquery
134    require_partition_filter: t.Optional[bool] = None
135    partition_expiration_days: t.Optional[int] = None
136
137    # snowflake
138    snowflake_warehouse: t.Optional[str] = None
139    # note: for Snowflake dynamic tables, in the DBT adapter we only support properties that DBT supports
140    # which are defined here: https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
141    target_lag: t.Optional[str] = None
142
143    # clickhouse
144    engine: t.Optional[str] = None
145    order_by: t.Optional[t.Union[t.List[str], str]] = None
146    primary_key: t.Optional[t.Union[t.List[str], str]] = None
147    sharding_key: t.Optional[t.Union[t.List[str], str]] = None
148    ttl: t.Optional[t.Union[t.List[str], str]] = None
149    settings: t.Optional[t.Dict[str, t.Any]] = None
150    query_settings: t.Optional[t.Dict[str, t.Any]] = None
151    inserts_only: t.Optional[bool] = None
152    incremental_predicates: t.Optional[t.List[str]] = None
153
154    _sql_validator = sql_str_validator
155    _on_destructive_change_validator = on_destructive_change_validator
156    _on_additive_change_validator = on_additive_change_validator
157
158    @field_validator(
159        "unique_key",
160        "cluster_by",
161        "tags",
162        mode="before",
163    )
164    @classmethod
165    def _validate_list(cls, v: t.Union[str, t.List[str]]) -> t.List[str]:
166        return ensure_list(v)
167
168    @field_validator("check_cols", mode="before")
169    @classmethod
170    def _validate_check_cols(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]:
171        if isinstance(v, str) and v.lower() == "all":
172            return "*"
173        return ensure_list(v)
174
175    @field_validator("updated_at", mode="before")
176    @classmethod
177    def _validate_updated_at(cls, v: t.Optional[str]) -> t.Optional[str]:
178        """
179        Extract column name if updated_at contains a cast.
180
181        SCDType2ByTimeKind and SCDType2ByColumnKind expect a column, and the casting is done later.
182        """
183        if v is None:
184            return None
185        parsed = d.parse_one(v)
186        if isinstance(parsed, exp.Cast) and isinstance(parsed.this, exp.Column):
187            return parsed.this.name
188
189        return v
190
191    @field_validator("sql", mode="before")
192    @classmethod
193    def _validate_sql(cls, v: t.Union[str, SqlStr]) -> SqlStr:
194        return SqlStr(v)
195
196    @field_validator("partition_by", mode="before")
197    @classmethod
198    def _validate_partition_by(
199        cls, v: t.Any
200    ) -> t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]]:
201        if v is None:
202            return None
203        if isinstance(v, str):
204            return [v]
205        if isinstance(v, list):
206            return v
207        if isinstance(v, dict):
208            if not v.get("field"):
209                raise ConfigError("'field' key required for partition_by.")
210            if "granularity" in v and v["granularity"].lower() not in (
211                "day",
212                "month",
213                "year",
214                "hour",
215            ):
216                granularity = v["granularity"]
217                raise ConfigError(f"Unexpected granularity '{granularity}' in partition_by '{v}'.")
218            if "data_type" in v and v["data_type"].lower() not in (
219                "timestamp",
220                "date",
221                "datetime",
222                "int64",
223            ):
224                data_type = v["data_type"]
225                raise ConfigError(f"Unexpected data_type '{data_type}' in partition_by '{v}'.")
226            return {"data_type": "date", "granularity": "day", **v}
227        raise ConfigError(f"Invalid format for partition_by '{v}'")
228
229    @field_validator("materialized", mode="before")
230    @classmethod
231    def _validate_materialized(cls, v: str) -> str:
232        unsupported_materializations = [
233            "materialized_view",  # multiple engines
234            "dictionary",  # clickhouse only
235            "distributed_table",  # clickhouse only
236            "distributed_incremental",  # clickhouse only
237        ]
238        if v in unsupported_materializations:
239            fallback = v.split("_")
240            msg = f"SQLMesh does not support the '{v}' model materialization."
241            if len(fallback) == 1:
242                # dictionary materialization
243                raise ConfigError(msg)
244            else:
245                get_console().log_warning(
246                    f"{msg} Falling back to the '{fallback[1]}' materialization."
247                )
248
249            return fallback[1]
250        return v
251
252    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
253        **BaseModelConfig._FIELD_UPDATE_STRATEGY,
254        **{
255            "sql": UpdateStrategy.IMMUTABLE,
256            "time_column": UpdateStrategy.IMMUTABLE,
257        },
258    }
259
260    @property
261    def model_materialization(self) -> Materialization:
262        return Materialization(self.materialized.lower())
263
264    @property
265    def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]:
266        return SnapshotStrategy(self.strategy.lower()) if self.strategy else None
267
268    @property
269    def table_schema(self) -> str:
270        return self.target_schema or super().table_schema
271
272    def model_kind(self, context: DbtContext) -> ModelKind:
273        """
274        Get the sqlmesh ModelKind
275        Returns:
276            The sqlmesh ModelKind
277        """
278        target = context.target
279        materialization = self.model_materialization
280
281        # args common to all sqlmesh incremental kinds, regardless of materialization
282        incremental_kind_kwargs: t.Dict[str, t.Any] = {}
283        on_schema_change = self.on_schema_change.lower()
284        if materialization == Materialization.SNAPSHOT:
285            # dbt snapshots default to `append_new_columns` behavior and can't be changed
286            on_schema_change = "append_new_columns"
287
288        if on_schema_change == "ignore":
289            on_destructive_change = OnDestructiveChange.IGNORE
290            on_additive_change = OnAdditiveChange.IGNORE
291        elif on_schema_change == "fail":
292            on_destructive_change = OnDestructiveChange.ERROR
293            on_additive_change = OnAdditiveChange.ERROR
294        elif on_schema_change == "append_new_columns":
295            on_destructive_change = OnDestructiveChange.IGNORE
296            on_additive_change = OnAdditiveChange.ALLOW
297        elif on_schema_change == "sync_all_columns":
298            on_destructive_change = OnDestructiveChange.ALLOW
299            on_additive_change = OnAdditiveChange.ALLOW
300        else:
301            raise ConfigError(
302                f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. "
303                "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'."
304            )
305
306        incremental_kind_kwargs["on_destructive_change"] = (
307            self._get_field_value("on_destructive_change") or on_destructive_change
308        )
309        incremental_kind_kwargs["on_additive_change"] = (
310            self._get_field_value("on_additive_change") or on_additive_change
311        )
312        auto_restatement_cron_value = self._get_field_value("auto_restatement_cron")
313        if auto_restatement_cron_value is not None:
314            incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value
315
316        if materialization == Materialization.TABLE:
317            return FullKind()
318        if materialization == Materialization.VIEW:
319            return ViewKind()
320        if materialization == Materialization.INCREMENTAL:
321            incremental_by_kind_kwargs: t.Dict[str, t.Any] = {"dialect": self.dialect(context)}
322            forward_only_value = self._get_field_value("forward_only")
323            if forward_only_value is not None:
324                incremental_kind_kwargs["forward_only"] = forward_only_value
325
326            is_incremental_by_time_range = self.time_column or (
327                self.incremental_strategy
328                and self.incremental_strategy in {"microbatch", "incremental_by_time_range"}
329            )
330            # Get shared incremental by kwargs
331            for field in ("batch_size", "batch_concurrency", "lookback"):
332                field_val = self._get_field_value(field)
333                if field_val is not None:
334                    # Check if `batch_size` is representing an interval unit and if so that will be handled at the model level
335                    if field == "batch_size" and isinstance(field_val, str):
336                        continue
337                    incremental_by_kind_kwargs[field] = field_val
338
339            disable_restatement = self.disable_restatement
340            if disable_restatement is None:
341                if is_incremental_by_time_range:
342                    disable_restatement = False
343                else:
344                    disable_restatement = (
345                        not self.full_refresh if self.full_refresh is not None else False
346                    )
347            incremental_by_kind_kwargs["disable_restatement"] = disable_restatement
348
349            if is_incremental_by_time_range:
350                strategy = self.incremental_strategy or target.default_incremental_strategy(
351                    IncrementalByTimeRangeKind
352                )
353
354                if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES:
355                    get_console().log_warning(
356                        f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
357                        f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
358                    )
359
360                if self.time_column and strategy != "incremental_by_time_range":
361                    get_console().log_warning(
362                        f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
363                        f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
364                    )
365
366                if strategy == "microbatch":
367                    if self.time_column:
368                        raise ConfigError(
369                            f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead."
370                        )
371                    time_column = self._get_field_value("event_time")
372                    if not time_column:
373                        raise ConfigError(
374                            f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
375                        )
376                    # dbt microbatch always processes batches in a size of 1
377                    incremental_by_kind_kwargs["batch_size"] = 1
378                else:
379                    if not self.time_column:
380                        raise ConfigError(
381                            f"{self.canonical_name(context)}: 'time_column' is required for incremental by time range models not defined using microbatch."
382                        )
383                    time_column = self.time_column
384
385                incremental_by_time_range_kwargs = {
386                    "time_column": time_column,
387                }
388                if self.auto_restatement_intervals:
389                    incremental_by_time_range_kwargs["auto_restatement_intervals"] = (
390                        self.auto_restatement_intervals
391                    )
392                if self.partition_by_time_column is not None:
393                    incremental_by_time_range_kwargs["partition_by_time_column"] = (
394                        self.partition_by_time_column
395                    )
396
397                return IncrementalByTimeRangeKind(
398                    **incremental_kind_kwargs,
399                    **incremental_by_kind_kwargs,
400                    **incremental_by_time_range_kwargs,
401                )
402
403            if self.unique_key:
404                strategy = self.incremental_strategy or target.default_incremental_strategy(
405                    IncrementalByUniqueKeyKind
406                )
407                if (
408                    self.incremental_strategy
409                    and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES
410                ):
411                    get_console().log_warning(
412                        f"Unique key is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
413                        f"Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}. Falling back to 'merge' strategy."
414                    )
415
416                merge_filter = None
417                if self.incremental_predicates:
418                    dialect = self.dialect(context)
419                    merge_filter = exp.and_(
420                        *[
421                            d.parse_one(predicate, dialect=dialect)
422                            for predicate in self.incremental_predicates
423                        ],
424                        dialect=dialect,
425                    ).transform(d.replace_merge_table_aliases)
426
427                return IncrementalByUniqueKeyKind(
428                    unique_key=self.unique_key,
429                    merge_filter=merge_filter,
430                    **incremental_kind_kwargs,
431                    **incremental_by_kind_kwargs,
432                )
433
434            strategy = self.incremental_strategy or target.default_incremental_strategy(
435                IncrementalUnmanagedKind
436            )
437            return IncrementalUnmanagedKind(
438                insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES,
439                disable_restatement=incremental_by_kind_kwargs["disable_restatement"],
440                **incremental_kind_kwargs,
441            )
442        if materialization == Materialization.EPHEMERAL:
443            return EmbeddedKind()
444        if materialization == Materialization.SNAPSHOT:
445            if not self.snapshot_strategy:
446                raise ConfigError(
447                    f"{self.canonical_name(context)}: SQLMesh snapshot strategy is required for snapshot materialization."
448                )
449            shared_kwargs = {
450                "dialect": self.dialect(context),
451                "unique_key": self.unique_key,
452                "invalidate_hard_deletes": self.invalidate_hard_deletes,
453                "valid_from_name": "dbt_valid_from",
454                "valid_to_name": "dbt_valid_to",
455                "time_data_type": (
456                    exp.DataType.build("TIMESTAMPTZ")
457                    if target.dialect == "bigquery"
458                    else exp.DataType.build("TIMESTAMP")
459                ),
460                **incremental_kind_kwargs,
461            }
462            if self.snapshot_strategy.is_check:
463                return SCDType2ByColumnKind(
464                    columns=self.check_cols, execution_time_as_valid_from=True, **shared_kwargs
465                )
466            return SCDType2ByTimeKind(
467                updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs
468            )
469
470        if materialization == Materialization.DYNAMIC_TABLE:
471            return ManagedKind()
472
473        if materialization == Materialization.CUSTOM:
474            if custom_materialization := self._get_custom_materialization(context):
475                return DbtCustomKind(
476                    materialization=self.materialized,
477                    adapter=custom_materialization.adapter,
478                    dialect=self.dialect(context),
479                    definition=custom_materialization.definition,
480                )
481
482            raise ConfigError(
483                f"Unknown materialization '{self.materialized}'. Custom materializations must be defined in your dbt project."
484            )
485
486        raise ConfigError(f"{materialization.value} materialization not supported.")
487
488    def _big_query_partition_by_expr(self, context: DbtContext) -> exp.Expr:
489        assert isinstance(self.partition_by, dict)
490        data_type = self.partition_by["data_type"].lower()
491        raw_field = self.partition_by["field"]
492        try:
493            field = d.parse_one(raw_field, dialect="bigquery")
494        except SqlglotError as e:
495            raise ConfigError(
496                f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{raw_field}' in '{self.path}': {e}"
497            ) from e
498
499        if data_type == "date" and self.partition_by["granularity"].lower() == "day":
500            return field
501
502        if data_type == "int64":
503            if "range" not in self.partition_by:
504                raise ConfigError(
505                    f"Range is required for int64 partitioning in model '{self.canonical_name(context)}'."
506                )
507
508            range_ = self.partition_by["range"]
509            start = range_["start"]
510            end = range_["end"]
511            interval = range_["interval"]
512
513            return exp.func(
514                "RANGE_BUCKET",
515                field,
516                exp.func("GENERATE_ARRAY", start, end, interval, dialect="bigquery"),
517                dialect="bigquery",
518            )
519
520        return d.parse_one(
521            f"{data_type}_trunc({self.partition_by['field']}, {self.partition_by['granularity'].upper()})",
522            dialect="bigquery",
523        )
524
525    def _get_custom_materialization(self, context: DbtContext) -> t.Optional[MaterializationConfig]:
526        materializations = context.manifest.materializations()
527        name, target_adapter = self.materialized, context.target.dialect
528
529        adapter_specific_key = f"{name}_{target_adapter}"
530        default_key = f"{name}_default"
531        if adapter_specific_key in materializations:
532            return materializations[adapter_specific_key]
533        if default_key in materializations:
534            return materializations[default_key]
535        return None
536
537    @property
538    def sqlmesh_config_fields(self) -> t.Set[str]:
539        return super().sqlmesh_config_fields | {
540            "cron",
541            "interval_unit",
542            "allow_partials",
543            "physical_version",
544            "start",
545            # In microbatch models `begin` is the same as `start`
546            "begin",
547        }
548
549    def to_sqlmesh(
550        self,
551        context: DbtContext,
552        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
553        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
554    ) -> Model:
555        """Converts the dbt model into a SQLMesh model."""
556        model_dialect = self.dialect(context)
557        query = d.jinja_query(self.sql)
558        kind = self.model_kind(context)
559
560        optional_kwargs: t.Dict[str, t.Any] = {}
561        physical_properties: t.Dict[str, t.Any] = {}
562
563        if self.partition_by:
564            if isinstance(kind, (ViewKind, EmbeddedKind)):
565                logger.warning(
566                    "Ignoring partition_by config for model '%s'; partition_by is not supported for %s.",
567                    self.name,
568                    "views" if isinstance(kind, ViewKind) else "ephemeral models",
569                )
570            elif context.target.dialect == "snowflake":
571                logger.warning(
572                    "Ignoring partition_by config for model '%s' targeting %s. The partition_by config is not supported for Snowflake.",
573                    self.name,
574                    context.target.dialect,
575                )
576            else:
577                partitioned_by = []
578                if isinstance(self.partition_by, list):
579                    for p in self.partition_by:
580                        try:
581                            partitioned_by.append(d.parse_one(p, dialect=model_dialect))
582                        except SqlglotError as e:
583                            raise ConfigError(
584                                f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}"
585                            ) from e
586                elif isinstance(self.partition_by, dict):
587                    if context.target.dialect == "bigquery":
588                        partitioned_by.append(self._big_query_partition_by_expr(context))
589                    else:
590                        logger.warning(
591                            "Ignoring partition_by config for model '%s' targeting %s. The format of the config field is only supported for BigQuery.",
592                            self.name,
593                            context.target.dialect,
594                        )
595
596                if partitioned_by:
597                    optional_kwargs["partitioned_by"] = partitioned_by
598
599        if self.cluster_by:
600            if isinstance(kind, (ViewKind, EmbeddedKind)):
601                logger.warning(
602                    "Ignoring cluster_by config for model '%s'; cluster_by is not supported for %s.",
603                    self.name,
604                    "views" if isinstance(kind, ViewKind) else "ephemeral models",
605                )
606            else:
607                clustered_by = []
608                for c in self.cluster_by:
609                    try:
610                        cluster_expr = exp.maybe_parse(
611                            c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect
612                        )
613                        for expr in cluster_expr.expressions:
614                            clustered_by.append(
615                                expr.this if isinstance(expr, exp.Ordered) else expr
616                            )
617                    except SqlglotError as e:
618                        raise ConfigError(
619                            f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}"
620                        ) from e
621                optional_kwargs["clustered_by"] = clustered_by
622
623        model_kwargs = self.sqlmesh_model_kwargs(context)
624        if self.sql_header:
625            model_kwargs["pre_statements"].insert(0, d.jinja_statement(self.sql_header))
626
627        if context.target.dialect == "bigquery":
628            dbt_max_partition_blob = self._dbt_max_partition_blob()
629            if dbt_max_partition_blob:
630                model_kwargs["pre_statements"].append(d.jinja_statement(dbt_max_partition_blob))
631
632            if self.partition_expiration_days is not None:
633                physical_properties["partition_expiration_days"] = self.partition_expiration_days
634            if self.require_partition_filter is not None:
635                physical_properties["require_partition_filter"] = self.require_partition_filter
636
637            if physical_properties:
638                model_kwargs["physical_properties"] = physical_properties
639
640        if context.target.dialect == "snowflake":
641            if self.snowflake_warehouse is not None:
642                model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse}
643
644            if self.model_materialization == Materialization.DYNAMIC_TABLE:
645                if not self.snowflake_warehouse:
646                    raise ConfigError("`snowflake_warehouse` must be set for dynamic tables")
647                if not self.target_lag:
648                    raise ConfigError("`target_lag` must be set for dynamic tables")
649
650                model_kwargs["physical_properties"] = {
651                    "warehouse": self.snowflake_warehouse,
652                    "target_lag": self.target_lag,
653                }
654
655        if context.target.dialect == "clickhouse":
656            if self.model_materialization == Materialization.INCREMENTAL:
657                # `inserts_only` overrides incremental_strategy setting (if present)
658                # https://github.com/ClickHouse/dbt-clickhouse/blob/065f3a724fa09205446ecadac7a00d92b2d8c646/README.md?plain=1#L108
659                if self.inserts_only:
660                    self.incremental_strategy = "append"
661
662                if self.incremental_strategy == "delete+insert":
663                    get_console().log_warning(
664                        f"The '{self.incremental_strategy}' incremental strategy is not supported - SQLMesh will use the temp table/partition swap strategy."
665                    )
666
667                if self.incremental_predicates:
668                    get_console().log_warning(
669                        "SQLMesh does not support 'incremental_predicates' - they will not be applied."
670                    )
671
672            if self.query_settings:
673                get_console().log_warning(
674                    "SQLMesh does not support the 'query_settings' model configuration parameter. Specify the query settings directly in the model query."
675                )
676
677            if self.engine:
678                optional_kwargs["storage_format"] = self.engine
679
680            if self.order_by:
681                order_by = []
682                for o in self.order_by if isinstance(self.order_by, list) else [self.order_by]:
683                    try:
684                        order_by.append(d.parse_one(o, dialect=model_dialect))
685                    except SqlglotError as e:
686                        raise ConfigError(
687                            f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}"
688                        ) from e
689                physical_properties["order_by"] = order_by
690
691            if self.primary_key:
692                primary_key = []
693                for p in self.primary_key:
694                    try:
695                        primary_key.append(d.parse_one(p, dialect=model_dialect))
696                    except SqlglotError as e:
697                        raise ConfigError(
698                            f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}"
699                        ) from e
700                physical_properties["primary_key"] = primary_key
701
702            if self.sharding_key:
703                get_console().log_warning(
704                    "SQLMesh does not support the 'sharding_key' model configuration parameter or distributed materializations."
705                )
706
707            if self.ttl:
708                physical_properties["ttl"] = exp.var(
709                    self.ttl[0] if isinstance(self.ttl, list) else self.ttl
710                )
711
712            if self.settings:
713                physical_properties.update({k: exp.var(v) for k, v in self.settings.items()})
714
715            if physical_properties:
716                model_kwargs["physical_properties"] = physical_properties
717
718        kind = self.model_kind(context)
719
720        # A falsy grants config (None or {}) is considered as unmanaged per dbt semantics
721        if self.grants and kind.supports_grants:
722            model_kwargs["grants"] = self.grants
723
724        allow_partials = model_kwargs.pop("allow_partials", None)
725        if allow_partials is None:
726            # Set allow_partials to True for dbt models to preserve the original semantics.
727            allow_partials = True
728
729        # pop begin for all models so we don't pass it through for non-incremental materializations
730        # (happens if model config is microbatch but project config overrides)
731        begin = model_kwargs.pop("begin", None)
732        if kind.is_incremental:
733            if self.batch_size and isinstance(self.batch_size, str):
734                if "interval_unit" in model_kwargs:
735                    get_console().log_warning(
736                        f"Both 'interval_unit' and 'batch_size' are set for model '{self.canonical_name(context)}'. 'interval_unit' will be used."
737                    )
738                else:
739                    model_kwargs["interval_unit"] = self.batch_size
740                    self.batch_size = None
741            if begin:
742                if "start" in model_kwargs:
743                    get_console().log_warning(
744                        f"Both 'begin' and 'start' are set for model '{self.canonical_name(context)}'. 'start' will be used."
745                    )
746                else:
747                    model_kwargs["start"] = begin
748            # If user explicitly disables concurrent batches then we want to set depends on past to true which we
749            # will do by including the model in the depends_on
750            if self.concurrent_batches is not None and self.concurrent_batches is False:
751                depends_on = model_kwargs.get("depends_on", set())
752                depends_on.add(self.canonical_name(context))
753
754        model_kwargs["start"] = model_kwargs.get(
755            "start", context.sqlmesh_config.model_defaults.start
756        )
757
758        model = create_sql_model(
759            self.canonical_name(context),
760            query,
761            dialect=model_dialect,
762            kind=kind,
763            audit_definitions=audit_definitions,
764            # This ensures that we bypass query rendering that would otherwise be required to extract additional
765            # dependencies from the model's SQL.
766            # Note: any table dependencies that are not referenced using the `ref` macro will not be included.
767            extract_dependencies_from_query=False,
768            allow_partials=allow_partials,
769            virtual_environment_mode=virtual_environment_mode,
770            dbt_node_info=self.node_info,
771            **optional_kwargs,
772            **model_kwargs,
773        )
774        return model
775
776    def _dbt_max_partition_blob(self) -> t.Optional[str]:
777        """Returns a SQL blob which declares the _dbt_max_partition variable. Only applicable to BigQuery."""
778        if (
779            not isinstance(self.partition_by, dict)
780            or self.model_materialization != Materialization.INCREMENTAL
781        ):
782            return None
783
784        from sqlmesh.core.engine_adapter.bigquery import select_partitions_expr
785
786        data_type = self.partition_by["data_type"]
787        select_max_partition_expr = select_partitions_expr(
788            "{{ adapter.resolve_schema(this) }}",
789            "{{ adapter.resolve_identifier(this) }}",
790            data_type,
791            granularity=self.partition_by.get("granularity"),
792            catalog="{{ target.database }}",
793        )
794
795        data_type = data_type.upper()
796        default_value = "NULL"
797        if data_type in ("DATE", "DATETIME", "TIMESTAMP"):
798            default_value = f"CAST('1970-01-01' AS {data_type})"
799
800        return f"""
801{{% if is_incremental() %}}
802  DECLARE _dbt_max_partition {data_type} DEFAULT (
803    COALESCE(({select_max_partition_expr}), {default_value})
804  );
805{{% endif %}}
806"""
logger = <Logger sqlmesh.dbt.model (WARNING)>
INCREMENTAL_BY_TIME_RANGE_STRATEGIES = {'microbatch', 'delete+insert', 'insert_overwrite', 'incremental_by_time_range'}
INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = {'merge'}
def collection_to_str(collection: Iterable) -> str:
59def collection_to_str(collection: t.Iterable) -> str:
60    return ", ".join(f"'{item}'" for item in sorted(collection))
class ModelConfig(sqlmesh.dbt.basemodel.BaseModelConfig):
 63class ModelConfig(BaseModelConfig):
 64    """
 65    ModelConfig contains all config parameters available to DBT models
 66
 67    See https://docs.getdbt.com/reference/configs-and-properties for
 68    a more detailed description of each config parameter under the
 69    General propreties, General configs, and For models sections.
 70
 71    Args:
 72        sql: The model sql
 73        time_column: The name of the time column
 74        cron: A cron string specifying how often the model should be refreshed, leveraging the
 75            [croniter](https://github.com/kiorky/croniter) library.
 76        interval_unit: The duration of an interval for the model. By default, it is computed from the cron expression.
 77        batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None,
 78            then backfilling this model will do all of history in one job. If this is set, a model's backfill
 79            will be chunked such that each individual job will only contain jobs with max `batch_size` intervals.
 80        lookback: The number of previous incremental intervals in the lookback window.
 81        cluster_by: Field(s) to use for clustering in data warehouses that support clustering
 82        incremental_strategy: Strategy used to build the incremental model
 83        materialized: How the model will be materialized in the database
 84        sql_header: SQL statement to run before table/view creation. Currently implemented as a pre-hook.
 85        unique_key: List of columns that define row uniqueness for the model
 86        partition_by: List of partition columns or dictionary of bigquery partition by parameters ([dbt bigquery config](https://docs.getdbt.com/reference/resource-configs/bigquery-configs)).
 87    """
 88
 89    # sqlmesh fields
 90    sql: SqlStr = SqlStr("")
 91    time_column: t.Optional[t.Union[str, t.Dict[str, str]]] = None
 92    cron: t.Optional[str] = None
 93    interval_unit: t.Optional[str] = None
 94    batch_concurrency: t.Optional[int] = None
 95    forward_only: bool = True
 96    disable_restatement: t.Optional[bool] = None
 97    allow_partials: bool = True
 98    physical_version: t.Optional[str] = None
 99    auto_restatement_cron: t.Optional[str] = None
100    auto_restatement_intervals: t.Optional[int] = None
101    partition_by_time_column: t.Optional[bool] = None
102    on_destructive_change: t.Optional[OnDestructiveChange] = None
103    on_additive_change: t.Optional[OnAdditiveChange] = None
104
105    # DBT configuration fields
106    cluster_by: t.Optional[t.List[str]] = None
107    incremental_strategy: t.Optional[str] = None
108    materialized: str = Materialization.VIEW.value
109    sql_header: t.Optional[str] = None
110    unique_key: t.Optional[t.List[str]] = None
111    partition_by: t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]] = None
112    full_refresh: t.Optional[bool] = None
113    on_schema_change: str = "ignore"
114
115    # Snapshot (SCD Type 2) Fields
116    updated_at: t.Optional[str] = None
117    strategy: t.Optional[str] = None
118    invalidate_hard_deletes: bool = False
119    target_schema: t.Optional[str] = None
120    check_cols: t.Optional[t.Union[t.List[str], str]] = None
121
122    # Microbatch Fields
123    event_time: t.Optional[str] = None
124    begin: t.Optional[datetime.datetime] = None
125    concurrent_batches: t.Optional[bool] = None
126
127    # Shared SQLMesh and DBT configuration fields
128    batch_size: t.Optional[t.Union[int, str]] = None
129    lookback: t.Optional[int] = None
130
131    # redshift
132    bind: t.Optional[bool] = None
133
134    # bigquery
135    require_partition_filter: t.Optional[bool] = None
136    partition_expiration_days: t.Optional[int] = None
137
138    # snowflake
139    snowflake_warehouse: t.Optional[str] = None
140    # note: for Snowflake dynamic tables, in the DBT adapter we only support properties that DBT supports
141    # which are defined here: https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
142    target_lag: t.Optional[str] = None
143
144    # clickhouse
145    engine: t.Optional[str] = None
146    order_by: t.Optional[t.Union[t.List[str], str]] = None
147    primary_key: t.Optional[t.Union[t.List[str], str]] = None
148    sharding_key: t.Optional[t.Union[t.List[str], str]] = None
149    ttl: t.Optional[t.Union[t.List[str], str]] = None
150    settings: t.Optional[t.Dict[str, t.Any]] = None
151    query_settings: t.Optional[t.Dict[str, t.Any]] = None
152    inserts_only: t.Optional[bool] = None
153    incremental_predicates: t.Optional[t.List[str]] = None
154
155    _sql_validator = sql_str_validator
156    _on_destructive_change_validator = on_destructive_change_validator
157    _on_additive_change_validator = on_additive_change_validator
158
159    @field_validator(
160        "unique_key",
161        "cluster_by",
162        "tags",
163        mode="before",
164    )
165    @classmethod
166    def _validate_list(cls, v: t.Union[str, t.List[str]]) -> t.List[str]:
167        return ensure_list(v)
168
169    @field_validator("check_cols", mode="before")
170    @classmethod
171    def _validate_check_cols(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]:
172        if isinstance(v, str) and v.lower() == "all":
173            return "*"
174        return ensure_list(v)
175
176    @field_validator("updated_at", mode="before")
177    @classmethod
178    def _validate_updated_at(cls, v: t.Optional[str]) -> t.Optional[str]:
179        """
180        Extract column name if updated_at contains a cast.
181
182        SCDType2ByTimeKind and SCDType2ByColumnKind expect a column, and the casting is done later.
183        """
184        if v is None:
185            return None
186        parsed = d.parse_one(v)
187        if isinstance(parsed, exp.Cast) and isinstance(parsed.this, exp.Column):
188            return parsed.this.name
189
190        return v
191
192    @field_validator("sql", mode="before")
193    @classmethod
194    def _validate_sql(cls, v: t.Union[str, SqlStr]) -> SqlStr:
195        return SqlStr(v)
196
197    @field_validator("partition_by", mode="before")
198    @classmethod
199    def _validate_partition_by(
200        cls, v: t.Any
201    ) -> t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]]:
202        if v is None:
203            return None
204        if isinstance(v, str):
205            return [v]
206        if isinstance(v, list):
207            return v
208        if isinstance(v, dict):
209            if not v.get("field"):
210                raise ConfigError("'field' key required for partition_by.")
211            if "granularity" in v and v["granularity"].lower() not in (
212                "day",
213                "month",
214                "year",
215                "hour",
216            ):
217                granularity = v["granularity"]
218                raise ConfigError(f"Unexpected granularity '{granularity}' in partition_by '{v}'.")
219            if "data_type" in v and v["data_type"].lower() not in (
220                "timestamp",
221                "date",
222                "datetime",
223                "int64",
224            ):
225                data_type = v["data_type"]
226                raise ConfigError(f"Unexpected data_type '{data_type}' in partition_by '{v}'.")
227            return {"data_type": "date", "granularity": "day", **v}
228        raise ConfigError(f"Invalid format for partition_by '{v}'")
229
230    @field_validator("materialized", mode="before")
231    @classmethod
232    def _validate_materialized(cls, v: str) -> str:
233        unsupported_materializations = [
234            "materialized_view",  # multiple engines
235            "dictionary",  # clickhouse only
236            "distributed_table",  # clickhouse only
237            "distributed_incremental",  # clickhouse only
238        ]
239        if v in unsupported_materializations:
240            fallback = v.split("_")
241            msg = f"SQLMesh does not support the '{v}' model materialization."
242            if len(fallback) == 1:
243                # dictionary materialization
244                raise ConfigError(msg)
245            else:
246                get_console().log_warning(
247                    f"{msg} Falling back to the '{fallback[1]}' materialization."
248                )
249
250            return fallback[1]
251        return v
252
253    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
254        **BaseModelConfig._FIELD_UPDATE_STRATEGY,
255        **{
256            "sql": UpdateStrategy.IMMUTABLE,
257            "time_column": UpdateStrategy.IMMUTABLE,
258        },
259    }
260
261    @property
262    def model_materialization(self) -> Materialization:
263        return Materialization(self.materialized.lower())
264
265    @property
266    def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]:
267        return SnapshotStrategy(self.strategy.lower()) if self.strategy else None
268
269    @property
270    def table_schema(self) -> str:
271        return self.target_schema or super().table_schema
272
273    def model_kind(self, context: DbtContext) -> ModelKind:
274        """
275        Get the sqlmesh ModelKind
276        Returns:
277            The sqlmesh ModelKind
278        """
279        target = context.target
280        materialization = self.model_materialization
281
282        # args common to all sqlmesh incremental kinds, regardless of materialization
283        incremental_kind_kwargs: t.Dict[str, t.Any] = {}
284        on_schema_change = self.on_schema_change.lower()
285        if materialization == Materialization.SNAPSHOT:
286            # dbt snapshots default to `append_new_columns` behavior and can't be changed
287            on_schema_change = "append_new_columns"
288
289        if on_schema_change == "ignore":
290            on_destructive_change = OnDestructiveChange.IGNORE
291            on_additive_change = OnAdditiveChange.IGNORE
292        elif on_schema_change == "fail":
293            on_destructive_change = OnDestructiveChange.ERROR
294            on_additive_change = OnAdditiveChange.ERROR
295        elif on_schema_change == "append_new_columns":
296            on_destructive_change = OnDestructiveChange.IGNORE
297            on_additive_change = OnAdditiveChange.ALLOW
298        elif on_schema_change == "sync_all_columns":
299            on_destructive_change = OnDestructiveChange.ALLOW
300            on_additive_change = OnAdditiveChange.ALLOW
301        else:
302            raise ConfigError(
303                f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. "
304                "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'."
305            )
306
307        incremental_kind_kwargs["on_destructive_change"] = (
308            self._get_field_value("on_destructive_change") or on_destructive_change
309        )
310        incremental_kind_kwargs["on_additive_change"] = (
311            self._get_field_value("on_additive_change") or on_additive_change
312        )
313        auto_restatement_cron_value = self._get_field_value("auto_restatement_cron")
314        if auto_restatement_cron_value is not None:
315            incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value
316
317        if materialization == Materialization.TABLE:
318            return FullKind()
319        if materialization == Materialization.VIEW:
320            return ViewKind()
321        if materialization == Materialization.INCREMENTAL:
322            incremental_by_kind_kwargs: t.Dict[str, t.Any] = {"dialect": self.dialect(context)}
323            forward_only_value = self._get_field_value("forward_only")
324            if forward_only_value is not None:
325                incremental_kind_kwargs["forward_only"] = forward_only_value
326
327            is_incremental_by_time_range = self.time_column or (
328                self.incremental_strategy
329                and self.incremental_strategy in {"microbatch", "incremental_by_time_range"}
330            )
331            # Get shared incremental by kwargs
332            for field in ("batch_size", "batch_concurrency", "lookback"):
333                field_val = self._get_field_value(field)
334                if field_val is not None:
335                    # Check if `batch_size` is representing an interval unit and if so that will be handled at the model level
336                    if field == "batch_size" and isinstance(field_val, str):
337                        continue
338                    incremental_by_kind_kwargs[field] = field_val
339
340            disable_restatement = self.disable_restatement
341            if disable_restatement is None:
342                if is_incremental_by_time_range:
343                    disable_restatement = False
344                else:
345                    disable_restatement = (
346                        not self.full_refresh if self.full_refresh is not None else False
347                    )
348            incremental_by_kind_kwargs["disable_restatement"] = disable_restatement
349
350            if is_incremental_by_time_range:
351                strategy = self.incremental_strategy or target.default_incremental_strategy(
352                    IncrementalByTimeRangeKind
353                )
354
355                if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES:
356                    get_console().log_warning(
357                        f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
358                        f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
359                    )
360
361                if self.time_column and strategy != "incremental_by_time_range":
362                    get_console().log_warning(
363                        f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
364                        f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
365                    )
366
367                if strategy == "microbatch":
368                    if self.time_column:
369                        raise ConfigError(
370                            f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead."
371                        )
372                    time_column = self._get_field_value("event_time")
373                    if not time_column:
374                        raise ConfigError(
375                            f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
376                        )
377                    # dbt microbatch always processes batches in a size of 1
378                    incremental_by_kind_kwargs["batch_size"] = 1
379                else:
380                    if not self.time_column:
381                        raise ConfigError(
382                            f"{self.canonical_name(context)}: 'time_column' is required for incremental by time range models not defined using microbatch."
383                        )
384                    time_column = self.time_column
385
386                incremental_by_time_range_kwargs = {
387                    "time_column": time_column,
388                }
389                if self.auto_restatement_intervals:
390                    incremental_by_time_range_kwargs["auto_restatement_intervals"] = (
391                        self.auto_restatement_intervals
392                    )
393                if self.partition_by_time_column is not None:
394                    incremental_by_time_range_kwargs["partition_by_time_column"] = (
395                        self.partition_by_time_column
396                    )
397
398                return IncrementalByTimeRangeKind(
399                    **incremental_kind_kwargs,
400                    **incremental_by_kind_kwargs,
401                    **incremental_by_time_range_kwargs,
402                )
403
404            if self.unique_key:
405                strategy = self.incremental_strategy or target.default_incremental_strategy(
406                    IncrementalByUniqueKeyKind
407                )
408                if (
409                    self.incremental_strategy
410                    and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES
411                ):
412                    get_console().log_warning(
413                        f"Unique key is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
414                        f"Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}. Falling back to 'merge' strategy."
415                    )
416
417                merge_filter = None
418                if self.incremental_predicates:
419                    dialect = self.dialect(context)
420                    merge_filter = exp.and_(
421                        *[
422                            d.parse_one(predicate, dialect=dialect)
423                            for predicate in self.incremental_predicates
424                        ],
425                        dialect=dialect,
426                    ).transform(d.replace_merge_table_aliases)
427
428                return IncrementalByUniqueKeyKind(
429                    unique_key=self.unique_key,
430                    merge_filter=merge_filter,
431                    **incremental_kind_kwargs,
432                    **incremental_by_kind_kwargs,
433                )
434
435            strategy = self.incremental_strategy or target.default_incremental_strategy(
436                IncrementalUnmanagedKind
437            )
438            return IncrementalUnmanagedKind(
439                insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES,
440                disable_restatement=incremental_by_kind_kwargs["disable_restatement"],
441                **incremental_kind_kwargs,
442            )
443        if materialization == Materialization.EPHEMERAL:
444            return EmbeddedKind()
445        if materialization == Materialization.SNAPSHOT:
446            if not self.snapshot_strategy:
447                raise ConfigError(
448                    f"{self.canonical_name(context)}: SQLMesh snapshot strategy is required for snapshot materialization."
449                )
450            shared_kwargs = {
451                "dialect": self.dialect(context),
452                "unique_key": self.unique_key,
453                "invalidate_hard_deletes": self.invalidate_hard_deletes,
454                "valid_from_name": "dbt_valid_from",
455                "valid_to_name": "dbt_valid_to",
456                "time_data_type": (
457                    exp.DataType.build("TIMESTAMPTZ")
458                    if target.dialect == "bigquery"
459                    else exp.DataType.build("TIMESTAMP")
460                ),
461                **incremental_kind_kwargs,
462            }
463            if self.snapshot_strategy.is_check:
464                return SCDType2ByColumnKind(
465                    columns=self.check_cols, execution_time_as_valid_from=True, **shared_kwargs
466                )
467            return SCDType2ByTimeKind(
468                updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs
469            )
470
471        if materialization == Materialization.DYNAMIC_TABLE:
472            return ManagedKind()
473
474        if materialization == Materialization.CUSTOM:
475            if custom_materialization := self._get_custom_materialization(context):
476                return DbtCustomKind(
477                    materialization=self.materialized,
478                    adapter=custom_materialization.adapter,
479                    dialect=self.dialect(context),
480                    definition=custom_materialization.definition,
481                )
482
483            raise ConfigError(
484                f"Unknown materialization '{self.materialized}'. Custom materializations must be defined in your dbt project."
485            )
486
487        raise ConfigError(f"{materialization.value} materialization not supported.")
488
489    def _big_query_partition_by_expr(self, context: DbtContext) -> exp.Expr:
490        assert isinstance(self.partition_by, dict)
491        data_type = self.partition_by["data_type"].lower()
492        raw_field = self.partition_by["field"]
493        try:
494            field = d.parse_one(raw_field, dialect="bigquery")
495        except SqlglotError as e:
496            raise ConfigError(
497                f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{raw_field}' in '{self.path}': {e}"
498            ) from e
499
500        if data_type == "date" and self.partition_by["granularity"].lower() == "day":
501            return field
502
503        if data_type == "int64":
504            if "range" not in self.partition_by:
505                raise ConfigError(
506                    f"Range is required for int64 partitioning in model '{self.canonical_name(context)}'."
507                )
508
509            range_ = self.partition_by["range"]
510            start = range_["start"]
511            end = range_["end"]
512            interval = range_["interval"]
513
514            return exp.func(
515                "RANGE_BUCKET",
516                field,
517                exp.func("GENERATE_ARRAY", start, end, interval, dialect="bigquery"),
518                dialect="bigquery",
519            )
520
521        return d.parse_one(
522            f"{data_type}_trunc({self.partition_by['field']}, {self.partition_by['granularity'].upper()})",
523            dialect="bigquery",
524        )
525
526    def _get_custom_materialization(self, context: DbtContext) -> t.Optional[MaterializationConfig]:
527        materializations = context.manifest.materializations()
528        name, target_adapter = self.materialized, context.target.dialect
529
530        adapter_specific_key = f"{name}_{target_adapter}"
531        default_key = f"{name}_default"
532        if adapter_specific_key in materializations:
533            return materializations[adapter_specific_key]
534        if default_key in materializations:
535            return materializations[default_key]
536        return None
537
538    @property
539    def sqlmesh_config_fields(self) -> t.Set[str]:
540        return super().sqlmesh_config_fields | {
541            "cron",
542            "interval_unit",
543            "allow_partials",
544            "physical_version",
545            "start",
546            # In microbatch models `begin` is the same as `start`
547            "begin",
548        }
549
550    def to_sqlmesh(
551        self,
552        context: DbtContext,
553        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
554        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
555    ) -> Model:
556        """Converts the dbt model into a SQLMesh model."""
557        model_dialect = self.dialect(context)
558        query = d.jinja_query(self.sql)
559        kind = self.model_kind(context)
560
561        optional_kwargs: t.Dict[str, t.Any] = {}
562        physical_properties: t.Dict[str, t.Any] = {}
563
564        if self.partition_by:
565            if isinstance(kind, (ViewKind, EmbeddedKind)):
566                logger.warning(
567                    "Ignoring partition_by config for model '%s'; partition_by is not supported for %s.",
568                    self.name,
569                    "views" if isinstance(kind, ViewKind) else "ephemeral models",
570                )
571            elif context.target.dialect == "snowflake":
572                logger.warning(
573                    "Ignoring partition_by config for model '%s' targeting %s. The partition_by config is not supported for Snowflake.",
574                    self.name,
575                    context.target.dialect,
576                )
577            else:
578                partitioned_by = []
579                if isinstance(self.partition_by, list):
580                    for p in self.partition_by:
581                        try:
582                            partitioned_by.append(d.parse_one(p, dialect=model_dialect))
583                        except SqlglotError as e:
584                            raise ConfigError(
585                                f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}"
586                            ) from e
587                elif isinstance(self.partition_by, dict):
588                    if context.target.dialect == "bigquery":
589                        partitioned_by.append(self._big_query_partition_by_expr(context))
590                    else:
591                        logger.warning(
592                            "Ignoring partition_by config for model '%s' targeting %s. The format of the config field is only supported for BigQuery.",
593                            self.name,
594                            context.target.dialect,
595                        )
596
597                if partitioned_by:
598                    optional_kwargs["partitioned_by"] = partitioned_by
599
600        if self.cluster_by:
601            if isinstance(kind, (ViewKind, EmbeddedKind)):
602                logger.warning(
603                    "Ignoring cluster_by config for model '%s'; cluster_by is not supported for %s.",
604                    self.name,
605                    "views" if isinstance(kind, ViewKind) else "ephemeral models",
606                )
607            else:
608                clustered_by = []
609                for c in self.cluster_by:
610                    try:
611                        cluster_expr = exp.maybe_parse(
612                            c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect
613                        )
614                        for expr in cluster_expr.expressions:
615                            clustered_by.append(
616                                expr.this if isinstance(expr, exp.Ordered) else expr
617                            )
618                    except SqlglotError as e:
619                        raise ConfigError(
620                            f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}"
621                        ) from e
622                optional_kwargs["clustered_by"] = clustered_by
623
624        model_kwargs = self.sqlmesh_model_kwargs(context)
625        if self.sql_header:
626            model_kwargs["pre_statements"].insert(0, d.jinja_statement(self.sql_header))
627
628        if context.target.dialect == "bigquery":
629            dbt_max_partition_blob = self._dbt_max_partition_blob()
630            if dbt_max_partition_blob:
631                model_kwargs["pre_statements"].append(d.jinja_statement(dbt_max_partition_blob))
632
633            if self.partition_expiration_days is not None:
634                physical_properties["partition_expiration_days"] = self.partition_expiration_days
635            if self.require_partition_filter is not None:
636                physical_properties["require_partition_filter"] = self.require_partition_filter
637
638            if physical_properties:
639                model_kwargs["physical_properties"] = physical_properties
640
641        if context.target.dialect == "snowflake":
642            if self.snowflake_warehouse is not None:
643                model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse}
644
645            if self.model_materialization == Materialization.DYNAMIC_TABLE:
646                if not self.snowflake_warehouse:
647                    raise ConfigError("`snowflake_warehouse` must be set for dynamic tables")
648                if not self.target_lag:
649                    raise ConfigError("`target_lag` must be set for dynamic tables")
650
651                model_kwargs["physical_properties"] = {
652                    "warehouse": self.snowflake_warehouse,
653                    "target_lag": self.target_lag,
654                }
655
656        if context.target.dialect == "clickhouse":
657            if self.model_materialization == Materialization.INCREMENTAL:
658                # `inserts_only` overrides incremental_strategy setting (if present)
659                # https://github.com/ClickHouse/dbt-clickhouse/blob/065f3a724fa09205446ecadac7a00d92b2d8c646/README.md?plain=1#L108
660                if self.inserts_only:
661                    self.incremental_strategy = "append"
662
663                if self.incremental_strategy == "delete+insert":
664                    get_console().log_warning(
665                        f"The '{self.incremental_strategy}' incremental strategy is not supported - SQLMesh will use the temp table/partition swap strategy."
666                    )
667
668                if self.incremental_predicates:
669                    get_console().log_warning(
670                        "SQLMesh does not support 'incremental_predicates' - they will not be applied."
671                    )
672
673            if self.query_settings:
674                get_console().log_warning(
675                    "SQLMesh does not support the 'query_settings' model configuration parameter. Specify the query settings directly in the model query."
676                )
677
678            if self.engine:
679                optional_kwargs["storage_format"] = self.engine
680
681            if self.order_by:
682                order_by = []
683                for o in self.order_by if isinstance(self.order_by, list) else [self.order_by]:
684                    try:
685                        order_by.append(d.parse_one(o, dialect=model_dialect))
686                    except SqlglotError as e:
687                        raise ConfigError(
688                            f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}"
689                        ) from e
690                physical_properties["order_by"] = order_by
691
692            if self.primary_key:
693                primary_key = []
694                for p in self.primary_key:
695                    try:
696                        primary_key.append(d.parse_one(p, dialect=model_dialect))
697                    except SqlglotError as e:
698                        raise ConfigError(
699                            f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}"
700                        ) from e
701                physical_properties["primary_key"] = primary_key
702
703            if self.sharding_key:
704                get_console().log_warning(
705                    "SQLMesh does not support the 'sharding_key' model configuration parameter or distributed materializations."
706                )
707
708            if self.ttl:
709                physical_properties["ttl"] = exp.var(
710                    self.ttl[0] if isinstance(self.ttl, list) else self.ttl
711                )
712
713            if self.settings:
714                physical_properties.update({k: exp.var(v) for k, v in self.settings.items()})
715
716            if physical_properties:
717                model_kwargs["physical_properties"] = physical_properties
718
719        kind = self.model_kind(context)
720
721        # A falsy grants config (None or {}) is considered as unmanaged per dbt semantics
722        if self.grants and kind.supports_grants:
723            model_kwargs["grants"] = self.grants
724
725        allow_partials = model_kwargs.pop("allow_partials", None)
726        if allow_partials is None:
727            # Set allow_partials to True for dbt models to preserve the original semantics.
728            allow_partials = True
729
730        # pop begin for all models so we don't pass it through for non-incremental materializations
731        # (happens if model config is microbatch but project config overrides)
732        begin = model_kwargs.pop("begin", None)
733        if kind.is_incremental:
734            if self.batch_size and isinstance(self.batch_size, str):
735                if "interval_unit" in model_kwargs:
736                    get_console().log_warning(
737                        f"Both 'interval_unit' and 'batch_size' are set for model '{self.canonical_name(context)}'. 'interval_unit' will be used."
738                    )
739                else:
740                    model_kwargs["interval_unit"] = self.batch_size
741                    self.batch_size = None
742            if begin:
743                if "start" in model_kwargs:
744                    get_console().log_warning(
745                        f"Both 'begin' and 'start' are set for model '{self.canonical_name(context)}'. 'start' will be used."
746                    )
747                else:
748                    model_kwargs["start"] = begin
749            # If user explicitly disables concurrent batches then we want to set depends on past to true which we
750            # will do by including the model in the depends_on
751            if self.concurrent_batches is not None and self.concurrent_batches is False:
752                depends_on = model_kwargs.get("depends_on", set())
753                depends_on.add(self.canonical_name(context))
754
755        model_kwargs["start"] = model_kwargs.get(
756            "start", context.sqlmesh_config.model_defaults.start
757        )
758
759        model = create_sql_model(
760            self.canonical_name(context),
761            query,
762            dialect=model_dialect,
763            kind=kind,
764            audit_definitions=audit_definitions,
765            # This ensures that we bypass query rendering that would otherwise be required to extract additional
766            # dependencies from the model's SQL.
767            # Note: any table dependencies that are not referenced using the `ref` macro will not be included.
768            extract_dependencies_from_query=False,
769            allow_partials=allow_partials,
770            virtual_environment_mode=virtual_environment_mode,
771            dbt_node_info=self.node_info,
772            **optional_kwargs,
773            **model_kwargs,
774        )
775        return model
776
777    def _dbt_max_partition_blob(self) -> t.Optional[str]:
778        """Returns a SQL blob which declares the _dbt_max_partition variable. Only applicable to BigQuery."""
779        if (
780            not isinstance(self.partition_by, dict)
781            or self.model_materialization != Materialization.INCREMENTAL
782        ):
783            return None
784
785        from sqlmesh.core.engine_adapter.bigquery import select_partitions_expr
786
787        data_type = self.partition_by["data_type"]
788        select_max_partition_expr = select_partitions_expr(
789            "{{ adapter.resolve_schema(this) }}",
790            "{{ adapter.resolve_identifier(this) }}",
791            data_type,
792            granularity=self.partition_by.get("granularity"),
793            catalog="{{ target.database }}",
794        )
795
796        data_type = data_type.upper()
797        default_value = "NULL"
798        if data_type in ("DATE", "DATETIME", "TIMESTAMP"):
799            default_value = f"CAST('1970-01-01' AS {data_type})"
800
801        return f"""
802{{% if is_incremental() %}}
803  DECLARE _dbt_max_partition {data_type} DEFAULT (
804    COALESCE(({select_max_partition_expr}), {default_value})
805  );
806{{% endif %}}
807"""

ModelConfig contains all config parameters available to DBT models

See https://docs.getdbt.com/reference/configs-and-properties for a more detailed description of each config parameter under the General propreties, General configs, and For models sections.

Arguments:
  • sql: The model sql
  • time_column: The name of the time column
  • cron: A cron string specifying how often the model should be refreshed, leveraging the croniter library.
  • interval_unit: The duration of an interval for the model. By default, it is computed from the cron expression.
  • batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None, then backfilling this model will do all of history in one job. If this is set, a model's backfill will be chunked such that each individual job will only contain jobs with max batch_size intervals.
  • lookback: The number of previous incremental intervals in the lookback window.
  • cluster_by: Field(s) to use for clustering in data warehouses that support clustering
  • incremental_strategy: Strategy used to build the incremental model
  • materialized: How the model will be materialized in the database
  • sql_header: SQL statement to run before table/view creation. Currently implemented as a pre-hook.
  • unique_key: List of columns that define row uniqueness for the model
  • partition_by: List of partition columns or dictionary of bigquery partition by parameters (dbt bigquery config).
time_column: Union[str, Dict[str, str], NoneType]
cron: Optional[str]
interval_unit: Optional[str]
batch_concurrency: Optional[int]
forward_only: bool
disable_restatement: Optional[bool]
allow_partials: bool
physical_version: Optional[str]
auto_restatement_cron: Optional[str]
auto_restatement_intervals: Optional[int]
partition_by_time_column: Optional[bool]
on_destructive_change: Optional[sqlmesh.core.model.kind.OnDestructiveChange]
on_additive_change: Optional[sqlmesh.core.model.kind.OnAdditiveChange]
cluster_by: Optional[List[str]]
incremental_strategy: Optional[str]
materialized: str
sql_header: Optional[str]
unique_key: Optional[List[str]]
partition_by: Union[List[str], Dict[str, Any], NoneType]
full_refresh: Optional[bool]
on_schema_change: str
updated_at: Optional[str]
strategy: Optional[str]
invalidate_hard_deletes: bool
target_schema: Optional[str]
check_cols: Union[List[str], str, NoneType]
event_time: Optional[str]
begin: Optional[datetime.datetime]
concurrent_batches: Optional[bool]
batch_size: Union[int, str, NoneType]
lookback: Optional[int]
bind: Optional[bool]
require_partition_filter: Optional[bool]
partition_expiration_days: Optional[int]
snowflake_warehouse: Optional[str]
target_lag: Optional[str]
engine: Optional[str]
order_by: Union[List[str], str, NoneType]
primary_key: Union[List[str], str, NoneType]
sharding_key: Union[List[str], str, NoneType]
ttl: Union[List[str], str, NoneType]
settings: Optional[Dict[str, Any]]
query_settings: Optional[Dict[str, Any]]
inserts_only: Optional[bool]
incremental_predicates: Optional[List[str]]
model_materialization: sqlmesh.dbt.basemodel.Materialization
261    @property
262    def model_materialization(self) -> Materialization:
263        return Materialization(self.materialized.lower())
snapshot_strategy: Optional[sqlmesh.dbt.basemodel.SnapshotStrategy]
265    @property
266    def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]:
267        return SnapshotStrategy(self.strategy.lower()) if self.strategy else None
table_schema: str
269    @property
270    def table_schema(self) -> str:
271        return self.target_schema or super().table_schema

Get the full schema name

273    def model_kind(self, context: DbtContext) -> ModelKind:
274        """
275        Get the sqlmesh ModelKind
276        Returns:
277            The sqlmesh ModelKind
278        """
279        target = context.target
280        materialization = self.model_materialization
281
282        # args common to all sqlmesh incremental kinds, regardless of materialization
283        incremental_kind_kwargs: t.Dict[str, t.Any] = {}
284        on_schema_change = self.on_schema_change.lower()
285        if materialization == Materialization.SNAPSHOT:
286            # dbt snapshots default to `append_new_columns` behavior and can't be changed
287            on_schema_change = "append_new_columns"
288
289        if on_schema_change == "ignore":
290            on_destructive_change = OnDestructiveChange.IGNORE
291            on_additive_change = OnAdditiveChange.IGNORE
292        elif on_schema_change == "fail":
293            on_destructive_change = OnDestructiveChange.ERROR
294            on_additive_change = OnAdditiveChange.ERROR
295        elif on_schema_change == "append_new_columns":
296            on_destructive_change = OnDestructiveChange.IGNORE
297            on_additive_change = OnAdditiveChange.ALLOW
298        elif on_schema_change == "sync_all_columns":
299            on_destructive_change = OnDestructiveChange.ALLOW
300            on_additive_change = OnAdditiveChange.ALLOW
301        else:
302            raise ConfigError(
303                f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. "
304                "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'."
305            )
306
307        incremental_kind_kwargs["on_destructive_change"] = (
308            self._get_field_value("on_destructive_change") or on_destructive_change
309        )
310        incremental_kind_kwargs["on_additive_change"] = (
311            self._get_field_value("on_additive_change") or on_additive_change
312        )
313        auto_restatement_cron_value = self._get_field_value("auto_restatement_cron")
314        if auto_restatement_cron_value is not None:
315            incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value
316
317        if materialization == Materialization.TABLE:
318            return FullKind()
319        if materialization == Materialization.VIEW:
320            return ViewKind()
321        if materialization == Materialization.INCREMENTAL:
322            incremental_by_kind_kwargs: t.Dict[str, t.Any] = {"dialect": self.dialect(context)}
323            forward_only_value = self._get_field_value("forward_only")
324            if forward_only_value is not None:
325                incremental_kind_kwargs["forward_only"] = forward_only_value
326
327            is_incremental_by_time_range = self.time_column or (
328                self.incremental_strategy
329                and self.incremental_strategy in {"microbatch", "incremental_by_time_range"}
330            )
331            # Get shared incremental by kwargs
332            for field in ("batch_size", "batch_concurrency", "lookback"):
333                field_val = self._get_field_value(field)
334                if field_val is not None:
335                    # Check if `batch_size` is representing an interval unit and if so that will be handled at the model level
336                    if field == "batch_size" and isinstance(field_val, str):
337                        continue
338                    incremental_by_kind_kwargs[field] = field_val
339
340            disable_restatement = self.disable_restatement
341            if disable_restatement is None:
342                if is_incremental_by_time_range:
343                    disable_restatement = False
344                else:
345                    disable_restatement = (
346                        not self.full_refresh if self.full_refresh is not None else False
347                    )
348            incremental_by_kind_kwargs["disable_restatement"] = disable_restatement
349
350            if is_incremental_by_time_range:
351                strategy = self.incremental_strategy or target.default_incremental_strategy(
352                    IncrementalByTimeRangeKind
353                )
354
355                if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES:
356                    get_console().log_warning(
357                        f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
358                        f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
359                    )
360
361                if self.time_column and strategy != "incremental_by_time_range":
362                    get_console().log_warning(
363                        f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
364                        f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
365                    )
366
367                if strategy == "microbatch":
368                    if self.time_column:
369                        raise ConfigError(
370                            f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead."
371                        )
372                    time_column = self._get_field_value("event_time")
373                    if not time_column:
374                        raise ConfigError(
375                            f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
376                        )
377                    # dbt microbatch always processes batches in a size of 1
378                    incremental_by_kind_kwargs["batch_size"] = 1
379                else:
380                    if not self.time_column:
381                        raise ConfigError(
382                            f"{self.canonical_name(context)}: 'time_column' is required for incremental by time range models not defined using microbatch."
383                        )
384                    time_column = self.time_column
385
386                incremental_by_time_range_kwargs = {
387                    "time_column": time_column,
388                }
389                if self.auto_restatement_intervals:
390                    incremental_by_time_range_kwargs["auto_restatement_intervals"] = (
391                        self.auto_restatement_intervals
392                    )
393                if self.partition_by_time_column is not None:
394                    incremental_by_time_range_kwargs["partition_by_time_column"] = (
395                        self.partition_by_time_column
396                    )
397
398                return IncrementalByTimeRangeKind(
399                    **incremental_kind_kwargs,
400                    **incremental_by_kind_kwargs,
401                    **incremental_by_time_range_kwargs,
402                )
403
404            if self.unique_key:
405                strategy = self.incremental_strategy or target.default_incremental_strategy(
406                    IncrementalByUniqueKeyKind
407                )
408                if (
409                    self.incremental_strategy
410                    and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES
411                ):
412                    get_console().log_warning(
413                        f"Unique key is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
414                        f"Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}. Falling back to 'merge' strategy."
415                    )
416
417                merge_filter = None
418                if self.incremental_predicates:
419                    dialect = self.dialect(context)
420                    merge_filter = exp.and_(
421                        *[
422                            d.parse_one(predicate, dialect=dialect)
423                            for predicate in self.incremental_predicates
424                        ],
425                        dialect=dialect,
426                    ).transform(d.replace_merge_table_aliases)
427
428                return IncrementalByUniqueKeyKind(
429                    unique_key=self.unique_key,
430                    merge_filter=merge_filter,
431                    **incremental_kind_kwargs,
432                    **incremental_by_kind_kwargs,
433                )
434
435            strategy = self.incremental_strategy or target.default_incremental_strategy(
436                IncrementalUnmanagedKind
437            )
438            return IncrementalUnmanagedKind(
439                insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES,
440                disable_restatement=incremental_by_kind_kwargs["disable_restatement"],
441                **incremental_kind_kwargs,
442            )
443        if materialization == Materialization.EPHEMERAL:
444            return EmbeddedKind()
445        if materialization == Materialization.SNAPSHOT:
446            if not self.snapshot_strategy:
447                raise ConfigError(
448                    f"{self.canonical_name(context)}: SQLMesh snapshot strategy is required for snapshot materialization."
449                )
450            shared_kwargs = {
451                "dialect": self.dialect(context),
452                "unique_key": self.unique_key,
453                "invalidate_hard_deletes": self.invalidate_hard_deletes,
454                "valid_from_name": "dbt_valid_from",
455                "valid_to_name": "dbt_valid_to",
456                "time_data_type": (
457                    exp.DataType.build("TIMESTAMPTZ")
458                    if target.dialect == "bigquery"
459                    else exp.DataType.build("TIMESTAMP")
460                ),
461                **incremental_kind_kwargs,
462            }
463            if self.snapshot_strategy.is_check:
464                return SCDType2ByColumnKind(
465                    columns=self.check_cols, execution_time_as_valid_from=True, **shared_kwargs
466                )
467            return SCDType2ByTimeKind(
468                updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs
469            )
470
471        if materialization == Materialization.DYNAMIC_TABLE:
472            return ManagedKind()
473
474        if materialization == Materialization.CUSTOM:
475            if custom_materialization := self._get_custom_materialization(context):
476                return DbtCustomKind(
477                    materialization=self.materialized,
478                    adapter=custom_materialization.adapter,
479                    dialect=self.dialect(context),
480                    definition=custom_materialization.definition,
481                )
482
483            raise ConfigError(
484                f"Unknown materialization '{self.materialized}'. Custom materializations must be defined in your dbt project."
485            )
486
487        raise ConfigError(f"{materialization.value} materialization not supported.")

Get the sqlmesh ModelKind

Returns:

The sqlmesh ModelKind

sqlmesh_config_fields: Set[str]
538    @property
539    def sqlmesh_config_fields(self) -> t.Set[str]:
540        return super().sqlmesh_config_fields | {
541            "cron",
542            "interval_unit",
543            "allow_partials",
544            "physical_version",
545            "start",
546            # In microbatch models `begin` is the same as `start`
547            "begin",
548        }

SQLMesh config fields that can be set in dbt projects.

Returns:

A set of SQLMesh config fields that can be set in dbt projects.

550    def to_sqlmesh(
551        self,
552        context: DbtContext,
553        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
554        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
555    ) -> Model:
556        """Converts the dbt model into a SQLMesh model."""
557        model_dialect = self.dialect(context)
558        query = d.jinja_query(self.sql)
559        kind = self.model_kind(context)
560
561        optional_kwargs: t.Dict[str, t.Any] = {}
562        physical_properties: t.Dict[str, t.Any] = {}
563
564        if self.partition_by:
565            if isinstance(kind, (ViewKind, EmbeddedKind)):
566                logger.warning(
567                    "Ignoring partition_by config for model '%s'; partition_by is not supported for %s.",
568                    self.name,
569                    "views" if isinstance(kind, ViewKind) else "ephemeral models",
570                )
571            elif context.target.dialect == "snowflake":
572                logger.warning(
573                    "Ignoring partition_by config for model '%s' targeting %s. The partition_by config is not supported for Snowflake.",
574                    self.name,
575                    context.target.dialect,
576                )
577            else:
578                partitioned_by = []
579                if isinstance(self.partition_by, list):
580                    for p in self.partition_by:
581                        try:
582                            partitioned_by.append(d.parse_one(p, dialect=model_dialect))
583                        except SqlglotError as e:
584                            raise ConfigError(
585                                f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}"
586                            ) from e
587                elif isinstance(self.partition_by, dict):
588                    if context.target.dialect == "bigquery":
589                        partitioned_by.append(self._big_query_partition_by_expr(context))
590                    else:
591                        logger.warning(
592                            "Ignoring partition_by config for model '%s' targeting %s. The format of the config field is only supported for BigQuery.",
593                            self.name,
594                            context.target.dialect,
595                        )
596
597                if partitioned_by:
598                    optional_kwargs["partitioned_by"] = partitioned_by
599
600        if self.cluster_by:
601            if isinstance(kind, (ViewKind, EmbeddedKind)):
602                logger.warning(
603                    "Ignoring cluster_by config for model '%s'; cluster_by is not supported for %s.",
604                    self.name,
605                    "views" if isinstance(kind, ViewKind) else "ephemeral models",
606                )
607            else:
608                clustered_by = []
609                for c in self.cluster_by:
610                    try:
611                        cluster_expr = exp.maybe_parse(
612                            c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect
613                        )
614                        for expr in cluster_expr.expressions:
615                            clustered_by.append(
616                                expr.this if isinstance(expr, exp.Ordered) else expr
617                            )
618                    except SqlglotError as e:
619                        raise ConfigError(
620                            f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}"
621                        ) from e
622                optional_kwargs["clustered_by"] = clustered_by
623
624        model_kwargs = self.sqlmesh_model_kwargs(context)
625        if self.sql_header:
626            model_kwargs["pre_statements"].insert(0, d.jinja_statement(self.sql_header))
627
628        if context.target.dialect == "bigquery":
629            dbt_max_partition_blob = self._dbt_max_partition_blob()
630            if dbt_max_partition_blob:
631                model_kwargs["pre_statements"].append(d.jinja_statement(dbt_max_partition_blob))
632
633            if self.partition_expiration_days is not None:
634                physical_properties["partition_expiration_days"] = self.partition_expiration_days
635            if self.require_partition_filter is not None:
636                physical_properties["require_partition_filter"] = self.require_partition_filter
637
638            if physical_properties:
639                model_kwargs["physical_properties"] = physical_properties
640
641        if context.target.dialect == "snowflake":
642            if self.snowflake_warehouse is not None:
643                model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse}
644
645            if self.model_materialization == Materialization.DYNAMIC_TABLE:
646                if not self.snowflake_warehouse:
647                    raise ConfigError("`snowflake_warehouse` must be set for dynamic tables")
648                if not self.target_lag:
649                    raise ConfigError("`target_lag` must be set for dynamic tables")
650
651                model_kwargs["physical_properties"] = {
652                    "warehouse": self.snowflake_warehouse,
653                    "target_lag": self.target_lag,
654                }
655
656        if context.target.dialect == "clickhouse":
657            if self.model_materialization == Materialization.INCREMENTAL:
658                # `inserts_only` overrides incremental_strategy setting (if present)
659                # https://github.com/ClickHouse/dbt-clickhouse/blob/065f3a724fa09205446ecadac7a00d92b2d8c646/README.md?plain=1#L108
660                if self.inserts_only:
661                    self.incremental_strategy = "append"
662
663                if self.incremental_strategy == "delete+insert":
664                    get_console().log_warning(
665                        f"The '{self.incremental_strategy}' incremental strategy is not supported - SQLMesh will use the temp table/partition swap strategy."
666                    )
667
668                if self.incremental_predicates:
669                    get_console().log_warning(
670                        "SQLMesh does not support 'incremental_predicates' - they will not be applied."
671                    )
672
673            if self.query_settings:
674                get_console().log_warning(
675                    "SQLMesh does not support the 'query_settings' model configuration parameter. Specify the query settings directly in the model query."
676                )
677
678            if self.engine:
679                optional_kwargs["storage_format"] = self.engine
680
681            if self.order_by:
682                order_by = []
683                for o in self.order_by if isinstance(self.order_by, list) else [self.order_by]:
684                    try:
685                        order_by.append(d.parse_one(o, dialect=model_dialect))
686                    except SqlglotError as e:
687                        raise ConfigError(
688                            f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}"
689                        ) from e
690                physical_properties["order_by"] = order_by
691
692            if self.primary_key:
693                primary_key = []
694                for p in self.primary_key:
695                    try:
696                        primary_key.append(d.parse_one(p, dialect=model_dialect))
697                    except SqlglotError as e:
698                        raise ConfigError(
699                            f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}"
700                        ) from e
701                physical_properties["primary_key"] = primary_key
702
703            if self.sharding_key:
704                get_console().log_warning(
705                    "SQLMesh does not support the 'sharding_key' model configuration parameter or distributed materializations."
706                )
707
708            if self.ttl:
709                physical_properties["ttl"] = exp.var(
710                    self.ttl[0] if isinstance(self.ttl, list) else self.ttl
711                )
712
713            if self.settings:
714                physical_properties.update({k: exp.var(v) for k, v in self.settings.items()})
715
716            if physical_properties:
717                model_kwargs["physical_properties"] = physical_properties
718
719        kind = self.model_kind(context)
720
721        # A falsy grants config (None or {}) is considered as unmanaged per dbt semantics
722        if self.grants and kind.supports_grants:
723            model_kwargs["grants"] = self.grants
724
725        allow_partials = model_kwargs.pop("allow_partials", None)
726        if allow_partials is None:
727            # Set allow_partials to True for dbt models to preserve the original semantics.
728            allow_partials = True
729
730        # pop begin for all models so we don't pass it through for non-incremental materializations
731        # (happens if model config is microbatch but project config overrides)
732        begin = model_kwargs.pop("begin", None)
733        if kind.is_incremental:
734            if self.batch_size and isinstance(self.batch_size, str):
735                if "interval_unit" in model_kwargs:
736                    get_console().log_warning(
737                        f"Both 'interval_unit' and 'batch_size' are set for model '{self.canonical_name(context)}'. 'interval_unit' will be used."
738                    )
739                else:
740                    model_kwargs["interval_unit"] = self.batch_size
741                    self.batch_size = None
742            if begin:
743                if "start" in model_kwargs:
744                    get_console().log_warning(
745                        f"Both 'begin' and 'start' are set for model '{self.canonical_name(context)}'. 'start' will be used."
746                    )
747                else:
748                    model_kwargs["start"] = begin
749            # If user explicitly disables concurrent batches then we want to set depends on past to true which we
750            # will do by including the model in the depends_on
751            if self.concurrent_batches is not None and self.concurrent_batches is False:
752                depends_on = model_kwargs.get("depends_on", set())
753                depends_on.add(self.canonical_name(context))
754
755        model_kwargs["start"] = model_kwargs.get(
756            "start", context.sqlmesh_config.model_defaults.start
757        )
758
759        model = create_sql_model(
760            self.canonical_name(context),
761            query,
762            dialect=model_dialect,
763            kind=kind,
764            audit_definitions=audit_definitions,
765            # This ensures that we bypass query rendering that would otherwise be required to extract additional
766            # dependencies from the model's SQL.
767            # Note: any table dependencies that are not referenced using the `ref` macro will not be included.
768            extract_dependencies_from_query=False,
769            allow_partials=allow_partials,
770            virtual_environment_mode=virtual_environment_mode,
771            dbt_node_info=self.node_info,
772            **optional_kwargs,
773            **model_kwargs,
774        )
775        return model

Converts the dbt model into a SQLMesh model.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.