Edit on GitHub

sqlmesh.dbt.basemodel

  1from __future__ import annotations
  2
  3import typing as t
  4from abc import abstractmethod
  5from enum import Enum
  6from pathlib import Path
  7import logging
  8
  9from pydantic import Field
 10from sqlglot.helper import ensure_list
 11
 12from sqlmesh.core import dialect as d
 13from sqlmesh.core.config.base import UpdateStrategy
 14from sqlmesh.core.config.common import VirtualEnvironmentMode
 15from sqlmesh.core.model import Model
 16from sqlmesh.core.model.common import ParsableSql
 17from sqlmesh.core.node import DbtNodeInfo
 18from sqlmesh.dbt.column import (
 19    ColumnConfig,
 20    column_descriptions_to_sqlmesh,
 21    column_types_to_sqlmesh,
 22)
 23from sqlmesh.dbt.common import (
 24    DbtConfig,
 25    Dependencies,
 26    GeneralConfig,
 27    RAW_CODE_KEY,
 28    SqlStr,
 29    sql_str_validator,
 30)
 31from sqlmesh.dbt.relation import Policy, RelationType
 32from sqlmesh.dbt.test import TestConfig
 33from sqlmesh.dbt.util import DBT_VERSION
 34from sqlmesh.utils import AttributeDict
 35from sqlmesh.utils.errors import ConfigError
 36from sqlmesh.utils.pydantic import field_validator
 37
 38if t.TYPE_CHECKING:
 39    from sqlmesh.core.audit.definition import ModelAudit
 40    from sqlmesh.dbt.context import DbtContext
 41
 42
 43BMC = t.TypeVar("BMC", bound="BaseModelConfig")
 44
 45
 46logger = logging.getLogger(__name__)
 47
 48
 49class Materialization(str, Enum):
 50    """DBT model materializations"""
 51
 52    TABLE = "table"
 53    VIEW = "view"
 54    INCREMENTAL = "incremental"
 55    EPHEMERAL = "ephemeral"
 56    SNAPSHOT = "snapshot"
 57
 58    # Snowflake, https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
 59    DYNAMIC_TABLE = "dynamic_table"
 60
 61    CUSTOM = "custom"
 62
 63    @classmethod
 64    def _missing_(cls, value):  # type: ignore
 65        return cls.CUSTOM
 66
 67
 68class SnapshotStrategy(str, Enum):
 69    """DBT snapshot strategies"""
 70
 71    TIMESTAMP = "timestamp"
 72    CHECK = "check"
 73
 74    @property
 75    def is_timestamp(self) -> bool:
 76        return self == SnapshotStrategy.TIMESTAMP
 77
 78    @property
 79    def is_check(self) -> bool:
 80        return self == SnapshotStrategy.CHECK
 81
 82
 83class Hook(DbtConfig):
 84    """
 85    Args:
 86        sql: The sql to execute.
 87        transaction: bool indicating if the hook is executed in the same transaction as the model query.
 88    """
 89
 90    sql: SqlStr
 91    transaction: bool = True
 92
 93    _sql_validator = sql_str_validator
 94
 95
 96class BaseModelConfig(GeneralConfig):
 97    """
 98    Args:
 99        owner: The owner of the model.
100        stamp: An optional arbitrary string sequence used to create new model versions without making
101            changes to any of the functional components of the definition.
102        storage_format: The storage format used to store the physical table, only applicable in certain engines.
103            (eg. 'parquet')
104        path: The file path of the model
105        dependencies: The macro, source, var, and ref dependencies used to execute the model and its hooks
106        name: Name of the model.
107        package_name: Name of the package that defines the model.
108        database: Database the model is stored in
109        schema: Custom schema name added to the model schema name
110        alias: Relation identifier for this model instead of the filename
111        pre-hook: List of SQL statements to run before the model is built.
112        post-hook: List of SQL statements to run after the model is built.
113        full_refresh: Forces the model to always do a full refresh or never do a full refresh
114        grants: Set or revoke permissions to the database object for this model
115        columns: Column information for the model
116        quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the ref() method
117    """
118
119    # sqlmesh fields
120    owner: t.Optional[str] = None
121    stamp: t.Optional[str] = None
122    table_format: t.Optional[str] = None
123    storage_format: t.Optional[str] = None
124    path: Path = Path()
125    dependencies: Dependencies = Dependencies()
126    tests: t.List[TestConfig] = []
127    dialect_: t.Optional[str] = Field(None, alias="dialect")
128    grain: t.Union[str, t.List[str]] = []
129
130    # DBT configuration fields
131    unique_id: str = ""
132    name: str = ""
133    package_name: str = ""
134    fqn_: t.List[str] = Field(default_factory=list, alias="fqn")
135    schema_: str = Field("", alias="schema")
136    database: t.Optional[str] = None
137    alias: t.Optional[str] = None
138    pre_hook: t.List[Hook] = Field([], alias="pre-hook")
139    post_hook: t.List[Hook] = Field([], alias="post-hook")
140    full_refresh: t.Optional[bool] = None
141    grants: t.Dict[str, t.List[str]] = {}
142    columns: t.Dict[str, ColumnConfig] = {}
143    quoting: t.Dict[str, t.Optional[bool]] = {}
144    event_time: t.Optional[str] = None
145
146    version: t.Optional[int] = None
147    latest_version: t.Optional[int] = None
148
149    _canonical_name: t.Optional[str] = None
150
151    @field_validator("pre_hook", "post_hook", mode="before")
152    @classmethod
153    def _validate_hooks(cls, v: t.Union[str, t.List[t.Union[SqlStr, str]]]) -> t.List[Hook]:
154        hooks = []
155        for hook in ensure_list(v):
156            if isinstance(hook, Hook):
157                hooks.append(hook)
158            elif isinstance(hook, str):
159                hooks.append(Hook(sql=hook))
160            elif isinstance(hook, dict):
161                hooks.append(Hook(**hook))
162            else:
163                raise ConfigError(f"Invalid hook data: {hook}")
164
165        return hooks
166
167    @field_validator("grants", mode="before")
168    @classmethod
169    def _validate_grants(
170        cls, v: t.Optional[t.Dict[str, str]]
171    ) -> t.Optional[t.Dict[str, t.List[str]]]:
172        if v is None:
173            return None
174        return {key: ensure_list(value) for key, value in v.items()}
175
176    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
177        **GeneralConfig._FIELD_UPDATE_STRATEGY,
178        **{
179            "grants": UpdateStrategy.KEY_EXTEND,
180            "path": UpdateStrategy.IMMUTABLE,
181            "pre-hook": UpdateStrategy.EXTEND,
182            "post-hook": UpdateStrategy.EXTEND,
183            "columns": UpdateStrategy.KEY_EXTEND,
184        },
185    }
186
187    @property
188    def table_schema(self) -> str:
189        """
190        Get the full schema name
191        """
192        return self.schema_
193
194    @property
195    def table_name(self) -> str:
196        """
197        Get the table name
198        """
199        return self.alias or self.path.stem
200
201    @property
202    def config_name(self) -> str:
203        """
204        Get the model's config name (package_name.name)
205        """
206        return f"{self.package_name}.{self.name}"
207
208    def dialect(self, context: DbtContext) -> str:
209        return self.dialect_ or context.default_dialect
210
211    def canonical_name(self, context: DbtContext) -> str:
212        """
213        Get the sqlmesh model name
214
215        Returns:
216            The sqlmesh model name
217        """
218        if not self._canonical_name:
219            relation = context.create_relation(
220                self.relation_info,
221                quote_policy=Policy(database=False, schema=False, identifier=False),
222            )
223            if relation.database == context.target.database:
224                relation = relation.include(database=False)
225            self._canonical_name = relation.render()
226        return self._canonical_name
227
228    @property
229    def model_materialization(self) -> Materialization:
230        return Materialization.TABLE
231
232    @property
233    def relation_info(self) -> AttributeDict[str, t.Any]:
234        if self.model_materialization == Materialization.VIEW:
235            relation_type = RelationType.View
236        elif self.model_materialization == Materialization.EPHEMERAL:
237            relation_type = RelationType.CTE
238        else:
239            relation_type = RelationType.Table
240
241        extras = {}
242        if DBT_VERSION >= (1, 9, 0) and self.event_time:
243            extras["event_time_filter"] = {
244                "field_name": self.event_time,
245            }
246
247        return AttributeDict(
248            {
249                "database": self.database,
250                "schema": self.table_schema,
251                "identifier": self.table_name,
252                "type": relation_type.value,
253                "quote_policy": AttributeDict(self.quoting),
254                **extras,
255            }
256        )
257
258    @property
259    def tests_ref_source_dependencies(self) -> Dependencies:
260        dependencies = Dependencies()
261        for test in self.tests:
262            dependencies = dependencies.union(test.dependencies)
263        if self.name in dependencies.refs:
264            dependencies.refs.remove(self.name)
265        dependencies.macros = []
266        return dependencies
267
268    def remove_tests_with_invalid_refs(self, context: DbtContext) -> None:
269        """
270        Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior.
271
272        Args:
273            context: The dbt context this model resides within.
274
275        Returns:
276            None
277        """
278        self.tests = [
279            test
280            for test in self.tests
281            if all(ref in context.refs for ref in test.dependencies.refs)
282            and all(source in context.sources for source in test.dependencies.sources)
283        ]
284
285    @property
286    def fqn(self) -> str:
287        return ".".join(self.fqn_)
288
289    @property
290    def sqlmesh_config_fields(self) -> t.Set[str]:
291        return {"description", "owner", "stamp", "storage_format"}
292
293    @property
294    def node_info(self) -> DbtNodeInfo:
295        return DbtNodeInfo(unique_id=self.unique_id, name=self.name, fqn=self.fqn, alias=self.alias)
296
297    def sqlmesh_model_kwargs(
298        self,
299        context: DbtContext,
300        column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None,
301    ) -> t.Dict[str, t.Any]:
302        """Get common sqlmesh model parameters"""
303        self.remove_tests_with_invalid_refs(context)
304
305        dependencies = self.dependencies.copy()
306        if dependencies.has_dynamic_var_names:
307            # Include ALL variables as dependencies since we couldn't determine
308            # precisely which variables are referenced in the model
309            dependencies.variables |= set(context.variables)
310
311        if (
312            getattr(self, "model_materialization", None) == Materialization.CUSTOM
313            and hasattr(self, "_get_custom_materialization")
314            and (custom_mat := self._get_custom_materialization(context))
315        ):
316            # include custom materialization dependencies as they might use macros
317            dependencies = dependencies.union(custom_mat.dependencies)
318
319        model_dialect = self.dialect(context)
320
321        # Only keep refs and sources that exist in the context to match dbt behavior
322        dependencies.refs.intersection_update(context.refs)
323        dependencies.sources.intersection_update(context.sources)
324        model_context = context.context_for_dependencies(
325            dependencies.union(self.tests_ref_source_dependencies)
326        )
327        jinja_macros = model_context.jinja_macros.trim(
328            dependencies.macros, package=self.package_name
329        )
330        jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies))
331
332        model_kwargs = {
333            "audits": [(test.canonical_name, {}) for test in self.tests],
334            "column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None,
335            "depends_on": {
336                model.canonical_name(context) for model in model_context.refs.values()
337            }.union(
338                {
339                    source.canonical_name(context)
340                    for source in model_context.sources.values()
341                    if source.fqn not in context.model_fqns
342                    # Allow dbt projects to reference a model as a source without causing a cycle
343                },
344            ),
345            "jinja_macros": jinja_macros,
346            "path": self.path,
347            "pre_statements": [
348                ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction)
349                for hook in self.pre_hook
350            ],
351            "post_statements": [
352                ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction)
353                for hook in self.post_hook
354            ],
355            "tags": self.tags,
356            "physical_schema_mapping": context.sqlmesh_config.physical_schema_mapping,
357            "default_catalog": context.target.database,
358            "grain": [d.parse_one(g, dialect=model_dialect) for g in ensure_list(self.grain)],
359            **self.sqlmesh_config_kwargs,
360        }
361
362        # dbt doesn't respect the data_type field for DDL statements– instead, it optionally uses
363        # it to validate the actual data types at runtime through contracts or external plugins.
364        # Only the `columns_types` config of seed models is actually respected. We don't set the
365        # columns attribute to self.columns intentionally in all other cases, as that could result
366        # in unfaithful types when models are materialized.
367        #
368        # See:
369        # - https://docs.getdbt.com/reference/resource-properties/columns
370        # - https://docs.getdbt.com/reference/resource-configs/contract
371        # - https://docs.getdbt.com/reference/resource-configs/column_types
372        if column_types_override:
373            model_kwargs["columns"] = (
374                column_types_to_sqlmesh(column_types_override, self.dialect(context)) or None
375            )
376
377        return model_kwargs
378
379    @abstractmethod
380    def to_sqlmesh(
381        self,
382        context: DbtContext,
383        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
384        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
385    ) -> Model:
386        """Convert DBT model into sqlmesh Model"""
387
388    def _model_jinja_context(
389        self, context: DbtContext, dependencies: Dependencies
390    ) -> t.Dict[str, t.Any]:
391        if context._manifest and self.unique_id in context._manifest._manifest.nodes:
392            attributes = context._manifest._manifest.nodes[self.unique_id].to_dict()
393            if dependencies.model_attrs.all_attrs:
394                model_node: AttributeDict[str, t.Any] = AttributeDict(attributes)
395            else:
396                model_node = AttributeDict(
397                    filter(lambda kv: kv[0] in dependencies.model_attrs.attrs, attributes.items())
398                )
399
400            # We exclude the raw SQL code to reduce the payload size. It's still accessible through
401            # the JinjaQuery instance stored in the resulting SQLMesh model's `query` field.
402            model_node.pop(RAW_CODE_KEY, None)
403        else:
404            model_node = AttributeDict({})
405
406        return {
407            "this": self.relation_info,
408            "model": model_node,
409            "schema": self.table_schema,
410            "config": self.config_attribute_dict,
411            **context.jinja_globals,
412        }
logger = <Logger sqlmesh.dbt.basemodel (WARNING)>
class Materialization(builtins.str, enum.Enum):
50class Materialization(str, Enum):
51    """DBT model materializations"""
52
53    TABLE = "table"
54    VIEW = "view"
55    INCREMENTAL = "incremental"
56    EPHEMERAL = "ephemeral"
57    SNAPSHOT = "snapshot"
58
59    # Snowflake, https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
60    DYNAMIC_TABLE = "dynamic_table"
61
62    CUSTOM = "custom"
63
64    @classmethod
65    def _missing_(cls, value):  # type: ignore
66        return cls.CUSTOM

