Edit on GitHub

sqlmesh.core.engine_adapter.mixins

  1from __future__ import annotations
  2
  3import abc
  4import logging
  5import typing as t
  6from dataclasses import dataclass
  7
  8from sqlglot import exp, parse_one
  9from sqlglot.helper import seq_get
 10from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 11
 12from sqlmesh.core.engine_adapter.base import EngineAdapter
 13from sqlmesh.core.engine_adapter.shared import DataObjectType
 14from sqlmesh.core.node import IntervalUnit
 15from sqlmesh.core.dialect import schema_
 16from sqlmesh.core.schema_diff import TableAlterOperation
 17from sqlmesh.utils.errors import SQLMeshError
 18
 19if t.TYPE_CHECKING:
 20    from sqlmesh.core._typing import TableName
 21    from sqlmesh.core.engine_adapter._typing import (
 22        DCL,
 23        DF,
 24        GrantsConfig,
 25        QueryOrDF,
 26    )
 27    from sqlmesh.core.engine_adapter.base import QueryOrDF
 28
 29logger = logging.getLogger(__name__)
 30
 31NORMALIZED_DATE_FORMAT = "%Y-%m-%d"
 32NORMALIZED_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
 33
 34
 35class LogicalMergeMixin(EngineAdapter):
 36    def merge(
 37        self,
 38        target_table: TableName,
 39        source_table: QueryOrDF,
 40        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
 41        unique_key: t.Sequence[exp.Expr],
 42        when_matched: t.Optional[exp.Whens] = None,
 43        merge_filter: t.Optional[exp.Expr] = None,
 44        source_columns: t.Optional[t.List[str]] = None,
 45        **kwargs: t.Any,
 46    ) -> None:
 47        logical_merge(
 48            self,
 49            target_table,
 50            source_table,
 51            target_columns_to_types,
 52            unique_key,
 53            when_matched=when_matched,
 54            merge_filter=merge_filter,
 55            source_columns=source_columns,
 56        )
 57
 58
 59class PandasNativeFetchDFSupportMixin(EngineAdapter):
 60    def _fetch_native_df(
 61        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 62    ) -> DF:
 63        """Fetches a Pandas DataFrame from a SQL query."""
 64        from warnings import catch_warnings, filterwarnings
 65
 66        from pandas.io.sql import read_sql_query
 67
 68        sql = self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query
 69        logger.debug(f"Executing SQL:\n{sql}")
 70        with catch_warnings(), self.transaction():
 71            filterwarnings(
 72                "ignore",
 73                category=UserWarning,
 74                message=".*pandas only supports SQLAlchemy connectable.*",
 75            )
 76            df = read_sql_query(sql, self._connection_pool.get())
 77        return df
 78
 79
 80class HiveMetastoreTablePropertiesMixin(EngineAdapter):
 81    MAX_TABLE_COMMENT_LENGTH = 4000
 82    MAX_COLUMN_COMMENT_LENGTH = 4000
 83
 84    def _build_partitioned_by_exp(
 85        self,
 86        partitioned_by: t.List[exp.Expr],
 87        *,
 88        catalog_name: t.Optional[str] = None,
 89        **kwargs: t.Any,
 90    ) -> t.Union[exp.PartitionedByProperty, exp.Property]:
 91        if (
 92            self.dialect == "trino"
 93            and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg"
 94        ):
 95            # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by"
 96            # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed
 97            # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar]
 98            # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties
 99            return exp.Property(
100                this=exp.var("PARTITIONING"),
101                value=exp.array(
102                    *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by)
103                ),
104            )
105        for expr in partitioned_by:
106            if not isinstance(expr, exp.Column):
107                raise SQLMeshError(
108                    f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'."
109                )
110        return exp.PartitionedByProperty(
111            this=exp.Schema(expressions=partitioned_by),
112        )
113
114    def _build_table_properties_exp(
115        self,
116        catalog_name: t.Optional[str] = None,
117        table_format: t.Optional[str] = None,
118        storage_format: t.Optional[str] = None,
119        partitioned_by: t.Optional[t.List[exp.Expr]] = None,
120        partition_interval_unit: t.Optional[IntervalUnit] = None,
121        clustered_by: t.Optional[t.List[exp.Expr]] = None,
122        table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
123        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
124        table_description: t.Optional[str] = None,
125        table_kind: t.Optional[str] = None,
126        **kwargs: t.Any,
127    ) -> t.Optional[exp.Properties]:
128        properties: t.List[exp.Expr] = []
129
130        if table_format and self.dialect == "spark":
131            properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format)))
132            if storage_format:
133                properties.append(
134                    exp.Property(
135                        this="write.format.default", value=exp.Literal.string(storage_format)
136                    )
137                )
138        elif storage_format:
139            properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format)))
140
141        if partitioned_by:
142            properties.append(
143                self._build_partitioned_by_exp(
144                    partitioned_by,
145                    partition_interval_unit=partition_interval_unit,
146                    catalog_name=catalog_name,
147                )
148            )
149
150        if table_description:
151            properties.append(
152                exp.SchemaCommentProperty(
153                    this=exp.Literal.string(self._truncate_table_comment(table_description))
154                )
155            )
156
157        properties.extend(self._table_or_view_properties_to_expressions(table_properties))
158
159        if properties:
160            return exp.Properties(expressions=properties)
161        return None
162
163    def _build_view_properties_exp(
164        self,
165        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
166        table_description: t.Optional[str] = None,
167        **kwargs: t.Any,
168    ) -> t.Optional[exp.Properties]:
169        """Creates a SQLGlot table properties expression for view"""
170        properties: t.List[exp.Expr] = []
171
172        if table_description:
173            properties.append(
174                exp.SchemaCommentProperty(
175                    this=exp.Literal.string(self._truncate_table_comment(table_description))
176                )
177            )
178
179        properties.extend(self._table_or_view_properties_to_expressions(view_properties))
180
181        if properties:
182            return exp.Properties(expressions=properties)
183        return None
184
185    def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str:
186        # iceberg and delta do not have a comment length limit
187        if self.current_catalog_type in ("iceberg", "delta_lake"):
188            return comment
189        return super()._truncate_comment(comment, length)
190
191
192class GetCurrentCatalogFromFunctionMixin(EngineAdapter):
193    CURRENT_CATALOG_EXPRESSION: exp.Expr = exp.func("current_catalog")
194
195    def get_current_catalog(self) -> t.Optional[str]:
196        """Returns the catalog name of the current connection."""
197        result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
198        if result:
199            return result[0]
200        return None
201
202
203class NonTransactionalTruncateMixin(EngineAdapter):
204    def _truncate_table(self, table_name: TableName) -> None:
205        # Truncate forces a commit of the current transaction so we want to do an unconditional delete to
206        # preserve the transaction if one exists otherwise we can truncate
207        if self._connection_pool.is_transaction_active:
208            return self.execute(exp.Delete(this=exp.to_table(table_name)))
209        super()._truncate_table(table_name)
210
211
212class VarcharSizeWorkaroundMixin(EngineAdapter):
213    def _default_precision_to_max(
214        self, columns_to_types: t.Dict[str, exp.DataType]
215    ) -> t.Dict[str, exp.DataType]:
216        # get default lengths for types that support "max" length
217        types_with_max_default_param = {
218            k: [self.schema_differ.parameterized_type_defaults[k][0][0]]
219            for k in self.schema_differ.max_parameter_length
220            if k in self.schema_differ.parameterized_type_defaults
221        }
222
223        # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT
224        # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their
225        # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length
226        # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation.
227        for col_name, col_type in columns_to_types.items():
228            if col_type.this in types_with_max_default_param and col_type.expressions:
229                parameter = self.schema_differ.get_type_parameters(col_type)
230                type_default = types_with_max_default_param[col_type.this]
231                if parameter == type_default:
232                    col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))])
233
234        return columns_to_types
235
236    def _build_create_table_exp(
237        self,
238        table_name_or_schema: t.Union[exp.Schema, TableName],
239        expression: t.Optional[exp.Expr],
240        exists: bool = True,
241        replace: bool = False,
242        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
243        table_description: t.Optional[str] = None,
244        table_kind: t.Optional[str] = None,
245        **kwargs: t.Any,
246    ) -> exp.Create:
247        statement = super()._build_create_table_exp(
248            table_name_or_schema,
249            expression=expression,
250            exists=exists,
251            replace=replace,
252            target_columns_to_types=target_columns_to_types,
253            table_description=table_description,
254            table_kind=table_kind,
255            **kwargs,
256        )
257
258        if (
259            statement.expression
260            and statement.expression.args.get("limit") is not None
261            and statement.expression.args["limit"].expression.this == "0"
262        ):
263            assert not isinstance(table_name_or_schema, exp.Schema)
264
265            # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit
266            # is applied to a ctas statement, VARCHAR types default to 1 in some instances.
267            select_statement = statement.expression.copy()
268            for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation):
269                limit = select_or_union.args.get("limit")
270                if limit is not None and limit.expression.this == "0":
271                    limit.pop()
272
273                select_or_union.set("where", None)
274
275            temp_view_name = self._get_temp_table("ctas")
276
277            self.create_view(temp_view_name, select_statement, replace=False)
278            try:
279                columns_to_types_from_view = self._default_precision_to_max(
280                    self.columns(temp_view_name)
281                )
282
283                schema = self._build_schema_exp(
284                    exp.to_table(table_name_or_schema),
285                    columns_to_types_from_view,
286                )
287                statement = super()._build_create_table_exp(
288                    schema,
289                    None,
290                    exists=exists,
291                    replace=replace,
292                    target_columns_to_types=columns_to_types_from_view,
293                    table_description=table_description,
294                    **kwargs,
295                )
296            finally:
297                self.drop_view(temp_view_name)
298
299        return statement
300
301
302@dataclass(frozen=True)
303class TableAlterClusterByOperation(TableAlterOperation, abc.ABC):
304    pass
305
306
307@dataclass(frozen=True)
308class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation):
309    clustering_key: str
310    dialect: str
311
312    @property
313    def is_additive(self) -> bool:
314        return False
315
316    @property
317    def is_destructive(self) -> bool:
318        return False
319
320    @property
321    def _alter_actions(self) -> t.List[exp.Expr]:
322        return [exp.Cluster(expressions=self.cluster_key_expressions)]
323
324    @property
325    def cluster_key_expressions(self) -> t.List[exp.Expr]:
326        # Note: Assumes `clustering_key` as a string like:
327        # - "(col_a)"
328        # - "(col_a, col_b)"
329        # - "func(col_a, transform(col_b))"
330        parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect)
331        return parsed_cluster_key.expressions or [parsed_cluster_key.this]
332
333
334@dataclass(frozen=True)
335class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation):
336    @property
337    def is_additive(self) -> bool:
338        return False
339
340    @property
341    def is_destructive(self) -> bool:
342        return False
343
344    @property
345    def _alter_actions(self) -> t.List[exp.Expr]:
346        return [exp.Command(this="DROP", expression="CLUSTERING KEY")]
347
348
349class ClusteredByMixin(EngineAdapter):
350    def _build_clustered_by_exp(
351        self,
352        clustered_by: t.List[exp.Expr],
353        **kwargs: t.Any,
354    ) -> t.Optional[exp.Cluster]:
355        return exp.Cluster(expressions=[c.copy() for c in clustered_by])
356
357    def get_alter_operations(
358        self,
359        current_table_name: TableName,
360        target_table_name: TableName,
361        *,
362        ignore_destructive: bool = False,
363        ignore_additive: bool = False,
364    ) -> t.List[TableAlterOperation]:
365        operations = super().get_alter_operations(
366            current_table_name,
367            target_table_name,
368            ignore_destructive=ignore_destructive,
369            ignore_additive=ignore_additive,
370        )
371
372        # check for a change in clustering
373        current_table = exp.to_table(current_table_name)
374        target_table = exp.to_table(target_table_name)
375
376        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
377        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
378
379        current_table_info = seq_get(
380            self.get_data_objects(current_table_schema, {current_table.name}), 0
381        )
382        target_table_info = seq_get(
383            self.get_data_objects(target_table_schema, {target_table.name}), 0
384        )
385
386        if current_table_info and target_table_info:
387            if target_table_info.is_clustered:
388                if target_table_info.clustering_key and (
389                    current_table_info.clustering_key != target_table_info.clustering_key
390                ):
391                    operations.append(
392                        TableAlterChangeClusterKeyOperation(
393                            target_table=current_table,
394                            clustering_key=target_table_info.clustering_key,
395                            dialect=self.dialect,
396                        )
397                    )
398            elif current_table_info.is_clustered:
399                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
400
401        return operations
402
403
404def logical_merge(
405    engine_adapter: EngineAdapter,
406    target_table: TableName,
407    source_table: QueryOrDF,
408    target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
409    unique_key: t.Sequence[exp.Expr],
410    when_matched: t.Optional[exp.Whens] = None,
411    merge_filter: t.Optional[exp.Expr] = None,
412    source_columns: t.Optional[t.List[str]] = None,
413) -> None:
414    """
415    Merge implementation for engine adapters that do not support merge natively.
416
417    The merge is executed as follows:
418    1. Create a temporary table containing the new data to merge.
419    2. Delete rows from target table where unique_key cols match a row in the temporary table.
420    3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows
421       within the temporary table are ommitted.
422    4. Drop the temporary table.
423    """
424    if when_matched or merge_filter:
425        prop = "when_matched" if when_matched else "merge_filter"
426        raise SQLMeshError(
427            f"This engine does not support MERGE expressions and therefore `{prop}` is not supported."
428        )
429
430    engine_adapter._replace_by_key(
431        target_table,
432        source_table,
433        target_columns_to_types,
434        unique_key,
435        is_unique_key=True,
436        source_columns=source_columns,
437    )
438
439
440class RowDiffMixin(EngineAdapter):
441    # The maximum supported value for n in timestamp(n).
442    # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9)
443    MAX_TIMESTAMP_PRECISION = 6
444
445    def concat_columns(
446        self,
447        columns_to_types: t.Dict[str, exp.DataType],
448        decimal_precision: int = 3,
449        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
450        delimiter: str = ",",
451    ) -> exp.Expr:
452        """
453        Produce an expression that generates a string version of a record, that is:
454            - Every column converted to a string representation, joined together into a single string using the specified :delimiter
455        """
456        expressions_to_concat: t.List[exp.Expr] = []
457        for idx, (column, type) in enumerate(columns_to_types.items()):
458            expressions_to_concat.append(
459                exp.func(
460                    "COALESCE",
461                    self.normalize_value(
462                        exp.to_column(column), type, decimal_precision, timestamp_precision
463                    ),
464                    exp.Literal.string(""),
465                )
466            )
467            if idx < len(columns_to_types) - 1:
468                expressions_to_concat.append(exp.Literal.string(delimiter))
469
470        return exp.func("CONCAT", *expressions_to_concat)
471
472    def normalize_value(
473        self,
474        expr: exp.Expr,
475        type: exp.DataType,
476        decimal_precision: int = 3,
477        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
478    ) -> exp.Expr:
479        """
480        Return an expression that converts the values inside the column `col` to a normalized string
481
482        This string should be comparable across database engines, eg:
483            - `date` columns -> YYYY-MM-DD string
484            - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
485            - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places
486            - `boolean` columns -> '1' or '0'
487            - NULLS -> "" (empty string)
488        """
489        value: exp.Expr
490        if type.is_type(exp.DataType.Type.BOOLEAN):
491            value = self._normalize_boolean_value(expr)
492        elif type.is_type(*exp.DataType.INTEGER_TYPES):
493            value = self._normalize_integer_value(expr)
494        elif type.is_type(*exp.DataType.REAL_TYPES):
495            # If there is no scale on the decimal type, treat it like an integer when comparing
496            # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0)
497            # and they should be treated as integers and not decimals
498            type_params = list(type.find_all(exp.DataTypeParam))
499            if len(type_params) == 2 and type_params[-1].this.to_py() == 0:
500                value = self._normalize_integer_value(expr)
501            else:
502                value = self._normalize_decimal_value(expr, decimal_precision)
503        elif type.is_type(*exp.DataType.TEMPORAL_TYPES):
504            value = self._normalize_timestamp_value(expr, type, timestamp_precision)
505        elif type.is_type(*exp.DataType.NESTED_TYPES):
506            value = self._normalize_nested_value(expr)
507        else:
508            value = expr
509
510        return exp.cast(value, to=exp.DataType.build("VARCHAR"))
511
512    def _normalize_nested_value(self, expr: exp.Expr) -> exp.Expr:
513        return expr
514
515    def _normalize_timestamp_value(
516        self, expr: exp.Expr, type: exp.DataType, precision: int
517    ) -> exp.Expr:
518        if precision > self.MAX_TIMESTAMP_PRECISION:
519            raise ValueError(
520                f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}"
521            )
522
523        is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32)
524
525        format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT
526
527        if type.is_type(
528            exp.DataType.Type.TIMESTAMPTZ,
529            exp.DataType.Type.TIMESTAMPLTZ,
530            exp.DataType.Type.TIMESTAMPNTZ,
531        ):
532            # Convert all timezone-aware values to UTC for comparison
533            expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC"))
534
535        digits_to_chop_off = (
536            6 - precision
537        )  # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds
538
539        expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format))
540        if digits_to_chop_off > 0:
541            expr = exp.func(
542                "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off
543            )
544
545        return expr
546
547    def _normalize_integer_value(self, expr: exp.Expr) -> exp.Expr:
548        return exp.cast(expr, "BIGINT")
549
550    def _normalize_decimal_value(self, expr: exp.Expr, precision: int) -> exp.Expr:
551        return exp.cast(expr, f"DECIMAL(38,{precision})")
552
553    def _normalize_boolean_value(self, expr: exp.Expr) -> exp.Expr:
554        return exp.cast(expr, "INT")
555
556
557class GrantsFromInfoSchemaMixin(EngineAdapter):
558    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("current_user")
559    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
560    USE_CATALOG_IN_GRANTS = False
561    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges"
562
563    @staticmethod
564    @abc.abstractmethod
565    def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]:
566        pass
567
568    @abc.abstractmethod
569    def _get_current_schema(self) -> str:
570        pass
571
572    def _dcl_grants_config_expr(
573        self,
574        dcl_cmd: t.Type[DCL],
575        table: exp.Table,
576        grants_config: GrantsConfig,
577        table_type: DataObjectType = DataObjectType.TABLE,
578    ) -> t.List[exp.Expr]:
579        expressions: t.List[exp.Expr] = []
580        if not grants_config:
581            return expressions
582
583        object_kind = self._grant_object_kind(table_type)
584        for privilege, principals in grants_config.items():
585            args: t.Dict[str, t.Any] = {
586                "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
587                "securable": table.copy(),
588            }
589            if object_kind:
590                args["kind"] = exp.Var(this=object_kind)
591            if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS:
592                args["principals"] = [
593                    normalize_identifiers(
594                        parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
595                        dialect=self.dialect,
596                    )
597                    for principal in principals
598                ]
599                expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
600            else:
601                for principal in principals:
602                    args["principals"] = [
603                        normalize_identifiers(
604                            parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
605                            dialect=self.dialect,
606                        )
607                    ]
608                    expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
609
610        return expressions
611
612    def _apply_grants_config_expr(
613        self,
614        table: exp.Table,
615        grants_config: GrantsConfig,
616        table_type: DataObjectType = DataObjectType.TABLE,
617    ) -> t.List[exp.Expr]:
618        return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type)
619
620    def _revoke_grants_config_expr(
621        self,
622        table: exp.Table,
623        grants_config: GrantsConfig,
624        table_type: DataObjectType = DataObjectType.TABLE,
625    ) -> t.List[exp.Expr]:
626        return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type)
627
628    def _get_grant_expression(self, table: exp.Table) -> exp.Expr:
629        schema_identifier = table.args.get("db") or normalize_identifiers(
630            exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect
631        )
632        schema_name = schema_identifier.this
633        table_name = table.args.get("this").this  # type: ignore
634
635        grant_conditions = [
636            exp.column("table_schema").eq(exp.Literal.string(schema_name)),
637            exp.column("table_name").eq(exp.Literal.string(table_name)),
638            exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
639            exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
640        ]
641
642        info_schema_table = normalize_identifiers(
643            exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"),
644            dialect=self.dialect,
645        )
646        if self.USE_CATALOG_IN_GRANTS:
647            catalog_identifier = table.args.get("catalog")
648            if not catalog_identifier:
649                catalog_name = self.get_current_catalog()
650                if not catalog_name:
651                    raise SQLMeshError(
652                        "Current catalog could not be determined for fetching grants. This is unexpected."
653                    )
654                catalog_identifier = normalize_identifiers(
655                    exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect
656                )
657            catalog_name = catalog_identifier.this
658            info_schema_table.set("catalog", catalog_identifier.copy())
659            grant_conditions.insert(
660                0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name))
661            )
662
663        return (
664            exp.select("privilege_type", "grantee")
665            .from_(info_schema_table)
666            .where(exp.and_(*grant_conditions))
667        )
668
669    def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
670        grant_expr = self._get_grant_expression(table)
671
672        results = self.fetchall(grant_expr)
673
674        grants_dict: GrantsConfig = {}
675        for privilege_raw, grantee_raw in results:
676            if privilege_raw is None or grantee_raw is None:
677                continue
678
679            privilege = str(privilege_raw)
680            grantee = str(grantee_raw)
681            if not privilege or not grantee:
682                continue
683
684            grantees = grants_dict.setdefault(privilege, [])
685            if grantee not in grantees:
686                grantees.append(grantee)
687
688        return grants_dict
logger = <Logger sqlmesh.core.engine_adapter.mixins (WARNING)>
NORMALIZED_DATE_FORMAT = '%Y-%m-%d'
NORMALIZED_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
class LogicalMergeMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
36class LogicalMergeMixin(EngineAdapter):
37    def merge(
38        self,
39        target_table: TableName,
40        source_table: QueryOrDF,
41        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
42        unique_key: t.Sequence[exp.Expr],
43        when_matched: t.Optional[exp.Whens] = None,
44        merge_filter: t.Optional[exp.Expr] = None,
45        source_columns: t.Optional[t.List[str]] = None,
46        **kwargs: t.Any,
47    ) -> None:
48        logical_merge(
49            self,
50            target_table,
51            source_table,
52            target_columns_to_types,
53            unique_key,
54            when_matched=when_matched,
55            merge_filter=merge_filter,
56            source_columns=source_columns,
57        )

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
def merge( self, target_table: Union[str, sqlglot.expressions.query.Table], source_table: <MagicMock id='132726895264336'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]], unique_key: Sequence[sqlglot.expressions.core.Expr], when_matched: Optional[sqlglot.expressions.dml.Whens] = None, merge_filter: Optional[sqlglot.expressions.core.Expr] = None, source_columns: Optional[List[str]] = None, **kwargs: Any) -> None:
37    def merge(
38        self,
39        target_table: TableName,
40        source_table: QueryOrDF,
41        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
42        unique_key: t.Sequence[exp.Expr],
43        when_matched: t.Optional[exp.Whens] = None,
44        merge_filter: t.Optional[exp.Expr] = None,
45        source_columns: t.Optional[t.List[str]] = None,
46        **kwargs: t.Any,
47    ) -> None:
48        logical_merge(
49            self,
50            target_table,
51            source_table,
52            target_columns_to_types,
53            unique_key,
54            when_matched=when_matched,
55            merge_filter=merge_filter,
56            source_columns=source_columns,
57        )
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class PandasNativeFetchDFSupportMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
60class PandasNativeFetchDFSupportMixin(EngineAdapter):
61    def _fetch_native_df(
62        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
63    ) -> DF:
64        """Fetches a Pandas DataFrame from a SQL query."""
65        from warnings import catch_warnings, filterwarnings
66
67        from pandas.io.sql import read_sql_query
68
69        sql = self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query
70        logger.debug(f"Executing SQL:\n{sql}")
71        with catch_warnings(), self.transaction():
72            filterwarnings(
73                "ignore",
74                category=UserWarning,
75                message=".*pandas only supports SQLAlchemy connectable.*",
76            )
77            df = read_sql_query(sql, self._connection_pool.get())
78        return df

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class HiveMetastoreTablePropertiesMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
 81class HiveMetastoreTablePropertiesMixin(EngineAdapter):
 82    MAX_TABLE_COMMENT_LENGTH = 4000
 83    MAX_COLUMN_COMMENT_LENGTH = 4000
 84
 85    def _build_partitioned_by_exp(
 86        self,
 87        partitioned_by: t.List[exp.Expr],
 88        *,
 89        catalog_name: t.Optional[str] = None,
 90        **kwargs: t.Any,
 91    ) -> t.Union[exp.PartitionedByProperty, exp.Property]:
 92        if (
 93            self.dialect == "trino"
 94            and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg"
 95        ):
 96            # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by"
 97            # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed
 98            # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar]
 99            # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties
100            return exp.Property(
101                this=exp.var("PARTITIONING"),
102                value=exp.array(
103                    *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by)
104                ),
105            )
106        for expr in partitioned_by:
107            if not isinstance(expr, exp.Column):
108                raise SQLMeshError(
109                    f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'."
110                )
111        return exp.PartitionedByProperty(
112            this=exp.Schema(expressions=partitioned_by),
113        )
114
115    def _build_table_properties_exp(
116        self,
117        catalog_name: t.Optional[str] = None,
118        table_format: t.Optional[str] = None,
119        storage_format: t.Optional[str] = None,
120        partitioned_by: t.Optional[t.List[exp.Expr]] = None,
121        partition_interval_unit: t.Optional[IntervalUnit] = None,
122        clustered_by: t.Optional[t.List[exp.Expr]] = None,
123        table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
124        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
125        table_description: t.Optional[str] = None,
126        table_kind: t.Optional[str] = None,
127        **kwargs: t.Any,
128    ) -> t.Optional[exp.Properties]:
129        properties: t.List[exp.Expr] = []
130
131        if table_format and self.dialect == "spark":
132            properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format)))
133            if storage_format:
134                properties.append(
135                    exp.Property(
136                        this="write.format.default", value=exp.Literal.string(storage_format)
137                    )
138                )
139        elif storage_format:
140            properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format)))
141
142        if partitioned_by:
143            properties.append(
144                self._build_partitioned_by_exp(
145                    partitioned_by,
146                    partition_interval_unit=partition_interval_unit,
147                    catalog_name=catalog_name,
148                )
149            )
150
151        if table_description:
152            properties.append(
153                exp.SchemaCommentProperty(
154                    this=exp.Literal.string(self._truncate_table_comment(table_description))
155                )
156            )
157
158        properties.extend(self._table_or_view_properties_to_expressions(table_properties))
159
160        if properties:
161            return exp.Properties(expressions=properties)
162        return None
163
164    def _build_view_properties_exp(
165        self,
166        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
167        table_description: t.Optional[str] = None,
168        **kwargs: t.Any,
169    ) -> t.Optional[exp.Properties]:
170        """Creates a SQLGlot table properties expression for view"""
171        properties: t.List[exp.Expr] = []
172
173        if table_description:
174            properties.append(
175                exp.SchemaCommentProperty(
176                    this=exp.Literal.string(self._truncate_table_comment(table_description))
177                )
178            )
179
180        properties.extend(self._table_or_view_properties_to_expressions(view_properties))
181
182        if properties:
183            return exp.Properties(expressions=properties)
184        return None
185
186    def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str:
187        # iceberg and delta do not have a comment length limit
188        if self.current_catalog_type in ("iceberg", "delta_lake"):
189            return comment
190        return super()._truncate_comment(comment, length)

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
MAX_TABLE_COMMENT_LENGTH = 4000
MAX_COLUMN_COMMENT_LENGTH = 4000
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class GetCurrentCatalogFromFunctionMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
193class GetCurrentCatalogFromFunctionMixin(EngineAdapter):
194    CURRENT_CATALOG_EXPRESSION: exp.Expr = exp.func("current_catalog")
195
196    def get_current_catalog(self) -> t.Optional[str]:
197        """Returns the catalog name of the current connection."""
198        result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
199        if result:
200            return result[0]
201        return None

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
CURRENT_CATALOG_EXPRESSION: sqlglot.expressions.core.Expr = CurrentCatalog()
def get_current_catalog(self) -> Optional[str]:
196    def get_current_catalog(self) -> t.Optional[str]:
197        """Returns the catalog name of the current connection."""
198        result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
199        if result:
200            return result[0]
201        return None

Returns the catalog name of the current connection.

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class NonTransactionalTruncateMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
204class NonTransactionalTruncateMixin(EngineAdapter):
205    def _truncate_table(self, table_name: TableName) -> None:
206        # Truncate forces a commit of the current transaction so we want to do an unconditional delete to
207        # preserve the transaction if one exists otherwise we can truncate
208        if self._connection_pool.is_transaction_active:
209            return self.execute(exp.Delete(this=exp.to_table(table_name)))
210        super()._truncate_table(table_name)

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class VarcharSizeWorkaroundMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
213class VarcharSizeWorkaroundMixin(EngineAdapter):
214    def _default_precision_to_max(
215        self, columns_to_types: t.Dict[str, exp.DataType]
216    ) -> t.Dict[str, exp.DataType]:
217        # get default lengths for types that support "max" length
218        types_with_max_default_param = {
219            k: [self.schema_differ.parameterized_type_defaults[k][0][0]]
220            for k in self.schema_differ.max_parameter_length
221            if k in self.schema_differ.parameterized_type_defaults
222        }
223
224        # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT
225        # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their
226        # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length
227        # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation.
228        for col_name, col_type in columns_to_types.items():
229            if col_type.this in types_with_max_default_param and col_type.expressions:
230                parameter = self.schema_differ.get_type_parameters(col_type)
231                type_default = types_with_max_default_param[col_type.this]
232                if parameter == type_default:
233                    col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))])
234
235        return columns_to_types
236
237    def _build_create_table_exp(
238        self,
239        table_name_or_schema: t.Union[exp.Schema, TableName],
240        expression: t.Optional[exp.Expr],
241        exists: bool = True,
242        replace: bool = False,
243        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
244        table_description: t.Optional[str] = None,
245        table_kind: t.Optional[str] = None,
246        **kwargs: t.Any,
247    ) -> exp.Create:
248        statement = super()._build_create_table_exp(
249            table_name_or_schema,
250            expression=expression,
251            exists=exists,
252            replace=replace,
253            target_columns_to_types=target_columns_to_types,
254            table_description=table_description,
255            table_kind=table_kind,
256            **kwargs,
257        )
258
259        if (
260            statement.expression
261            and statement.expression.args.get("limit") is not None
262            and statement.expression.args["limit"].expression.this == "0"
263        ):
264            assert not isinstance(table_name_or_schema, exp.Schema)
265
266            # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit
267            # is applied to a ctas statement, VARCHAR types default to 1 in some instances.
268            select_statement = statement.expression.copy()
269            for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation):
270                limit = select_or_union.args.get("limit")
271                if limit is not None and limit.expression.this == "0":
272                    limit.pop()
273
274                select_or_union.set("where", None)
275
276            temp_view_name = self._get_temp_table("ctas")
277
278            self.create_view(temp_view_name, select_statement, replace=False)
279            try:
280                columns_to_types_from_view = self._default_precision_to_max(
281                    self.columns(temp_view_name)
282                )
283
284                schema = self._build_schema_exp(
285                    exp.to_table(table_name_or_schema),
286                    columns_to_types_from_view,
287                )
288                statement = super()._build_create_table_exp(
289                    schema,
290                    None,
291                    exists=exists,
292                    replace=replace,
293                    target_columns_to_types=columns_to_types_from_view,
294                    table_description=table_description,
295                    **kwargs,
296                )
297            finally:
298                self.drop_view(temp_view_name)
299
300        return statement

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
@dataclass(frozen=True)
class TableAlterClusterByOperation(sqlmesh.core.schema_diff.TableAlterOperation, abc.ABC):
303@dataclass(frozen=True)
304class TableAlterClusterByOperation(TableAlterOperation, abc.ABC):
305    pass
@dataclass(frozen=True)
class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation):
308@dataclass(frozen=True)
309class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation):
310    clustering_key: str
311    dialect: str
312
313    @property
314    def is_additive(self) -> bool:
315        return False
316
317    @property
318    def is_destructive(self) -> bool:
319        return False
320
321    @property
322    def _alter_actions(self) -> t.List[exp.Expr]:
323        return [exp.Cluster(expressions=self.cluster_key_expressions)]
324
325    @property
326    def cluster_key_expressions(self) -> t.List[exp.Expr]:
327        # Note: Assumes `clustering_key` as a string like:
328        # - "(col_a)"
329        # - "(col_a, col_b)"
330        # - "func(col_a, transform(col_b))"
331        parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect)
332        return parsed_cluster_key.expressions or [parsed_cluster_key.this]
TableAlterChangeClusterKeyOperation( target_table: sqlglot.expressions.query.Table, clustering_key: str, dialect: str)
clustering_key: str
dialect: str
is_additive: bool
313    @property
314    def is_additive(self) -> bool:
315        return False
is_destructive: bool
317    @property
318    def is_destructive(self) -> bool:
319        return False
cluster_key_expressions: List[sqlglot.expressions.core.Expr]
325    @property
326    def cluster_key_expressions(self) -> t.List[exp.Expr]:
327        # Note: Assumes `clustering_key` as a string like:
328        # - "(col_a)"
329        # - "(col_a, col_b)"
330        # - "func(col_a, transform(col_b))"
331        parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect)
332        return parsed_cluster_key.expressions or [parsed_cluster_key.this]
@dataclass(frozen=True)
class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation):
335@dataclass(frozen=True)
336class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation):
337    @property
338    def is_additive(self) -> bool:
339        return False
340
341    @property
342    def is_destructive(self) -> bool:
343        return False
344
345    @property
346    def _alter_actions(self) -> t.List[exp.Expr]:
347        return [exp.Command(this="DROP", expression="CLUSTERING KEY")]
TableAlterDropClusterKeyOperation(target_table: sqlglot.expressions.query.Table)
is_additive: bool
337    @property
338    def is_additive(self) -> bool:
339        return False
is_destructive: bool
341    @property
342    def is_destructive(self) -> bool:
343        return False
class ClusteredByMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
350class ClusteredByMixin(EngineAdapter):
351    def _build_clustered_by_exp(
352        self,
353        clustered_by: t.List[exp.Expr],
354        **kwargs: t.Any,
355    ) -> t.Optional[exp.Cluster]:
356        return exp.Cluster(expressions=[c.copy() for c in clustered_by])
357
358    def get_alter_operations(
359        self,
360        current_table_name: TableName,
361        target_table_name: TableName,
362        *,
363        ignore_destructive: bool = False,
364        ignore_additive: bool = False,
365    ) -> t.List[TableAlterOperation]:
366        operations = super().get_alter_operations(
367            current_table_name,
368            target_table_name,
369            ignore_destructive=ignore_destructive,
370            ignore_additive=ignore_additive,
371        )
372
373        # check for a change in clustering
374        current_table = exp.to_table(current_table_name)
375        target_table = exp.to_table(target_table_name)
376
377        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
378        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
379
380        current_table_info = seq_get(
381            self.get_data_objects(current_table_schema, {current_table.name}), 0
382        )
383        target_table_info = seq_get(
384            self.get_data_objects(target_table_schema, {target_table.name}), 0
385        )
386
387        if current_table_info and target_table_info:
388            if target_table_info.is_clustered:
389                if target_table_info.clustering_key and (
390                    current_table_info.clustering_key != target_table_info.clustering_key
391                ):
392                    operations.append(
393                        TableAlterChangeClusterKeyOperation(
394                            target_table=current_table,
395                            clustering_key=target_table_info.clustering_key,
396                            dialect=self.dialect,
397                        )
398                    )
399            elif current_table_info.is_clustered:
400                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
401
402        return operations

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
def get_alter_operations( self, current_table_name: Union[str, sqlglot.expressions.query.Table], target_table_name: Union[str, sqlglot.expressions.query.Table], *, ignore_destructive: bool = False, ignore_additive: bool = False) -> List[sqlmesh.core.schema_diff.TableAlterOperation]:
358    def get_alter_operations(
359        self,
360        current_table_name: TableName,
361        target_table_name: TableName,
362        *,
363        ignore_destructive: bool = False,
364        ignore_additive: bool = False,
365    ) -> t.List[TableAlterOperation]:
366        operations = super().get_alter_operations(
367            current_table_name,
368            target_table_name,
369            ignore_destructive=ignore_destructive,
370            ignore_additive=ignore_additive,
371        )
372
373        # check for a change in clustering
374        current_table = exp.to_table(current_table_name)
375        target_table = exp.to_table(target_table_name)
376
377        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
378        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
379
380        current_table_info = seq_get(
381            self.get_data_objects(current_table_schema, {current_table.name}), 0
382        )
383        target_table_info = seq_get(
384            self.get_data_objects(target_table_schema, {target_table.name}), 0
385        )
386
387        if current_table_info and target_table_info:
388            if target_table_info.is_clustered:
389                if target_table_info.clustering_key and (
390                    current_table_info.clustering_key != target_table_info.clustering_key
391                ):
392                    operations.append(
393                        TableAlterChangeClusterKeyOperation(
394                            target_table=current_table,
395                            clustering_key=target_table_info.clustering_key,
396                            dialect=self.dialect,
397                        )
398                    )
399            elif current_table_info.is_clustered:
400                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
401
402        return operations

Determines the alter statements needed to change the current table into the structure of the target table.

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
def logical_merge( engine_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, target_table: Union[str, sqlglot.expressions.query.Table], source_table: <MagicMock id='132726895264336'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]], unique_key: Sequence[sqlglot.expressions.core.Expr], when_matched: Optional[sqlglot.expressions.dml.Whens] = None, merge_filter: Optional[sqlglot.expressions.core.Expr] = None, source_columns: Optional[List[str]] = None) -> None:
405def logical_merge(
406    engine_adapter: EngineAdapter,
407    target_table: TableName,
408    source_table: QueryOrDF,
409    target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
410    unique_key: t.Sequence[exp.Expr],
411    when_matched: t.Optional[exp.Whens] = None,
412    merge_filter: t.Optional[exp.Expr] = None,
413    source_columns: t.Optional[t.List[str]] = None,
414) -> None:
415    """
416    Merge implementation for engine adapters that do not support merge natively.
417
418    The merge is executed as follows:
419    1. Create a temporary table containing the new data to merge.
420    2. Delete rows from target table where unique_key cols match a row in the temporary table.
421    3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows
422       within the temporary table are ommitted.
423    4. Drop the temporary table.
424    """
425    if when_matched or merge_filter:
426        prop = "when_matched" if when_matched else "merge_filter"
427        raise SQLMeshError(
428            f"This engine does not support MERGE expressions and therefore `{prop}` is not supported."
429        )
430
431    engine_adapter._replace_by_key(
432        target_table,
433        source_table,
434        target_columns_to_types,
435        unique_key,
436        is_unique_key=True,
437        source_columns=source_columns,
438    )

Merge implementation for engine adapters that do not support merge natively.

The merge is executed as follows:

  1. Create a temporary table containing the new data to merge.
  2. Delete rows from target table where unique_key cols match a row in the temporary table.
  3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows within the temporary table are ommitted.
  4. Drop the temporary table.
class RowDiffMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
441class RowDiffMixin(EngineAdapter):
442    # The maximum supported value for n in timestamp(n).
443    # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9)
444    MAX_TIMESTAMP_PRECISION = 6
445
446    def concat_columns(
447        self,
448        columns_to_types: t.Dict[str, exp.DataType],
449        decimal_precision: int = 3,
450        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
451        delimiter: str = ",",
452    ) -> exp.Expr:
453        """
454        Produce an expression that generates a string version of a record, that is:
455            - Every column converted to a string representation, joined together into a single string using the specified :delimiter
456        """
457        expressions_to_concat: t.List[exp.Expr] = []
458        for idx, (column, type) in enumerate(columns_to_types.items()):
459            expressions_to_concat.append(
460                exp.func(
461                    "COALESCE",
462                    self.normalize_value(
463                        exp.to_column(column), type, decimal_precision, timestamp_precision
464                    ),
465                    exp.Literal.string(""),
466                )
467            )
468            if idx < len(columns_to_types) - 1:
469                expressions_to_concat.append(exp.Literal.string(delimiter))
470
471        return exp.func("CONCAT", *expressions_to_concat)
472
473    def normalize_value(
474        self,
475        expr: exp.Expr,
476        type: exp.DataType,
477        decimal_precision: int = 3,
478        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
479    ) -> exp.Expr:
480        """
481        Return an expression that converts the values inside the column `col` to a normalized string
482
483        This string should be comparable across database engines, eg:
484            - `date` columns -> YYYY-MM-DD string
485            - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
486            - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places
487            - `boolean` columns -> '1' or '0'
488            - NULLS -> "" (empty string)
489        """
490        value: exp.Expr
491        if type.is_type(exp.DataType.Type.BOOLEAN):
492            value = self._normalize_boolean_value(expr)
493        elif type.is_type(*exp.DataType.INTEGER_TYPES):
494            value = self._normalize_integer_value(expr)
495        elif type.is_type(*exp.DataType.REAL_TYPES):
496            # If there is no scale on the decimal type, treat it like an integer when comparing
497            # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0)
498            # and they should be treated as integers and not decimals
499            type_params = list(type.find_all(exp.DataTypeParam))
500            if len(type_params) == 2 and type_params[-1].this.to_py() == 0:
501                value = self._normalize_integer_value(expr)
502            else:
503                value = self._normalize_decimal_value(expr, decimal_precision)
504        elif type.is_type(*exp.DataType.TEMPORAL_TYPES):
505            value = self._normalize_timestamp_value(expr, type, timestamp_precision)
506        elif type.is_type(*exp.DataType.NESTED_TYPES):
507            value = self._normalize_nested_value(expr)
508        else:
509            value = expr
510
511        return exp.cast(value, to=exp.DataType.build("VARCHAR"))
512
513    def _normalize_nested_value(self, expr: exp.Expr) -> exp.Expr:
514        return expr
515
516    def _normalize_timestamp_value(
517        self, expr: exp.Expr, type: exp.DataType, precision: int
518    ) -> exp.Expr:
519        if precision > self.MAX_TIMESTAMP_PRECISION:
520            raise ValueError(
521                f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}"
522            )
523
524        is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32)
525
526        format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT
527
528        if type.is_type(
529            exp.DataType.Type.TIMESTAMPTZ,
530            exp.DataType.Type.TIMESTAMPLTZ,
531            exp.DataType.Type.TIMESTAMPNTZ,
532        ):
533            # Convert all timezone-aware values to UTC for comparison
534            expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC"))
535
536        digits_to_chop_off = (
537            6 - precision
538        )  # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds
539
540        expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format))
541        if digits_to_chop_off > 0:
542            expr = exp.func(
543                "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off
544            )
545
546        return expr
547
548    def _normalize_integer_value(self, expr: exp.Expr) -> exp.Expr:
549        return exp.cast(expr, "BIGINT")
550
551    def _normalize_decimal_value(self, expr: exp.Expr, precision: int) -> exp.Expr:
552        return exp.cast(expr, f"DECIMAL(38,{precision})")
553
554    def _normalize_boolean_value(self, expr: exp.Expr) -> exp.Expr:
555        return exp.cast(expr, "INT")

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
MAX_TIMESTAMP_PRECISION = 6
def concat_columns( self, columns_to_types: Dict[str, sqlglot.expressions.datatypes.DataType], decimal_precision: int = 3, timestamp_precision: int = 6, delimiter: str = ',') -> sqlglot.expressions.core.Expr:
446    def concat_columns(
447        self,
448        columns_to_types: t.Dict[str, exp.DataType],
449        decimal_precision: int = 3,
450        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
451        delimiter: str = ",",
452    ) -> exp.Expr:
453        """
454        Produce an expression that generates a string version of a record, that is:
455            - Every column converted to a string representation, joined together into a single string using the specified :delimiter
456        """
457        expressions_to_concat: t.List[exp.Expr] = []
458        for idx, (column, type) in enumerate(columns_to_types.items()):
459            expressions_to_concat.append(
460                exp.func(
461                    "COALESCE",
462                    self.normalize_value(
463                        exp.to_column(column), type, decimal_precision, timestamp_precision
464                    ),
465                    exp.Literal.string(""),
466                )
467            )
468            if idx < len(columns_to_types) - 1:
469                expressions_to_concat.append(exp.Literal.string(delimiter))
470
471        return exp.func("CONCAT", *expressions_to_concat)

Produce an expression that generates a string version of a record, that is: - Every column converted to a string representation, joined together into a single string using the specified :delimiter

def normalize_value( self, expr: sqlglot.expressions.core.Expr, type: sqlglot.expressions.datatypes.DataType, decimal_precision: int = 3, timestamp_precision: int = 6) -> sqlglot.expressions.core.Expr:
473    def normalize_value(
474        self,
475        expr: exp.Expr,
476        type: exp.DataType,
477        decimal_precision: int = 3,
478        timestamp_precision: int = MAX_TIMESTAMP_PRECISION,
479    ) -> exp.Expr:
480        """
481        Return an expression that converts the values inside the column `col` to a normalized string
482
483        This string should be comparable across database engines, eg:
484            - `date` columns -> YYYY-MM-DD string
485            - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
486            - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places
487            - `boolean` columns -> '1' or '0'
488            - NULLS -> "" (empty string)
489        """
490        value: exp.Expr
491        if type.is_type(exp.DataType.Type.BOOLEAN):
492            value = self._normalize_boolean_value(expr)
493        elif type.is_type(*exp.DataType.INTEGER_TYPES):
494            value = self._normalize_integer_value(expr)
495        elif type.is_type(*exp.DataType.REAL_TYPES):
496            # If there is no scale on the decimal type, treat it like an integer when comparing
497            # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0)
498            # and they should be treated as integers and not decimals
499            type_params = list(type.find_all(exp.DataTypeParam))
500            if len(type_params) == 2 and type_params[-1].this.to_py() == 0:
501                value = self._normalize_integer_value(expr)
502            else:
503                value = self._normalize_decimal_value(expr, decimal_precision)
504        elif type.is_type(*exp.DataType.TEMPORAL_TYPES):
505            value = self._normalize_timestamp_value(expr, type, timestamp_precision)
506        elif type.is_type(*exp.DataType.NESTED_TYPES):
507            value = self._normalize_nested_value(expr)
508        else:
509            value = expr
510
511        return exp.cast(value, to=exp.DataType.build("VARCHAR"))

Return an expression that converts the values inside the column col to a normalized string

This string should be comparable across database engines, eg: - date columns -> YYYY-MM-DD string - datetime/timestamp/timestamptz columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision - float / double / decimal -> Value formatted to :decimal_precision decimal places - boolean columns -> '1' or '0' - NULLS -> "" (empty string)

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class GrantsFromInfoSchemaMixin(sqlmesh.core.engine_adapter.base.EngineAdapter):
558class GrantsFromInfoSchemaMixin(EngineAdapter):
559    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("current_user")
560    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
561    USE_CATALOG_IN_GRANTS = False
562    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges"
563
564    @staticmethod
565    @abc.abstractmethod
566    def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]:
567        pass
568
569    @abc.abstractmethod
570    def _get_current_schema(self) -> str:
571        pass
572
573    def _dcl_grants_config_expr(
574        self,
575        dcl_cmd: t.Type[DCL],
576        table: exp.Table,
577        grants_config: GrantsConfig,
578        table_type: DataObjectType = DataObjectType.TABLE,
579    ) -> t.List[exp.Expr]:
580        expressions: t.List[exp.Expr] = []
581        if not grants_config:
582            return expressions
583
584        object_kind = self._grant_object_kind(table_type)
585        for privilege, principals in grants_config.items():
586            args: t.Dict[str, t.Any] = {
587                "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
588                "securable": table.copy(),
589            }
590            if object_kind:
591                args["kind"] = exp.Var(this=object_kind)
592            if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS:
593                args["principals"] = [
594                    normalize_identifiers(
595                        parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
596                        dialect=self.dialect,
597                    )
598                    for principal in principals
599                ]
600                expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
601            else:
602                for principal in principals:
603                    args["principals"] = [
604                        normalize_identifiers(
605                            parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
606                            dialect=self.dialect,
607                        )
608                    ]
609                    expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
610
611        return expressions
612
613    def _apply_grants_config_expr(
614        self,
615        table: exp.Table,
616        grants_config: GrantsConfig,
617        table_type: DataObjectType = DataObjectType.TABLE,
618    ) -> t.List[exp.Expr]:
619        return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type)
620
621    def _revoke_grants_config_expr(
622        self,
623        table: exp.Table,
624        grants_config: GrantsConfig,
625        table_type: DataObjectType = DataObjectType.TABLE,
626    ) -> t.List[exp.Expr]:
627        return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type)
628
629    def _get_grant_expression(self, table: exp.Table) -> exp.Expr:
630        schema_identifier = table.args.get("db") or normalize_identifiers(
631            exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect
632        )
633        schema_name = schema_identifier.this
634        table_name = table.args.get("this").this  # type: ignore
635
636        grant_conditions = [
637            exp.column("table_schema").eq(exp.Literal.string(schema_name)),
638            exp.column("table_name").eq(exp.Literal.string(table_name)),
639            exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
640            exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
641        ]
642
643        info_schema_table = normalize_identifiers(
644            exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"),
645            dialect=self.dialect,
646        )
647        if self.USE_CATALOG_IN_GRANTS:
648            catalog_identifier = table.args.get("catalog")
649            if not catalog_identifier:
650                catalog_name = self.get_current_catalog()
651                if not catalog_name:
652                    raise SQLMeshError(
653                        "Current catalog could not be determined for fetching grants. This is unexpected."
654                    )
655                catalog_identifier = normalize_identifiers(
656                    exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect
657                )
658            catalog_name = catalog_identifier.this
659            info_schema_table.set("catalog", catalog_identifier.copy())
660            grant_conditions.insert(
661                0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name))
662            )
663
664        return (
665            exp.select("privilege_type", "grantee")
666            .from_(info_schema_table)
667            .where(exp.and_(*grant_conditions))
668        )
669
670    def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
671        grant_expr = self._get_grant_expression(table)
672
673        results = self.fetchall(grant_expr)
674
675        grants_dict: GrantsConfig = {}
676        for privilege_raw, grantee_raw in results:
677            if privilege_raw is None or grantee_raw is None:
678                continue
679
680            privilege = str(privilege_raw)
681            grantee = str(grantee_raw)
682            if not privilege or not grantee:
683                continue
684
685            grantees = grants_dict.setdefault(privilege, [])
686            if grantee not in grantees:
687                grantees.append(grantee)
688
689        return grants_dict

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
CURRENT_USER_OR_ROLE_EXPRESSION: sqlglot.expressions.core.Expr = CurrentUser()
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
USE_CATALOG_IN_GRANTS = False
GRANT_INFORMATION_SCHEMA_TABLE_NAME = 'table_privileges'
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts