sqlmesh.dbt.seed
1from __future__ import annotations 2 3import typing as t 4 5import agate 6 7from sqlmesh.dbt.util import DBT_VERSION 8 9if DBT_VERSION >= (1, 8, 0): 10 from dbt_common.clients import agate_helper # type: ignore 11 12 SUPPORTS_DELIMITER = True 13else: 14 from dbt.clients import agate_helper # type: ignore 15 16 SUPPORTS_DELIMITER = False 17from sqlglot import exp 18 19from sqlmesh.core.config.common import VirtualEnvironmentMode 20from sqlmesh.core.model import Model, SeedKind, create_seed_model 21from sqlmesh.core.model.seed import CsvSettings 22from sqlmesh.dbt.basemodel import BaseModelConfig 23from sqlmesh.dbt.column import ColumnConfig 24 25if t.TYPE_CHECKING: 26 from sqlmesh.core.audit.definition import ModelAudit 27 from sqlmesh.dbt.context import DbtContext 28 29 30class SeedConfig(BaseModelConfig): 31 """ 32 seedConfig contains all config parameters available to DBT seeds 33 34 See https://docs.getdbt.com/reference/configs-and-properties for 35 a more detailed description of each config parameter under the 36 General propreties, General configs, and For seeds sections. 37 """ 38 39 delimiter: str = "," 40 column_types: t.Optional[t.Dict[str, str]] = None 41 quote_columns: t.Optional[bool] = False 42 43 def to_sqlmesh( 44 self, 45 context: DbtContext, 46 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 47 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 48 ) -> Model: 49 """Converts the dbt seed into a SQLMesh model.""" 50 seed_path = self.path.absolute().as_posix() 51 52 column_types_override = { 53 name: ColumnConfig(name=name, data_type=data_type, quote=self.quote_columns) 54 for name, data_type in (self.column_types or {}).items() 55 } 56 kwargs = self.sqlmesh_model_kwargs(context, column_types_override) 57 58 columns = kwargs.get("columns") or {} 59 60 agate_table = ( 61 agate_helper.from_csv(seed_path, [], delimiter=self.delimiter) 62 if SUPPORTS_DELIMITER 63 else agate_helper.from_csv(seed_path, []) 64 ) 65 inferred_types = { 66 name: AGATE_TYPE_MAPPING[tpe.__class__] 67 for name, tpe in zip(agate_table.column_names, agate_table.column_types) 68 } 69 70 # The columns list built from the mixture of supplied and inferred types needs to 71 # be in the same order as the data for assumptions elsewhere in the codebase to hold true 72 new_columns = {} 73 for column_name in agate_table.column_names: 74 if column_name not in columns: 75 new_columns[column_name] = inferred_types[column_name] 76 else: 77 new_columns[column_name] = columns[column_name] 78 79 kwargs["columns"] = new_columns 80 81 # dbt treats single whitespace as a null value 82 csv_settings = CsvSettings( 83 delimiter=self.delimiter, 84 na_values=[" "], 85 keep_default_na=True, 86 ) 87 88 return create_seed_model( 89 self.canonical_name(context), 90 SeedKind(path=seed_path, csv_settings=csv_settings), 91 dialect=self.dialect(context), 92 audit_definitions=audit_definitions, 93 virtual_environment_mode=virtual_environment_mode, 94 start=self.start or context.sqlmesh_config.model_defaults.start, 95 dbt_node_info=self.node_info, 96 **kwargs, 97 ) 98 99 100AGATE_TYPE_MAPPING = { 101 agate_helper.Number: exp.DataType.build("double"), 102 agate_helper.ISODateTime: exp.DataType.build("datetime"), 103 agate.Date: exp.DataType.build("date"), 104 agate.DateTime: exp.DataType.build("datetime"), 105 agate.Boolean: exp.DataType.build("boolean"), 106 agate.Text: exp.DataType.build("text"), 107} 108 109 110if DBT_VERSION >= (1, 7, 0): 111 112 class Integer(agate_helper.Integer): 113 def cast(self, d: t.Any) -> t.Optional[int]: 114 if isinstance(d, str): 115 # The dbt's implementation doesn't support coercion of strings to integers. 116 if d.strip().lower() in self.null_values: 117 return None 118 try: 119 return int(d) 120 except ValueError: 121 raise agate.exceptions.CastError('Can not parse value "%s" as Integer.' % d) 122 return super().cast(d) 123 124 def jsonify(self, d: t.Any) -> str: 125 return d 126 127 agate_helper.Integer = Integer # type: ignore 128 129 AGATE_TYPE_MAPPING[agate_helper.Integer] = exp.DataType.build("int")
31class SeedConfig(BaseModelConfig): 32 """ 33 seedConfig contains all config parameters available to DBT seeds 34 35 See https://docs.getdbt.com/reference/configs-and-properties for 36 a more detailed description of each config parameter under the 37 General propreties, General configs, and For seeds sections. 38 """ 39 40 delimiter: str = "," 41 column_types: t.Optional[t.Dict[str, str]] = None 42 quote_columns: t.Optional[bool] = False 43 44 def to_sqlmesh( 45 self, 46 context: DbtContext, 47 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 48 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 49 ) -> Model: 50 """Converts the dbt seed into a SQLMesh model.""" 51 seed_path = self.path.absolute().as_posix() 52 53 column_types_override = { 54 name: ColumnConfig(name=name, data_type=data_type, quote=self.quote_columns) 55 for name, data_type in (self.column_types or {}).items() 56 } 57 kwargs = self.sqlmesh_model_kwargs(context, column_types_override) 58 59 columns = kwargs.get("columns") or {} 60 61 agate_table = ( 62 agate_helper.from_csv(seed_path, [], delimiter=self.delimiter) 63 if SUPPORTS_DELIMITER 64 else agate_helper.from_csv(seed_path, []) 65 ) 66 inferred_types = { 67 name: AGATE_TYPE_MAPPING[tpe.__class__] 68 for name, tpe in zip(agate_table.column_names, agate_table.column_types) 69 } 70 71 # The columns list built from the mixture of supplied and inferred types needs to 72 # be in the same order as the data for assumptions elsewhere in the codebase to hold true 73 new_columns = {} 74 for column_name in agate_table.column_names: 75 if column_name not in columns: 76 new_columns[column_name] = inferred_types[column_name] 77 else: 78 new_columns[column_name] = columns[column_name] 79 80 kwargs["columns"] = new_columns 81 82 # dbt treats single whitespace as a null value 83 csv_settings = CsvSettings( 84 delimiter=self.delimiter, 85 na_values=[" "], 86 keep_default_na=True, 87 ) 88 89 return create_seed_model( 90 self.canonical_name(context), 91 SeedKind(path=seed_path, csv_settings=csv_settings), 92 dialect=self.dialect(context), 93 audit_definitions=audit_definitions, 94 virtual_environment_mode=virtual_environment_mode, 95 start=self.start or context.sqlmesh_config.model_defaults.start, 96 dbt_node_info=self.node_info, 97 **kwargs, 98 )
seedConfig contains all config parameters available to DBT seeds
See https://docs.getdbt.com/reference/configs-and-properties for a more detailed description of each config parameter under the General propreties, General configs, and For seeds sections.
def
to_sqlmesh( self, context: sqlmesh.dbt.context.DbtContext, audit_definitions: Optional[Dict[str, sqlmesh.core.audit.definition.ModelAudit]] = None, virtual_environment_mode: sqlmesh.core.config.common.VirtualEnvironmentMode = FULL) -> Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]:
44 def to_sqlmesh( 45 self, 46 context: DbtContext, 47 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 48 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default, 49 ) -> Model: 50 """Converts the dbt seed into a SQLMesh model.""" 51 seed_path = self.path.absolute().as_posix() 52 53 column_types_override = { 54 name: ColumnConfig(name=name, data_type=data_type, quote=self.quote_columns) 55 for name, data_type in (self.column_types or {}).items() 56 } 57 kwargs = self.sqlmesh_model_kwargs(context, column_types_override) 58 59 columns = kwargs.get("columns") or {} 60 61 agate_table = ( 62 agate_helper.from_csv(seed_path, [], delimiter=self.delimiter) 63 if SUPPORTS_DELIMITER 64 else agate_helper.from_csv(seed_path, []) 65 ) 66 inferred_types = { 67 name: AGATE_TYPE_MAPPING[tpe.__class__] 68 for name, tpe in zip(agate_table.column_names, agate_table.column_types) 69 } 70 71 # The columns list built from the mixture of supplied and inferred types needs to 72 # be in the same order as the data for assumptions elsewhere in the codebase to hold true 73 new_columns = {} 74 for column_name in agate_table.column_names: 75 if column_name not in columns: 76 new_columns[column_name] = inferred_types[column_name] 77 else: 78 new_columns[column_name] = columns[column_name] 79 80 kwargs["columns"] = new_columns 81 82 # dbt treats single whitespace as a null value 83 csv_settings = CsvSettings( 84 delimiter=self.delimiter, 85 na_values=[" "], 86 keep_default_na=True, 87 ) 88 89 return create_seed_model( 90 self.canonical_name(context), 91 SeedKind(path=seed_path, csv_settings=csv_settings), 92 dialect=self.dialect(context), 93 audit_definitions=audit_definitions, 94 virtual_environment_mode=virtual_environment_mode, 95 start=self.start or context.sqlmesh_config.model_defaults.start, 96 dbt_node_info=self.node_info, 97 **kwargs, 98 )
Converts the dbt seed into a SQLMesh model.
model_config =
{'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
def
model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.dbt.basemodel.BaseModelConfig
- owner
- stamp
- table_format
- storage_format
- path
- dependencies
- tests
- dialect_
- grain
- unique_id
- name
- package_name
- fqn_
- schema_
- database
- alias
- pre_hook
- post_hook
- full_refresh
- grants
- columns
- quoting
- event_time
- version
- latest_version
- table_schema
- table_name
- config_name
- dialect
- canonical_name
- model_materialization
- relation_info
- tests_ref_source_dependencies
- remove_tests_with_invalid_refs
- fqn
- sqlmesh_config_fields
- node_info
- sqlmesh_model_kwargs
AGATE_TYPE_MAPPING =
{<class 'dbt_common.clients.agate_helper.Number'>: DataType(this=DType.DOUBLE, nested=False), <class 'dbt_common.clients.agate_helper.ISODateTime'>: DataType(this=DType.DATETIME, nested=False), <class 'agate.data_types.date.Date'>: DataType(this=DType.DATE, nested=False), <class 'agate.data_types.date_time.DateTime'>: DataType(this=DType.DATETIME, nested=False), <class 'agate.data_types.boolean.Boolean'>: DataType(this=DType.BOOLEAN, nested=False), <class 'agate.data_types.text.Text'>: DataType(this=DType.TEXT, nested=False), <class 'Integer'>: DataType(this=DType.INT, nested=False)}
class
Integer(dbt_common.clients.agate_helper.Integer):
113 class Integer(agate_helper.Integer): 114 def cast(self, d: t.Any) -> t.Optional[int]: 115 if isinstance(d, str): 116 # The dbt's implementation doesn't support coercion of strings to integers. 117 if d.strip().lower() in self.null_values: 118 return None 119 try: 120 return int(d) 121 except ValueError: 122 raise agate.exceptions.CastError('Can not parse value "%s" as Integer.' % d) 123 return super().cast(d) 124 125 def jsonify(self, d: t.Any) -> str: 126 return d
Specifies how values should be parsed when creating a .Table.
Parameters
- null_values: A sequence of values which should be cast to
:code:
Nonewhen encountered by this data type.
def
cast(self, d: Any) -> Optional[int]:
114 def cast(self, d: t.Any) -> t.Optional[int]: 115 if isinstance(d, str): 116 # The dbt's implementation doesn't support coercion of strings to integers. 117 if d.strip().lower() in self.null_values: 118 return None 119 try: 120 return int(d) 121 except ValueError: 122 raise agate.exceptions.CastError('Can not parse value "%s" as Integer.' % d) 123 return super().cast(d)
Coerce a given string value into this column's data type.
Inherited Members
- agate.data_types.base.DataType
- DataType
- null_values
- test
- csvify