DBT model materializations

TABLE = <Materialization.TABLE: 'table'>
VIEW = <Materialization.VIEW: 'view'>
INCREMENTAL = <Materialization.INCREMENTAL: 'incremental'>
EPHEMERAL = <Materialization.EPHEMERAL: 'ephemeral'>
SNAPSHOT = <Materialization.SNAPSHOT: 'snapshot'>
DYNAMIC_TABLE = <Materialization.DYNAMIC_TABLE: 'dynamic_table'>
CUSTOM = <Materialization.CUSTOM: 'custom'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class SnapshotStrategy(builtins.str, enum.Enum):
69class SnapshotStrategy(str, Enum):
70    """DBT snapshot strategies"""
71
72    TIMESTAMP = "timestamp"
73    CHECK = "check"
74
75    @property
76    def is_timestamp(self) -> bool:
77        return self == SnapshotStrategy.TIMESTAMP
78
79    @property
80    def is_check(self) -> bool:
81        return self == SnapshotStrategy.CHECK

DBT snapshot strategies

TIMESTAMP = <SnapshotStrategy.TIMESTAMP: 'timestamp'>
CHECK = <SnapshotStrategy.CHECK: 'check'>
is_timestamp: bool
75    @property
76    def is_timestamp(self) -> bool:
77        return self == SnapshotStrategy.TIMESTAMP
is_check: bool
79    @property
80    def is_check(self) -> bool:
81        return self == SnapshotStrategy.CHECK
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Hook(sqlmesh.dbt.common.DbtConfig):
84class Hook(DbtConfig):
85    """
86    Args:
87        sql: The sql to execute.
88        transaction: bool indicating if the hook is executed in the same transaction as the model query.
89    """
90
91    sql: SqlStr
92    transaction: bool = True
93
94    _sql_validator = sql_str_validator
Arguments:
  • sql: The sql to execute.
  • transaction: bool indicating if the hook is executed in the same transaction as the model query.
transaction: bool
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BaseModelConfig(sqlmesh.dbt.common.GeneralConfig):
 97class BaseModelConfig(GeneralConfig):
 98    """
 99    Args:
100        owner: The owner of the model.
101        stamp: An optional arbitrary string sequence used to create new model versions without making
102            changes to any of the functional components of the definition.
103        storage_format: The storage format used to store the physical table, only applicable in certain engines.
104            (eg. 'parquet')
105        path: The file path of the model
106        dependencies: The macro, source, var, and ref dependencies used to execute the model and its hooks
107        name: Name of the model.
108        package_name: Name of the package that defines the model.
109        database: Database the model is stored in
110        schema: Custom schema name added to the model schema name
111        alias: Relation identifier for this model instead of the filename
112        pre-hook: List of SQL statements to run before the model is built.
113        post-hook: List of SQL statements to run after the model is built.
114        full_refresh: Forces the model to always do a full refresh or never do a full refresh
115        grants: Set or revoke permissions to the database object for this model
116        columns: Column information for the model
117        quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the ref() method
118    """
119
120    # sqlmesh fields
121    owner: t.Optional[str] = None
122    stamp: t.Optional[str] = None
123    table_format: t.Optional[str] = None
124    storage_format: t.Optional[str] = None
125    path: Path = Path()
126    dependencies: Dependencies = Dependencies()
127    tests: t.List[TestConfig] = []
128    dialect_: t.Optional[str] = Field(None, alias="dialect")
129    grain: t.Union[str, t.List[str]] = []
130
131    # DBT configuration fields
132    unique_id: str = ""
133    name: str = ""
134    package_name: str = ""
135    fqn_: t.List[str] = Field(default_factory=list, alias="fqn")
136    schema_: str = Field("", alias="schema")
137    database: t.Optional[str] = None
138    alias: t.Optional[str] = None
139    pre_hook: t.List[Hook] = Field([], alias="pre-hook")
140    post_hook: t.List[Hook] = Field([], alias="post-hook")
141    full_refresh: t.Optional[bool] = None
142    grants: t.Dict[str, t.List[str]] = {}
143    columns: t.Dict[str, ColumnConfig] = {}
144    quoting: t.Dict[str, t.Optional[bool]] = {}
145    event_time: t.Optional[str] = None
146
147    version: t.Optional[int] = None
148    latest_version: t.Optional[int] = None
149
150    _canonical_name: t.Optional[str] = None
151
152    @field_validator("pre_hook", "post_hook", mode="before")
153    @classmethod
154    def _validate_hooks(cls, v: t.Union[str, t.List[t.Union[SqlStr, str]]]) -> t.List[Hook]:
155        hooks = []
156        for hook in ensure_list(v):
157            if isinstance(hook, Hook):
158                hooks.append(hook)
159            elif isinstance(hook, str):
160                hooks.append(Hook(sql=hook))
161            elif isinstance(hook, dict):
162                hooks.append(Hook(**hook))
163            else:
164                raise ConfigError(f"Invalid hook data: {hook}")
165
166        return hooks
167
168    @field_validator("grants", mode="before")
169    @classmethod
170    def _validate_grants(
171        cls, v: t.Optional[t.Dict[str, str]]
172    ) -> t.Optional[t.Dict[str, t.List[str]]]:
173        if v is None:
174            return None
175        return {key: ensure_list(value) for key, value in v.items()}
176
177    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
178        **GeneralConfig._FIELD_UPDATE_STRATEGY,
179        **{
180            "grants": UpdateStrategy.KEY_EXTEND,
181            "path": UpdateStrategy.IMMUTABLE,
182            "pre-hook": UpdateStrategy.EXTEND,
183            "post-hook": UpdateStrategy.EXTEND,
184            "columns": UpdateStrategy.KEY_EXTEND,
185        },
186    }
187
188    @property
189    def table_schema(self) -> str:
190        """
191        Get the full schema name
192        """
193        return self.schema_
194
195    @property
196    def table_name(self) -> str:
197        """
198        Get the table name
199        """
200        return self.alias or self.path.stem
201
202    @property
203    def config_name(self) -> str:
204        """
205        Get the model's config name (package_name.name)
206        """
207        return f"{self.package_name}.{self.name}"
208
209    def dialect(self, context: DbtContext) -> str:
210        return self.dialect_ or context.default_dialect
211
212    def canonical_name(self, context: DbtContext) -> str:
213        """
214        Get the sqlmesh model name
215
216        Returns:
217            The sqlmesh model name
218        """
219        if not self._canonical_name:
220            relation = context.create_relation(
221                self.relation_info,
222                quote_policy=Policy(database=False, schema=False, identifier=False),
223            )
224            if relation.database == context.target.database:
225                relation = relation.include(database=False)
226            self._canonical_name = relation.render()
227        return self._canonical_name
228
229    @property
230    def model_materialization(self) -> Materialization:
231        return Materialization.TABLE
232
233    @property
234    def relation_info(self) -> AttributeDict[str, t.Any]:
235        if self.model_materialization == Materialization.VIEW:
236            relation_type = RelationType.View
237        elif self.model_materialization == Materialization.EPHEMERAL:
238            relation_type = RelationType.CTE
239        else:
240            relation_type = RelationType.Table
241
242        extras = {}
243        if DBT_VERSION >= (1, 9, 0) and self.event_time:
244            extras["event_time_filter"] = {
245                "field_name": self.event_time,
246            }
247
248        return AttributeDict(
249            {
250                "database": self.database,
251                "schema": self.table_schema,
252                "identifier": self.table_name,
253                "type": relation_type.value,
254                "quote_policy": AttributeDict(self.quoting),
255                **extras,
256            }
257        )
258
259    @property
260    def tests_ref_source_dependencies(self) -> Dependencies:
261        dependencies = Dependencies()
262        for test in self.tests:
263            dependencies = dependencies.union(test.dependencies)
264        if self.name in dependencies.refs:
265            dependencies.refs.remove(self.name)
266        dependencies.macros = []
267        return dependencies
268
269    def remove_tests_with_invalid_refs(self, context: DbtContext) -> None:
270        """
271        Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior.
272
273        Args:
274            context: The dbt context this model resides within.
275
276        Returns:
277            None
278        """
279        self.tests = [
280            test
281            for test in self.tests
282            if all(ref in context.refs for ref in test.dependencies.refs)
283            and all(source in context.sources for source in test.dependencies.sources)
284        ]
285
286    @property
287    def fqn(self) -> str:
288        return ".".join(self.fqn_)
289
290    @property
291    def sqlmesh_config_fields(self) -> t.Set[str]:
292        return {"description", "owner", "stamp", "storage_format"}
293
294    @property
295    def node_info(self) -> DbtNodeInfo:
296        return DbtNodeInfo(unique_id=self.unique_id, name=self.name, fqn=self.fqn, alias=self.alias)
297
298    def sqlmesh_model_kwargs(
299        self,
300        context: DbtContext,
301        column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None,
302    ) -> t.Dict[str, t.Any]:
303        """Get common sqlmesh model parameters"""
304        self.remove_tests_with_invalid_refs(context)
305
306        dependencies = self.dependencies.copy()
307        if dependencies.has_dynamic_var_names:
308            # Include ALL variables as dependencies since we couldn't determine
309            # precisely which variables are referenced in the model
310            dependencies.variables |= set(context.variables)
311
312        if (
313            getattr(self, "model_materialization", None) == Materialization.CUSTOM
314            and hasattr(self, "_get_custom_materialization")
315            and (custom_mat := self._get_custom_materialization(context))
316        ):
317            # include custom materialization dependencies as they might use macros
318            dependencies = dependencies.union(custom_mat.dependencies)
319
320        model_dialect = self.dialect(context)
321
322        # Only keep refs and sources that exist in the context to match dbt behavior
323        dependencies.refs.intersection_update(context.refs)
324        dependencies.sources.intersection_update(context.sources)
325        model_context = context.context_for_dependencies(
326            dependencies.union(self.tests_ref_source_dependencies)
327        )
328        jinja_macros = model_context.jinja_macros.trim(
329            dependencies.macros, package=self.package_name
330        )
331        jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies))
332
333        model_kwargs = {
334            "audits": [(test.canonical_name, {}) for test in self.tests],
335            "column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None,
336            "depends_on": {
337                model.canonical_name(context) for model in model_context.refs.values()
338            }.union(
339                {
340                    source.canonical_name(context)
341                    for source in model_context.sources.values()
342                    if source.fqn not in context.model_fqns
343                    # Allow dbt projects to reference a model as a source without causing a cycle
344                },
345            ),
346            "jinja_macros": jinja_macros,
347            "path": self.path,
348            "pre_statements": [
349                ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction)
350                for hook in self.pre_hook
351            ],
352            "post_statements": [
353                ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction)
354                for hook in self.post_hook
355            ],
356            "tags": self.tags,
357            "physical_schema_mapping": context.sqlmesh_config.physical_schema_mapping,
358            "default_catalog": context.target.database,
359            "grain": [d.parse_one(g, dialect=model_dialect) for g in ensure_list(self.grain)],
360            **self.sqlmesh_config_kwargs,
361        }
362
363        # dbt doesn't respect the data_type field for DDL statements– instead, it optionally uses
364        # it to validate the actual data types at runtime through contracts or external plugins.
365        # Only the `columns_types` config of seed models is actually respected. We don't set the
366        # columns attribute to self.columns intentionally in all other cases, as that could result
367        # in unfaithful types when models are materialized.
368        #
369        # See:
370        # - https://docs.getdbt.com/reference/resource-properties/columns
371        # - https://docs.getdbt.com/reference/resource-configs/contract
372        # - https://docs.getdbt.com/reference/resource-configs/column_types
373        if column_types_override:
374            model_kwargs["columns"] = (
375                column_types_to_sqlmesh(column_types_override, self.dialect(context)) or None
376            )
377
378        return model_kwargs
379
380    @abstractmethod
381    def to_sqlmesh(
382        self,
383        context: DbtContext,
384        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
385        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
386    ) -> Model:
387        """Convert DBT model into sqlmesh Model"""
388
389    def _model_jinja_context(
390        self, context: DbtContext, dependencies: Dependencies
391    ) -> t.Dict[str, t.Any]:
392        if context._manifest and self.unique_id in context._manifest._manifest.nodes:
393            attributes = context._manifest._manifest.nodes[self.unique_id].to_dict()
394            if dependencies.model_attrs.all_attrs:
395                model_node: AttributeDict[str, t.Any] = AttributeDict(attributes)
396            else:
397                model_node = AttributeDict(
398                    filter(lambda kv: kv[0] in dependencies.model_attrs.attrs, attributes.items())
399                )
400
401            # We exclude the raw SQL code to reduce the payload size. It's still accessible through
402            # the JinjaQuery instance stored in the resulting SQLMesh model's `query` field.
403            model_node.pop(RAW_CODE_KEY, None)
404        else:
405            model_node = AttributeDict({})
406
407        return {
408            "this": self.relation_info,
409            "model": model_node,
410            "schema": self.table_schema,
411            "config": self.config_attribute_dict,
412            **context.jinja_globals,
413        }
Arguments:
  • owner: The owner of the model.
  • stamp: An optional arbitrary string sequence used to create new model versions without making changes to any of the functional components of the definition.
  • storage_format: The storage format used to store the physical table, only applicable in certain engines. (eg. 'parquet')
  • path: The file path of the model
  • dependencies: The macro, source, var, and ref dependencies used to execute the model and its hooks
  • name: Name of the model.
  • package_name: Name of the package that defines the model.
  • database: Database the model is stored in
  • schema: Custom schema name added to the model schema name
  • alias: Relation identifier for this model instead of the filename
  • pre-hook: List of SQL statements to run before the model is built.
  • post-hook: List of SQL statements to run after the model is built.
  • full_refresh: Forces the model to always do a full refresh or never do a full refresh
  • grants: Set or revoke permissions to the database object for this model
  • columns: Column information for the model
  • quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the ref() method
owner: Optional[str]
stamp: Optional[str]
table_format: Optional[str]
storage_format: Optional[str]
path: pathlib.Path
dialect_: Optional[str]
grain: Union[str, List[str]]
unique_id: str
name: str
package_name: str
fqn_: List[str]
schema_: str
database: Optional[str]
alias: Optional[str]
pre_hook: List[Hook]
post_hook: List[Hook]
full_refresh: Optional[bool]
grants: Dict[str, List[str]]
columns: Dict[str, sqlmesh.dbt.column.ColumnConfig]
quoting: Dict[str, Optional[bool]]
event_time: Optional[str]
version: Optional[int]
latest_version: Optional[int]
table_schema: str
188    @property
189    def table_schema(self) -> str:
190        """
191        Get the full schema name
192        """
193        return self.schema_

Get the full schema name

table_name: str
195    @property
196    def table_name(self) -> str:
197        """
198        Get the table name
199        """
200        return self.alias or self.path.stem

Get the table name

config_name: str
202    @property
203    def config_name(self) -> str:
204        """
205        Get the model's config name (package_name.name)
206        """
207        return f"{self.package_name}.{self.name}"

Get the model's config name (package_name.name)

def dialect(self, context: sqlmesh.dbt.context.DbtContext) -> str:
209    def dialect(self, context: DbtContext) -> str:
210        return self.dialect_ or context.default_dialect
def canonical_name(self, context: sqlmesh.dbt.context.DbtContext) -> str:
212    def canonical_name(self, context: DbtContext) -> str:
213        """
214        Get the sqlmesh model name
215
216        Returns:
217            The sqlmesh model name
218        """
219        if not self._canonical_name:
220            relation = context.create_relation(
221                self.relation_info,
222                quote_policy=Policy(database=False, schema=False, identifier=False),
223            )
224            if relation.database == context.target.database:
225                relation = relation.include(database=False)
226            self._canonical_name = relation.render()
227        return self._canonical_name

Get the sqlmesh model name

Returns:

The sqlmesh model name

model_materialization: Materialization
229    @property
230    def model_materialization(self) -> Materialization:
231        return Materialization.TABLE
relation_info: sqlmesh.utils.AttributeDict[str, typing.Any]
233    @property
234    def relation_info(self) -> AttributeDict[str, t.Any]:
235        if self.model_materialization == Materialization.VIEW:
236            relation_type = RelationType.View
237        elif self.model_materialization == Materialization.EPHEMERAL:
238            relation_type = RelationType.CTE
239        else:
240            relation_type = RelationType.Table
241
242        extras = {}
243        if DBT_VERSION >= (1, 9, 0) and self.event_time:
244            extras["event_time_filter"] = {
245                "field_name": self.event_time,
246            }
247
248        return AttributeDict(
249            {
250                "database": self.database,
251                "schema": self.table_schema,
252                "identifier": self.table_name,
253                "type": relation_type.value,
254                "quote_policy": AttributeDict(self.quoting),
255                **extras,
256            }
257        )
tests_ref_source_dependencies: sqlmesh.dbt.common.Dependencies
259    @property
260    def tests_ref_source_dependencies(self) -> Dependencies:
261        dependencies = Dependencies()
262        for test in self.tests:
263            dependencies = dependencies.union(test.dependencies)
264        if self.name in dependencies.refs:
265            dependencies.refs.remove(self.name)
266        dependencies.macros = []
267        return dependencies
def remove_tests_with_invalid_refs(self, context: sqlmesh.dbt.context.DbtContext) -> None:
269    def remove_tests_with_invalid_refs(self, context: DbtContext) -> None:
270        """
271        Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior.
272
273        Args:
274            context: The dbt context this model resides within.
275
276        Returns:
277            None
278        """
279        self.tests = [
280            test
281            for test in self.tests
282            if all(ref in context.refs for ref in test.dependencies.refs)
283            and all(source in context.sources for source in test.dependencies.sources)
284        ]

Removes tests that reference models or sources that do not exist in the context in order to match dbt behavior.

Arguments:
  • context: The dbt context this model resides within.
Returns:

None

fqn: str
286    @property
287    def fqn(self) -> str:
288        return ".".join(self.fqn_)
sqlmesh_config_fields: Set[str]
290    @property
291    def sqlmesh_config_fields(self) -> t.Set[str]:
292        return {"description", "owner", "stamp", "storage_format"}

SQLMesh config fields that can be set in dbt projects.

Returns:

A set of SQLMesh config fields that can be set in dbt projects.

node_info: sqlmesh.core.node.DbtNodeInfo
294    @property
295    def node_info(self) -> DbtNodeInfo:
296        return DbtNodeInfo(unique_id=self.unique_id, name=self.name, fqn=self.fqn, alias=self.alias)
def sqlmesh_model_kwargs( self, context: sqlmesh.dbt.context.DbtContext, column_types_override: Optional[Dict[str, sqlmesh.dbt.column.ColumnConfig]] = None) -> Dict[str, Any]:
298    def sqlmesh_model_kwargs(
299        self,
300        context: DbtContext,
301        column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None,
302    ) -> t.Dict[str, t.Any]:
303        """Get common sqlmesh model parameters"""
304        self.remove_tests_with_invalid_refs(context)
305
306        dependencies = self.dependencies.copy()
307        if dependencies.has_dynamic_var_names:
308            # Include ALL variables as dependencies since we couldn't determine
309            # precisely which variables are referenced in the model
310            dependencies.variables |= set(context.variables)
311
312        if (
313            getattr(self, "model_materialization", None) == Materialization.CUSTOM
314            and hasattr(self, "_get_custom_materialization")
315            and (custom_mat := self._get_custom_materialization(context))
316        ):
317            # include custom materialization dependencies as they might use macros
318            dependencies = dependencies.union(custom_mat.dependencies)
319
320        model_dialect = self.dialect(context)
321
322        # Only keep refs and sources that exist in the context to match dbt behavior
323        dependencies.refs.intersection_update(context.refs)
324        dependencies.sources.intersection_update(context.sources)
325        model_context = context.context_for_dependencies(
326            dependencies.union(self.tests_ref_source_dependencies)
327        )
328        jinja_macros = model_context.jinja_macros.trim(
329            dependencies.macros, package=self.package_name
330        )
331        jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies))
332
333        model_kwargs = {
334            "audits": [(test.canonical_name, {}) for test in self.tests],
335            "column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None,
336            "depends_on": {
337                model.canonical_name(context) for model in model_context.refs.values()
338            }.union(
339                {
340                    source.canonical_name(context)
341                    for source in model_context.sources.values()
342                    if source.fqn not in context.model_fqns
343                    # Allow dbt projects to reference a model as a source without causing a cycle
344                },
345            ),
346            "jinja_macros": jinja_macros,
347            "path": self.path,
348            "pre_statements": [
349                ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction)
350                for hook in self.pre_hook
351            ],
352            "post_statements": [
353                ParsableSql(sql=d.jinja_statement(hook.sql).sql(), transaction=hook.transaction)
354                for hook in self.post_hook
355            ],
356            "tags": self.tags,
357            "physical_schema_mapping": context.sqlmesh_config.physical_schema_mapping,
358            "default_catalog": context.target.database,
359            "grain": [d.parse_one(g, dialect=model_dialect) for g in ensure_list(self.grain)],
360            **self.sqlmesh_config_kwargs,
361        }
362
363        # dbt doesn't respect the data_type field for DDL statements– instead, it optionally uses
364        # it to validate the actual data types at runtime through contracts or external plugins.
365        # Only the `columns_types` config of seed models is actually respected. We don't set the
366        # columns attribute to self.columns intentionally in all other cases, as that could result
367        # in unfaithful types when models are materialized.
368        #
369        # See:
370        # - https://docs.getdbt.com/reference/resource-properties/columns
371        # - https://docs.getdbt.com/reference/resource-configs/contract
372        # - https://docs.getdbt.com/reference/resource-configs/column_types
373        if column_types_override:
374            model_kwargs["columns"] = (
375                column_types_to_sqlmesh(column_types_override, self.dialect(context)) or None
376            )
377
378        return model_kwargs

Get common sqlmesh model parameters

380    @abstractmethod
381    def to_sqlmesh(
382        self,
383        context: DbtContext,
384        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
385        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
386    ) -> Model:
387        """Convert DBT model into sqlmesh Model"""

Convert DBT model into sqlmesh Model

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.dbt.common.GeneralConfig
start
description
enabled
docs
persist_docs
tags
meta
config_attribute_dict
replace
sqlmesh_config_kwargs
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields