Edit on GitHub

sqlmesh.dbt.source

  1from __future__ import annotations
  2
  3import typing as t
  4
  5from pydantic import Field
  6
  7from sqlmesh.core.config.base import UpdateStrategy
  8from sqlmesh.dbt.column import ColumnConfig
  9from sqlmesh.dbt.common import GeneralConfig
 10from sqlmesh.dbt.relation import RelationType
 11from sqlmesh.dbt.util import DBT_VERSION
 12from sqlmesh.utils import AttributeDict
 13from sqlmesh.utils.errors import ConfigError
 14
 15if t.TYPE_CHECKING:
 16    from sqlmesh.dbt.context import DbtContext
 17
 18
 19class SourceConfig(GeneralConfig):
 20    """
 21    Args:
 22        name: The name of the table
 23        source_name: The name of the source that defines the table
 24        database: Name of the database where the table is stored. By default, the project's target database is used.
 25        schema: The scehma name as stored in the database. If not specified, the source name is used.
 26        identifier: The table name as stored in the database. If not specified, the source table name is used
 27        loader: Describes the tool that loads the source into the warehouse
 28        overrides: Override a source defined in the specified package
 29        freshness: Dictionary specifying maximum time, since the most recent record, to consider the source fresh
 30        loaded_at_field: Column name or expression that returns a timestamp indicating freshness
 31        quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the source() method
 32        external: Dictionary of metadata properties specific to sources that point to external tables
 33        columns: Columns within the source
 34    """
 35
 36    # DBT configuration fields
 37    name: str = ""
 38    source_name_: str = Field("", alias="source_name")
 39    fqn_: t.List[str] = Field(default_factory=list, alias="fqn")
 40    database: t.Optional[str] = None
 41    schema_: t.Optional[str] = Field(None, alias="schema")
 42    identifier: t.Optional[str] = None
 43    loader: t.Optional[str] = None
 44    overrides: t.Optional[str] = None
 45    freshness: t.Optional[t.Dict[str, t.Any]] = {}
 46    loaded_at_field: t.Optional[str] = None
 47    quoting: t.Dict[str, t.Optional[bool]] = {}
 48    external: t.Optional[t.Dict[str, t.Any]] = {}
 49    source_meta: t.Optional[t.Dict[str, t.Any]] = {}
 50    columns: t.Dict[str, ColumnConfig] = {}
 51    event_time: t.Optional[str] = None
 52
 53    _canonical_name: t.Optional[str] = None
 54
 55    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
 56        **GeneralConfig._FIELD_UPDATE_STRATEGY,
 57        **{"columns": UpdateStrategy.KEY_EXTEND},
 58    }
 59
 60    @property
 61    def table_name(self) -> t.Optional[str]:
 62        return self.identifier or self.name
 63
 64    @property
 65    def config_name(self) -> str:
 66        return f"{self.source_name_}.{self.name}"
 67
 68    @property
 69    def fqn(self) -> str:
 70        return ".".join(self.fqn_)
 71
 72    def canonical_name(self, context: DbtContext) -> str:
 73        if self._canonical_name is None:
 74            source = context.get_callable_macro("source")
 75            if not source:
 76                raise ConfigError("'source' macro not found.")
 77
 78            try:
 79                relation = source(self.source_name_, self.name)
 80            except Exception as e:
 81                raise ConfigError(
 82                    f"'source' macro failed for '{self.config_name}' with exception '{e}'."
 83                )
 84
 85            relation = relation.quote(
 86                database=False,
 87                schema=False,
 88                identifier=False,
 89            )
 90            if relation.database == context.target.database:
 91                relation = relation.include(database=False)
 92            self._canonical_name = relation.render()
 93        return self._canonical_name
 94
 95    @property
 96    def relation_info(self) -> AttributeDict:
 97        extras = {}
 98        external_location = (
 99            self.source_meta.get("external_location", None) if self.source_meta else None
100        )
101        if external_location:
102            extras["external"] = external_location.replace("{name}", self.table_name)
103
104        if DBT_VERSION >= (1, 9, 0) and self.event_time:
105            extras["event_time_filter"] = {
106                "field_name": self.event_time,
107            }
108
109        return AttributeDict(
110            {
111                "database": self.database,
112                "schema": self.schema_,
113                "identifier": self.table_name,
114                "type": RelationType.External.value,
115                "quote_policy": AttributeDict(self.quoting),
116                **extras,
117            }
118        )
class SourceConfig(sqlmesh.dbt.common.GeneralConfig):
 20class SourceConfig(GeneralConfig):
 21    """
 22    Args:
 23        name: The name of the table
 24        source_name: The name of the source that defines the table
 25        database: Name of the database where the table is stored. By default, the project's target database is used.
 26        schema: The scehma name as stored in the database. If not specified, the source name is used.
 27        identifier: The table name as stored in the database. If not specified, the source table name is used
 28        loader: Describes the tool that loads the source into the warehouse
 29        overrides: Override a source defined in the specified package
 30        freshness: Dictionary specifying maximum time, since the most recent record, to consider the source fresh
 31        loaded_at_field: Column name or expression that returns a timestamp indicating freshness
 32        quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the source() method
 33        external: Dictionary of metadata properties specific to sources that point to external tables
 34        columns: Columns within the source
 35    """
 36
 37    # DBT configuration fields
 38    name: str = ""
 39    source_name_: str = Field("", alias="source_name")
 40    fqn_: t.List[str] = Field(default_factory=list, alias="fqn")
 41    database: t.Optional[str] = None
 42    schema_: t.Optional[str] = Field(None, alias="schema")
 43    identifier: t.Optional[str] = None
 44    loader: t.Optional[str] = None
 45    overrides: t.Optional[str] = None
 46    freshness: t.Optional[t.Dict[str, t.Any]] = {}
 47    loaded_at_field: t.Optional[str] = None
 48    quoting: t.Dict[str, t.Optional[bool]] = {}
 49    external: t.Optional[t.Dict[str, t.Any]] = {}
 50    source_meta: t.Optional[t.Dict[str, t.Any]] = {}
 51    columns: t.Dict[str, ColumnConfig] = {}
 52    event_time: t.Optional[str] = None
 53
 54    _canonical_name: t.Optional[str] = None
 55
 56    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
 57        **GeneralConfig._FIELD_UPDATE_STRATEGY,
 58        **{"columns": UpdateStrategy.KEY_EXTEND},
 59    }
 60
 61    @property
 62    def table_name(self) -> t.Optional[str]:
 63        return self.identifier or self.name
 64
 65    @property
 66    def config_name(self) -> str:
 67        return f"{self.source_name_}.{self.name}"
 68
 69    @property
 70    def fqn(self) -> str:
 71        return ".".join(self.fqn_)
 72
 73    def canonical_name(self, context: DbtContext) -> str:
 74        if self._canonical_name is None:
 75            source = context.get_callable_macro("source")
 76            if not source:
 77                raise ConfigError("'source' macro not found.")
 78
 79            try:
 80                relation = source(self.source_name_, self.name)
 81            except Exception as e:
 82                raise ConfigError(
 83                    f"'source' macro failed for '{self.config_name}' with exception '{e}'."
 84                )
 85
 86            relation = relation.quote(
 87                database=False,
 88                schema=False,
 89                identifier=False,
 90            )
 91            if relation.database == context.target.database:
 92                relation = relation.include(database=False)
 93            self._canonical_name = relation.render()
 94        return self._canonical_name
 95
 96    @property
 97    def relation_info(self) -> AttributeDict:
 98        extras = {}
 99        external_location = (
100            self.source_meta.get("external_location", None) if self.source_meta else None
101        )
102        if external_location:
103            extras["external"] = external_location.replace("{name}", self.table_name)
104
105        if DBT_VERSION >= (1, 9, 0) and self.event_time:
106            extras["event_time_filter"] = {
107                "field_name": self.event_time,
108            }
109
110        return AttributeDict(
111            {
112                "database": self.database,
113                "schema": self.schema_,
114                "identifier": self.table_name,
115                "type": RelationType.External.value,
116                "quote_policy": AttributeDict(self.quoting),
117                **extras,
118            }
119        )
Arguments:
  • name: The name of the table
  • source_name: The name of the source that defines the table
  • database: Name of the database where the table is stored. By default, the project's target database is used.
  • schema: The scehma name as stored in the database. If not specified, the source name is used.
  • identifier: The table name as stored in the database. If not specified, the source table name is used
  • loader: Describes the tool that loads the source into the warehouse
  • overrides: Override a source defined in the specified package
  • freshness: Dictionary specifying maximum time, since the most recent record, to consider the source fresh
  • loaded_at_field: Column name or expression that returns a timestamp indicating freshness
  • quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the source() method
  • external: Dictionary of metadata properties specific to sources that point to external tables
  • columns: Columns within the source
