Edit on GitHub

sqlmesh.dbt.seed

  1from __future__ import annotations
  2
  3import typing as t
  4
  5import agate
  6
  7from sqlmesh.dbt.util import DBT_VERSION
  8
  9if DBT_VERSION >= (1, 8, 0):
 10    from dbt_common.clients import agate_helper  # type: ignore
 11
 12    SUPPORTS_DELIMITER = True
 13else:
 14    from dbt.clients import agate_helper  # type: ignore
 15
 16    SUPPORTS_DELIMITER = False
 17from sqlglot import exp
 18
 19from sqlmesh.core.config.common import VirtualEnvironmentMode
 20from sqlmesh.core.model import Model, SeedKind, create_seed_model
 21from sqlmesh.core.model.seed import CsvSettings
 22from sqlmesh.dbt.basemodel import BaseModelConfig
 23from sqlmesh.dbt.column import ColumnConfig
 24
 25if t.TYPE_CHECKING:
 26    from sqlmesh.core.audit.definition import ModelAudit
 27    from sqlmesh.dbt.context import DbtContext
 28
 29
 30class SeedConfig(BaseModelConfig):
 31    """
 32    seedConfig contains all config parameters available to DBT seeds
 33
 34    See https://docs.getdbt.com/reference/configs-and-properties for
 35    a more detailed description of each config parameter under the
 36    General propreties, General configs, and For seeds sections.
 37    """
 38
 39    delimiter: str = ","
 40    column_types: t.Optional[t.Dict[str, str]] = None
 41    quote_columns: t.Optional[bool] = False
 42
 43    def to_sqlmesh(
 44        self,
 45        context: DbtContext,
 46        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
 47        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
 48    ) -> Model:
 49        """Converts the dbt seed into a SQLMesh model."""
 50        seed_path = self.path.absolute().as_posix()
 51
 52        column_types_override = {
 53            name: ColumnConfig(name=name, data_type=data_type, quote=self.quote_columns)
 54            for name, data_type in (self.column_types or {}).items()
 55        }
 56        kwargs = self.sqlmesh_model_kwargs(context, column_types_override)
 57
 58        columns = kwargs.get("columns") or {}
 59
 60        agate_table = (
 61            agate_helper.from_csv(seed_path, [], delimiter=self.delimiter)
 62            if SUPPORTS_DELIMITER
 63            else agate_helper.from_csv(seed_path, [])
 64        )
 65        inferred_types = {
 66            name: AGATE_TYPE_MAPPING[tpe.__class__]
 67            for name, tpe in zip(agate_table.column_names, agate_table.column_types)
 68        }
 69
 70        # The columns list built from the mixture of supplied and inferred types needs to
 71        # be in the same order as the data for assumptions elsewhere in the codebase to hold true
 72        new_columns = {}
 73        for column_name in agate_table.column_names:
 74            if column_name not in columns:
 75                new_columns[column_name] = inferred_types[column_name]
 76            else:
 77                new_columns[column_name] = columns[column_name]
 78
 79        kwargs["columns"] = new_columns
 80
 81        # dbt treats single whitespace as a null value
 82        csv_settings = CsvSettings(
 83            delimiter=self.delimiter,
 84            na_values=[" "],
 85            keep_default_na=True,
 86        )
 87
 88        return create_seed_model(
 89            self.canonical_name(context),
 90            SeedKind(path=seed_path, csv_settings=csv_settings),
 91            dialect=self.dialect(context),
 92            audit_definitions=audit_definitions,
 93            virtual_environment_mode=virtual_environment_mode,
 94            start=self.start or context.sqlmesh_config.model_defaults.start,
 95            dbt_node_info=self.node_info,
 96            **kwargs,
 97        )
 98
 99
100AGATE_TYPE_MAPPING = {
101    agate_helper.Number: exp.DataType.build("double"),
102    agate_helper.ISODateTime: exp.DataType.build("datetime"),
103    agate.Date: exp.DataType.build("date"),
104    agate.DateTime: exp.DataType.build("datetime"),
105    agate.Boolean: exp.DataType.build("boolean"),
106    agate.Text: exp.DataType.build("text"),
107}
108
109
110if DBT_VERSION >= (1, 7, 0):
111
112    class Integer(agate_helper.Integer):
113        def cast(self, d: t.Any) -> t.Optional[int]:
114            if isinstance(d, str):
115                # The dbt's implementation doesn't support coercion of strings to integers.
116                if d.strip().lower() in self.null_values:
117                    return None
118                try:
119                    return int(d)
120                except ValueError:
121                    raise agate.exceptions.CastError('Can not parse value "%s" as Integer.' % d)
122            return super().cast(d)
123
124        def jsonify(self, d: t.Any) -> str:
125            return d
126
127    agate_helper.Integer = Integer  # type: ignore
128
129    AGATE_TYPE_MAPPING[agate_helper.Integer] = exp.DataType.build("int")
class SeedConfig(sqlmesh.dbt.basemodel.BaseModelConfig):
31class SeedConfig(BaseModelConfig):
32    """
33    seedConfig contains all config parameters available to DBT seeds
34
35    See https://docs.getdbt.com/reference/configs-and-properties for
36    a more detailed description of each config parameter under the
37    General propreties, General configs, and For seeds sections.
38    """
39
40    delimiter: str = ","
41    column_types: t.Optional[t.Dict[str, str]] = None
42    quote_columns: t.Optional[bool] = False
43
44    def to_sqlmesh(
45        self,
46        context: DbtContext,
47        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
48        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
49    ) -> Model:
50        """Converts the dbt seed into a SQLMesh model."""
51        seed_path = self.path.absolute().as_posix()
52
53        column_types_override = {
54            name: ColumnConfig(name=name, data_type=data_type, quote=self.quote_columns)
55            for name, data_type in (self.column_types or {}).items()
56        }
57        kwargs = self.sqlmesh_model_kwargs(context, column_types_override)
58
59        columns = kwargs.get("columns") or {}
60
61        agate_table = (
62            agate_helper.from_csv(seed_path, [], delimiter=self.delimiter)
63            if SUPPORTS_DELIMITER
64            else agate_helper.from_csv(seed_path, [])
65        )
66        inferred_types = {
67            name: AGATE_TYPE_MAPPING[tpe.__class__]
68            for name, tpe in zip(agate_table.column_names, agate_table.column_types)
69        }
70
71        # The columns list built from the mixture of supplied and inferred types needs to
72        # be in the same order as the data for assumptions elsewhere in the codebase to hold true
73        new_columns = {}
74        for column_name in agate_table.column_names:
75            if column_name not in columns:
76                new_columns[column_name] = inferred_types[column_name]
77            else:
78                new_columns[column_name] = columns[column_name]
79
80        kwargs["columns"] = new_columns
81
82        # dbt treats single whitespace as a null value
83        csv_settings = CsvSettings(
84            delimiter=self.delimiter,
85            na_values=[" "],
86            keep_default_na=True,
87        )
88
89        return create_seed_model(
90            self.canonical_name(context),
91            SeedKind(path=seed_path, csv_settings=csv_settings),
92            dialect=self.dialect(context),
93            audit_definitions=audit_definitions,
94            virtual_environment_mode=virtual_environment_mode,
95            start=self.start or context.sqlmesh_config.model_defaults.start,
96            dbt_node_info=self.node_info,
97            **kwargs,
98        )

seedConfig contains all config parameters available to DBT seeds

See https://docs.getdbt.com/reference/configs-and-properties for a more detailed description of each config parameter under the General propreties, General configs, and For seeds sections.

delimiter: str
column_types: Optional[Dict[str, str]]
quote_columns: Optional[bool]
44    def to_sqlmesh(
45        self,
46        context: DbtContext,
47        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
48        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
49    ) -> Model:
50        """Converts the dbt seed into a SQLMesh model."""
51        seed_path = self.path.absolute().as_posix()
52
53        column_types_override = {
54            name: ColumnConfig(name=name, data_type=data_type, quote=self.quote_columns)
55            for name, data_type in (self.column_types or {}).items()
56        }
57        kwargs = self.sqlmesh_model_kwargs(context, column_types_override)
58
59        columns = kwargs.get("columns") or {}
60
61        agate_table = (
62            agate_helper.from_csv(seed_path, [], delimiter=self.delimiter)
63            if SUPPORTS_DELIMITER
64            else agate_helper.from_csv(seed_path, [])
65        )
66        inferred_types = {
67            name: AGATE_TYPE_MAPPING[tpe.__class__]
68            for name, tpe in zip(agate_table.column_names, agate_table.column_types)
69        }
70
71        # The columns list built from the mixture of supplied and inferred types needs to
72        # be in the same order as the data for assumptions elsewhere in the codebase to hold true
73        new_columns = {}
74        for column_name in agate_table.column_names:
75            if column_name not in columns:
76                new_columns[column_name] = inferred_types[column_name]
77            else:
78                new_columns[column_name] = columns[column_name]
79
80        kwargs["columns"] = new_columns
81
82        # dbt treats single whitespace as a null value
83        csv_settings = CsvSettings(
84            delimiter=self.delimiter,
85            na_values=[" "],
86            keep_default_na=True,
87        )
88
89        return create_seed_model(
90            self.canonical_name(context),
91            SeedKind(path=seed_path, csv_settings=csv_settings),
92            dialect=self.dialect(context),
93            audit_definitions=audit_definitions,
94            virtual_environment_mode=virtual_environment_mode,
95            start=self.start or context.sqlmesh_config.model_defaults.start,
96            dbt_node_info=self.node_info,
97            **kwargs,
98        )

Converts the dbt seed into a SQLMesh model.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
AGATE_TYPE_MAPPING = {<class 'dbt_common.clients.agate_helper.Number'>: DataType(this=DType.DOUBLE, nested=False), <class 'dbt_common.clients.agate_helper.ISODateTime'>: DataType(this=DType.DATETIME, nested=False), <class 'agate.data_types.date.Date'>: DataType(this=DType.DATE, nested=False), <class 'agate.data_types.date_time.DateTime'>: DataType(this=DType.DATETIME, nested=False), <class 'agate.data_types.boolean.Boolean'>: DataType(this=DType.BOOLEAN, nested=False), <class 'agate.data_types.text.Text'>: DataType(this=DType.TEXT, nested=False), <class 'Integer'>: DataType(this=DType.INT, nested=False)}
class Integer(dbt_common.clients.agate_helper.Integer):
113    class Integer(agate_helper.Integer):
114        def cast(self, d: t.Any) -> t.Optional[int]:
115            if isinstance(d, str):
116                # The dbt's implementation doesn't support coercion of strings to integers.
117                if d.strip().lower() in self.null_values:
118                    return None
119                try:
120                    return int(d)
121                except ValueError:
122                    raise agate.exceptions.CastError('Can not parse value "%s" as Integer.' % d)
123            return super().cast(d)
124
125        def jsonify(self, d: t.Any) -> str:
126            return d

Specifies how values should be parsed when creating a .Table.

Parameters
  • null_values: A sequence of values which should be cast to :code:None when encountered by this data type.
def cast(self, d: Any) -> Optional[int]:
114        def cast(self, d: t.Any) -> t.Optional[int]:
115            if isinstance(d, str):
116                # The dbt's implementation doesn't support coercion of strings to integers.
117                if d.strip().lower() in self.null_values:
118                    return None
119                try:
120                    return int(d)
121                except ValueError:
122                    raise agate.exceptions.CastError('Can not parse value "%s" as Integer.' % d)
123            return super().cast(d)

Coerce a given string value into this column's data type.

def jsonify(self, d: Any) -> str:
125        def jsonify(self, d: t.Any) -> str:
126            return d

Format a given native value for JSON serialization.

Inherited Members
agate.data_types.base.DataType
DataType
null_values
test
csvify