sqlmesh.dbt.model
1from __future__ import annotations 2 3import datetime 4import typing as t 5import logging 6 7from sqlglot import exp 8from sqlglot.errors import SqlglotError 9from sqlglot.helper import ensure_list 10 11from sqlmesh.core import dialect as d 12from sqlmesh.core.config.base import UpdateStrategy 13from sqlmesh.core.config.common import VirtualEnvironmentMode 14from sqlmesh.core.console import get_console 15from sqlmesh.core.model import ( 16 EmbeddedKind, 17 FullKind, 18 IncrementalByTimeRangeKind, 19 IncrementalByUniqueKeyKind, 20 IncrementalUnmanagedKind, 21 Model, 22 ModelKind, 23 SCDType2ByColumnKind, 24 ViewKind, 25 ManagedKind, 26 create_sql_model, 27) 28from sqlmesh.core.model.kind import ( 29 SCDType2ByTimeKind, 30 OnDestructiveChange, 31 OnAdditiveChange, 32 on_destructive_change_validator, 33 on_additive_change_validator, 34 DbtCustomKind, 35) 36from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy 37from sqlmesh.dbt.common import SqlStr, sql_str_validator 38from sqlmesh.utils.errors import ConfigError 39from sqlmesh.utils.pydantic import field_validator 40 41if t.TYPE_CHECKING: 42 from sqlmesh.core.audit.definition import ModelAudit 43 from sqlmesh.dbt.context import DbtContext 44 from sqlmesh.dbt.package import MaterializationConfig 45 46logger = logging.getLogger(__name__) 47 48 49logger = logging.getLogger(__name__) 50 51 52INCREMENTAL_BY_TIME_RANGE_STRATEGIES = set( 53 ["delete+insert", "insert_overwrite", "microbatch", "incremental_by_time_range"] 54) 55INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = set(["merge"]) 56 57 58def collection_to_str(collection: t.Iterable) -> str: 59 return ", ".join(f"'{item}'" for item in sorted(collection)) 60 61 62class ModelConfig(BaseModelConfig): 63 """ 64 ModelConfig contains all config parameters available to DBT models 65 66 See https://docs.getdbt.com/reference/configs-and-properties for 67 a more detailed description of each config parameter under the 68 General propreties, General configs, and For models sections. 69 70 Args: 71 sql: The model sql 72 time_column: The name of the time column 73 cron: A cron string specifying how often the model should be refreshed, leveraging the 74 [croniter](https://github.com/kiorky/croniter) library. 75 interval_unit: The duration of an interval for the model. By default, it is computed from the cron expression. 76 batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None, 77 then backfilling this model will do all of history in one job. If this is set, a model's backfill 78 will be chunked such that each individual job will only contain jobs with max `batch_size` intervals. 79 lookback: The number of previous incremental intervals in the lookback window. 80 cluster_by: Field(s) to use for clustering in data warehouses that support clustering 81 incremental_strategy: Strategy used to build the incremental model 82 materialized: How the model will be materialized in the database 83 sql_header: SQL statement to run before table/view creation. Currently implemented as a pre-hook. 84 unique_key: List of columns that define row uniqueness for the model 85 partition_by: List of partition columns or dictionary of bigquery partition by parameters ([dbt bigquery config](https://docs.getdbt.com/reference/resource-configs/bigquery-configs)). 86 """ 87 88 # sqlmesh fields 89 sql: SqlStr = SqlStr("") 90 time_column: t.Optional[t.Union[str, t.Dict[str, str]]] = None 91 cron: t.Optional[str] = None 92 interval_unit: t.Optional[str] = None 93 batch_concurrency: t.Optional[int] = None 94 forward_only: bool = True 95 disable_restatement: t.Optional[bool] = None 96 allow_partials: bool = True 97 physical_version: t.Optional[str] = None 98 auto_restatement_cron: t.Optional[str] = None 99 auto_restatement_intervals: t.Optional[int] = None 100 partition_by_time_column: t.Optional[bool] = None 101 on_destructive_change: t.Optional[OnDestructiveChange] = None 102 on_additive_change: t.Optional[OnAdditiveChange] = None 103 104 # DBT configuration fields 105 cluster_by: t.Optional[t.List[str]] = None 106 incremental_strategy: t.Optional[str] = None 107 materialized: str = Materialization.VIEW.value 108 sql_header: t.Optional[str] = None 109 unique_key: t.Optional[t.List[str]] = None 110 partition_by: t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]] = None 111 full_refresh: t.Optional[bool] = None 112 on_schema_change: str = "ignore" 113 114 # Snapshot (SCD Type 2) Fields 115 updated_at: t.Optional[str] = None 116 strategy: t.Optional[str] = None 117 invalidate_hard_deletes: bool = False 118 target_schema: t.Optional[str] = None 119 check_cols: t.Optional[t.Union[t.List[str], str]] = None 120 121 # Microbatch Fields 122 event_time: t.Optional[str] = None 123 begin: t.Optional[datetime.datetime] = None 124 concurrent_batches: t.Optional[bool] = None 125 126 # Shared SQLMesh and DBT configuration fields 127 batch_size: t.Optional[t.Union[int, str]] = None 128 lookback: t.Optional[int] = None 129 130 # redshift 131 bind: t.Optional[bool] = None 132 133 # bigquery 134 require_partition_filter: t.Optional[bool] = None 135 partition_expiration_days: t.Optional[int] = None 136 137 # snowflake 138 snowflake_warehouse: t.Optional[str] = None 139 # note: for Snowflake dynamic tables, in the DBT adapter we only support properties that DBT supports 140 # which are defined here: https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables 141 target_lag: t.Optional[str] = None 142 143 # clickhouse 144 engine: t.Optional[str] = None 145 order_by: t.Optional[t.Union[t.List[str], str]] = None 146 primary_key: t.Optional[t.Union[t.List[str], str]] = None 147 sharding_key: t.Optional[t.Union[t.List[str], str]] = None 148 ttl: t.Optional[t.Union[t.List[str], str]] = None 149 settings: t.Optional[t.Dict[str, t.Any]] = None 150 query_settings: t.Optional[t.Dict[str, t.Any]] = None 151 inserts_only: t.Optional[bool] = None 152 incremental_predicates: t.Optional[t.List[str]] = None 153 154 _sql_validator = sql_str_validator 155 _on_destructive_change_validator = on_destructive_change_validator 156 _on_additive_change_validator = on_additive_change_validator 157 158 @field_validator( 159 "unique_key", 160 "cluster_by", 161 "tags", 162 mode="before", 163 ) 164 @classmethod 165 def _validate_list(cls, v: t.Union[str, t.List[str]]) -> t.List[str]: 166 return ensure_list(v) 167 168 @field_validator("check_cols", mode="before") 169 @classmethod 170 def _validate_check_cols(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]: 171 if isinstance(v, str) and v.lower() == "all": 172 return "*" 173 return ensure_list(v) 174 175 @field_validator("updated_at", mode="before") 176 @classmethod 177 def _validate_updated_at(cls, v: t.Optional[str]) -> t.Optional[str]: 178 """ 179 Extract column name if updated_at contains a cast. 180 181 SCDType2ByTimeKind and SCDType2ByColumnKind expect a column, and the casting is done later. 182 """ 183 if v is None: 184 return None 185 parsed = d.parse_one(v) 186 if isinstance(parsed, exp.Cast) and isinstance(parsed.this, exp.Column): 187 return parsed.this.name 188 189 return v 190 191 @field_validator("sql", mode="before") 192 @classmethod 193 def _validate_sql(cls, v: t.Union[str, SqlStr]) -> SqlStr: 194 return SqlStr(v) 195 196 @field_validator("partition_by", mode="before") 197 @classmethod 198 def _validate_partition_by( 199 cls, v: t.Any 200 ) -> t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]]: 201 if v is None: 202 return None 203 if isinstance(v, str): 204 return [v] 205 if isinstance(v, list): 206 return v 207 if isinstance(v, dict): 208 if not v.get("field"): 209 raise ConfigError("'field' key required for partition_by.") 210 if "granularity" in v and v["granularity"].lower() not in ( 211 "day", 212 "month", 213 "year", 214 "hour", 215 ): 216 granularity = v["granularity"] 217 raise ConfigError(f"Unexpected granularity '{granularity}' in partition_by '{v}'.") 218 if "data_type" in v and v["data_type"].lower() not in ( 219 "timestamp", 220 "date", 221 "datetime", 222 "int64", 223 ): 224 data_type = v["data_type"] 225 raise ConfigError(f"Unexpected data_type '{data_type}' in partition_by '{v}'.") 226 return {"data_type": "date", "granularity": "day", **v} 227 raise ConfigError(f"Invalid format for partition_by '{v}'") 228 229 @field_validator("materialized", mode="before") 230 @classmethod 231 def _validate_materialized(cls, v: str) -> str: 232 unsupported_materializations = [ 233 "materialized_view", # multiple engines 234 "dictionary", # clickhouse only 235 "distributed_table", # clickhouse only 236 "distributed_incremental", # clickhouse only 237 ] 238 if v in unsupported_materializations: 239 fallback = v.split("_") 240 msg = f"SQLMesh does not support the '{v}' model materialization." 241 if len(fallback) == 1: 242 # dictionary materialization 243 raise ConfigError(msg) 244 else: 245 get_console().log_warning( 246 f"{msg} Falling back to the '{fallback[1]}' materialization." 247 ) 248 249 return fallback[1] 250 return v 251 252 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 253 **BaseModelConfig._FIELD_UPDATE_STRATEGY, 254 **{ 255 "sql": UpdateStrategy.IMMUTABLE, 256 "time_column": UpdateStrategy.IMMUTABLE, 257 }, 258 } 259 260 @property 261 def model_materialization(self) -> Materialization: 262 return Materialization(self.materialized.lower()) 263 264 @property 265 def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]: 266 return SnapshotStrategy(self.strategy.lower()) if self.strategy else None 267 268 @property 269 def table_schema(self) -> str: 270 return self.target_schema or super().table_schema 271 272 def model_kind(self, context: DbtContext) -> ModelKind: 273 """ 274 Get the sqlmesh ModelKind 275 Returns: 276 The sqlmesh ModelKind 277 """ 278 target = context.target 279 materialization = self.model_materialization 280 281 # args common to all sqlmesh incremental kinds, regardless of materialization 282 incremental_kind_kwargs: t.Dict[str, t.Any] = {} 283 on_schema_change = self.on_schema_change.lower() 284 if materialization == Materialization.SNAPSHOT: 285 # dbt snapshots default to `append_new_columns` behavior and can't be changed 286 on_schema_change = "append_new_columns" 287 288 if on_schema_change == "ignore": 289 on_destructive_change = OnDestructiveChange.IGNORE 290 on_additive_change = OnAdditiveChange.IGNORE 291 elif on_schema_change == "fail": 292 on_destructive_change = OnDestructiveChange.ERROR 293 on_additive_change = OnAdditiveChange.ERROR 294 elif on_schema_change == "append_new_columns": 295 on_destructive_change = OnDestructiveChange.IGNORE 296 on_additive_change = OnAdditiveChange.ALLOW 297 elif on_schema_change == "sync_all_columns": 298 on_destructive_change = OnDestructiveChange.ALLOW 299 on_additive_change = OnAdditiveChange.ALLOW 300 else: 301 raise ConfigError( 302 f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. " 303 "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'." 304 ) 305 306 incremental_kind_kwargs["on_destructive_change"] = ( 307 self._get_field_value("on_destructive_change") or on_destructive_change 308 ) 309 incremental_kind_kwargs["on_additive_change"] = ( 310 self._get_field_value("on_additive_change") or on_additive_change 311 ) 312 auto_restatement_cron_value = self._get_field_value("auto_restatement_cron") 313 if auto_restatement_cron_value is not None: 314 incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value 315 316 if materialization == Materialization.TABLE: 317 return FullKind() 318 if materialization == Materialization.VIEW: 319 return ViewKind() 320 if materialization == Materialization.INCREMENTAL: 321 incremental_by_kind_kwargs: t.Dict[str, t.Any] = {"dialect": self.dialect(context)} 322 forward_only_value = self._get_field_value("forward_only") 323 if forward_only_value is not None: 324 incremental_kind_kwargs["forward_only"] = forward_only_value 325 326 is_incremental_by_time_range = self.time_column or ( 327 self.incremental_strategy 328 and self.incremental_strategy in {"microbatch", "incremental_by_time_range"} 329 ) 330 # Get shared incremental by kwargs 331 for field in ("batch_size", "batch_concurrency", "lookback"): 332 field_val = self._get_field_value(field) 333 if field_val is not None: 334 # Check if `batch_size` is representing an interval unit and if so that will be handled at the model level 335 if field == "batch_size" and isinstance(field_val, str): 336 continue 337 incremental_by_kind_kwargs[field] = field_val 338 339 disable_restatement = self.disable_restatement 340 if disable_restatement is None: 341 if is_incremental_by_time_range: 342 disable_restatement = False 343 else: 344 disable_restatement = ( 345 not self.full_refresh if self.full_refresh is not None else False 346 ) 347 incremental_by_kind_kwargs["disable_restatement"] = disable_restatement 348 349 if is_incremental_by_time_range: 350 strategy = self.incremental_strategy or target.default_incremental_strategy( 351 IncrementalByTimeRangeKind 352 ) 353 354 if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES: 355 get_console().log_warning( 356 f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " 357 f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}." 358 ) 359 360 if self.time_column and strategy != "incremental_by_time_range": 361 get_console().log_warning( 362 f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. " 363 f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'." 364 ) 365 366 if strategy == "microbatch": 367 if self.time_column: 368 raise ConfigError( 369 f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead." 370 ) 371 time_column = self._get_field_value("event_time") 372 if not time_column: 373 raise ConfigError( 374 f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy." 375 ) 376 # dbt microbatch always processes batches in a size of 1 377 incremental_by_kind_kwargs["batch_size"] = 1 378 else: 379 if not self.time_column: 380 raise ConfigError( 381 f"{self.canonical_name(context)}: 'time_column' is required for incremental by time range models not defined using microbatch." 382 ) 383 time_column = self.time_column 384 385 incremental_by_time_range_kwargs = { 386 "time_column": time_column, 387 } 388 if self.auto_restatement_intervals: 389 incremental_by_time_range_kwargs["auto_restatement_intervals"] = ( 390 self.auto_restatement_intervals 391 ) 392 if self.partition_by_time_column is not None: 393 incremental_by_time_range_kwargs["partition_by_time_column"] = ( 394 self.partition_by_time_column 395 ) 396 397 return IncrementalByTimeRangeKind( 398 **incremental_kind_kwargs, 399 **incremental_by_kind_kwargs, 400 **incremental_by_time_range_kwargs, 401 ) 402 403 if self.unique_key: 404 strategy = self.incremental_strategy or target.default_incremental_strategy( 405 IncrementalByUniqueKeyKind 406 ) 407 if ( 408 self.incremental_strategy 409 and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES 410 ): 411 get_console().log_warning( 412 f"Unique key is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " 413 f"Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}. Falling back to 'merge' strategy." 414 ) 415 416 merge_filter = None 417 if self.incremental_predicates: 418 dialect = self.dialect(context) 419 merge_filter = exp.and_( 420 *[ 421 d.parse_one(predicate, dialect=dialect) 422 for predicate in self.incremental_predicates 423 ], 424 dialect=dialect, 425 ).transform(d.replace_merge_table_aliases) 426 427 return IncrementalByUniqueKeyKind( 428 unique_key=self.unique_key, 429 merge_filter=merge_filter, 430 **incremental_kind_kwargs, 431 **incremental_by_kind_kwargs, 432 ) 433 434 strategy = self.incremental_strategy or target.default_incremental_strategy( 435 IncrementalUnmanagedKind 436 ) 437 return IncrementalUnmanagedKind( 438 insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES, 439 disable_restatement=incremental_by_kind_kwargs["disable_restatement"], 440 **incremental_kind_kwargs, 441 ) 442 if materialization == Materialization.EPHEMERAL: 443 return EmbeddedKind() 444 if materialization == Materialization.SNAPSHOT: 445 if not self.snapshot_strategy: 446 raise ConfigError( 447 f"{self.canonical_name(context)}: SQLMesh snapshot strategy is required for snapshot materialization." 448 ) 449 shared_kwargs = { 450 "dialect": self.dialect(context), 451 "unique_key": self.unique_key, 452 "invalidate_hard_deletes": self.invalidate_hard_deletes, 453 "valid_from_name": "dbt_valid_from", 454 "valid_to_name": "dbt_valid_to", 455 "time_data_type": ( 456 exp.DataType.build("TIMESTAMPTZ") 457 if target.dialect == "bigquery" 458 else exp.DataType.build("TIMESTAMP") 459 ), 460 **incremental_kind_kwargs, 461 } 462 if self.snapshot_strategy.is_check: 463 return SCDType2ByColumnKind( 464 columns=self.check_cols, execution_time_as_valid_from=True, **shared_kwargs 465 ) 466 return SCDType2ByTimeKind( 467 updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs 468 ) 469 470 if materialization == Materialization.DYNAMIC_TABLE: 471 return ManagedKind() 472 473 if materialization == Materialization.CUSTOM: 474 if custom_materialization := self._get_custom_materialization(context): 475 return DbtCustomKind( 476 materialization=self.materialized, 477 adapter=custom_materialization.adapter, 478 dialect=self.dialect(context), 479 definition=custom_materialization.definition, 480 ) 481 482 raise ConfigError( 483 f"Unknown materialization '{self.materialized}'. Custom materializations must be defined in your dbt project." 484 ) 485 486 raise ConfigError(f"{materialization.value} materialization not supported.") 487 488 def _big_query_partition_by_expr(self, context: DbtContext) -> exp.Expr: 489 assert isinstance(self.partition_by, dict) 490 data_type = self.partition_by["data_type"].lower() 491 raw_field = self.partition_by["field"] 492 try: 493 field = d.parse_one(raw_field, dialect="bigquery") 494 except SqlglotError as e: 495 raise ConfigError( 496 f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{raw_field}' in '{self.path}': {e}" 497 ) from e 498 499 if data_type == "date" and self.partition_by["granularity"].lower() == "day": 500 return field 501 502 if data_type == "int64": 503 if "range" not in self.partition_by: 504 raise ConfigError( 505 f"Range is required for int64 partitioning in model '{self.canonical_name(context)}'." 506 ) 507 508 range_ = self.partition_by["range"] 509 start = range_["start"] 510 end = range_["end"] 511 interval = range_["interval"] 512 513 return exp.func( 514 "RANGE_BUCKET", 515 field, 516 exp.func("GENERATE_ARRAY", start, end, interval, dialect="bigquery"), 517 dialect="bigquery", 518 ) 519 520 return d.parse_one( 521 f"{data_type}_trunc({self.partition_by['field']}, {self.partition_by['granularity'].upper()})", 522 dialect="bigquery", 523 ) 524 525 def _get_custom_materialization(self, context: DbtContext) -> t.Optional[MaterializationConfig]: 526 materializations = context.manifest.materializations() 527 name, target_adapter = self.materialized, context.target.dialect 528 529 adapter_specific_key = f"{name}_{target_adapter}" 530 default_key = f"{name}_default" 531 if adapter_specific_key in materializations: 532 return materializations[adapter_specific_key] 533 if default_key in materializations: 534 return materializations[default_key] 535 return None 536 537 @property 538 def sqlmesh_config_fields(self) -> t.Set[str]: 539 return super().sqlmesh_config_fields | { 540 "cron", 541 "interval_unit", 542 "allow_partials", 543 "physical_version", 544 "start", 545 # In microbatch models `begin` is the same as `start` 546 "begin", 547 } 548 549 def to_sqlmesh( 550 self, 551 context: DbtContext, 552 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 553 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 554 ) -> Model: 555 """Converts the dbt model into a SQLMesh model.""" 556 model_dialect = self.dialect(context) 557 query = d.jinja_query(self.sql) 558 kind = self.model_kind(context) 559 560 optional_kwargs: t.Dict[str, t.Any] = {} 561 physical_properties: t.Dict[str, t.Any] = {} 562 563 if self.partition_by: 564 if isinstance(kind, (ViewKind, EmbeddedKind)): 565 logger.warning( 566 "Ignoring partition_by config for model '%s'; partition_by is not supported for %s.", 567 self.name, 568 "views" if isinstance(kind, ViewKind) else "ephemeral models", 569 ) 570 elif context.target.dialect == "snowflake": 571 logger.warning( 572 "Ignoring partition_by config for model '%s' targeting %s. The partition_by config is not supported for Snowflake.", 573 self.name, 574 context.target.dialect, 575 ) 576 else: 577 partitioned_by = [] 578 if isinstance(self.partition_by, list): 579 for p in self.partition_by: 580 try: 581 partitioned_by.append(d.parse_one(p, dialect=model_dialect)) 582 except SqlglotError as e: 583 raise ConfigError( 584 f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}" 585 ) from e 586 elif isinstance(self.partition_by, dict): 587 if context.target.dialect == "bigquery": 588 partitioned_by.append(self._big_query_partition_by_expr(context)) 589 else: 590 logger.warning( 591 "Ignoring partition_by config for model '%s' targeting %s. The format of the config field is only supported for BigQuery.", 592 self.name, 593 context.target.dialect, 594 ) 595 596 if partitioned_by: 597 optional_kwargs["partitioned_by"] = partitioned_by 598 599 if self.cluster_by: 600 if isinstance(kind, (ViewKind, EmbeddedKind)): 601 logger.warning( 602 "Ignoring cluster_by config for model '%s'; cluster_by is not supported for %s.", 603 self.name, 604 "views" if isinstance(kind, ViewKind) else "ephemeral models", 605 ) 606 else: 607 clustered_by = [] 608 for c in self.cluster_by: 609 try: 610 cluster_expr = exp.maybe_parse( 611 c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect 612 ) 613 for expr in cluster_expr.expressions: 614 clustered_by.append( 615 expr.this if isinstance(expr, exp.Ordered) else expr 616 ) 617 except SqlglotError as e: 618 raise ConfigError( 619 f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}" 620 ) from e 621 optional_kwargs["clustered_by"] = clustered_by 622 623 model_kwargs = self.sqlmesh_model_kwargs(context) 624 if self.sql_header: 625 model_kwargs["pre_statements"].insert(0, d.jinja_statement(self.sql_header)) 626 627 if context.target.dialect == "bigquery": 628 dbt_max_partition_blob = self._dbt_max_partition_blob() 629 if dbt_max_partition_blob: 630 model_kwargs["pre_statements"].append(d.jinja_statement(dbt_max_partition_blob)) 631 632 if self.partition_expiration_days is not None: 633 physical_properties["partition_expiration_days"] = self.partition_expiration_days 634 if self.require_partition_filter is not None: 635 physical_properties["require_partition_filter"] = self.require_partition_filter 636 637 if physical_properties: 638 model_kwargs["physical_properties"] = physical_properties 639 640 if context.target.dialect == "snowflake": 641 if self.snowflake_warehouse is not None: 642 model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse} 643 644 if self.model_materialization == Materialization.DYNAMIC_TABLE: 645 if not self.snowflake_warehouse: 646 raise ConfigError("`snowflake_warehouse` must be set for dynamic tables") 647 if not self.target_lag: 648 raise ConfigError("`target_lag` must be set for dynamic tables") 649 650 model_kwargs["physical_properties"] = { 651 "warehouse": self.snowflake_warehouse, 652 "target_lag": self.target_lag, 653 } 654 655 if context.target.dialect == "clickhouse": 656 if self.model_materialization == Materialization.INCREMENTAL: 657 # `inserts_only` overrides incremental_strategy setting (if present) 658 # https://github.com/ClickHouse/dbt-clickhouse/blob/065f3a724fa09205446ecadac7a00d92b2d8c646/README.md?plain=1#L108 659 if self.inserts_only: 660 self.incremental_strategy = "append" 661 662 if self.incremental_strategy == "delete+insert": 663 get_console().log_warning( 664 f"The '{self.incremental_strategy}' incremental strategy is not supported - SQLMesh will use the temp table/partition swap strategy." 665 ) 666 667 if self.incremental_predicates: 668 get_console().log_warning( 669 "SQLMesh does not support 'incremental_predicates' - they will not be applied." 670 ) 671 672 if self.query_settings: 673 get_console().log_warning( 674 "SQLMesh does not support the 'query_settings' model configuration parameter. Specify the query settings directly in the model query." 675 ) 676 677 if self.engine: 678 optional_kwargs["storage_format"] = self.engine 679 680 if self.order_by: 681 order_by = [] 682 for o in self.order_by if isinstance(self.order_by, list) else [self.order_by]: 683 try: 684 order_by.append(d.parse_one(o, dialect=model_dialect)) 685 except SqlglotError as e: 686 raise ConfigError( 687 f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}" 688 ) from e 689 physical_properties["order_by"] = order_by 690 691 if self.primary_key: 692 primary_key = [] 693 for p in self.primary_key: 694 try: 695 primary_key.append(d.parse_one(p, dialect=model_dialect)) 696 except SqlglotError as e: 697 raise ConfigError( 698 f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}" 699 ) from e 700 physical_properties["primary_key"] = primary_key 701 702 if self.sharding_key: 703 get_console().log_warning( 704 "SQLMesh does not support the 'sharding_key' model configuration parameter or distributed materializations." 705 ) 706 707 if self.ttl: 708 physical_properties["ttl"] = exp.var( 709 self.ttl[0] if isinstance(self.ttl, list) else self.ttl 710 ) 711 712 if self.settings: 713 physical_properties.update({k: exp.var(v) for k, v in self.settings.items()}) 714 715 if physical_properties: 716 model_kwargs["physical_properties"] = physical_properties 717 718 kind = self.model_kind(context) 719 720 # A falsy grants config (None or {}) is considered as unmanaged per dbt semantics 721 if self.grants and kind.supports_grants: 722 model_kwargs["grants"] = self.grants 723 724 allow_partials = model_kwargs.pop("allow_partials", None) 725 if allow_partials is None: 726 # Set allow_partials to True for dbt models to preserve the original semantics. 727 allow_partials = True 728 729 # pop begin for all models so we don't pass it through for non-incremental materializations 730 # (happens if model config is microbatch but project config overrides) 731 begin = model_kwargs.pop("begin", None) 732 if kind.is_incremental: 733 if self.batch_size and isinstance(self.batch_size, str): 734 if "interval_unit" in model_kwargs: 735 get_console().log_warning( 736 f"Both 'interval_unit' and 'batch_size' are set for model '{self.canonical_name(context)}'. 'interval_unit' will be used." 737 ) 738 else: 739 model_kwargs["interval_unit"] = self.batch_size 740 self.batch_size = None 741 if begin: 742 if "start" in model_kwargs: 743 get_console().log_warning( 744 f"Both 'begin' and 'start' are set for model '{self.canonical_name(context)}'. 'start' will be used." 745 ) 746 else: 747 model_kwargs["start"] = begin 748 # If user explicitly disables concurrent batches then we want to set depends on past to true which we 749 # will do by including the model in the depends_on 750 if self.concurrent_batches is not None and self.concurrent_batches is False: 751 depends_on = model_kwargs.get("depends_on", set()) 752 depends_on.add(self.canonical_name(context)) 753 754 model_kwargs["start"] = model_kwargs.get( 755 "start", context.sqlmesh_config.model_defaults.start 756 ) 757 758 model = create_sql_model( 759 self.canonical_name(context), 760 query, 761 dialect=model_dialect, 762 kind=kind, 763 audit_definitions=audit_definitions, 764 # This ensures that we bypass query rendering that would otherwise be required to extract additional 765 # dependencies from the model's SQL. 766 # Note: any table dependencies that are not referenced using the `ref` macro will not be included. 767 extract_dependencies_from_query=False, 768 allow_partials=allow_partials, 769 virtual_environment_mode=virtual_environment_mode, 770 dbt_node_info=self.node_info, 771 **optional_kwargs, 772 **model_kwargs, 773 ) 774 return model 775 776 def _dbt_max_partition_blob(self) -> t.Optional[str]: 777 """Returns a SQL blob which declares the _dbt_max_partition variable. Only applicable to BigQuery.""" 778 if ( 779 not isinstance(self.partition_by, dict) 780 or self.model_materialization != Materialization.INCREMENTAL 781 ): 782 return None 783 784 from sqlmesh.core.engine_adapter.bigquery import select_partitions_expr 785 786 data_type = self.partition_by["data_type"] 787 select_max_partition_expr = select_partitions_expr( 788 "{{ adapter.resolve_schema(this) }}", 789 "{{ adapter.resolve_identifier(this) }}", 790 data_type, 791 granularity=self.partition_by.get("granularity"), 792 catalog="{{ target.database }}", 793 ) 794 795 data_type = data_type.upper() 796 default_value = "NULL" 797 if data_type in ("DATE", "DATETIME", "TIMESTAMP"): 798 default_value = f"CAST('1970-01-01' AS {data_type})" 799 800 return f""" 801{{% if is_incremental() %}} 802 DECLARE _dbt_max_partition {data_type} DEFAULT ( 803 COALESCE(({select_max_partition_expr}), {default_value}) 804 ); 805{{% endif %}} 806"""
63class ModelConfig(BaseModelConfig): 64 """ 65 ModelConfig contains all config parameters available to DBT models 66 67 See https://docs.getdbt.com/reference/configs-and-properties for 68 a more detailed description of each config parameter under the 69 General propreties, General configs, and For models sections. 70 71 Args: 72 sql: The model sql 73 time_column: The name of the time column 74 cron: A cron string specifying how often the model should be refreshed, leveraging the 75 [croniter](https://github.com/kiorky/croniter) library. 76 interval_unit: The duration of an interval for the model. By default, it is computed from the cron expression. 77 batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None, 78 then backfilling this model will do all of history in one job. If this is set, a model's backfill 79 will be chunked such that each individual job will only contain jobs with max `batch_size` intervals. 80 lookback: The number of previous incremental intervals in the lookback window. 81 cluster_by: Field(s) to use for clustering in data warehouses that support clustering 82 incremental_strategy: Strategy used to build the incremental model 83 materialized: How the model will be materialized in the database 84 sql_header: SQL statement to run before table/view creation. Currently implemented as a pre-hook. 85 unique_key: List of columns that define row uniqueness for the model 86 partition_by: List of partition columns or dictionary of bigquery partition by parameters ([dbt bigquery config](https://docs.getdbt.com/reference/resource-configs/bigquery-configs)). 87 """ 88 89 # sqlmesh fields 90 sql: SqlStr = SqlStr("") 91 time_column: t.Optional[t.Union[str, t.Dict[str, str]]] = None 92 cron: t.Optional[str] = None 93 interval_unit: t.Optional[str] = None 94 batch_concurrency: t.Optional[int] = None 95 forward_only: bool = True 96 disable_restatement: t.Optional[bool] = None 97 allow_partials: bool = True 98 physical_version: t.Optional[str] = None 99 auto_restatement_cron: t.Optional[str] = None 100 auto_restatement_intervals: t.Optional[int] = None 101 partition_by_time_column: t.Optional[bool] = None 102 on_destructive_change: t.Optional[OnDestructiveChange] = None 103 on_additive_change: t.Optional[OnAdditiveChange] = None 104 105 # DBT configuration fields 106 cluster_by: t.Optional[t.List[str]] = None 107 incremental_strategy: t.Optional[str] = None 108 materialized: str = Materialization.VIEW.value 109 sql_header: t.Optional[str] = None 110 unique_key: t.Optional[t.List[str]] = None 111 partition_by: t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]] = None 112 full_refresh: t.Optional[bool] = None 113 on_schema_change: str = "ignore" 114 115 # Snapshot (SCD Type 2) Fields 116 updated_at: t.Optional[str] = None 117 strategy: t.Optional[str] = None 118 invalidate_hard_deletes: bool = False 119 target_schema: t.Optional[str] = None 120 check_cols: t.Optional[t.Union[t.List[str], str]] = None 121 122 # Microbatch Fields 123 event_time: t.Optional[str] = None 124 begin: t.Optional[datetime.datetime] = None 125 concurrent_batches: t.Optional[bool] = None 126 127 # Shared SQLMesh and DBT configuration fields 128 batch_size: t.Optional[t.Union[int, str]] = None 129 lookback: t.Optional[int] = None 130 131 # redshift 132 bind: t.Optional[bool] = None 133 134 # bigquery 135 require_partition_filter: t.Optional[bool] = None 136 partition_expiration_days: t.Optional[int] = None 137 138 # snowflake 139 snowflake_warehouse: t.Optional[str] = None 140 # note: for Snowflake dynamic tables, in the DBT adapter we only support properties that DBT supports 141 # which are defined here: https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables 142 target_lag: t.Optional[str] = None 143 144 # clickhouse 145 engine: t.Optional[str] = None 146 order_by: t.Optional[t.Union[t.List[str], str]] = None 147 primary_key: t.Optional[t.Union[t.List[str], str]] = None 148 sharding_key: t.Optional[t.Union[t.List[str], str]] = None 149 ttl: t.Optional[t.Union[t.List[str], str]] = None 150 settings: t.Optional[t.Dict[str, t.Any]] = None 151 query_settings: t.Optional[t.Dict[str, t.Any]] = None 152 inserts_only: t.Optional[bool] = None 153 incremental_predicates: t.Optional[t.List[str]] = None 154 155 _sql_validator = sql_str_validator 156 _on_destructive_change_validator = on_destructive_change_validator 157 _on_additive_change_validator = on_additive_change_validator 158 159 @field_validator( 160 "unique_key", 161 "cluster_by", 162 "tags", 163 mode="before", 164 ) 165 @classmethod 166 def _validate_list(cls, v: t.Union[str, t.List[str]]) -> t.List[str]: 167 return ensure_list(v) 168 169 @field_validator("check_cols", mode="before") 170 @classmethod 171 def _validate_check_cols(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]: 172 if isinstance(v, str) and v.lower() == "all": 173 return "*" 174 return ensure_list(v) 175 176 @field_validator("updated_at", mode="before") 177 @classmethod 178 def _validate_updated_at(cls, v: t.Optional[str]) -> t.Optional[str]: 179 """ 180 Extract column name if updated_at contains a cast. 181 182 SCDType2ByTimeKind and SCDType2ByColumnKind expect a column, and the casting is done later. 183 """ 184 if v is None: 185 return None 186 parsed = d.parse_one(v) 187 if isinstance(parsed, exp.Cast) and isinstance(parsed.this, exp.Column): 188 return parsed.this.name 189 190 return v 191 192 @field_validator("sql", mode="before") 193 @classmethod 194 def _validate_sql(cls, v: t.Union[str, SqlStr]) -> SqlStr: 195 return SqlStr(v) 196 197 @field_validator("partition_by", mode="before") 198 @classmethod 199 def _validate_partition_by( 200 cls, v: t.Any 201 ) -> t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]]: 202 if v is None: 203 return None 204 if isinstance(v, str): 205 return [v] 206 if isinstance(v, list): 207 return v 208 if isinstance(v, dict): 209 if not v.get("field"): 210 raise ConfigError("'field' key required for partition_by.") 211 if "granularity" in v and v["granularity"].lower() not in ( 212 "day", 213 "month", 214 "year", 215 "hour", 216 ): 217 granularity = v["granularity"] 218 raise ConfigError(f"Unexpected granularity '{granularity}' in partition_by '{v}'.") 219 if "data_type" in v and v["data_type"].lower() not in ( 220 "timestamp", 221 "date", 222 "datetime", 223 "int64", 224 ): 225 data_type = v["data_type"] 226 raise ConfigError(f"Unexpected data_type '{data_type}' in partition_by '{v}'.") 227 return {"data_type": "date", "granularity": "day", **v} 228 raise ConfigError(f"Invalid format for partition_by '{v}'") 229 230 @field_validator("materialized", mode="before") 231 @classmethod 232 def _validate_materialized(cls, v: str) -> str: 233 unsupported_materializations = [ 234 "materialized_view", # multiple engines 235 "dictionary", # clickhouse only 236 "distributed_table", # clickhouse only 237 "distributed_incremental", # clickhouse only 238 ] 239 if v in unsupported_materializations: 240 fallback = v.split("_") 241 msg = f"SQLMesh does not support the '{v}' model materialization." 242 if len(fallback) == 1: 243 # dictionary materialization 244 raise ConfigError(msg) 245 else: 246 get_console().log_warning( 247 f"{msg} Falling back to the '{fallback[1]}' materialization." 248 ) 249 250 return fallback[1] 251 return v 252 253 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 254 **BaseModelConfig._FIELD_UPDATE_STRATEGY, 255 **{ 256 "sql": UpdateStrategy.IMMUTABLE, 257 "time_column": UpdateStrategy.IMMUTABLE, 258 }, 259 } 260 261 @property 262 def model_materialization(self) -> Materialization: 263 return Materialization(self.materialized.lower()) 264 265 @property 266 def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]: 267 return SnapshotStrategy(self.strategy.lower()) if self.strategy else None 268 269 @property 270 def table_schema(self) -> str: 271 return self.target_schema or super().table_schema 272 273 def model_kind(self, context: DbtContext) -> ModelKind: 274 """ 275 Get the sqlmesh ModelKind 276 Returns: 277 The sqlmesh ModelKind 278 """ 279 target = context.target 280 materialization = self.model_materialization 281 282 # args common to all sqlmesh incremental kinds, regardless of materialization 283 incremental_kind_kwargs: t.Dict[str, t.Any] = {} 284 on_schema_change = self.on_schema_change.lower() 285 if materialization == Materialization.SNAPSHOT: 286 # dbt snapshots default to `append_new_columns` behavior and can't be changed 287 on_schema_change = "append_new_columns" 288 289 if on_schema_change == "ignore": 290 on_destructive_change = OnDestructiveChange.IGNORE 291 on_additive_change = OnAdditiveChange.IGNORE 292 elif on_schema_change == "fail": 293 on_destructive_change = OnDestructiveChange.ERROR 294 on_additive_change = OnAdditiveChange.ERROR 295 elif on_schema_change == "append_new_columns": 296 on_destructive_change = OnDestructiveChange.IGNORE 297 on_additive_change = OnAdditiveChange.ALLOW 298 elif on_schema_change == "sync_all_columns": 299 on_destructive_change = OnDestructiveChange.ALLOW 300 on_additive_change = OnAdditiveChange.ALLOW 301 else: 302 raise ConfigError( 303 f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. " 304 "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'." 305 ) 306 307 incremental_kind_kwargs["on_destructive_change"] = ( 308 self._get_field_value("on_destructive_change") or on_destructive_change 309 ) 310 incremental_kind_kwargs["on_additive_change"] = ( 311 self._get_field_value("on_additive_change") or on_additive_change 312 ) 313 auto_restatement_cron_value = self._get_field_value("auto_restatement_cron") 314 if auto_restatement_cron_value is not None: 315 incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value 316 317 if materialization == Materialization.TABLE: 318 return FullKind() 319 if materialization == Materialization.VIEW: 320 return ViewKind() 321 if materialization == Materialization.INCREMENTAL: 322 incremental_by_kind_kwargs: t.Dict[str, t.Any] = {"dialect": self.dialect(context)} 323 forward_only_value = self._get_field_value("forward_only") 324 if forward_only_value is not None: 325 incremental_kind_kwargs["forward_only"] = forward_only_value 326 327 is_incremental_by_time_range = self.time_column or ( 328 self.incremental_strategy 329 and self.incremental_strategy in {"microbatch", "incremental_by_time_range"} 330 ) 331 # Get shared incremental by kwargs 332 for field in ("batch_size", "batch_concurrency", "lookback"): 333 field_val = self._get_field_value(field) 334 if field_val is not None: 335 # Check if `batch_size` is representing an interval unit and if so that will be handled at the model level 336 if field == "batch_size" and isinstance(field_val, str): 337 continue 338 incremental_by_kind_kwargs[field] = field_val 339 340 disable_restatement = self.disable_restatement 341 if disable_restatement is None: 342 if is_incremental_by_time_range: 343 disable_restatement = False 344 else: 345 disable_restatement = ( 346 not self.full_refresh if self.full_refresh is not None else False 347 ) 348 incremental_by_kind_kwargs["disable_restatement"] = disable_restatement 349 350 if is_incremental_by_time_range: 351 strategy = self.incremental_strategy or target.default_incremental_strategy( 352 IncrementalByTimeRangeKind 353 ) 354 355 if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES: 356 get_console().log_warning( 357 f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " 358 f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}." 359 ) 360 361 if self.time_column and strategy != "incremental_by_time_range": 362 get_console().log_warning( 363 f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. " 364 f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'." 365 ) 366 367 if strategy == "microbatch": 368 if self.time_column: 369 raise ConfigError( 370 f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead." 371 ) 372 time_column = self._get_field_value("event_time") 373 if not time_column: 374 raise ConfigError( 375 f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy." 376 ) 377 # dbt microbatch always processes batches in a size of 1 378 incremental_by_kind_kwargs["batch_size"] = 1 379 else: 380 if not self.time_column: 381 raise ConfigError( 382 f"{self.canonical_name(context)}: 'time_column' is required for incremental by time range models not defined using microbatch." 383 ) 384 time_column = self.time_column 385 386 incremental_by_time_range_kwargs = { 387 "time_column": time_column, 388 } 389 if self.auto_restatement_intervals: 390 incremental_by_time_range_kwargs["auto_restatement_intervals"] = ( 391 self.auto_restatement_intervals 392 ) 393 if self.partition_by_time_column is not None: 394 incremental_by_time_range_kwargs["partition_by_time_column"] = ( 395 self.partition_by_time_column 396 ) 397 398 return IncrementalByTimeRangeKind( 399 **incremental_kind_kwargs, 400 **incremental_by_kind_kwargs, 401 **incremental_by_time_range_kwargs, 402 ) 403 404 if self.unique_key: 405 strategy = self.incremental_strategy or target.default_incremental_strategy( 406 IncrementalByUniqueKeyKind 407 ) 408 if ( 409 self.incremental_strategy 410 and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES 411 ): 412 get_console().log_warning( 413 f"Unique key is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " 414 f"Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}. Falling back to 'merge' strategy." 415 ) 416 417 merge_filter = None 418 if self.incremental_predicates: 419 dialect = self.dialect(context) 420 merge_filter = exp.and_( 421 *[ 422 d.parse_one(predicate, dialect=dialect) 423 for predicate in self.incremental_predicates 424 ], 425 dialect=dialect, 426 ).transform(d.replace_merge_table_aliases) 427 428 return IncrementalByUniqueKeyKind( 429 unique_key=self.unique_key, 430 merge_filter=merge_filter, 431 **incremental_kind_kwargs, 432 **incremental_by_kind_kwargs, 433 ) 434 435 strategy = self.incremental_strategy or target.default_incremental_strategy( 436 IncrementalUnmanagedKind 437 ) 438 return IncrementalUnmanagedKind( 439 insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES, 440 disable_restatement=incremental_by_kind_kwargs["disable_restatement"], 441 **incremental_kind_kwargs, 442 ) 443 if materialization == Materialization.EPHEMERAL: 444 return EmbeddedKind() 445 if materialization == Materialization.SNAPSHOT: 446 if not self.snapshot_strategy: 447 raise ConfigError( 448 f"{self.canonical_name(context)}: SQLMesh snapshot strategy is required for snapshot materialization." 449 ) 450 shared_kwargs = { 451 "dialect": self.dialect(context), 452 "unique_key": self.unique_key, 453 "invalidate_hard_deletes": self.invalidate_hard_deletes, 454 "valid_from_name": "dbt_valid_from", 455 "valid_to_name": "dbt_valid_to", 456 "time_data_type": ( 457 exp.DataType.build("TIMESTAMPTZ") 458 if target.dialect == "bigquery" 459 else exp.DataType.build("TIMESTAMP") 460 ), 461 **incremental_kind_kwargs, 462 } 463 if self.snapshot_strategy.is_check: 464 return SCDType2ByColumnKind( 465 columns=self.check_cols, execution_time_as_valid_from=True, **shared_kwargs 466 ) 467 return SCDType2ByTimeKind( 468 updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs 469 ) 470 471 if materialization == Materialization.DYNAMIC_TABLE: 472 return ManagedKind() 473 474 if materialization == Materialization.CUSTOM: 475 if custom_materialization := self._get_custom_materialization(context): 476 return DbtCustomKind( 477 materialization=self.materialized, 478 adapter=custom_materialization.adapter, 479 dialect=self.dialect(context), 480 definition=custom_materialization.definition, 481 ) 482 483 raise ConfigError( 484 f"Unknown materialization '{self.materialized}'. Custom materializations must be defined in your dbt project." 485 ) 486 487 raise ConfigError(f"{materialization.value} materialization not supported.") 488 489 def _big_query_partition_by_expr(self, context: DbtContext) -> exp.Expr: 490 assert isinstance(self.partition_by, dict) 491 data_type = self.partition_by["data_type"].lower() 492 raw_field = self.partition_by["field"] 493 try: 494 field = d.parse_one(raw_field, dialect="bigquery") 495 except SqlglotError as e: 496 raise ConfigError( 497 f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{raw_field}' in '{self.path}': {e}" 498 ) from e 499 500 if data_type == "date" and self.partition_by["granularity"].lower() == "day": 501 return field 502 503 if data_type == "int64": 504 if "range" not in self.partition_by: 505 raise ConfigError( 506 f"Range is required for int64 partitioning in model '{self.canonical_name(context)}'." 507 ) 508 509 range_ = self.partition_by["range"] 510 start = range_["start"] 511 end = range_["end"] 512 interval = range_["interval"] 513 514 return exp.func( 515 "RANGE_BUCKET", 516 field, 517 exp.func("GENERATE_ARRAY", start, end, interval, dialect="bigquery"), 518 dialect="bigquery", 519 ) 520 521 return d.parse_one( 522 f"{data_type}_trunc({self.partition_by['field']}, {self.partition_by['granularity'].upper()})", 523 dialect="bigquery", 524 ) 525 526 def _get_custom_materialization(self, context: DbtContext) -> t.Optional[MaterializationConfig]: 527 materializations = context.manifest.materializations() 528 name, target_adapter = self.materialized, context.target.dialect 529 530 adapter_specific_key = f"{name}_{target_adapter}" 531 default_key = f"{name}_default" 532 if adapter_specific_key in materializations: 533 return materializations[adapter_specific_key] 534 if default_key in materializations: 535 return materializations[default_key] 536 return None 537 538 @property 539 def sqlmesh_config_fields(self) -> t.Set[str]: 540 return super().sqlmesh_config_fields | { 541 "cron", 542 "interval_unit", 543 "allow_partials", 544 "physical_version", 545 "start", 546 # In microbatch models `begin` is the same as `start` 547 "begin", 548 } 549 550 def to_sqlmesh( 551 self, 552 context: DbtContext, 553 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 554 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 555 ) -> Model: 556 """Converts the dbt model into a SQLMesh model.""" 557 model_dialect = self.dialect(context) 558 query = d.jinja_query(self.sql) 559 kind = self.model_kind(context) 560 561 optional_kwargs: t.Dict[str, t.Any] = {} 562 physical_properties: t.Dict[str, t.Any] = {} 563 564 if self.partition_by: 565 if isinstance(kind, (ViewKind, EmbeddedKind)): 566 logger.warning( 567 "Ignoring partition_by config for model '%s'; partition_by is not supported for %s.", 568 self.name, 569 "views" if isinstance(kind, ViewKind) else "ephemeral models", 570 ) 571 elif context.target.dialect == "snowflake": 572 logger.warning( 573 "Ignoring partition_by config for model '%s' targeting %s. The partition_by config is not supported for Snowflake.", 574 self.name, 575 context.target.dialect, 576 ) 577 else: 578 partitioned_by = [] 579 if isinstance(self.partition_by, list): 580 for p in self.partition_by: 581 try: 582 partitioned_by.append(d.parse_one(p, dialect=model_dialect)) 583 except SqlglotError as e: 584 raise ConfigError( 585 f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}" 586 ) from e 587 elif isinstance(self.partition_by, dict): 588 if context.target.dialect == "bigquery": 589 partitioned_by.append(self._big_query_partition_by_expr(context)) 590 else: 591 logger.warning( 592 "Ignoring partition_by config for model '%s' targeting %s. The format of the config field is only supported for BigQuery.", 593 self.name, 594 context.target.dialect, 595 ) 596 597 if partitioned_by: 598 optional_kwargs["partitioned_by"] = partitioned_by 599 600 if self.cluster_by: 601 if isinstance(kind, (ViewKind, EmbeddedKind)): 602 logger.warning( 603 "Ignoring cluster_by config for model '%s'; cluster_by is not supported for %s.", 604 self.name, 605 "views" if isinstance(kind, ViewKind) else "ephemeral models", 606 ) 607 else: 608 clustered_by = [] 609 for c in self.cluster_by: 610 try: 611 cluster_expr = exp.maybe_parse( 612 c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect 613 ) 614 for expr in cluster_expr.expressions: 615 clustered_by.append( 616 expr.this if isinstance(expr, exp.Ordered) else expr 617 ) 618 except SqlglotError as e: 619 raise ConfigError( 620 f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}" 621 ) from e 622 optional_kwargs["clustered_by"] = clustered_by 623 624 model_kwargs = self.sqlmesh_model_kwargs(context) 625 if self.sql_header: 626 model_kwargs["pre_statements"].insert(0, d.jinja_statement(self.sql_header)) 627 628 if context.target.dialect == "bigquery": 629 dbt_max_partition_blob = self._dbt_max_partition_blob() 630 if dbt_max_partition_blob: 631 model_kwargs["pre_statements"].append(d.jinja_statement(dbt_max_partition_blob)) 632 633 if self.partition_expiration_days is not None: 634 physical_properties["partition_expiration_days"] = self.partition_expiration_days 635 if self.require_partition_filter is not None: 636 physical_properties["require_partition_filter"] = self.require_partition_filter 637 638 if physical_properties: 639 model_kwargs["physical_properties"] = physical_properties 640 641 if context.target.dialect == "snowflake": 642 if self.snowflake_warehouse is not None: 643 model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse} 644 645 if self.model_materialization == Materialization.DYNAMIC_TABLE: 646 if not self.snowflake_warehouse: 647 raise ConfigError("`snowflake_warehouse` must be set for dynamic tables") 648 if not self.target_lag: 649 raise ConfigError("`target_lag` must be set for dynamic tables") 650 651 model_kwargs["physical_properties"] = { 652 "warehouse": self.snowflake_warehouse, 653 "target_lag": self.target_lag, 654 } 655 656 if context.target.dialect == "clickhouse": 657 if self.model_materialization == Materialization.INCREMENTAL: 658 # `inserts_only` overrides incremental_strategy setting (if present) 659 # https://github.com/ClickHouse/dbt-clickhouse/blob/065f3a724fa09205446ecadac7a00d92b2d8c646/README.md?plain=1#L108 660 if self.inserts_only: 661 self.incremental_strategy = "append" 662 663 if self.incremental_strategy == "delete+insert": 664 get_console().log_warning( 665 f"The '{self.incremental_strategy}' incremental strategy is not supported - SQLMesh will use the temp table/partition swap strategy." 666 ) 667 668 if self.incremental_predicates: 669 get_console().log_warning( 670 "SQLMesh does not support 'incremental_predicates' - they will not be applied." 671 ) 672 673 if self.query_settings: 674 get_console().log_warning( 675 "SQLMesh does not support the 'query_settings' model configuration parameter. Specify the query settings directly in the model query." 676 ) 677 678 if self.engine: 679 optional_kwargs["storage_format"] = self.engine 680 681 if self.order_by: 682 order_by = [] 683 for o in self.order_by if isinstance(self.order_by, list) else [self.order_by]: 684 try: 685 order_by.append(d.parse_one(o, dialect=model_dialect)) 686 except SqlglotError as e: 687 raise ConfigError( 688 f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}" 689 ) from e 690 physical_properties["order_by"] = order_by 691 692 if self.primary_key: 693 primary_key = [] 694 for p in self.primary_key: 695 try: 696 primary_key.append(d.parse_one(p, dialect=model_dialect)) 697 except SqlglotError as e: 698 raise ConfigError( 699 f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}" 700 ) from e 701 physical_properties["primary_key"] = primary_key 702 703 if self.sharding_key: 704 get_console().log_warning( 705 "SQLMesh does not support the 'sharding_key' model configuration parameter or distributed materializations." 706 ) 707 708 if self.ttl: 709 physical_properties["ttl"] = exp.var( 710 self.ttl[0] if isinstance(self.ttl, list) else self.ttl 711 ) 712 713 if self.settings: 714 physical_properties.update({k: exp.var(v) for k, v in self.settings.items()}) 715 716 if physical_properties: 717 model_kwargs["physical_properties"] = physical_properties 718 719 kind = self.model_kind(context) 720 721 # A falsy grants config (None or {}) is considered as unmanaged per dbt semantics 722 if self.grants and kind.supports_grants: 723 model_kwargs["grants"] = self.grants 724 725 allow_partials = model_kwargs.pop("allow_partials", None) 726 if allow_partials is None: 727 # Set allow_partials to True for dbt models to preserve the original semantics. 728 allow_partials = True 729 730 # pop begin for all models so we don't pass it through for non-incremental materializations 731 # (happens if model config is microbatch but project config overrides) 732 begin = model_kwargs.pop("begin", None) 733 if kind.is_incremental: 734 if self.batch_size and isinstance(self.batch_size, str): 735 if "interval_unit" in model_kwargs: 736 get_console().log_warning( 737 f"Both 'interval_unit' and 'batch_size' are set for model '{self.canonical_name(context)}'. 'interval_unit' will be used." 738 ) 739 else: 740 model_kwargs["interval_unit"] = self.batch_size 741 self.batch_size = None 742 if begin: 743 if "start" in model_kwargs: 744 get_console().log_warning( 745 f"Both 'begin' and 'start' are set for model '{self.canonical_name(context)}'. 'start' will be used." 746 ) 747 else: 748 model_kwargs["start"] = begin 749 # If user explicitly disables concurrent batches then we want to set depends on past to true which we 750 # will do by including the model in the depends_on 751 if self.concurrent_batches is not None and self.concurrent_batches is False: 752 depends_on = model_kwargs.get("depends_on", set()) 753 depends_on.add(self.canonical_name(context)) 754 755 model_kwargs["start"] = model_kwargs.get( 756 "start", context.sqlmesh_config.model_defaults.start 757 ) 758 759 model = create_sql_model( 760 self.canonical_name(context), 761 query, 762 dialect=model_dialect, 763 kind=kind, 764 audit_definitions=audit_definitions, 765 # This ensures that we bypass query rendering that would otherwise be required to extract additional 766 # dependencies from the model's SQL. 767 # Note: any table dependencies that are not referenced using the `ref` macro will not be included. 768 extract_dependencies_from_query=False, 769 allow_partials=allow_partials, 770 virtual_environment_mode=virtual_environment_mode, 771 dbt_node_info=self.node_info, 772 **optional_kwargs, 773 **model_kwargs, 774 ) 775 return model 776 777 def _dbt_max_partition_blob(self) -> t.Optional[str]: 778 """Returns a SQL blob which declares the _dbt_max_partition variable. Only applicable to BigQuery.""" 779 if ( 780 not isinstance(self.partition_by, dict) 781 or self.model_materialization != Materialization.INCREMENTAL 782 ): 783 return None 784 785 from sqlmesh.core.engine_adapter.bigquery import select_partitions_expr 786 787 data_type = self.partition_by["data_type"] 788 select_max_partition_expr = select_partitions_expr( 789 "{{ adapter.resolve_schema(this) }}", 790 "{{ adapter.resolve_identifier(this) }}", 791 data_type, 792 granularity=self.partition_by.get("granularity"), 793 catalog="{{ target.database }}", 794 ) 795 796 data_type = data_type.upper() 797 default_value = "NULL" 798 if data_type in ("DATE", "DATETIME", "TIMESTAMP"): 799 default_value = f"CAST('1970-01-01' AS {data_type})" 800 801 return f""" 802{{% if is_incremental() %}} 803 DECLARE _dbt_max_partition {data_type} DEFAULT ( 804 COALESCE(({select_max_partition_expr}), {default_value}) 805 ); 806{{% endif %}} 807"""
ModelConfig contains all config parameters available to DBT models
See https://docs.getdbt.com/reference/configs-and-properties for a more detailed description of each config parameter under the General propreties, General configs, and For models sections.
Arguments:
- sql: The model sql
- time_column: The name of the time column
- cron: A cron string specifying how often the model should be refreshed, leveraging the croniter library.
- interval_unit: The duration of an interval for the model. By default, it is computed from the cron expression.
- batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None,
then backfilling this model will do all of history in one job. If this is set, a model's backfill
will be chunked such that each individual job will only contain jobs with max
batch_sizeintervals. - lookback: The number of previous incremental intervals in the lookback window.
- cluster_by: Field(s) to use for clustering in data warehouses that support clustering
- incremental_strategy: Strategy used to build the incremental model
- materialized: How the model will be materialized in the database
- sql_header: SQL statement to run before table/view creation. Currently implemented as a pre-hook.
- unique_key: List of columns that define row uniqueness for the model
- partition_by: List of partition columns or dictionary of bigquery partition by parameters (dbt bigquery config).
269 @property 270 def table_schema(self) -> str: 271 return self.target_schema or super().table_schema
Get the full schema name
273 def model_kind(self, context: DbtContext) -> ModelKind: 274 """ 275 Get the sqlmesh ModelKind 276 Returns: 277 The sqlmesh ModelKind 278 """ 279 target = context.target 280 materialization = self.model_materialization 281 282 # args common to all sqlmesh incremental kinds, regardless of materialization 283 incremental_kind_kwargs: t.Dict[str, t.Any] = {} 284 on_schema_change = self.on_schema_change.lower() 285 if materialization == Materialization.SNAPSHOT: 286 # dbt snapshots default to `append_new_columns` behavior and can't be changed 287 on_schema_change = "append_new_columns" 288 289 if on_schema_change == "ignore": 290 on_destructive_change = OnDestructiveChange.IGNORE 291 on_additive_change = OnAdditiveChange.IGNORE 292 elif on_schema_change == "fail": 293 on_destructive_change = OnDestructiveChange.ERROR 294 on_additive_change = OnAdditiveChange.ERROR 295 elif on_schema_change == "append_new_columns": 296 on_destructive_change = OnDestructiveChange.IGNORE 297 on_additive_change = OnAdditiveChange.ALLOW 298 elif on_schema_change == "sync_all_columns": 299 on_destructive_change = OnDestructiveChange.ALLOW 300 on_additive_change = OnAdditiveChange.ALLOW 301 else: 302 raise ConfigError( 303 f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. " 304 "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'." 305 ) 306 307 incremental_kind_kwargs["on_destructive_change"] = ( 308 self._get_field_value("on_destructive_change") or on_destructive_change 309 ) 310 incremental_kind_kwargs["on_additive_change"] = ( 311 self._get_field_value("on_additive_change") or on_additive_change 312 ) 313 auto_restatement_cron_value = self._get_field_value("auto_restatement_cron") 314 if auto_restatement_cron_value is not None: 315 incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value 316 317 if materialization == Materialization.TABLE: 318 return FullKind() 319 if materialization == Materialization.VIEW: 320 return ViewKind() 321 if materialization == Materialization.INCREMENTAL: 322 incremental_by_kind_kwargs: t.Dict[str, t.Any] = {"dialect": self.dialect(context)} 323 forward_only_value = self._get_field_value("forward_only") 324 if forward_only_value is not None: 325 incremental_kind_kwargs["forward_only"] = forward_only_value 326 327 is_incremental_by_time_range = self.time_column or ( 328 self.incremental_strategy 329 and self.incremental_strategy in {"microbatch", "incremental_by_time_range"} 330 ) 331 # Get shared incremental by kwargs 332 for field in ("batch_size", "batch_concurrency", "lookback"): 333 field_val = self._get_field_value(field) 334 if field_val is not None: 335 # Check if `batch_size` is representing an interval unit and if so that will be handled at the model level 336 if field == "batch_size" and isinstance(field_val, str): 337 continue 338 incremental_by_kind_kwargs[field] = field_val 339 340 disable_restatement = self.disable_restatement 341 if disable_restatement is None: 342 if is_incremental_by_time_range: 343 disable_restatement = False 344 else: 345 disable_restatement = ( 346 not self.full_refresh if self.full_refresh is not None else False 347 ) 348 incremental_by_kind_kwargs["disable_restatement"] = disable_restatement 349 350 if is_incremental_by_time_range: 351 strategy = self.incremental_strategy or target.default_incremental_strategy( 352 IncrementalByTimeRangeKind 353 ) 354 355 if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES: 356 get_console().log_warning( 357 f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " 358 f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}." 359 ) 360 361 if self.time_column and strategy != "incremental_by_time_range": 362 get_console().log_warning( 363 f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. " 364 f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'." 365 ) 366 367 if strategy == "microbatch": 368 if self.time_column: 369 raise ConfigError( 370 f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead." 371 ) 372 time_column = self._get_field_value("event_time") 373 if not time_column: 374 raise ConfigError( 375 f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy." 376 ) 377 # dbt microbatch always processes batches in a size of 1 378 incremental_by_kind_kwargs["batch_size"] = 1 379 else: 380 if not self.time_column: 381 raise ConfigError( 382 f"{self.canonical_name(context)}: 'time_column' is required for incremental by time range models not defined using microbatch." 383 ) 384 time_column = self.time_column 385 386 incremental_by_time_range_kwargs = { 387 "time_column": time_column, 388 } 389 if self.auto_restatement_intervals: 390 incremental_by_time_range_kwargs["auto_restatement_intervals"] = ( 391 self.auto_restatement_intervals 392 ) 393 if self.partition_by_time_column is not None: 394 incremental_by_time_range_kwargs["partition_by_time_column"] = ( 395 self.partition_by_time_column 396 ) 397 398 return IncrementalByTimeRangeKind( 399 **incremental_kind_kwargs, 400 **incremental_by_kind_kwargs, 401 **incremental_by_time_range_kwargs, 402 ) 403 404 if self.unique_key: 405 strategy = self.incremental_strategy or target.default_incremental_strategy( 406 IncrementalByUniqueKeyKind 407 ) 408 if ( 409 self.incremental_strategy 410 and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES 411 ): 412 get_console().log_warning( 413 f"Unique key is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " 414 f"Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}. Falling back to 'merge' strategy." 415 ) 416 417 merge_filter = None 418 if self.incremental_predicates: 419 dialect = self.dialect(context) 420 merge_filter = exp.and_( 421 *[ 422 d.parse_one(predicate, dialect=dialect) 423 for predicate in self.incremental_predicates 424 ], 425 dialect=dialect, 426 ).transform(d.replace_merge_table_aliases) 427 428 return IncrementalByUniqueKeyKind( 429 unique_key=self.unique_key, 430 merge_filter=merge_filter, 431 **incremental_kind_kwargs, 432 **incremental_by_kind_kwargs, 433 ) 434 435 strategy = self.incremental_strategy or target.default_incremental_strategy( 436 IncrementalUnmanagedKind 437 ) 438 return IncrementalUnmanagedKind( 439 insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES, 440 disable_restatement=incremental_by_kind_kwargs["disable_restatement"], 441 **incremental_kind_kwargs, 442 ) 443 if materialization == Materialization.EPHEMERAL: 444 return EmbeddedKind() 445 if materialization == Materialization.SNAPSHOT: 446 if not self.snapshot_strategy: 447 raise ConfigError( 448 f"{self.canonical_name(context)}: SQLMesh snapshot strategy is required for snapshot materialization." 449 ) 450 shared_kwargs = { 451 "dialect": self.dialect(context), 452 "unique_key": self.unique_key, 453 "invalidate_hard_deletes": self.invalidate_hard_deletes, 454 "valid_from_name": "dbt_valid_from", 455 "valid_to_name": "dbt_valid_to", 456 "time_data_type": ( 457 exp.DataType.build("TIMESTAMPTZ") 458 if target.dialect == "bigquery" 459 else exp.DataType.build("TIMESTAMP") 460 ), 461 **incremental_kind_kwargs, 462 } 463 if self.snapshot_strategy.is_check: 464 return SCDType2ByColumnKind( 465 columns=self.check_cols, execution_time_as_valid_from=True, **shared_kwargs 466 ) 467 return SCDType2ByTimeKind( 468 updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs 469 ) 470 471 if materialization == Materialization.DYNAMIC_TABLE: 472 return ManagedKind() 473 474 if materialization == Materialization.CUSTOM: 475 if custom_materialization := self._get_custom_materialization(context): 476 return DbtCustomKind( 477 materialization=self.materialized, 478 adapter=custom_materialization.adapter, 479 dialect=self.dialect(context), 480 definition=custom_materialization.definition, 481 ) 482 483 raise ConfigError( 484 f"Unknown materialization '{self.materialized}'. Custom materializations must be defined in your dbt project." 485 ) 486 487 raise ConfigError(f"{materialization.value} materialization not supported.")
Get the sqlmesh ModelKind
Returns:
The sqlmesh ModelKind
538 @property 539 def sqlmesh_config_fields(self) -> t.Set[str]: 540 return super().sqlmesh_config_fields | { 541 "cron", 542 "interval_unit", 543 "allow_partials", 544 "physical_version", 545 "start", 546 # In microbatch models `begin` is the same as `start` 547 "begin", 548 }
SQLMesh config fields that can be set in dbt projects.
Returns:
A set of SQLMesh config fields that can be set in dbt projects.
550 def to_sqlmesh( 551 self, 552 context: DbtContext, 553 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 554 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 555 ) -> Model: 556 """Converts the dbt model into a SQLMesh model.""" 557 model_dialect = self.dialect(context) 558 query = d.jinja_query(self.sql) 559 kind = self.model_kind(context) 560 561 optional_kwargs: t.Dict[str, t.Any] = {} 562 physical_properties: t.Dict[str, t.Any] = {} 563 564 if self.partition_by: 565 if isinstance(kind, (ViewKind, EmbeddedKind)): 566 logger.warning( 567 "Ignoring partition_by config for model '%s'; partition_by is not supported for %s.", 568 self.name, 569 "views" if isinstance(kind, ViewKind) else "ephemeral models", 570 ) 571 elif context.target.dialect == "snowflake": 572 logger.warning( 573 "Ignoring partition_by config for model '%s' targeting %s. The partition_by config is not supported for Snowflake.", 574 self.name, 575 context.target.dialect, 576 ) 577 else: 578 partitioned_by = [] 579 if isinstance(self.partition_by, list): 580 for p in self.partition_by: 581 try: 582 partitioned_by.append(d.parse_one(p, dialect=model_dialect)) 583 except SqlglotError as e: 584 raise ConfigError( 585 f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}" 586 ) from e 587 elif isinstance(self.partition_by, dict): 588 if context.target.dialect == "bigquery": 589 partitioned_by.append(self._big_query_partition_by_expr(context)) 590 else: 591 logger.warning( 592 "Ignoring partition_by config for model '%s' targeting %s. The format of the config field is only supported for BigQuery.", 593 self.name, 594 context.target.dialect, 595 ) 596 597 if partitioned_by: 598 optional_kwargs["partitioned_by"] = partitioned_by 599 600 if self.cluster_by: 601 if isinstance(kind, (ViewKind, EmbeddedKind)): 602 logger.warning( 603 "Ignoring cluster_by config for model '%s'; cluster_by is not supported for %s.", 604 self.name, 605 "views" if isinstance(kind, ViewKind) else "ephemeral models", 606 ) 607 else: 608 clustered_by = [] 609 for c in self.cluster_by: 610 try: 611 cluster_expr = exp.maybe_parse( 612 c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect 613 ) 614 for expr in cluster_expr.expressions: 615 clustered_by.append( 616 expr.this if isinstance(expr, exp.Ordered) else expr 617 ) 618 except SqlglotError as e: 619 raise ConfigError( 620 f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}" 621 ) from e 622 optional_kwargs["clustered_by"] = clustered_by 623 624 model_kwargs = self.sqlmesh_model_kwargs(context) 625 if self.sql_header: 626 model_kwargs["pre_statements"].insert(0, d.jinja_statement(self.sql_header)) 627 628 if context.target.dialect == "bigquery": 629 dbt_max_partition_blob = self._dbt_max_partition_blob() 630 if dbt_max_partition_blob: 631 model_kwargs["pre_statements"].append(d.jinja_statement(dbt_max_partition_blob)) 632 633 if self.partition_expiration_days is not None: 634 physical_properties["partition_expiration_days"] = self.partition_expiration_days 635 if self.require_partition_filter is not None: 636 physical_properties["require_partition_filter"] = self.require_partition_filter 637 638 if physical_properties: 639 model_kwargs["physical_properties"] = physical_properties 640 641 if context.target.dialect == "snowflake": 642 if self.snowflake_warehouse is not None: 643 model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse} 644 645 if self.model_materialization == Materialization.DYNAMIC_TABLE: 646 if not self.snowflake_warehouse: 647 raise ConfigError("`snowflake_warehouse` must be set for dynamic tables") 648 if not self.target_lag: 649 raise ConfigError("`target_lag` must be set for dynamic tables") 650 651 model_kwargs["physical_properties"] = { 652 "warehouse": self.snowflake_warehouse, 653 "target_lag": self.target_lag, 654 } 655 656 if context.target.dialect == "clickhouse": 657 if self.model_materialization == Materialization.INCREMENTAL: 658 # `inserts_only` overrides incremental_strategy setting (if present) 659 # https://github.com/ClickHouse/dbt-clickhouse/blob/065f3a724fa09205446ecadac7a00d92b2d8c646/README.md?plain=1#L108 660 if self.inserts_only: 661 self.incremental_strategy = "append" 662 663 if self.incremental_strategy == "delete+insert": 664 get_console().log_warning( 665 f"The '{self.incremental_strategy}' incremental strategy is not supported - SQLMesh will use the temp table/partition swap strategy." 666 ) 667 668 if self.incremental_predicates: 669 get_console().log_warning( 670 "SQLMesh does not support 'incremental_predicates' - they will not be applied." 671 ) 672 673 if self.query_settings: 674 get_console().log_warning( 675 "SQLMesh does not support the 'query_settings' model configuration parameter. Specify the query settings directly in the model query." 676 ) 677 678 if self.engine: 679 optional_kwargs["storage_format"] = self.engine 680 681 if self.order_by: 682 order_by = [] 683 for o in self.order_by if isinstance(self.order_by, list) else [self.order_by]: 684 try: 685 order_by.append(d.parse_one(o, dialect=model_dialect)) 686 except SqlglotError as e: 687 raise ConfigError( 688 f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}" 689 ) from e 690 physical_properties["order_by"] = order_by 691 692 if self.primary_key: 693 primary_key = [] 694 for p in self.primary_key: 695 try: 696 primary_key.append(d.parse_one(p, dialect=model_dialect)) 697 except SqlglotError as e: 698 raise ConfigError( 699 f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}" 700 ) from e 701 physical_properties["primary_key"] = primary_key 702 703 if self.sharding_key: 704 get_console().log_warning( 705 "SQLMesh does not support the 'sharding_key' model configuration parameter or distributed materializations." 706 ) 707 708 if self.ttl: 709 physical_properties["ttl"] = exp.var( 710 self.ttl[0] if isinstance(self.ttl, list) else self.ttl 711 ) 712 713 if self.settings: 714 physical_properties.update({k: exp.var(v) for k, v in self.settings.items()}) 715 716 if physical_properties: 717 model_kwargs["physical_properties"] = physical_properties 718 719 kind = self.model_kind(context) 720 721 # A falsy grants config (None or {}) is considered as unmanaged per dbt semantics 722 if self.grants and kind.supports_grants: 723 model_kwargs["grants"] = self.grants 724 725 allow_partials = model_kwargs.pop("allow_partials", None) 726 if allow_partials is None: 727 # Set allow_partials to True for dbt models to preserve the original semantics. 728 allow_partials = True 729 730 # pop begin for all models so we don't pass it through for non-incremental materializations 731 # (happens if model config is microbatch but project config overrides) 732 begin = model_kwargs.pop("begin", None) 733 if kind.is_incremental: 734 if self.batch_size and isinstance(self.batch_size, str): 735 if "interval_unit" in model_kwargs: 736 get_console().log_warning( 737 f"Both 'interval_unit' and 'batch_size' are set for model '{self.canonical_name(context)}'. 'interval_unit' will be used." 738 ) 739 else: 740 model_kwargs["interval_unit"] = self.batch_size 741 self.batch_size = None 742 if begin: 743 if "start" in model_kwargs: 744 get_console().log_warning( 745 f"Both 'begin' and 'start' are set for model '{self.canonical_name(context)}'. 'start' will be used." 746 ) 747 else: 748 model_kwargs["start"] = begin 749 # If user explicitly disables concurrent batches then we want to set depends on past to true which we 750 # will do by including the model in the depends_on 751 if self.concurrent_batches is not None and self.concurrent_batches is False: 752 depends_on = model_kwargs.get("depends_on", set()) 753 depends_on.add(self.canonical_name(context)) 754 755 model_kwargs["start"] = model_kwargs.get( 756 "start", context.sqlmesh_config.model_defaults.start 757 ) 758 759 model = create_sql_model( 760 self.canonical_name(context), 761 query, 762 dialect=model_dialect, 763 kind=kind, 764 audit_definitions=audit_definitions, 765 # This ensures that we bypass query rendering that would otherwise be required to extract additional 766 # dependencies from the model's SQL. 767 # Note: any table dependencies that are not referenced using the `ref` macro will not be included. 768 extract_dependencies_from_query=False, 769 allow_partials=allow_partials, 770 virtual_environment_mode=virtual_environment_mode, 771 dbt_node_info=self.node_info, 772 **optional_kwargs, 773 **model_kwargs, 774 ) 775 return model
Converts the dbt model into a SQLMesh model.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.dbt.basemodel.BaseModelConfig
- owner
- stamp
- table_format
- storage_format
- path
- dependencies
- tests
- dialect_
- grain
- unique_id
- name
- package_name
- fqn_
- schema_
- database
- alias
- pre_hook
- post_hook
- grants
- columns
- quoting
- version
- latest_version
- table_name
- config_name
- dialect
- canonical_name
- relation_info
- tests_ref_source_dependencies
- remove_tests_with_invalid_refs
- fqn
- node_info
- sqlmesh_model_kwargs