name: str
source_name_: str
fqn_: List[str]
database: Optional[str]
schema_: Optional[str]
identifier: Optional[str]
loader: Optional[str]
overrides: Optional[str]
freshness: Optional[Dict[str, Any]]
loaded_at_field: Optional[str]
quoting: Dict[str, Optional[bool]]
external: Optional[Dict[str, Any]]
source_meta: Optional[Dict[str, Any]]
columns: Dict[str, sqlmesh.dbt.column.ColumnConfig]
event_time: Optional[str]
table_name: Optional[str]
61    @property
62    def table_name(self) -> t.Optional[str]:
63        return self.identifier or self.name
config_name: str
65    @property
66    def config_name(self) -> str:
67        return f"{self.source_name_}.{self.name}"
fqn: str
69    @property
70    def fqn(self) -> str:
71        return ".".join(self.fqn_)
def canonical_name(self, context: sqlmesh.dbt.context.DbtContext) -> str:
73    def canonical_name(self, context: DbtContext) -> str:
74        if self._canonical_name is None:
75            source = context.get_callable_macro("source")
76            if not source:
77                raise ConfigError("'source' macro not found.")
78
79            try:
80                relation = source(self.source_name_, self.name)
81            except Exception as e:
82                raise ConfigError(
83                    f"'source' macro failed for '{self.config_name}' with exception '{e}'."
84                )
85
86            relation = relation.quote(
87                database=False,
88                schema=False,
89                identifier=False,
90            )
91            if relation.database == context.target.database:
92                relation = relation.include(database=False)
93            self._canonical_name = relation.render()
94        return self._canonical_name
relation_info: sqlmesh.utils.AttributeDict
 96    @property
 97    def relation_info(self) -> AttributeDict:
 98        extras = {}
 99        external_location = (
100            self.source_meta.get("external_location", None) if self.source_meta else None
101        )
102        if external_location:
103            extras["external"] = external_location.replace("{name}", self.table_name)
104
105        if DBT_VERSION >= (1, 9, 0) and self.event_time:
106            extras["event_time_filter"] = {
107                "field_name": self.event_time,
108            }
109
110        return AttributeDict(
111            {
112                "database": self.database,
113                "schema": self.schema_,
114                "identifier": self.table_name,
115                "type": RelationType.External.value,
116                "quote_policy": AttributeDict(self.quoting),
117                **extras,
118            }
119        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.dbt.common.GeneralConfig
start
description
enabled
docs
persist_docs
tags
meta
config_attribute_dict
replace
sqlmesh_config_kwargs
sqlmesh_config_fields
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields