sqlmesh.dbt.source
1from __future__ import annotations 2 3import typing as t 4 5from pydantic import Field 6 7from sqlmesh.core.config.base import UpdateStrategy 8from sqlmesh.dbt.column import ColumnConfig 9from sqlmesh.dbt.common import GeneralConfig 10from sqlmesh.dbt.relation import RelationType 11from sqlmesh.dbt.util import DBT_VERSION 12from sqlmesh.utils import AttributeDict 13from sqlmesh.utils.errors import ConfigError 14 15if t.TYPE_CHECKING: 16 from sqlmesh.dbt.context import DbtContext 17 18 19class SourceConfig(GeneralConfig): 20 """ 21 Args: 22 name: The name of the table 23 source_name: The name of the source that defines the table 24 database: Name of the database where the table is stored. By default, the project's target database is used. 25 schema: The scehma name as stored in the database. If not specified, the source name is used. 26 identifier: The table name as stored in the database. If not specified, the source table name is used 27 loader: Describes the tool that loads the source into the warehouse 28 overrides: Override a source defined in the specified package 29 freshness: Dictionary specifying maximum time, since the most recent record, to consider the source fresh 30 loaded_at_field: Column name or expression that returns a timestamp indicating freshness 31 quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the source() method 32 external: Dictionary of metadata properties specific to sources that point to external tables 33 columns: Columns within the source 34 """ 35 36 # DBT configuration fields 37 name: str = "" 38 source_name_: str = Field("", alias="source_name") 39 fqn_: t.List[str] = Field(default_factory=list, alias="fqn") 40 database: t.Optional[str] = None 41 schema_: t.Optional[str] = Field(None, alias="schema") 42 identifier: t.Optional[str] = None 43 loader: t.Optional[str] = None 44 overrides: t.Optional[str] = None 45 freshness: t.Optional[t.Dict[str, t.Any]] = {} 46 loaded_at_field: t.Optional[str] = None 47 quoting: t.Dict[str, t.Optional[bool]] = {} 48 external: t.Optional[t.Dict[str, t.Any]] = {} 49 source_meta: t.Optional[t.Dict[str, t.Any]] = {} 50 columns: t.Dict[str, ColumnConfig] = {} 51 event_time: t.Optional[str] = None 52 53 _canonical_name: t.Optional[str] = None 54 55 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 56 **GeneralConfig._FIELD_UPDATE_STRATEGY, 57 **{"columns": UpdateStrategy.KEY_EXTEND}, 58 } 59 60 @property 61 def table_name(self) -> t.Optional[str]: 62 return self.identifier or self.name 63 64 @property 65 def config_name(self) -> str: 66 return f"{self.source_name_}.{self.name}" 67 68 @property 69 def fqn(self) -> str: 70 return ".".join(self.fqn_) 71 72 def canonical_name(self, context: DbtContext) -> str: 73 if self._canonical_name is None: 74 source = context.get_callable_macro("source") 75 if not source: 76 raise ConfigError("'source' macro not found.") 77 78 try: 79 relation = source(self.source_name_, self.name) 80 except Exception as e: 81 raise ConfigError( 82 f"'source' macro failed for '{self.config_name}' with exception '{e}'." 83 ) 84 85 relation = relation.quote( 86 database=False, 87 schema=False, 88 identifier=False, 89 ) 90 if relation.database == context.target.database: 91 relation = relation.include(database=False) 92 self._canonical_name = relation.render() 93 return self._canonical_name 94 95 @property 96 def relation_info(self) -> AttributeDict: 97 extras = {} 98 external_location = ( 99 self.source_meta.get("external_location", None) if self.source_meta else None 100 ) 101 if external_location: 102 extras["external"] = external_location.replace("{name}", self.table_name) 103 104 if DBT_VERSION >= (1, 9, 0) and self.event_time: 105 extras["event_time_filter"] = { 106 "field_name": self.event_time, 107 } 108 109 return AttributeDict( 110 { 111 "database": self.database, 112 "schema": self.schema_, 113 "identifier": self.table_name, 114 "type": RelationType.External.value, 115 "quote_policy": AttributeDict(self.quoting), 116 **extras, 117 } 118 )
20class SourceConfig(GeneralConfig): 21 """ 22 Args: 23 name: The name of the table 24 source_name: The name of the source that defines the table 25 database: Name of the database where the table is stored. By default, the project's target database is used. 26 schema: The scehma name as stored in the database. If not specified, the source name is used. 27 identifier: The table name as stored in the database. If not specified, the source table name is used 28 loader: Describes the tool that loads the source into the warehouse 29 overrides: Override a source defined in the specified package 30 freshness: Dictionary specifying maximum time, since the most recent record, to consider the source fresh 31 loaded_at_field: Column name or expression that returns a timestamp indicating freshness 32 quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the source() method 33 external: Dictionary of metadata properties specific to sources that point to external tables 34 columns: Columns within the source 35 """ 36 37 # DBT configuration fields 38 name: str = "" 39 source_name_: str = Field("", alias="source_name") 40 fqn_: t.List[str] = Field(default_factory=list, alias="fqn") 41 database: t.Optional[str] = None 42 schema_: t.Optional[str] = Field(None, alias="schema") 43 identifier: t.Optional[str] = None 44 loader: t.Optional[str] = None 45 overrides: t.Optional[str] = None 46 freshness: t.Optional[t.Dict[str, t.Any]] = {} 47 loaded_at_field: t.Optional[str] = None 48 quoting: t.Dict[str, t.Optional[bool]] = {} 49 external: t.Optional[t.Dict[str, t.Any]] = {} 50 source_meta: t.Optional[t.Dict[str, t.Any]] = {} 51 columns: t.Dict[str, ColumnConfig] = {} 52 event_time: t.Optional[str] = None 53 54 _canonical_name: t.Optional[str] = None 55 56 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 57 **GeneralConfig._FIELD_UPDATE_STRATEGY, 58 **{"columns": UpdateStrategy.KEY_EXTEND}, 59 } 60 61 @property 62 def table_name(self) -> t.Optional[str]: 63 return self.identifier or self.name 64 65 @property 66 def config_name(self) -> str: 67 return f"{self.source_name_}.{self.name}" 68 69 @property 70 def fqn(self) -> str: 71 return ".".join(self.fqn_) 72 73 def canonical_name(self, context: DbtContext) -> str: 74 if self._canonical_name is None: 75 source = context.get_callable_macro("source") 76 if not source: 77 raise ConfigError("'source' macro not found.") 78 79 try: 80 relation = source(self.source_name_, self.name) 81 except Exception as e: 82 raise ConfigError( 83 f"'source' macro failed for '{self.config_name}' with exception '{e}'." 84 ) 85 86 relation = relation.quote( 87 database=False, 88 schema=False, 89 identifier=False, 90 ) 91 if relation.database == context.target.database: 92 relation = relation.include(database=False) 93 self._canonical_name = relation.render() 94 return self._canonical_name 95 96 @property 97 def relation_info(self) -> AttributeDict: 98 extras = {} 99 external_location = ( 100 self.source_meta.get("external_location", None) if self.source_meta else None 101 ) 102 if external_location: 103 extras["external"] = external_location.replace("{name}", self.table_name) 104 105 if DBT_VERSION >= (1, 9, 0) and self.event_time: 106 extras["event_time_filter"] = { 107 "field_name": self.event_time, 108 } 109 110 return AttributeDict( 111 { 112 "database": self.database, 113 "schema": self.schema_, 114 "identifier": self.table_name, 115 "type": RelationType.External.value, 116 "quote_policy": AttributeDict(self.quoting), 117 **extras, 118 } 119 )
Arguments:
- name: The name of the table
- source_name: The name of the source that defines the table
- database: Name of the database where the table is stored. By default, the project's target database is used.
- schema: The scehma name as stored in the database. If not specified, the source name is used.
- identifier: The table name as stored in the database. If not specified, the source table name is used
- loader: Describes the tool that loads the source into the warehouse
- overrides: Override a source defined in the specified package
- freshness: Dictionary specifying maximum time, since the most recent record, to consider the source fresh
- loaded_at_field: Column name or expression that returns a timestamp indicating freshness
- quoting: Define which components of the qualified name (database, schema, identifier) to quote when resolving the source() method
- external: Dictionary of metadata properties specific to sources that point to external tables
- columns: Columns within the source
columns: Dict[str, sqlmesh.dbt.column.ColumnConfig]
73 def canonical_name(self, context: DbtContext) -> str: 74 if self._canonical_name is None: 75 source = context.get_callable_macro("source") 76 if not source: 77 raise ConfigError("'source' macro not found.") 78 79 try: 80 relation = source(self.source_name_, self.name) 81 except Exception as e: 82 raise ConfigError( 83 f"'source' macro failed for '{self.config_name}' with exception '{e}'." 84 ) 85 86 relation = relation.quote( 87 database=False, 88 schema=False, 89 identifier=False, 90 ) 91 if relation.database == context.target.database: 92 relation = relation.include(database=False) 93 self._canonical_name = relation.render() 94 return self._canonical_name
relation_info: sqlmesh.utils.AttributeDict
96 @property 97 def relation_info(self) -> AttributeDict: 98 extras = {} 99 external_location = ( 100 self.source_meta.get("external_location", None) if self.source_meta else None 101 ) 102 if external_location: 103 extras["external"] = external_location.replace("{name}", self.table_name) 104 105 if DBT_VERSION >= (1, 9, 0) and self.event_time: 106 extras["event_time_filter"] = { 107 "field_name": self.event_time, 108 } 109 110 return AttributeDict( 111 { 112 "database": self.database, 113 "schema": self.schema_, 114 "identifier": self.table_name, 115 "type": RelationType.External.value, 116 "quote_policy": AttributeDict(self.quoting), 117 **extras, 118 } 119 )
model_config =
{'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
def
model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs