sqlmesh.dbt.basemodel
1from __future__ import annotations 2 3import typing as t 4from abc import abstractmethod 5from enum import Enum 6from pathlib import Path 7import logging 8 9from pydantic import Field 10from sqlglot.helper import ensure_list 11 12from sqlmesh.core import dialect as d 13from sqlmesh.core.config.base import UpdateStrategy 14from sqlmesh.core.config.common import VirtualEnvironmentMode 15from sqlmesh.core.model import Model 16from sqlmesh.core.model.common import ParsableSql 17from sqlmesh.core.node import DbtNodeInfo 18from sqlmesh.dbt.column import ( 19 ColumnConfig, 20 column_descriptions_to_sqlmesh, 21 column_types_to_sqlmesh, 22) 23from sqlmesh.dbt.common import ( 24 DbtConfig, 25 Dependencies, 26 GeneralConfig, 27 RAW_CODE_KEY, 28 SqlStr, 29 sql_str_validator, 30) 31from sqlmesh.dbt.relation import Policy, RelationType 32from sqlmesh.dbt.test import TestConfig 33from sqlmesh.dbt.util import DBT_VERSION 34from sqlmesh.utils import AttributeDict 35from sqlmesh.utils.errors import ConfigError 36from sqlmesh.utils.pydantic import field_validator 37 38if t.TYPE_CHECKING: 39 from sqlmesh.core.audit.definition import ModelAudit 40 from sqlmesh.dbt.context import DbtContext 41 42 43BMC = t.TypeVar("BMC", bound="BaseModelConfig") 44 45 46logger = logging.getLogger(__name__) 47 48 49class Materialization(str, Enum): 50 """DBT model materializations""" 51 52 TABLE = "table" 53 VIEW = "view" 54 INCREMENTAL = "incremental" 55 EPHEMERAL = "ephemeral" 56 SNAPSHOT = "snapshot" 57 58 # Snowflake, https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables 59 DYNAMIC_TABLE = "dynamic_table" 60 61 CUSTOM = "custom" 62 63 @classmethod 64 def _missing_(cls, value): # type: ignore 65 return cls.CUSTOM 66 67 68class SnapshotStrategy(str, Enum): 69 """DBT snapshot strategies""" 70 71 TIMESTAMP = "timestamp" 72 CHECK = "check" 73 74 @property 75 def is_timestamp(self) -> bool: 76 return self == SnapshotStrategy.TIMESTAMP 77 78 @property 79 def is_check(self) -> bool: 80 return self == SnapshotStrategy.CHECK 81 82 83class Hook(DbtConfig): 84 """ 85 Args: 86 sql: The sql to execute. 87 transaction: bool indicating if the hook is executed in the same transaction as the model query. 88 """ 89 90 sql: SqlStr 91 transaction: bool = True 92 93 _sql_validator = sql_str_validator 94 95 96class BaseModelConfig(GeneralConfig): 97 """ 98 Args: 99 owner: The owner of the model. 100 stamp: An optional arbitrary string sequence used to create new model versions without making 101 changes to any of the functional components of the definition. 102 storage_format: The storage format used to store the physical table, only applicable in certain engines. 103 (eg. 'parquet') 104 path: The file path of the model 105 dependencies: The macro, source, var, and ref dependencies used to execute the model and its hooks 106 name: Name of the model. 107 package_name: Name of the package that defines the model. 108 database: Database the model is stored in 109 schema: Custom schema name added to the model schema name 110 alias: Relation identifier for this model instead of the filename 111 pre-hook: List of SQL statements to run before the model is built. 112 post-hook: List of SQL statements to run after the model is built. 113 full_refresh: Forces the model to always do a full refresh or never do a full refresh 114 grants: Set or revoke permissions to the database object for this model 115 columns: Column information for the model 116 quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the ref() method 117 """ 118 119 # sqlmesh fields 120 owner: t.Optional[str] = None 121 stamp: t.Optional[str] = None 122 table_format: t.Optional[str] = None 123 storage_format: t.Optional[str] = None 124 path: Path = Path() 125 dependencies: Dependencies = Dependencies() 126 tests: t.List[TestConfig] = [] 127 dialect_: t.Optional[str] = Field(None, alias="dialect") 128 grain: t.Union[str, t.List[str]] = [] 129 130 # DBT configuration fields 131 unique_id: str = "" 132 name: str = "" 133 package_name: str = "" 134 fqn_: t.List[str] = Field(default_factory=list, alias="fqn") 135 schema_: str = Field("", alias="schema") 136 database: t.Optional[str] = None 137 alias: t.Optional[str] = None 138 pre_hook: t.List[Hook] = Field([], alias="pre-hook") 139 post_hook: t.List[Hook] = Field([], alias="post-hook") 140 full_refresh: t.Optional[bool] = None 141 grants: t.Dict[str, t.List[str]] = {} 142 columns: t.Dict[str, ColumnConfig] = {} 143 quoting: t.Dict[str, t.Optional[bool]] = {} 144 event_time: t.Optional[str] = None 145 146 version: t.Optional[int] = None 147 latest_version: t.Optional[int] = None 148 149 _canonical_name: t.Optional[str] = None 150 151 @field_validator("pre_hook", "post_hook", mode="before") 152 @classmethod 153 def _validate_hooks(cls, v: t.Union[str, t.List[t.Union[SqlStr, str]]]) -> t.List[Hook]: 154 hooks = [] 155 for hook in ensure_list(v): 156 if isinstance(hook, Hook): 157 hooks.append(hook) 158 elif isinstance(hook, str): 159 hooks.append(Hook(sql=hook)) 160 elif isinstance(hook, dict): 161 hooks.append(Hook(**hook)) 162 else: 163 raise ConfigError(f"Invalid hook data: {hook}") 164 165 return hooks 166 167 @field_validator("grants", mode="before") 168 @classmethod 169 def _validate_grants( 170 cls, v: t.Optional[t.Dict[str, str]] 171 ) -> t.Optional[t.Dict[str, t.List[str]]]: 172 if v is None: 173 return None 174 return {key: ensure_list(value) for key, value in v.items()} 175 176 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 177 **GeneralConfig._FIELD_UPDATE_STRATEGY, 178 **{ 179 "grants": UpdateStrategy.KEY_EXTEND, 180 "path": UpdateStrategy.IMMUTABLE, 181 "pre-hook": UpdateStrategy.EXTEND, 182 "post-hook": UpdateStrategy.EXTEND, 183 "columns": UpdateStrategy.KEY_EXTEND, 184 }, 185 } 186 187 @property 188 def table_schema(self) -> str: 189 """ 190 Get the full schema name 191 """ 192 return self.schema_ 193 194 @property 195 def table_name(self) -> str: 196 """ 197 Get the table name 198 """ 199 return self.alias or self.path.stem 200 201 @property 202 def config_name(self) -> str: 203 """ 204 Get the model's config name (package_name.name) 205 """ 206 return f"{self.package_name}.{self.name}" 207 208 def dialect(self, context: DbtContext) -> str: 209 return self.dialect_ or context.default_dialect 210 211 def canonical_name(self, context: DbtContext) -> str: 212 """ 213 Get the sqlmesh model name 214 215 Returns: 216 The sqlmesh model name 217 """ 218 if not self._canonical_name: 219 relation = context.create_relation( 220 self.relation_info, 221 quote_policy=Policy(database=False, schema=False, identifier=False), 222 ) 223 if relation.database == context.target.database: 224 relation = relation.include(database=False) 225 self._canonical_name = relation.render() 226 return self._canonical_name 227 228 @property 229 def model_materialization(self) -> Materialization: 230 return Materialization.TABLE 231 232 @property 233 def relation_info(self) -> AttributeDict[str, t.Any]: 234 if self.model_materialization == Materialization.VIEW: 235 relation_type = RelationType.View 236 elif self.model_materialization == Materialization.EPHEMERAL: 237 relation_type = RelationType.CTE 238 else: 239 relation_type = RelationType.Table 240 241 extras = {} 242 if DBT_VERSION >= (1, 9, 0) and self.event_time: 243 extras["event_time_filter"] = { 244 "field_name": self.event_time, 245 } 246 247 return AttributeDict( 248 { 249 "database": self.database, 250 "schema": self.table_schema, 251 "identifier": self.table_name, 252 "type": relation_type.value, 253 "quote_policy": AttributeDict(self.quoting), 254 **extras, 255 } 256 ) 257 258 @property 259 def tests_ref_source_dependencies(self) -> Dependencies: 260 dependencies = Dependencies() 261 for test in self.tests: 262 dependencies = dependencies.union(test.dependencies) 263 if self.name in dependencies.refs: 264 dependencies.refs.remove(self.name) 265 dependencies.macros = [] 266 return dependencies 267 268 def remove_tests_with_invalid_refs(self, context: DbtContext) -> None: 269 """ 270 Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior. 271 272 Args: 273 context: The dbt context this model resides within. 274 275 Returns: 276 None 277 """ 278 self.tests = [ 279 test 280 for test in self.tests 281 if all(ref in context.refs for ref in test.dependencies.refs) 282 and all(source in context.sources for source in test.dependencies.sources) 283 ] 284 285 @property 286 def fqn(self) -> str: 287 return ".".join(self.fqn_) 288 289 @property 290 def sqlmesh_config_fields(self) -> t.Set[str]: 291 return {"description", "owner", "stamp", "storage_format"} 292 293 @property 294 def node_info(self) -> DbtNodeInfo: 295 return DbtNodeInfo(unique_id=self.unique_id, name=self.name, fqn=self.fqn, alias=self.alias) 296 297 def sqlmesh_model_kwargs( 298 self, 299 context: DbtContext, 300 column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None, 301 ) -> t.Dict[str, t.Any]: 302 """Get common sqlmesh model parameters""" 303 self.remove_tests_with_invalid_refs(context) 304 305 dependencies = self.dependencies.copy() 306 if dependencies.has_dynamic_var_names: 307 # Include ALL variables as dependencies since we couldn't determine 308 # precisely which variables are referenced in the model 309 dependencies.variables |= set(context.variables) 310 311 if ( 312 getattr(self, "model_materialization", None) == Materialization.CUSTOM 313 and hasattr(self, "_get_custom_materialization") 314 and (custom_mat := self._get_custom_materialization(context)) 315 ): 316 # include custom materialization dependencies as they might use macros 317 dependencies = dependencies.union(custom_mat.dependencies) 318 319 model_dialect = self.dialect(context) 320 321 # Only keep refs and sources that exist in the context to match dbt behavior 322 dependencies.refs.intersection_update(context.refs) 323 dependencies.sources.intersection_update(context.sources) 324 model_context = context.context_for_dependencies( 325 dependencies.union(self.tests_ref_source_dependencies) 326 ) 327 jinja_macros = model_context.jinja_macros.trim( 328 dependencies.macros, package=self.package_name 329 ) 330 jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies)) 331 332 model_kwargs = { 333 "audits": [(test.canonical_name, {}) for test in self.tests], 334 "column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None, 335 "depends_on": { 336 model.canonical_name(context) for model in model_context.refs.values() 337 }.union( 338 { 339 source.canonical_name(context) 340 for source in model_context.sources.values() 341 if source.fqn not in context.model_fqns 342 # Allow dbt projects to reference a model as a source without causing a cycle 343 }, 344 ), 345 "jinja_macros": jinja_macros, 346 "path": self.path, 347 "pre_statements": [ 348 ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction) 349 for hook in self.pre_hook 350 ], 351 "post_statements": [ 352 ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction) 353 for hook in self.post_hook 354 ], 355 "tags": self.tags, 356 "physical_schema_mapping": context.sqlmesh_config.physical_schema_mapping, 357 "default_catalog": context.target.database, 358 "grain": [d.parse_one(g, dialect=model_dialect) for g in ensure_list(self.grain)], 359 **self.sqlmesh_config_kwargs, 360 } 361 362 # dbt doesn't respect the data_type field for DDL statements– instead, it optionally uses 363 # it to validate the actual data types at runtime through contracts or external plugins. 364 # Only the `columns_types` config of seed models is actually respected. We don't set the 365 # columns attribute to self.columns intentionally in all other cases, as that could result 366 # in unfaithful types when models are materialized. 367 # 368 # See: 369 # - https://docs.getdbt.com/reference/resource-properties/columns 370 # - https://docs.getdbt.com/reference/resource-configs/contract 371 # - https://docs.getdbt.com/reference/resource-configs/column_types 372 if column_types_override: 373 model_kwargs["columns"] = ( 374 column_types_to_sqlmesh(column_types_override, self.dialect(context)) or None 375 ) 376 377 return model_kwargs 378 379 @abstractmethod 380 def to_sqlmesh( 381 self, 382 context: DbtContext, 383 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 384 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 385 ) -> Model: 386 """Convert DBT model into sqlmesh Model""" 387 388 def _model_jinja_context( 389 self, context: DbtContext, dependencies: Dependencies 390 ) -> t.Dict[str, t.Any]: 391 if context._manifest and self.unique_id in context._manifest._manifest.nodes: 392 attributes = context._manifest._manifest.nodes[self.unique_id].to_dict() 393 if dependencies.model_attrs.all_attrs: 394 model_node: AttributeDict[str, t.Any] = AttributeDict(attributes) 395 else: 396 model_node = AttributeDict( 397 filter(lambda kv: kv[0] in dependencies.model_attrs.attrs, attributes.items()) 398 ) 399 400 # We exclude the raw SQL code to reduce the payload size. It's still accessible through 401 # the JinjaQuery instance stored in the resulting SQLMesh model's `query` field. 402 model_node.pop(RAW_CODE_KEY, None) 403 else: 404 model_node = AttributeDict({}) 405 406 return { 407 "this": self.relation_info, 408 "model": model_node, 409 "schema": self.table_schema, 410 "config": self.config_attribute_dict, 411 **context.jinja_globals, 412 }
50class Materialization(str, Enum): 51 """DBT model materializations""" 52 53 TABLE = "table" 54 VIEW = "view" 55 INCREMENTAL = "incremental" 56 EPHEMERAL = "ephemeral" 57 SNAPSHOT = "snapshot" 58 59 # Snowflake, https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables 60 DYNAMIC_TABLE = "dynamic_table" 61 62 CUSTOM = "custom" 63 64 @classmethod 65 def _missing_(cls, value): # type: ignore 66 return cls.CUSTOM
DBT model materializations
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
69class SnapshotStrategy(str, Enum): 70 """DBT snapshot strategies""" 71 72 TIMESTAMP = "timestamp" 73 CHECK = "check" 74 75 @property 76 def is_timestamp(self) -> bool: 77 return self == SnapshotStrategy.TIMESTAMP 78 79 @property 80 def is_check(self) -> bool: 81 return self == SnapshotStrategy.CHECK
DBT snapshot strategies
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
84class Hook(DbtConfig): 85 """ 86 Args: 87 sql: The sql to execute. 88 transaction: bool indicating if the hook is executed in the same transaction as the model query. 89 """ 90 91 sql: SqlStr 92 transaction: bool = True 93 94 _sql_validator = sql_str_validator
Arguments:
- sql: The sql to execute.
- transaction: bool indicating if the hook is executed in the same transaction as the model query.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
97class BaseModelConfig(GeneralConfig): 98 """ 99 Args: 100 owner: The owner of the model. 101 stamp: An optional arbitrary string sequence used to create new model versions without making 102 changes to any of the functional components of the definition. 103 storage_format: The storage format used to store the physical table, only applicable in certain engines. 104 (eg. 'parquet') 105 path: The file path of the model 106 dependencies: The macro, source, var, and ref dependencies used to execute the model and its hooks 107 name: Name of the model. 108 package_name: Name of the package that defines the model. 109 database: Database the model is stored in 110 schema: Custom schema name added to the model schema name 111 alias: Relation identifier for this model instead of the filename 112 pre-hook: List of SQL statements to run before the model is built. 113 post-hook: List of SQL statements to run after the model is built. 114 full_refresh: Forces the model to always do a full refresh or never do a full refresh 115 grants: Set or revoke permissions to the database object for this model 116 columns: Column information for the model 117 quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the ref() method 118 """ 119 120 # sqlmesh fields 121 owner: t.Optional[str] = None 122 stamp: t.Optional[str] = None 123 table_format: t.Optional[str] = None 124 storage_format: t.Optional[str] = None 125 path: Path = Path() 126 dependencies: Dependencies = Dependencies() 127 tests: t.List[TestConfig] = [] 128 dialect_: t.Optional[str] = Field(None, alias="dialect") 129 grain: t.Union[str, t.List[str]] = [] 130 131 # DBT configuration fields 132 unique_id: str = "" 133 name: str = "" 134 package_name: str = "" 135 fqn_: t.List[str] = Field(default_factory=list, alias="fqn") 136 schema_: str = Field("", alias="schema") 137 database: t.Optional[str] = None 138 alias: t.Optional[str] = None 139 pre_hook: t.List[Hook] = Field([], alias="pre-hook") 140 post_hook: t.List[Hook] = Field([], alias="post-hook") 141 full_refresh: t.Optional[bool] = None 142 grants: t.Dict[str, t.List[str]] = {} 143 columns: t.Dict[str, ColumnConfig] = {} 144 quoting: t.Dict[str, t.Optional[bool]] = {} 145 event_time: t.Optional[str] = None 146 147 version: t.Optional[int] = None 148 latest_version: t.Optional[int] = None 149 150 _canonical_name: t.Optional[str] = None 151 152 @field_validator("pre_hook", "post_hook", mode="before") 153 @classmethod 154 def _validate_hooks(cls, v: t.Union[str, t.List[t.Union[SqlStr, str]]]) -> t.List[Hook]: 155 hooks = [] 156 for hook in ensure_list(v): 157 if isinstance(hook, Hook): 158 hooks.append(hook) 159 elif isinstance(hook, str): 160 hooks.append(Hook(sql=hook)) 161 elif isinstance(hook, dict): 162 hooks.append(Hook(**hook)) 163 else: 164 raise ConfigError(f"Invalid hook data: {hook}") 165 166 return hooks 167 168 @field_validator("grants", mode="before") 169 @classmethod 170 def _validate_grants( 171 cls, v: t.Optional[t.Dict[str, str]] 172 ) -> t.Optional[t.Dict[str, t.List[str]]]: 173 if v is None: 174 return None 175 return {key: ensure_list(value) for key, value in v.items()} 176 177 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 178 **GeneralConfig._FIELD_UPDATE_STRATEGY, 179 **{ 180 "grants": UpdateStrategy.KEY_EXTEND, 181 "path": UpdateStrategy.IMMUTABLE, 182 "pre-hook": UpdateStrategy.EXTEND, 183 "post-hook": UpdateStrategy.EXTEND, 184 "columns": UpdateStrategy.KEY_EXTEND, 185 }, 186 } 187 188 @property 189 def table_schema(self) -> str: 190 """ 191 Get the full schema name 192 """ 193 return self.schema_ 194 195 @property 196 def table_name(self) -> str: 197 """ 198 Get the table name 199 """ 200 return self.alias or self.path.stem 201 202 @property 203 def config_name(self) -> str: 204 """ 205 Get the model's config name (package_name.name) 206 """ 207 return f"{self.package_name}.{self.name}" 208 209 def dialect(self, context: DbtContext) -> str: 210 return self.dialect_ or context.default_dialect 211 212 def canonical_name(self, context: DbtContext) -> str: 213 """ 214 Get the sqlmesh model name 215 216 Returns: 217 The sqlmesh model name 218 """ 219 if not self._canonical_name: 220 relation = context.create_relation( 221 self.relation_info, 222 quote_policy=Policy(database=False, schema=False, identifier=False), 223 ) 224 if relation.database == context.target.database: 225 relation = relation.include(database=False) 226 self._canonical_name = relation.render() 227 return self._canonical_name 228 229 @property 230 def model_materialization(self) -> Materialization: 231 return Materialization.TABLE 232 233 @property 234 def relation_info(self) -> AttributeDict[str, t.Any]: 235 if self.model_materialization == Materialization.VIEW: 236 relation_type = RelationType.View 237 elif self.model_materialization == Materialization.EPHEMERAL: 238 relation_type = RelationType.CTE 239 else: 240 relation_type = RelationType.Table 241 242 extras = {} 243 if DBT_VERSION >= (1, 9, 0) and self.event_time: 244 extras["event_time_filter"] = { 245 "field_name": self.event_time, 246 } 247 248 return AttributeDict( 249 { 250 "database": self.database, 251 "schema": self.table_schema, 252 "identifier": self.table_name, 253 "type": relation_type.value, 254 "quote_policy": AttributeDict(self.quoting), 255 **extras, 256 } 257 ) 258 259 @property 260 def tests_ref_source_dependencies(self) -> Dependencies: 261 dependencies = Dependencies() 262 for test in self.tests: 263 dependencies = dependencies.union(test.dependencies) 264 if self.name in dependencies.refs: 265 dependencies.refs.remove(self.name) 266 dependencies.macros = [] 267 return dependencies 268 269 def remove_tests_with_invalid_refs(self, context: DbtContext) -> None: 270 """ 271 Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior. 272 273 Args: 274 context: The dbt context this model resides within. 275 276 Returns: 277 None 278 """ 279 self.tests = [ 280 test 281 for test in self.tests 282 if all(ref in context.refs for ref in test.dependencies.refs) 283 and all(source in context.sources for source in test.dependencies.sources) 284 ] 285 286 @property 287 def fqn(self) -> str: 288 return ".".join(self.fqn_) 289 290 @property 291 def sqlmesh_config_fields(self) -> t.Set[str]: 292 return {"description", "owner", "stamp", "storage_format"} 293 294 @property 295 def node_info(self) -> DbtNodeInfo: 296 return DbtNodeInfo(unique_id=self.unique_id, name=self.name, fqn=self.fqn, alias=self.alias) 297 298 def sqlmesh_model_kwargs( 299 self, 300 context: DbtContext, 301 column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None, 302 ) -> t.Dict[str, t.Any]: 303 """Get common sqlmesh model parameters""" 304 self.remove_tests_with_invalid_refs(context) 305 306 dependencies = self.dependencies.copy() 307 if dependencies.has_dynamic_var_names: 308 # Include ALL variables as dependencies since we couldn't determine 309 # precisely which variables are referenced in the model 310 dependencies.variables |= set(context.variables) 311 312 if ( 313 getattr(self, "model_materialization", None) == Materialization.CUSTOM 314 and hasattr(self, "_get_custom_materialization") 315 and (custom_mat := self._get_custom_materialization(context)) 316 ): 317 # include custom materialization dependencies as they might use macros 318 dependencies = dependencies.union(custom_mat.dependencies) 319 320 model_dialect = self.dialect(context) 321 322 # Only keep refs and sources that exist in the context to match dbt behavior 323 dependencies.refs.intersection_update(context.refs) 324 dependencies.sources.intersection_update(context.sources) 325 model_context = context.context_for_dependencies( 326 dependencies.union(self.tests_ref_source_dependencies) 327 ) 328 jinja_macros = model_context.jinja_macros.trim( 329 dependencies.macros, package=self.package_name 330 ) 331 jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies)) 332 333 model_kwargs = { 334 "audits": [(test.canonical_name, {}) for test in self.tests], 335 "column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None, 336 "depends_on": { 337 model.canonical_name(context) for model in model_context.refs.values() 338 }.union( 339 { 340 source.canonical_name(context) 341 for source in model_context.sources.values() 342 if source.fqn not in context.model_fqns 343 # Allow dbt projects to reference a model as a source without causing a cycle 344 }, 345 ), 346 "jinja_macros": jinja_macros, 347 "path": self.path, 348 "pre_statements": [ 349 ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction) 350 for hook in self.pre_hook 351 ], 352 "post_statements": [ 353 ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction) 354 for hook in self.post_hook 355 ], 356 "tags": self.tags, 357 "physical_schema_mapping": context.sqlmesh_config.physical_schema_mapping, 358 "default_catalog": context.target.database, 359 "grain": [d.parse_one(g, dialect=model_dialect) for g in ensure_list(self.grain)], 360 **self.sqlmesh_config_kwargs, 361 } 362 363 # dbt doesn't respect the data_type field for DDL statements– instead, it optionally uses 364 # it to validate the actual data types at runtime through contracts or external plugins. 365 # Only the `columns_types` config of seed models is actually respected. We don't set the 366 # columns attribute to self.columns intentionally in all other cases, as that could result 367 # in unfaithful types when models are materialized. 368 # 369 # See: 370 # - https://docs.getdbt.com/reference/resource-properties/columns 371 # - https://docs.getdbt.com/reference/resource-configs/contract 372 # - https://docs.getdbt.com/reference/resource-configs/column_types 373 if column_types_override: 374 model_kwargs["columns"] = ( 375 column_types_to_sqlmesh(column_types_override, self.dialect(context)) or None 376 ) 377 378 return model_kwargs 379 380 @abstractmethod 381 def to_sqlmesh( 382 self, 383 context: DbtContext, 384 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 385 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 386 ) -> Model: 387 """Convert DBT model into sqlmesh Model""" 388 389 def _model_jinja_context( 390 self, context: DbtContext, dependencies: Dependencies 391 ) -> t.Dict[str, t.Any]: 392 if context._manifest and self.unique_id in context._manifest._manifest.nodes: 393 attributes = context._manifest._manifest.nodes[self.unique_id].to_dict() 394 if dependencies.model_attrs.all_attrs: 395 model_node: AttributeDict[str, t.Any] = AttributeDict(attributes) 396 else: 397 model_node = AttributeDict( 398 filter(lambda kv: kv[0] in dependencies.model_attrs.attrs, attributes.items()) 399 ) 400 401 # We exclude the raw SQL code to reduce the payload size. It's still accessible through 402 # the JinjaQuery instance stored in the resulting SQLMesh model's `query` field. 403 model_node.pop(RAW_CODE_KEY, None) 404 else: 405 model_node = AttributeDict({}) 406 407 return { 408 "this": self.relation_info, 409 "model": model_node, 410 "schema": self.table_schema, 411 "config": self.config_attribute_dict, 412 **context.jinja_globals, 413 }
Arguments:
- owner: The owner of the model.
- stamp: An optional arbitrary string sequence used to create new model versions without making changes to any of the functional components of the definition.
- storage_format: The storage format used to store the physical table, only applicable in certain engines. (eg. 'parquet')
- path: The file path of the model
- dependencies: The macro, source, var, and ref dependencies used to execute the model and its hooks
- name: Name of the model.
- package_name: Name of the package that defines the model.
- database: Database the model is stored in
- schema: Custom schema name added to the model schema name
- alias: Relation identifier for this model instead of the filename
- pre-hook: List of SQL statements to run before the model is built.
- post-hook: List of SQL statements to run after the model is built.
- full_refresh: Forces the model to always do a full refresh or never do a full refresh
- grants: Set or revoke permissions to the database object for this model
- columns: Column information for the model
- quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the ref() method
188 @property 189 def table_schema(self) -> str: 190 """ 191 Get the full schema name 192 """ 193 return self.schema_
Get the full schema name
195 @property 196 def table_name(self) -> str: 197 """ 198 Get the table name 199 """ 200 return self.alias or self.path.stem
Get the table name
202 @property 203 def config_name(self) -> str: 204 """ 205 Get the model's config name (package_name.name) 206 """ 207 return f"{self.package_name}.{self.name}"
Get the model's config name (package_name.name)
212 def canonical_name(self, context: DbtContext) -> str: 213 """ 214 Get the sqlmesh model name 215 216 Returns: 217 The sqlmesh model name 218 """ 219 if not self._canonical_name: 220 relation = context.create_relation( 221 self.relation_info, 222 quote_policy=Policy(database=False, schema=False, identifier=False), 223 ) 224 if relation.database == context.target.database: 225 relation = relation.include(database=False) 226 self._canonical_name = relation.render() 227 return self._canonical_name
Get the sqlmesh model name
Returns:
The sqlmesh model name
233 @property 234 def relation_info(self) -> AttributeDict[str, t.Any]: 235 if self.model_materialization == Materialization.VIEW: 236 relation_type = RelationType.View 237 elif self.model_materialization == Materialization.EPHEMERAL: 238 relation_type = RelationType.CTE 239 else: 240 relation_type = RelationType.Table 241 242 extras = {} 243 if DBT_VERSION >= (1, 9, 0) and self.event_time: 244 extras["event_time_filter"] = { 245 "field_name": self.event_time, 246 } 247 248 return AttributeDict( 249 { 250 "database": self.database, 251 "schema": self.table_schema, 252 "identifier": self.table_name, 253 "type": relation_type.value, 254 "quote_policy": AttributeDict(self.quoting), 255 **extras, 256 } 257 )
259 @property 260 def tests_ref_source_dependencies(self) -> Dependencies: 261 dependencies = Dependencies() 262 for test in self.tests: 263 dependencies = dependencies.union(test.dependencies) 264 if self.name in dependencies.refs: 265 dependencies.refs.remove(self.name) 266 dependencies.macros = [] 267 return dependencies
269 def remove_tests_with_invalid_refs(self, context: DbtContext) -> None: 270 """ 271 Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior. 272 273 Args: 274 context: The dbt context this model resides within. 275 276 Returns: 277 None 278 """ 279 self.tests = [ 280 test 281 for test in self.tests 282 if all(ref in context.refs for ref in test.dependencies.refs) 283 and all(source in context.sources for source in test.dependencies.sources) 284 ]
Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior.
Arguments:
- context: The dbt context this model resides within.
Returns:
None
290 @property 291 def sqlmesh_config_fields(self) -> t.Set[str]: 292 return {"description", "owner", "stamp", "storage_format"}
SQLMesh config fields that can be set in dbt projects.
Returns:
A set of SQLMesh config fields that can be set in dbt projects.
298 def sqlmesh_model_kwargs( 299 self, 300 context: DbtContext, 301 column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None, 302 ) -> t.Dict[str, t.Any]: 303 """Get common sqlmesh model parameters""" 304 self.remove_tests_with_invalid_refs(context) 305 306 dependencies = self.dependencies.copy() 307 if dependencies.has_dynamic_var_names: 308 # Include ALL variables as dependencies since we couldn't determine 309 # precisely which variables are referenced in the model 310 dependencies.variables |= set(context.variables) 311 312 if ( 313 getattr(self, "model_materialization", None) == Materialization.CUSTOM 314 and hasattr(self, "_get_custom_materialization") 315 and (custom_mat := self._get_custom_materialization(context)) 316 ): 317 # include custom materialization dependencies as they might use macros 318 dependencies = dependencies.union(custom_mat.dependencies) 319 320 model_dialect = self.dialect(context) 321 322 # Only keep refs and sources that exist in the context to match dbt behavior 323 dependencies.refs.intersection_update(context.refs) 324 dependencies.sources.intersection_update(context.sources) 325 model_context = context.context_for_dependencies( 326 dependencies.union(self.tests_ref_source_dependencies) 327 ) 328 jinja_macros = model_context.jinja_macros.trim( 329 dependencies.macros, package=self.package_name 330 ) 331 jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies)) 332 333 model_kwargs = { 334 "audits": [(test.canonical_name, {}) for test in self.tests], 335 "column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None, 336 "depends_on": { 337 model.canonical_name(context) for model in model_context.refs.values() 338 }.union( 339 { 340 source.canonical_name(context) 341 for source in model_context.sources.values() 342 if source.fqn not in context.model_fqns 343 # Allow dbt projects to reference a model as a source without causing a cycle 344 }, 345 ), 346 "jinja_macros": jinja_macros, 347 "path": self.path, 348 "pre_statements": [ 349 ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction) 350 for hook in self.pre_hook 351 ], 352 "post_statements": [ 353 ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction) 354 for hook in self.post_hook 355 ], 356 "tags": self.tags, 357 "physical_schema_mapping": context.sqlmesh_config.physical_schema_mapping, 358 "default_catalog": context.target.database, 359 "grain": [d.parse_one(g, dialect=model_dialect) for g in ensure_list(self.grain)], 360 **self.sqlmesh_config_kwargs, 361 } 362 363 # dbt doesn't respect the data_type field for DDL statements– instead, it optionally uses 364 # it to validate the actual data types at runtime through contracts or external plugins. 365 # Only the `columns_types` config of seed models is actually respected. We don't set the 366 # columns attribute to self.columns intentionally in all other cases, as that could result 367 # in unfaithful types when models are materialized. 368 # 369 # See: 370 # - https://docs.getdbt.com/reference/resource-properties/columns 371 # - https://docs.getdbt.com/reference/resource-configs/contract 372 # - https://docs.getdbt.com/reference/resource-configs/column_types 373 if column_types_override: 374 model_kwargs["columns"] = ( 375 column_types_to_sqlmesh(column_types_override, self.dialect(context)) or None 376 ) 377 378 return model_kwargs
Get common sqlmesh model parameters
380 @abstractmethod 381 def to_sqlmesh( 382 self, 383 context: DbtContext, 384 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 385 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 386 ) -> Model: 387 """Convert DBT model into sqlmesh Model"""
Convert DBT model into sqlmesh Model
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs