Edit on GitHub

sqlmesh.core.engine_adapter.mixins

  1from __future__ import annotations
  2
  3import abc
  4import logging
  5import typing as t
  6from dataclasses import dataclass
  7
  8from sqlglot import exp, parse_one
  9from sqlglot.helper import seq_get
 10from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 11
 12from sqlmesh.core.engine_adapter.base import EngineAdapter
 13from sqlmesh.core.engine_adapter.shared import DataObjectType
 14from sqlmesh.core.node import IntervalUnit
 15from sqlmesh.core.dialect import schema_
 16from sqlmesh.core.schema_diff import TableAlterOperation
 17from sqlmesh.utils.errors import SQLMeshError
 18
 19if t.TYPE_CHECKING:
 20    from sqlmesh.core._typing import TableName
 21    from sqlmesh.core.engine_adapter._typing import (
 22        DCL,
 23        DF,
 24        GrantsConfig,
 25        QueryOrDF,
 26    )
 27    from sqlmesh.core.engine_adapter.base import QueryOrDF
 28
 29logger = logging.getLogger(__name__)
 30
 31NORMALIZED_DATE_FORMAT = "%Y-%m-%d"
 32NORMALIZED_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
 33
 34
 35class LogicalMergeMixin(EngineAdapter):
 36    def merge(
 37        self,
 38        target_table: TableName,
 39        source_table: QueryOrDF,
 40        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
 41        unique_key: t.Sequence[exp.Expression],
 42        when_matched: t.Optional[exp.Whens] = None,
 43        merge_filter: t.Optional[exp.Expression] = None,
 44        source_columns: t.Optional[t.List[str]] = None,
 45        **kwargs: t.Any,
 46    ) -> None:
 47        logical_merge(
 48            self,
 49            target_table,
 50            source_table,
 51            target_columns_to_types,
 52            unique_key,
 53            when_matched=when_matched,
 54            merge_filter=merge_filter,
 55            source_columns=source_columns,
 56        )
 57
 58
 59class PandasNativeFetchDFSupportMixin(EngineAdapter):
 60    def _fetch_native_df(
 61        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 62    ) -> DF:
 63        """Fetches a Pandas DataFrame from a SQL query."""
 64        from warnings import catch_warnings, filterwarnings
 65
 66        from pandas.io.sql import read_sql_query
 67
 68        sql = (
 69            self._to_sql(query, quote=quote_identifiers)
 70            if isinstance(query, exp.Expression)
 71            else query
 72        )
 73        logger.debug(f"Executing SQL:\n{sql}")
 74        with catch_warnings(), self.transaction():
 75            filterwarnings(
 76                "ignore",
 77                category=UserWarning,
 78                message=".*pandas only supports SQLAlchemy connectable.*",
 79            )
 80            df = read_sql_query(sql, self._connection_pool.get())
 81        return df
 82
 83
 84class HiveMetastoreTablePropertiesMixin(EngineAdapter):
 85    MAX_TABLE_COMMENT_LENGTH = 4000
 86    MAX_COLUMN_COMMENT_LENGTH = 4000
 87
 88    def _build_partitioned_by_exp(
 89        self,
 90        partitioned_by: t.List[exp.Expression],
 91        *,
 92        catalog_name: t.Optional[str] = None,
 93        **kwargs: t.Any,
 94    ) -> t.Union[exp.PartitionedByProperty, exp.Property]:
 95        if (
 96            self.dialect == "trino"
 97            and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg"
 98        ):
 99            # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by"
100            # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed
101            # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar]
102            # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties
103            return exp.Property(
104                this=exp.var("PARTITIONING"),
105                value=exp.array(
106                    *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by)
107                ),
108            )
109        for expr in partitioned_by:
110            if not isinstance(expr, exp.Column):
111                raise SQLMeshError(
112                    f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'."
113                )
114        return exp.PartitionedByProperty(
115            this=exp.Schema(expressions=partitioned_by),
116        )
117
118    def _build_table_properties_exp(
119        self,
120        catalog_name: t.Optional[str] = None,
121        table_format: t.Optional[str] = None,
122        storage_format: t.Optional[str] = None,
123        partitioned_by: t.Optional[t.List[exp.Expression]] = None,
124        partition_interval_unit: t.Optional[IntervalUnit] = None,
125        clustered_by: t.Optional[t.List[exp.Expression]] = None,
126        table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
127        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
128        table_description: t.Optional[str] = None,
129        table_kind: t.Optional[str] = None,
130        **kwargs: t.Any,
131    ) -> t.Optional[exp.Properties]:
132        properties: t.List[exp.Expression] = []
133
134        if table_format and self.dialect == "spark":
135            properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format)))
136            if storage_format:
137                properties.append(
138                    exp.Property(
139                        this="write.format.default", value=exp.Literal.string(storage_format)
140                    )
141                )
142        elif storage_format:
143            properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format)))
144
145        if partitioned_by:
146            properties.append(
147                self._build_partitioned_by_exp(
148                    partitioned_by,
149                    partition_interval_unit=partition_interval_unit,
150                    catalog_name=catalog_name,
151                )
152            )
153
154        if table_description:
155            properties.append(
156                exp.SchemaCommentProperty(
157                    this=exp.Literal.string(self._truncate_table_comment(table_description))
158                )
159            )
160
161        properties.extend(self._table_or_view_properties_to_expressions(table_properties))
162
163        if properties:
164            return exp.Properties(expressions=properties)
165        return None
166
167    def _build_view_properties_exp(
168        self,
169        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
170        table_description: t.Optional[str] = None,
171        **kwargs: t.Any,
172    ) -> t.Optional[exp.Properties]:
173        """Creates a SQLGlot table properties expression for view"""
174        properties: t.List[exp.Expression] = []
175
176        if table_description:
177            properties.append(
178                exp.SchemaCommentProperty(
179                    this=exp.Literal.string(self._truncate_table_comment(table_description))
180                )
181            )
182
183        properties.extend(self._table_or_view_properties_to_expressions(view_properties))
184
185        if properties:
186            return exp.Properties(expressions=properties)
187        return None
188
189    def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str:
190        # iceberg and delta do not have a comment length limit
191        if self.current_catalog_type in ("iceberg", "delta_lake"):
192            return comment
193        return super()._truncate_comment(comment, length)
194
195
196class GetCurrentCatalogFromFunctionMixin(EngineAdapter):
197    CURRENT_CATALOG_EXPRESSION: exp.Expression = exp.func("current_catalog")
198
199    def get_current_catalog(self) -> t.Optional[str]:
200        """Returns the catalog name of the current connection."""
201        result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
202        if result:
203            return result[0]
204        return None
205
206
207class NonTransactionalTruncateMixin(EngineAdapter):
208    def _truncate_table(self, table_name: TableName) -> None:
209        # Truncate forces a commit of the current transaction so we want to do an unconditional delete to
210        # preserve the transaction if one exists otherwise we can truncate
211        if self._connection_pool.is_transaction_active:
212            return self.execute(exp.Delete(this=exp.to_table(table_name)))
213        super()._truncate_table(table_name)
214
215
216class VarcharSizeWorkaroundMixin(EngineAdapter):
217    def _default_precision_to_max(
218        self, columns_to_types: t.Dict[str, exp.DataType]
219    ) -> t.Dict[str, exp.DataType]:
220        # get default lengths for types that support "max" length
221        types_with_max_default_param = {
222            k: [self.schema_differ.parameterized_type_defaults[k][0][0]]
223            for k in self.schema_differ.max_parameter_length
224            if k in self.schema_differ.parameterized_type_defaults
225        }
226
227        # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT
228        # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their
229        # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length
230        # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation.
231        for col_name, col_type in columns_to_types.items():
232            if col_type.this in types_with_max_default_param and col_type.expressions:
233                parameter = self.schema_differ.get_type_parameters(col_type)
234                type_default = types_with_max_default_param[col_type.this]
235                if parameter == type_default:
236                    col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))])
237
238        return columns_to_types
239
240    def _build_create_table_exp(
241        self,
242        table_name_or_schema: t.Union[exp.Schema, TableName],
243        expression: t.Optional[exp.Expression],
244        exists: bool = True,
245        replace: bool = False,
246        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
247        table_description: t.Optional[str] = None,
248        table_kind: t.Optional[str] = None,
249        **kwargs: t.Any,
250    ) -> exp.Create:
251        statement = super()._build_create_table_exp(
252            table_name_or_schema,
253            expression=expression,
254            exists=exists,
255            replace=replace,
256            target_columns_to_types=target_columns_to_types,
257            table_description=table_description,
258            table_kind=table_kind,
259            **kwargs,
260        )
261
262        if (
263            statement.expression
264            and statement.expression.args.get("limit") is not None
265            and statement.expression.args["limit"].expression.this == "0"
266        ):
267            assert not isinstance(table_name_or_schema, exp.Schema)
268
269            # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit
270            # is applied to a ctas statement, VARCHAR types default to 1 in some instances.
271            select_statement = statement.expression.copy()
272            for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation):
273                limit = select_or_union.args.get("limit")
274                if limit is not None and limit.expression.this == "0":
275                    limit.pop()
276
277                select_or_union.set("where", None)
278
279            temp_view_name = self._get_temp_table("ctas")
280
281            self.create_view(temp_view_name, select_statement, replace=False)
282            try:
283                columns_to_types_from_view = self._default_precision_to_max(
284                    self.columns(temp_view_name)
285                )
286
287                schema = self._build_schema_exp(
288                    exp.to_table(table_name_or_schema),
289                    columns_to_types_from_view,
290                )
291                statement = super()._build_create_table_exp(
292                    schema,
293                    None,
294                    exists=exists,
295                    replace=replace,
296                    target_columns_to_types=columns_to_types_from_view,
297                    table_description=table_description,
298                    **kwargs,
299                )
300            finally:
301                self.drop_view(temp_view_name)
302
303        return statement
304
305
306@dataclass(frozen=True)
307class TableAlterClusterByOperation(TableAlterOperation, abc.ABC):
308    pass
309
310
311@dataclass(frozen=True)
312class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation):
313    clustering_key: str
314    dialect: str
315
316    @property
317    def is_additive(self) -> bool:
318        return False
319
320    @property
321    def is_destructive(self) -> bool:
322        return False
323
324    @property
325    def _alter_actions(self) -> t.List[exp.Expression]:
326        return [exp.Cluster(expressions=self.cluster_key_expressions)]
327
328    @property
329    def cluster_key_expressions(self) -> t.List[exp.Expression]:
330        # Note: Assumes `clustering_key` as a string like:
331        # - "(col_a)"
332        # - "(col_a, col_b)"
333        # - "func(col_a, transform(col_b))"
334        parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect)
335        return parsed_cluster_key.expressions or [parsed_cluster_key.this]
336
337
338@dataclass(frozen=True)
339class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation):
340    @property
341    def is_additive(self) -> bool:
342        return False
343
344    @property
345    def is_destructive(self) -> bool:
346        return False
347
348    @property
349    def _alter_actions(self) -> t.List[exp.Expression]:
350        return [exp.Command(this="DROP", expression="CLUSTERING KEY")]
351
352
353class ClusteredByMixin(EngineAdapter):
354    def _build_clustered_by_exp(
355        self,
356        clustered_by: t.List[exp.Expression],
357        **kwargs: t.Any,
358    ) -> t.Optional[exp.Cluster]:
359        return exp.Cluster(expressions=[c.copy() for c in clustered_by])
360
361    def get_alter_operations(
362        self,
363        current_table_name: TableName,
364        target_table_name: TableName,
365        *,
366        ignore_destructive: bool = False,
367        ignore_additive: bool = False,
368    ) -> t.List[TableAlterOperation]:
369        operations = super().get_alter_operations(
370            current_table_name,
371            target_table_name,
372            ignore_destructive=ignore_destructive,
373            ignore_additive=ignore_additive,
374        )
375
376        # check for a change in clustering
377        current_table = exp.to_table(current_table_name)
378        target_table = exp.to_table(target_table_name)
379
380        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
381        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
382
383        current_table_info = seq_get(
384            self.get_data_objects(current_table_schema, {current_table.name}), 0
385        )
386        target_table_info = seq_get(
387            self.get_data_objects(target_table_schema, {target_table.name}), 0
388        )
389
390        if current_table_info and target_table_info:
391            if target_table_info.is_clustered:
392                if target_table_info.clustering_key and (
393                    current_table_info.clustering_key != target_table_info.clustering_key
394                ):
395                    operations.append(
396                        TableAlterChangeClusterKeyOperation(
397                            target_table=current_table,
398                            clustering_key=target_table_info.clustering_key,
399                            dialect=self.dialect,
400                        )
401                    )
402            elif current_table_info.is_clustered:
403                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
404
405        return operations
406
407
408def logical_merge(
409    engine_adapter: EngineAdapter,
410    target_table: TableName,
411    source_table: QueryOrDF,
412    target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
413    unique_key: t.Sequence[exp.Expression],
414    when_matched: t.Optional[exp.Whens] = None,
415    merge_filter: t.Optional[exp.Expression] = None,
416    source_columns: t.Optional[t.List[str]] = None,
417) -> None:
418    """
419    Merge implementation for engine adapters that do not support merge natively.
420
421    The merge is executed as follows:
422    1. Create a temporary table containing the new data to merge.
423    2. Delete rows from target table where unique_key cols match a row in the temporary table.
424    3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows
425       within the temporary table are ommitted.
426    4. Drop the temporary table.
427    """
428    if when_matched or merge_filter:
429        prop = "when_matched" if when_matched else "merge_filter"
430        raise SQLMeshError(
431            f"This engine does not support MERGE expressions and therefore `{prop}` is not supported."
432        )
433
434    engine_adapter._replace_by_key(
435        target_table,
436        source_table,
437        target_columns_to_types,
438        unique_key,
439        is_unique_key=True,
440        source_columns=source_columns,
441    )
442
443
444class RowDiffMixin(EngineAdapter):
445    # The maximum supported value for n in timestamp(n).
446    # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9)
447    MAX_TIMESTAMP_PRECISION = 6
448
449    def concat_columns(
450        self,
451        columns_to_types: t.Dict[str, exp.DataType],
452        decimal_precision: int = 3,
453        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
454        delimiter: str = ",",
455    ) -> exp.Expression:
456        """
457        Produce an expression that generates a string version of a record, that is:
458            - Every column converted to a string representation, joined together into a single string using the specified :delimiter
459        """
460        expressions_to_concat: t.List[exp.Expression] = []
461        for idx, (column, type) in enumerate(columns_to_types.items()):
462            expressions_to_concat.append(
463                exp.func(
464                    "COALESCE",
465                    self.normalize_value(
466                        exp.to_column(column), type, decimal_precision, timestamp_precision
467                    ),
468                    exp.Literal.string(""),
469                )
470            )
471            if idx < len(columns_to_types) - 1:
472                expressions_to_concat.append(exp.Literal.string(delimiter))
473
474        return exp.func("CONCAT", *expressions_to_concat)
475
476    def normalize_value(
477        self,
478        expr: exp.Expression,
479        type: exp.DataType,
480        decimal_precision: int = 3,
481        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
482    ) -> exp.Expression:
483        """
484        Return an expression that converts the values inside the column `col` to a normalized string
485
486        This string should be comparable across database engines, eg:
487            - `date` columns -> YYYY-MM-DD string
488            - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
489            - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places
490            - `boolean` columns -> '1' or '0'
491            - NULLS -> "" (empty string)
492        """
493        if type.is_type(exp.DataType.Type.BOOLEAN):
494            value = self._normalize_boolean_value(expr)
495        elif type.is_type(*exp.DataType.INTEGER_TYPES):
496            value = self._normalize_integer_value(expr)
497        elif type.is_type(*exp.DataType.REAL_TYPES):
498            # If there is no scale on the decimal type, treat it like an integer when comparing
499            # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0)
500            # and they should be treated as integers and not decimals
501            type_params = list(type.find_all(exp.DataTypeParam))
502            if len(type_params) == 2 and type_params[-1].this.to_py() == 0:
503                value = self._normalize_integer_value(expr)
504            else:
505                value = self._normalize_decimal_value(expr, decimal_precision)
506        elif type.is_type(*exp.DataType.TEMPORAL_TYPES):
507            value = self._normalize_timestamp_value(expr, type, timestamp_precision)
508        elif type.is_type(*exp.DataType.NESTED_TYPES):
509            value = self._normalize_nested_value(expr)
510        else:
511            value = expr
512
513        return exp.cast(value, to=exp.DataType.build("VARCHAR"))
514
515    def _normalize_nested_value(self, expr: exp.Expression) -> exp.Expression:
516        return expr
517
518    def _normalize_timestamp_value(
519        self, expr: exp.Expression, type: exp.DataType, precision: int
520    ) -> exp.Expression:
521        if precision > self.MAX_TIMESTAMP_PRECISION:
522            raise ValueError(
523                f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}"
524            )
525
526        is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32)
527
528        format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT
529
530        if type.is_type(
531            exp.DataType.Type.TIMESTAMPTZ,
532            exp.DataType.Type.TIMESTAMPLTZ,
533            exp.DataType.Type.TIMESTAMPNTZ,
534        ):
535            # Convert all timezone-aware values to UTC for comparison
536            expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC"))
537
538        digits_to_chop_off = (
539            6 - precision
540        )  # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds
541
542        expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format))
543        if digits_to_chop_off > 0:
544            expr = exp.func(
545                "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off
546            )
547
548        return expr
549
550    def _normalize_integer_value(self, expr: exp.Expression) -> exp.Expression:
551        return exp.cast(expr, "BIGINT")
552
553    def _normalize_decimal_value(self, expr: exp.Expression, precision: int) -> exp.Expression:
554        return exp.cast(expr, f"DECIMAL(38,{precision})")
555
556    def _normalize_boolean_value(self, expr: exp.Expression) -> exp.Expression:
557        return exp.cast(expr, "INT")
558
559
560class GrantsFromInfoSchemaMixin(EngineAdapter):
561    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("current_user")
562    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
563    USE_CATALOG_IN_GRANTS = False
564    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges"
565
566    @staticmethod
567    @abc.abstractmethod
568    def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]:
569        pass
570
571    @abc.abstractmethod
572    def _get_current_schema(self) -> str:
573        pass
574
575    def _dcl_grants_config_expr(
576        self,
577        dcl_cmd: t.Type[DCL],
578        table: exp.Table,
579        grants_config: GrantsConfig,
580        table_type: DataObjectType = DataObjectType.TABLE,
581    ) -> t.List[exp.Expression]:
582        expressions: t.List[exp.Expression] = []
583        if not grants_config:
584            return expressions
585
586        object_kind = self._grant_object_kind(table_type)
587        for privilege, principals in grants_config.items():
588            args: t.Dict[str, t.Any] = {
589                "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
590                "securable": table.copy(),
591            }
592            if object_kind:
593                args["kind"] = exp.Var(this=object_kind)
594            if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS:
595                args["principals"] = [
596                    normalize_identifiers(
597                        parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
598                        dialect=self.dialect,
599                    )
600                    for principal in principals
601                ]
602                expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
603            else:
604                for principal in principals:
605                    args["principals"] = [
606                        normalize_identifiers(
607                            parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
608                            dialect=self.dialect,
609                        )
610                    ]
611                    expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
612
613        return expressions
614
615    def _apply_grants_config_expr(
616        self,
617        table: exp.Table,
618        grants_config: GrantsConfig,
619        table_type: DataObjectType = DataObjectType.TABLE,
620    ) -> t.List[exp.Expression]:
621        return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type)
622
623    def _revoke_grants_config_expr(
624        self,
625        table: exp.Table,
626        grants_config: GrantsConfig,
627        table_type: DataObjectType = DataObjectType.TABLE,
628    ) -> t.List[exp.Expression]:
629        return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type)
630
631    def _get_grant_expression(self, table: exp.Table) -> exp.Expression:
632        schema_identifier = table.args.get("db") or normalize_identifiers(
633            exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect
634        )
635        schema_name = schema_identifier.this
636        table_name = table.args.get("this").this  # type: ignore
637
638        grant_conditions = [
639            exp.column("table_schema").eq(exp.Literal.string(schema_name)),
640            exp.column("table_name").eq(exp.Literal.string(table_name)),
641            exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
642            exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
643        ]
644
645        info_schema_table = normalize_identifiers(
646            exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"),
647            dialect=self.dialect,
648        )
649        if self.USE_CATALOG_IN_GRANTS:
650            catalog_identifier = table.args.get("catalog")
651            if not catalog_identifier:
652                catalog_name = self.get_current_catalog()
653                if not catalog_name:
654                    raise SQLMeshError(
655                        "Current catalog could not be determined for fetching grants. This is unexpected."
656                    )
657                catalog_identifier = normalize_identifiers(
658                    exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect
659                )
660            catalog_name = catalog_identifier.this
661            info_schema_table.set("catalog", catalog_identifier.copy())
662            grant_conditions.insert(
663                0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name))
664            )
665
666        return (
667            exp.select("privilege_type", "grantee")
668            .from_(info_schema_table)
669            .where(exp.and_(*grant_conditions))
670        )
671
672    def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
673        grant_expr = self._get_grant_expression(table)
674
675        results = self.fetchall(grant_expr)
676
677        grants_dict: GrantsConfig = {}
678        for privilege_raw, grantee_raw in results:
679            if privilege_raw is None or grantee_raw is None:
680                continue
681
682            privilege = str(privilege_raw)
683            grantee = str(grantee_raw)
684            if not privilege or not grantee:
685                continue
686
687            grantees = grants_dict.setdefault(privilege, [])
688            if grantee not in grantees:
689                grantees.append(grantee)
690
691        return grants_dict
logger = <Logger sqlmesh.core.engine_adapter.mixins (WARNING)>
NORMALIZED_DATE_FORMAT = '%Y-%m-%d'
NORMALIZED_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
class LogicalMergeMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
36class LogicalMergeMixin(EngineAdapter):
37    def merge(
38        self,
39        target_table: TableName,
40        source_table: QueryOrDF,
41        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
42        unique_key: t.Sequence[exp.Expression],
43        when_matched: t.Optional[exp.Whens] = None,
44        merge_filter: t.Optional[exp.Expression] = None,
45        source_columns: t.Optional[t.List[str]] = None,
46        **kwargs: t.Any,
47    ) -> None:
48        logical_merge(
49            self,
50            target_table,
51            source_table,
52            target_columns_to_types,
53            unique_key,
54            when_matched=when_matched,
55            merge_filter=merge_filter,
56            source_columns=source_columns,
57        )

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
def merge( self, target_table: Union[str, sqlglot.expressions.Table], source_table: <MagicMock id='126494339921328'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.DataType]], unique_key: Sequence[sqlglot.expressions.Expression], when_matched: Optional[sqlglot.expressions.Whens] = None, merge_filter: Optional[sqlglot.expressions.Expression] = None, source_columns: Optional[List[str]] = None, **kwargs: Any) -> None:
37    def merge(
38        self,
39        target_table: TableName,
40        source_table: QueryOrDF,
41        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
42        unique_key: t.Sequence[exp.Expression],
43        when_matched: t.Optional[exp.Whens] = None,
44        merge_filter: t.Optional[exp.Expression] = None,
45        source_columns: t.Optional[t.List[str]] = None,
46        **kwargs: t.Any,
47    ) -> None:
48        logical_merge(
49            self,
50            target_table,
51            source_table,
52            target_columns_to_types,
53            unique_key,
54            when_matched=when_matched,
55            merge_filter=merge_filter,
56            source_columns=source_columns,
57        )
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class PandasNativeFetchDFSupportMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
60class PandasNativeFetchDFSupportMixin(EngineAdapter):
61    def _fetch_native_df(
62        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
63    ) -> DF:
64        """Fetches a Pandas DataFrame from a SQL query."""
65        from warnings import catch_warnings, filterwarnings
66
67        from pandas.io.sql import read_sql_query
68
69        sql = (
70            self._to_sql(query, quote=quote_identifiers)
71            if isinstance(query, exp.Expression)
72            else query
73        )
74        logger.debug(f"Executing SQL:\n{sql}")
75        with catch_warnings(), self.transaction():
76            filterwarnings(
77                "ignore",
78                category=UserWarning,
79                message=".*pandas only supports SQLAlchemy connectable.*",
80            )
81            df = read_sql_query(sql, self._connection_pool.get())
82        return df

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class HiveMetastoreTablePropertiesMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
 85class HiveMetastoreTablePropertiesMixin(EngineAdapter):
 86    MAX_TABLE_COMMENT_LENGTH = 4000
 87    MAX_COLUMN_COMMENT_LENGTH = 4000
 88
 89    def _build_partitioned_by_exp(
 90        self,
 91        partitioned_by: t.List[exp.Expression],
 92        *,
 93        catalog_name: t.Optional[str] = None,
 94        **kwargs: t.Any,
 95    ) -> t.Union[exp.PartitionedByProperty, exp.Property]:
 96        if (
 97            self.dialect == "trino"
 98            and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg"
 99        ):
100            # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by"
101            # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed
102            # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar]
103            # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties
104            return exp.Property(
105                this=exp.var("PARTITIONING"),
106                value=exp.array(
107                    *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by)
108                ),
109            )
110        for expr in partitioned_by:
111            if not isinstance(expr, exp.Column):
112                raise SQLMeshError(
113                    f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'."
114                )
115        return exp.PartitionedByProperty(
116            this=exp.Schema(expressions=partitioned_by),
117        )
118
119    def _build_table_properties_exp(
120        self,
121        catalog_name: t.Optional[str] = None,
122        table_format: t.Optional[str] = None,
123        storage_format: t.Optional[str] = None,
124        partitioned_by: t.Optional[t.List[exp.Expression]] = None,
125        partition_interval_unit: t.Optional[IntervalUnit] = None,
126        clustered_by: t.Optional[t.List[exp.Expression]] = None,
127        table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
128        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
129        table_description: t.Optional[str] = None,
130        table_kind: t.Optional[str] = None,
131        **kwargs: t.Any,
132    ) -> t.Optional[exp.Properties]:
133        properties: t.List[exp.Expression] = []
134
135        if table_format and self.dialect == "spark":
136            properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format)))
137            if storage_format:
138                properties.append(
139                    exp.Property(
140                        this="write.format.default", value=exp.Literal.string(storage_format)
141                    )
142                )
143        elif storage_format:
144            properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format)))
145
146        if partitioned_by:
147            properties.append(
148                self._build_partitioned_by_exp(
149                    partitioned_by,
150                    partition_interval_unit=partition_interval_unit,
151                    catalog_name=catalog_name,
152                )
153            )
154
155        if table_description:
156            properties.append(
157                exp.SchemaCommentProperty(
158                    this=exp.Literal.string(self._truncate_table_comment(table_description))
159                )
160            )
161
162        properties.extend(self._table_or_view_properties_to_expressions(table_properties))
163
164        if properties:
165            return exp.Properties(expressions=properties)
166        return None
167
168    def _build_view_properties_exp(
169        self,
170        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
171        table_description: t.Optional[str] = None,
172        **kwargs: t.Any,
173    ) -> t.Optional[exp.Properties]:
174        """Creates a SQLGlot table properties expression for view"""
175        properties: t.List[exp.Expression] = []
176
177        if table_description:
178            properties.append(
179                exp.SchemaCommentProperty(
180                    this=exp.Literal.string(self._truncate_table_comment(table_description))
181                )
182            )
183
184        properties.extend(self._table_or_view_properties_to_expressions(view_properties))
185
186        if properties:
187            return exp.Properties(expressions=properties)
188        return None
189
190    def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str:
191        # iceberg and delta do not have a comment length limit
192        if self.current_catalog_type in ("iceberg", "delta_lake"):
193            return comment
194        return super()._truncate_comment(comment, length)

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
MAX_TABLE_COMMENT_LENGTH = 4000
MAX_COLUMN_COMMENT_LENGTH = 4000
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class GetCurrentCatalogFromFunctionMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
197class GetCurrentCatalogFromFunctionMixin(EngineAdapter):
198    CURRENT_CATALOG_EXPRESSION: exp.Expression = exp.func("current_catalog")
199
200    def get_current_catalog(self) -> t.Optional[str]:
201        """Returns the catalog name of the current connection."""
202        result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
203        if result:
204            return result[0]
205        return None

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
CURRENT_CATALOG_EXPRESSION: sqlglot.expressions.Expression = CurrentCatalog()
def get_current_catalog(self) -> Optional[str]:
200    def get_current_catalog(self) -> t.Optional[str]:
201        """Returns the catalog name of the current connection."""
202        result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
203        if result:
204            return result[0]
205        return None

Returns the catalog name of the current connection.

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class NonTransactionalTruncateMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
208class NonTransactionalTruncateMixin(EngineAdapter):
209    def _truncate_table(self, table_name: TableName) -> None:
210        # Truncate forces a commit of the current transaction so we want to do an unconditional delete to
211        # preserve the transaction if one exists otherwise we can truncate
212        if self._connection_pool.is_transaction_active:
213            return self.execute(exp.Delete(this=exp.to_table(table_name)))
214        super()._truncate_table(table_name)

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class VarcharSizeWorkaroundMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
217class VarcharSizeWorkaroundMixin(EngineAdapter):
218    def _default_precision_to_max(
219        self, columns_to_types: t.Dict[str, exp.DataType]
220    ) -> t.Dict[str, exp.DataType]:
221        # get default lengths for types that support "max" length
222        types_with_max_default_param = {
223            k: [self.schema_differ.parameterized_type_defaults[k][0][0]]
224            for k in self.schema_differ.max_parameter_length
225            if k in self.schema_differ.parameterized_type_defaults
226        }
227
228        # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT
229        # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their
230        # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length
231        # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation.
232        for col_name, col_type in columns_to_types.items():
233            if col_type.this in types_with_max_default_param and col_type.expressions:
234                parameter = self.schema_differ.get_type_parameters(col_type)
235                type_default = types_with_max_default_param[col_type.this]
236                if parameter == type_default:
237                    col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))])
238
239        return columns_to_types
240
241    def _build_create_table_exp(
242        self,
243        table_name_or_schema: t.Union[exp.Schema, TableName],
244        expression: t.Optional[exp.Expression],
245        exists: bool = True,
246        replace: bool = False,
247        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
248        table_description: t.Optional[str] = None,
249        table_kind: t.Optional[str] = None,
250        **kwargs: t.Any,
251    ) -> exp.Create:
252        statement = super()._build_create_table_exp(
253            table_name_or_schema,
254            expression=expression,
255            exists=exists,
256            replace=replace,
257            target_columns_to_types=target_columns_to_types,
258            table_description=table_description,
259            table_kind=table_kind,
260            **kwargs,
261        )
262
263        if (
264            statement.expression
265            and statement.expression.args.get("limit") is not None
266            and statement.expression.args["limit"].expression.this == "0"
267        ):
268            assert not isinstance(table_name_or_schema, exp.Schema)
269
270            # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit
271            # is applied to a ctas statement, VARCHAR types default to 1 in some instances.
272            select_statement = statement.expression.copy()
273            for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation):
274                limit = select_or_union.args.get("limit")
275                if limit is not None and limit.expression.this == "0":
276                    limit.pop()
277
278                select_or_union.set("where", None)
279
280            temp_view_name = self._get_temp_table("ctas")
281
282            self.create_view(temp_view_name, select_statement, replace=False)
283            try:
284                columns_to_types_from_view = self._default_precision_to_max(
285                    self.columns(temp_view_name)
286                )
287
288                schema = self._build_schema_exp(
289                    exp.to_table(table_name_or_schema),
290                    columns_to_types_from_view,
291                )
292                statement = super()._build_create_table_exp(
293                    schema,
294                    None,
295                    exists=exists,
296                    replace=replace,
297                    target_columns_to_types=columns_to_types_from_view,
298                    table_description=table_description,
299                    **kwargs,
300                )
301            finally:
302                self.drop_view(temp_view_name)
303
304        return statement

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
@dataclass(frozen=True)
class TableAlterClusterByOperation(sqlmesh.core.schema_diff.TableAlterOperation, abc.ABC):
307@dataclass(frozen=True)
308class TableAlterClusterByOperation(TableAlterOperation, abc.ABC):
309    pass
@dataclass(frozen=True)
class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation):
312@dataclass(frozen=True)
313class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation):
314    clustering_key: str
315    dialect: str
316
317    @property
318    def is_additive(self) -> bool:
319        return False
320
321    @property
322    def is_destructive(self) -> bool:
323        return False
324
325    @property
326    def _alter_actions(self) -> t.List[exp.Expression]:
327        return [exp.Cluster(expressions=self.cluster_key_expressions)]
328
329    @property
330    def cluster_key_expressions(self) -> t.List[exp.Expression]:
331        # Note: Assumes `clustering_key` as a string like:
332        # - "(col_a)"
333        # - "(col_a, col_b)"
334        # - "func(col_a, transform(col_b))"
335        parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect)
336        return parsed_cluster_key.expressions or [parsed_cluster_key.this]
TableAlterChangeClusterKeyOperation( target_table: sqlglot.expressions.Table, clustering_key: str, dialect: str)
clustering_key: str
dialect: str
is_additive: bool
317    @property
318    def is_additive(self) -> bool:
319        return False
is_destructive: bool
321    @property
322    def is_destructive(self) -> bool:
323        return False
cluster_key_expressions: List[sqlglot.expressions.Expression]
329    @property
330    def cluster_key_expressions(self) -> t.List[exp.Expression]:
331        # Note: Assumes `clustering_key` as a string like:
332        # - "(col_a)"
333        # - "(col_a, col_b)"
334        # - "func(col_a, transform(col_b))"
335        parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect)
336        return parsed_cluster_key.expressions or [parsed_cluster_key.this]
@dataclass(frozen=True)
class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation):
339@dataclass(frozen=True)
340class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation):
341    @property
342    def is_additive(self) -> bool:
343        return False
344
345    @property
346    def is_destructive(self) -> bool:
347        return False
348
349    @property
350    def _alter_actions(self) -> t.List[exp.Expression]:
351        return [exp.Command(this="DROP", expression="CLUSTERING KEY")]
TableAlterDropClusterKeyOperation(target_table: sqlglot.expressions.Table)
is_additive: bool
341    @property
342    def is_additive(self) -> bool:
343        return False
is_destructive: bool
345    @property
346    def is_destructive(self) -> bool:
347        return False
class ClusteredByMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
354class ClusteredByMixin(EngineAdapter):
355    def _build_clustered_by_exp(
356        self,
357        clustered_by: t.List[exp.Expression],
358        **kwargs: t.Any,
359    ) -> t.Optional[exp.Cluster]:
360        return exp.Cluster(expressions=[c.copy() for c in clustered_by])
361
362    def get_alter_operations(
363        self,
364        current_table_name: TableName,
365        target_table_name: TableName,
366        *,
367        ignore_destructive: bool = False,
368        ignore_additive: bool = False,
369    ) -> t.List[TableAlterOperation]:
370        operations = super().get_alter_operations(
371            current_table_name,
372            target_table_name,
373            ignore_destructive=ignore_destructive,
374            ignore_additive=ignore_additive,
375        )
376
377        # check for a change in clustering
378        current_table = exp.to_table(current_table_name)
379        target_table = exp.to_table(target_table_name)
380
381        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
382        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
383
384        current_table_info = seq_get(
385            self.get_data_objects(current_table_schema, {current_table.name}), 0
386        )
387        target_table_info = seq_get(
388            self.get_data_objects(target_table_schema, {target_table.name}), 0
389        )
390
391        if current_table_info and target_table_info:
392            if target_table_info.is_clustered:
393                if target_table_info.clustering_key and (
394                    current_table_info.clustering_key != target_table_info.clustering_key
395                ):
396                    operations.append(
397                        TableAlterChangeClusterKeyOperation(
398                            target_table=current_table,
399                            clustering_key=target_table_info.clustering_key,
400                            dialect=self.dialect,
401                        )
402                    )
403            elif current_table_info.is_clustered:
404                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
405
406        return operations

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
def get_alter_operations( self, current_table_name: Union[str, sqlglot.expressions.Table], target_table_name: Union[str, sqlglot.expressions.Table], *, ignore_destructive: bool = False, ignore_additive: bool = False) -> List[sqlmesh.core.schema_diff.TableAlterOperation]:
362    def get_alter_operations(
363        self,
364        current_table_name: TableName,
365        target_table_name: TableName,
366        *,
367        ignore_destructive: bool = False,
368        ignore_additive: bool = False,
369    ) -> t.List[TableAlterOperation]:
370        operations = super().get_alter_operations(
371            current_table_name,
372            target_table_name,
373            ignore_destructive=ignore_destructive,
374            ignore_additive=ignore_additive,
375        )
376
377        # check for a change in clustering
378        current_table = exp.to_table(current_table_name)
379        target_table = exp.to_table(target_table_name)
380
381        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
382        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
383
384        current_table_info = seq_get(
385            self.get_data_objects(current_table_schema, {current_table.name}), 0
386        )
387        target_table_info = seq_get(
388            self.get_data_objects(target_table_schema, {target_table.name}), 0
389        )
390
391        if current_table_info and target_table_info:
392            if target_table_info.is_clustered:
393                if target_table_info.clustering_key and (
394                    current_table_info.clustering_key != target_table_info.clustering_key
395                ):
396                    operations.append(
397                        TableAlterChangeClusterKeyOperation(
398                            target_table=current_table,
399                            clustering_key=target_table_info.clustering_key,
400                            dialect=self.dialect,
401                        )
402                    )
403            elif current_table_info.is_clustered:
404                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
405
406        return operations

Determines the alter statements needed to change the current table into the structure of the target table.

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
def logical_merge( engine_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, target_table: Union[str, sqlglot.expressions.Table], source_table: <MagicMock id='126494339921328'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.DataType]], unique_key: Sequence[sqlglot.expressions.Expression], when_matched: Optional[sqlglot.expressions.Whens] = None, merge_filter: Optional[sqlglot.expressions.Expression] = None, source_columns: Optional[List[str]] = None) -> None:
409def logical_merge(
410    engine_adapter: EngineAdapter,
411    target_table: TableName,
412    source_table: QueryOrDF,
413    target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
414    unique_key: t.Sequence[exp.Expression],
415    when_matched: t.Optional[exp.Whens] = None,
416    merge_filter: t.Optional[exp.Expression] = None,
417    source_columns: t.Optional[t.List[str]] = None,
418) -> None:
419    """
420    Merge implementation for engine adapters that do not support merge natively.
421
422    The merge is executed as follows:
423    1. Create a temporary table containing the new data to merge.
424    2. Delete rows from target table where unique_key cols match a row in the temporary table.
425    3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows
426       within the temporary table are ommitted.
427    4. Drop the temporary table.
428    """
429    if when_matched or merge_filter:
430        prop = "when_matched" if when_matched else "merge_filter"
431        raise SQLMeshError(
432            f"This engine does not support MERGE expressions and therefore `{prop}` is not supported."
433        )
434
435    engine_adapter._replace_by_key(
436        target_table,
437        source_table,
438        target_columns_to_types,
439        unique_key,
440        is_unique_key=True,
441        source_columns=source_columns,
442    )

Merge implementation for engine adapters that do not support merge natively.

The merge is executed as follows:

  1. Create a temporary table containing the new data to merge.
  2. Delete rows from target table where unique_key cols match a row in the temporary table.
  3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows within the temporary table are ommitted.
  4. Drop the temporary table.
class RowDiffMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
445class RowDiffMixin(EngineAdapter):
446    # The maximum supported value for n in timestamp(n).
447    # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9)
448    MAX_TIMESTAMP_PRECISION = 6
449
450    def concat_columns(
451        self,
452        columns_to_types: t.Dict[str, exp.DataType],
453        decimal_precision: int = 3,
454        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
455        delimiter: str = ",",
456    ) -> exp.Expression:
457        """
458        Produce an expression that generates a string version of a record, that is:
459            - Every column converted to a string representation, joined together into a single string using the specified :delimiter
460        """
461        expressions_to_concat: t.List[exp.Expression] = []
462        for idx, (column, type) in enumerate(columns_to_types.items()):
463            expressions_to_concat.append(
464                exp.func(
465                    "COALESCE",
466                    self.normalize_value(
467                        exp.to_column(column), type, decimal_precision, timestamp_precision
468                    ),
469                    exp.Literal.string(""),
470                )
471            )
472            if idx < len(columns_to_types) - 1:
473                expressions_to_concat.append(exp.Literal.string(delimiter))
474
475        return exp.func("CONCAT", *expressions_to_concat)
476
477    def normalize_value(
478        self,
479        expr: exp.Expression,
480        type: exp.DataType,
481        decimal_precision: int = 3,
482        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
483    ) -> exp.Expression:
484        """
485        Return an expression that converts the values inside the column `col` to a normalized string
486
487        This string should be comparable across database engines, eg:
488            - `date` columns -> YYYY-MM-DD string
489            - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
490            - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places
491            - `boolean` columns -> '1' or '0'
492            - NULLS -> "" (empty string)
493        """
494        if type.is_type(exp.DataType.Type.BOOLEAN):
495            value = self._normalize_boolean_value(expr)
496        elif type.is_type(*exp.DataType.INTEGER_TYPES):
497            value = self._normalize_integer_value(expr)
498        elif type.is_type(*exp.DataType.REAL_TYPES):
499            # If there is no scale on the decimal type, treat it like an integer when comparing
500            # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0)
501            # and they should be treated as integers and not decimals
502            type_params = list(type.find_all(exp.DataTypeParam))
503            if len(type_params) == 2 and type_params[-1].this.to_py() == 0:
504                value = self._normalize_integer_value(expr)
505            else:
506                value = self._normalize_decimal_value(expr, decimal_precision)
507        elif type.is_type(*exp.DataType.TEMPORAL_TYPES):
508            value = self._normalize_timestamp_value(expr, type, timestamp_precision)
509        elif type.is_type(*exp.DataType.NESTED_TYPES):
510            value = self._normalize_nested_value(expr)
511        else:
512            value = expr
513
514        return exp.cast(value, to=exp.DataType.build("VARCHAR"))
515
516    def _normalize_nested_value(self, expr: exp.Expression) -> exp.Expression:
517        return expr
518
519    def _normalize_timestamp_value(
520        self, expr: exp.Expression, type: exp.DataType, precision: int
521    ) -> exp.Expression:
522        if precision > self.MAX_TIMESTAMP_PRECISION:
523            raise ValueError(
524                f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}"
525            )
526
527        is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32)
528
529        format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT
530
531        if type.is_type(
532            exp.DataType.Type.TIMESTAMPTZ,
533            exp.DataType.Type.TIMESTAMPLTZ,
534            exp.DataType.Type.TIMESTAMPNTZ,
535        ):
536            # Convert all timezone-aware values to UTC for comparison
537            expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC"))
538
539        digits_to_chop_off = (
540            6 - precision
541        )  # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds
542
543        expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format))
544        if digits_to_chop_off > 0:
545            expr = exp.func(
546                "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off
547            )
548
549        return expr
550
551    def _normalize_integer_value(self, expr: exp.Expression) -> exp.Expression:
552        return exp.cast(expr, "BIGINT")
553
554    def _normalize_decimal_value(self, expr: exp.Expression, precision: int) -> exp.Expression:
555        return exp.cast(expr, f"DECIMAL(38,{precision})")
556
557    def _normalize_boolean_value(self, expr: exp.Expression) -> exp.Expression:
558        return exp.cast(expr, "INT")

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
MAX_TIMESTAMP_PRECISION = 6
def concat_columns( self, columns_to_types: Dict[str, sqlglot.expressions.DataType], decimal_precision: int = 3, timestamp_precision: int = 6, delimiter: str = ',') -> sqlglot.expressions.Expression:
450    def concat_columns(
451        self,
452        columns_to_types: t.Dict[str, exp.DataType],
453        decimal_precision: int = 3,
454        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
455        delimiter: str = ",",
456    ) -> exp.Expression:
457        """
458        Produce an expression that generates a string version of a record, that is:
459            - Every column converted to a string representation, joined together into a single string using the specified :delimiter
460        """
461        expressions_to_concat: t.List[exp.Expression] = []
462        for idx, (column, type) in enumerate(columns_to_types.items()):
463            expressions_to_concat.append(
464                exp.func(
465                    "COALESCE",
466                    self.normalize_value(
467                        exp.to_column(column), type, decimal_precision, timestamp_precision
468                    ),
469                    exp.Literal.string(""),
470                )
471            )
472            if idx < len(columns_to_types) - 1:
473                expressions_to_concat.append(exp.Literal.string(delimiter))
474
475        return exp.func("CONCAT", *expressions_to_concat)

Produce an expression that generates a string version of a record, that is: - Every column converted to a string representation, joined together into a single string using the specified :delimiter

def normalize_value( self, expr: sqlglot.expressions.Expression, type: sqlglot.expressions.DataType, decimal_precision: int = 3, timestamp_precision: int = 6) -> sqlglot.expressions.Expression:
477    def normalize_value(
478        self,
479        expr: exp.Expression,
480        type: exp.DataType,
481        decimal_precision: int = 3,
482        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
483    ) -> exp.Expression:
484        """
485        Return an expression that converts the values inside the column `col` to a normalized string
486
487        This string should be comparable across database engines, eg:
488            - `date` columns -> YYYY-MM-DD string
489            - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
490            - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places
491            - `boolean` columns -> '1' or '0'
492            - NULLS -> "" (empty string)
493        """
494        if type.is_type(exp.DataType.Type.BOOLEAN):
495            value = self._normalize_boolean_value(expr)
496        elif type.is_type(*exp.DataType.INTEGER_TYPES):
497            value = self._normalize_integer_value(expr)
498        elif type.is_type(*exp.DataType.REAL_TYPES):
499            # If there is no scale on the decimal type, treat it like an integer when comparing
500            # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0)
501            # and they should be treated as integers and not decimals
502            type_params = list(type.find_all(exp.DataTypeParam))
503            if len(type_params) == 2 and type_params[-1].this.to_py() == 0:
504                value = self._normalize_integer_value(expr)
505            else:
506                value = self._normalize_decimal_value(expr, decimal_precision)
507        elif type.is_type(*exp.DataType.TEMPORAL_TYPES):
508            value = self._normalize_timestamp_value(expr, type, timestamp_precision)
509        elif type.is_type(*exp.DataType.NESTED_TYPES):
510            value = self._normalize_nested_value(expr)
511        else:
512            value = expr
513
514        return exp.cast(value, to=exp.DataType.build("VARCHAR"))

Return an expression that converts the values inside the column col to a normalized string

This string should be comparable across database engines, eg: - date columns -> YYYY-MM-DD string - datetime/timestamp/timestamptz columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision - float / double / decimal -> Value formatted to :decimal_precision decimal places - boolean columns -> '1' or '0' - NULLS -> "" (empty string)

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class GrantsFromInfoSchemaMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
561class GrantsFromInfoSchemaMixin(EngineAdapter):
562    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("current_user")
563    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
564    USE_CATALOG_IN_GRANTS = False
565    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges"
566
567    @staticmethod
568    @abc.abstractmethod
569    def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]:
570        pass
571
572    @abc.abstractmethod
573    def _get_current_schema(self) -> str:
574        pass
575
576    def _dcl_grants_config_expr(
577        self,
578        dcl_cmd: t.Type[DCL],
579        table: exp.Table,
580        grants_config: GrantsConfig,
581        table_type: DataObjectType = DataObjectType.TABLE,
582    ) -> t.List[exp.Expression]:
583        expressions: t.List[exp.Expression] = []
584        if not grants_config:
585            return expressions
586
587        object_kind = self._grant_object_kind(table_type)
588        for privilege, principals in grants_config.items():
589            args: t.Dict[str, t.Any] = {
590                "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
591                "securable": table.copy(),
592            }
593            if object_kind:
594                args["kind"] = exp.Var(this=object_kind)
595            if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS:
596                args["principals"] = [
597                    normalize_identifiers(
598                        parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
599                        dialect=self.dialect,
600                    )
601                    for principal in principals
602                ]
603                expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
604            else:
605                for principal in principals:
606                    args["principals"] = [
607                        normalize_identifiers(
608                            parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
609                            dialect=self.dialect,
610                        )
611                    ]
612                    expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
613
614        return expressions
615
616    def _apply_grants_config_expr(
617        self,
618        table: exp.Table,
619        grants_config: GrantsConfig,
620        table_type: DataObjectType = DataObjectType.TABLE,
621    ) -> t.List[exp.Expression]:
622        return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type)
623
624    def _revoke_grants_config_expr(
625        self,
626        table: exp.Table,
627        grants_config: GrantsConfig,
628        table_type: DataObjectType = DataObjectType.TABLE,
629    ) -> t.List[exp.Expression]:
630        return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type)
631
632    def _get_grant_expression(self, table: exp.Table) -> exp.Expression:
633        schema_identifier = table.args.get("db") or normalize_identifiers(
634            exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect
635        )
636        schema_name = schema_identifier.this
637        table_name = table.args.get("this").this  # type: ignore
638
639        grant_conditions = [
640            exp.column("table_schema").eq(exp.Literal.string(schema_name)),
641            exp.column("table_name").eq(exp.Literal.string(table_name)),
642            exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
643            exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
644        ]
645
646        info_schema_table = normalize_identifiers(
647            exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"),
648            dialect=self.dialect,
649        )
650        if self.USE_CATALOG_IN_GRANTS:
651            catalog_identifier = table.args.get("catalog")
652            if not catalog_identifier:
653                catalog_name = self.get_current_catalog()
654                if not catalog_name:
655                    raise SQLMeshError(
656                        "Current catalog could not be determined for fetching grants. This is unexpected."
657                    )
658                catalog_identifier = normalize_identifiers(
659                    exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect
660                )
661            catalog_name = catalog_identifier.this
662            info_schema_table.set("catalog", catalog_identifier.copy())
663            grant_conditions.insert(
664                0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name))
665            )
666
667        return (
668            exp.select("privilege_type", "grantee")
669            .from_(info_schema_table)
670            .where(exp.and_(*grant_conditions))
671        )
672
673    def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
674        grant_expr = self._get_grant_expression(table)
675
676        results = self.fetchall(grant_expr)
677
678        grants_dict: GrantsConfig = {}
679        for privilege_raw, grantee_raw in results:
680            if privilege_raw is None or grantee_raw is None:
681                continue
682
683            privilege = str(privilege_raw)
684            grantee = str(grantee_raw)
685            if not privilege or not grantee:
686                continue
687
688            grantees = grants_dict.setdefault(privilege, [])
689            if grantee not in grantees:
690                grantees.append(grantee)
691
692        return grants_dict

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
CURRENT_USER_OR_ROLE_EXPRESSION: sqlglot.expressions.Expression = CurrentUser()
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
USE_CATALOG_IN_GRANTS = False
GRANT_INFORMATION_SCHEMA_TABLE_NAME = 'table_privileges'
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts