Edit on GitHub

EngineAdapter

Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to generalize its functionality to different engines that have Python Database API 2.0-compliant connections. Rather than executing queries directly against your data stores, SQLMesh components such as the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic.

   1"""
   2# EngineAdapter
   3
   4Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to
   5generalize its functionality to different engines that have Python Database API 2.0-compliant
   6connections. Rather than executing queries directly against your data stores, SQLMesh components such as
   7the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic.
   8"""
   9
  10from __future__ import annotations
  11
  12import contextlib
  13import itertools
  14import logging
  15import sys
  16import typing as t
  17from functools import partial
  18
  19import pandas as pd
  20from sqlglot import Dialect, exp
  21from sqlglot.errors import ErrorLevel
  22from sqlglot.helper import ensure_list
  23from sqlglot.optimizer.qualify_columns import quote_identifiers
  24
  25from sqlmesh.core.dialect import (
  26    add_table,
  27    schema_,
  28    select_from_values_for_batch_range,
  29    to_schema,
  30)
  31from sqlmesh.core.engine_adapter.shared import (
  32    CatalogSupport,
  33    CommentCreationTable,
  34    CommentCreationView,
  35    DataObject,
  36    InsertOverwriteStrategy,
  37    SourceQuery,
  38    set_catalog,
  39)
  40from sqlmesh.core.model.kind import TimeColumn
  41from sqlmesh.core.schema_diff import SchemaDiffer
  42from sqlmesh.utils import columns_to_types_all_known, random_id
  43from sqlmesh.utils.connection_pool import create_connection_pool
  44from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column
  45from sqlmesh.utils.errors import SQLMeshError, UnsupportedCatalogOperationError
  46from sqlmesh.utils.pandas import columns_to_types_from_df
  47
  48if t.TYPE_CHECKING:
  49    from sqlmesh.core._typing import SchemaName, SessionProperties, TableName
  50    from sqlmesh.core.engine_adapter._typing import (
  51        DF,
  52        PySparkDataFrame,
  53        PySparkSession,
  54        Query,
  55        QueryOrDF,
  56    )
  57    from sqlmesh.core.node import IntervalUnit
  58
  59logger = logging.getLogger(__name__)
  60
  61MERGE_TARGET_ALIAS = "__MERGE_TARGET__"
  62MERGE_SOURCE_ALIAS = "__MERGE_SOURCE__"
  63
  64
  65@set_catalog()
  66class EngineAdapter:
  67    """Base class wrapping a Database API compliant connection.
  68
  69    The EngineAdapter is an easily-subclassable interface that interacts
  70    with the underlying engine and data store.
  71
  72    Args:
  73        connection_factory: a callable which produces a new Database API-compliant
  74            connection on every call.
  75        dialect: The dialect with which this adapter is associated.
  76        multithreaded: Indicates whether this adapter will be used by more than one thread.
  77    """
  78
  79    DIALECT = ""
  80    DEFAULT_BATCH_SIZE = 10000
  81    DATA_OBJECT_FILTER_BATCH_SIZE = 4000
  82    SUPPORTS_TRANSACTIONS = True
  83    SUPPORTS_INDEXES = False
  84    COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS
  85    COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS
  86    MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None
  87    MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None
  88    INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
  89    SUPPORTS_MATERIALIZED_VIEWS = False
  90    SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False
  91    SUPPORTS_CLONING = False
  92    SCHEMA_DIFFER = SchemaDiffer()
  93    SUPPORTS_TUPLE_IN = True
  94    CATALOG_SUPPORT = CatalogSupport.UNSUPPORTED
  95    SUPPORTS_ROW_LEVEL_OP = True
  96    HAS_VIEW_BINDING = False
  97    SUPPORTS_REPLACE_TABLE = True
  98    DEFAULT_CATALOG_TYPE = DIALECT
  99    QUOTE_IDENTIFIERS_IN_VIEWS = True
 100
 101    def __init__(
 102        self,
 103        connection_factory: t.Callable[[], t.Any],
 104        dialect: str = "",
 105        sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None,
 106        multithreaded: bool = False,
 107        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
 108        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
 109        default_catalog: t.Optional[str] = None,
 110        execute_log_level: int = logging.DEBUG,
 111        register_comments: bool = True,
 112        **kwargs: t.Any,
 113    ):
 114        self.dialect = dialect.lower() or self.DIALECT
 115        self._connection_pool = create_connection_pool(
 116            connection_factory, multithreaded, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
 117        )
 118        self.sql_gen_kwargs = sql_gen_kwargs or {}
 119        self._default_catalog = default_catalog
 120        self._execute_log_level = execute_log_level
 121        self._extra_config = kwargs
 122        self.register_comments = register_comments
 123
 124    def with_log_level(self, level: int) -> EngineAdapter:
 125        adapter = self.__class__(
 126            lambda: None,
 127            dialect=self.dialect,
 128            sql_gen_kwargs=self.sql_gen_kwargs,
 129            default_catalog=self._default_catalog,
 130            execute_log_level=level,
 131            register_comments=self.register_comments,
 132            **self._extra_config,
 133        )
 134
 135        adapter._connection_pool = self._connection_pool
 136
 137        return adapter
 138
 139    @property
 140    def cursor(self) -> t.Any:
 141        return self._connection_pool.get_cursor()
 142
 143    @property
 144    def spark(self) -> t.Optional[PySparkSession]:
 145        return None
 146
 147    @property
 148    def comments_enabled(self) -> bool:
 149        return self.register_comments and self.COMMENT_CREATION_TABLE.is_supported
 150
 151    @classmethod
 152    def is_pandas_df(cls, value: t.Any) -> bool:
 153        return isinstance(value, pd.DataFrame)
 154
 155    @classmethod
 156    def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]:
 157        return [
 158            exp.alias_(exp.cast(exp.column(column), to=kind), column, copy=False)
 159            for column, kind in columns_to_types.items()
 160        ]
 161
 162    @property
 163    def default_catalog(self) -> t.Optional[str]:
 164        if self.CATALOG_SUPPORT.is_unsupported:
 165            return None
 166        default_catalog = self._default_catalog or self.get_current_catalog()
 167        if not default_catalog:
 168            raise SQLMeshError("Could not determine a default catalog despite it being supported.")
 169        return default_catalog
 170
 171    def _get_source_queries(
 172        self,
 173        query_or_df: QueryOrDF,
 174        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
 175        target_table: TableName,
 176        *,
 177        batch_size: t.Optional[int] = None,
 178    ) -> t.List[SourceQuery]:
 179        batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size
 180        if isinstance(query_or_df, (exp.Query, exp.DerivedTable)):
 181            return [SourceQuery(query_factory=lambda: query_or_df)]  # type: ignore
 182        if not columns_to_types:
 183            raise SQLMeshError(
 184                "It is expected that if a DF is passed in then columns_to_types is set"
 185            )
 186        return self._df_to_source_queries(
 187            query_or_df, columns_to_types, batch_size, target_table=target_table
 188        )
 189
 190    def _df_to_source_queries(
 191        self,
 192        df: DF,
 193        columns_to_types: t.Dict[str, exp.DataType],
 194        batch_size: int,
 195        target_table: TableName,
 196    ) -> t.List[SourceQuery]:
 197        assert isinstance(df, pd.DataFrame)
 198        num_rows = len(df.index)
 199        batch_size = sys.maxsize if batch_size == 0 else batch_size
 200        values = list(df.itertuples(index=False, name=None))
 201        return [
 202            SourceQuery(
 203                query_factory=partial(
 204                    self._values_to_sql,
 205                    values=values,
 206                    columns_to_types=columns_to_types,
 207                    batch_start=i,
 208                    batch_end=min(i + batch_size, num_rows),
 209                ),
 210            )
 211            for i in range(0, num_rows, batch_size)
 212        ]
 213
 214    def _get_source_queries_and_columns_to_types(
 215        self,
 216        query_or_df: QueryOrDF,
 217        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
 218        target_table: TableName,
 219        *,
 220        batch_size: t.Optional[int] = None,
 221    ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]:
 222        columns_to_types = self._columns_to_types(query_or_df, columns_to_types)
 223        return (
 224            self._get_source_queries(
 225                query_or_df, columns_to_types, target_table=target_table, batch_size=batch_size
 226            ),
 227            columns_to_types,
 228        )
 229
 230    @t.overload
 231    def _columns_to_types(
 232        self, query_or_df: DF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 233    ) -> t.Dict[str, exp.DataType]: ...
 234
 235    @t.overload
 236    def _columns_to_types(
 237        self, query_or_df: Query, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 238    ) -> t.Optional[t.Dict[str, exp.DataType]]: ...
 239
 240    def _columns_to_types(
 241        self, query_or_df: QueryOrDF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 242    ) -> t.Optional[t.Dict[str, exp.DataType]]:
 243        if columns_to_types:
 244            return columns_to_types
 245        if self.is_pandas_df(query_or_df):
 246            return columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df))
 247        return columns_to_types
 248
 249    def recycle(self) -> None:
 250        """Closes all open connections and releases all allocated resources associated with any thread
 251        except the calling one."""
 252        self._connection_pool.close_all(exclude_calling_thread=True)
 253
 254    def close(self) -> t.Any:
 255        """Closes all open connections and releases all allocated resources."""
 256        self._connection_pool.close_all()
 257
 258    def get_current_catalog(self) -> t.Optional[str]:
 259        """Returns the catalog name of the current connection."""
 260        raise NotImplementedError()
 261
 262    def set_current_catalog(self, catalog: str) -> None:
 263        """Sets the catalog name of the current connection."""
 264        raise NotImplementedError()
 265
 266    def get_catalog_type(self, catalog: t.Optional[str]) -> str:
 267        """Intended to be overridden for data virtualization systems like Trino that,
 268        depending on the target catalog, require slightly different properties to be set when creating / updating tables
 269        """
 270        if self.CATALOG_SUPPORT.is_unsupported:
 271            raise UnsupportedCatalogOperationError(
 272                f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}"
 273            )
 274        return self.DEFAULT_CATALOG_TYPE
 275
 276    @property
 277    def current_catalog_type(self) -> str:
 278        return self.get_catalog_type(self.get_current_catalog())
 279
 280    def replace_query(
 281        self,
 282        table_name: TableName,
 283        query_or_df: QueryOrDF,
 284        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 285        table_description: t.Optional[str] = None,
 286        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 287        **kwargs: t.Any,
 288    ) -> None:
 289        """Replaces an existing table with a query.
 290
 291        For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.
 292
 293        Args:
 294            table_name: The name of the table (eg. prod.table)
 295            query_or_df: The SQL query to run or a dataframe.
 296            columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type.
 297                Expected to be ordered to match the order of values in the dataframe.
 298            kwargs: Optional create table properties.
 299        """
 300        target_table = exp.to_table(table_name)
 301        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 302            query_or_df, columns_to_types, target_table=target_table
 303        )
 304        columns_to_types = columns_to_types or self.columns(target_table)
 305        query = source_queries[0].query_factory()
 306        self_referencing = any(
 307            quote_identifiers(table) == quote_identifiers(target_table)
 308            for table in query.find_all(exp.Table)
 309        )
 310        # If a query references itself then it must have a table created regardless of approach used.
 311        if self_referencing:
 312            self._create_table_from_columns(
 313                target_table,
 314                columns_to_types,
 315                exists=True,
 316                table_description=table_description,
 317                column_descriptions=column_descriptions,
 318            )
 319        # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we
 320        # use `CREATE OR REPLACE TABLE AS` if the engine supports it
 321        if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table):
 322            return self._create_table_from_source_queries(
 323                target_table,
 324                source_queries,
 325                columns_to_types,
 326                replace=self.SUPPORTS_REPLACE_TABLE,
 327                table_description=table_description,
 328                column_descriptions=column_descriptions,
 329                **kwargs,
 330            )
 331        else:
 332            if self_referencing:
 333                with self.temp_table(
 334                    self._select_columns(columns_to_types).from_(target_table),
 335                    name=target_table,
 336                    columns_to_types=columns_to_types,
 337                    **kwargs,
 338                ) as temp_table:
 339                    for source_query in source_queries:
 340                        source_query.add_transform(
 341                            lambda node: (  # type: ignore
 342                                temp_table  # type: ignore
 343                                if isinstance(node, exp.Table)
 344                                and quote_identifiers(node) == quote_identifiers(target_table)
 345                                else node
 346                            )
 347                        )
 348                    return self._insert_overwrite_by_condition(
 349                        target_table,
 350                        source_queries,
 351                        columns_to_types,
 352                    )
 353            return self._insert_overwrite_by_condition(
 354                target_table,
 355                source_queries,
 356                columns_to_types,
 357            )
 358
 359    def create_index(
 360        self,
 361        table_name: TableName,
 362        index_name: str,
 363        columns: t.Tuple[str, ...],
 364        exists: bool = True,
 365    ) -> None:
 366        """Creates a new index for the given table if supported
 367
 368        Args:
 369            table_name: The name of the target table.
 370            index_name: The name of the index.
 371            columns: The list of columns that constitute the index.
 372            exists: Indicates whether to include the IF NOT EXISTS check.
 373        """
 374        if not self.SUPPORTS_INDEXES:
 375            return
 376
 377        expression = exp.Create(
 378            this=exp.Index(
 379                this=exp.to_identifier(index_name),
 380                table=exp.to_table(table_name),
 381                params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]),
 382            ),
 383            kind="INDEX",
 384            exists=exists,
 385        )
 386        self.execute(expression)
 387
 388    def create_table(
 389        self,
 390        table_name: TableName,
 391        columns_to_types: t.Dict[str, exp.DataType],
 392        primary_key: t.Optional[t.Tuple[str, ...]] = None,
 393        exists: bool = True,
 394        table_description: t.Optional[str] = None,
 395        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 396        **kwargs: t.Any,
 397    ) -> None:
 398        """Create a table using a DDL statement
 399
 400        Args:
 401            table_name: The name of the table to create. Can be fully qualified or just table name.
 402            columns_to_types: A mapping between the column name and its data type.
 403            primary_key: Determines the table primary key.
 404            exists: Indicates whether to include the IF NOT EXISTS check.
 405            table_description: Optional table description from MODEL DDL.
 406            column_descriptions: Optional column descriptions from model query.
 407            kwargs: Optional create table properties.
 408        """
 409        self._create_table_from_columns(
 410            table_name,
 411            columns_to_types,
 412            primary_key,
 413            exists,
 414            table_description,
 415            column_descriptions,
 416            **kwargs,
 417        )
 418
 419    def ctas(
 420        self,
 421        table_name: TableName,
 422        query_or_df: QueryOrDF,
 423        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 424        exists: bool = True,
 425        table_description: t.Optional[str] = None,
 426        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 427        **kwargs: t.Any,
 428    ) -> None:
 429        """Create a table using a CTAS statement
 430
 431        Args:
 432            table_name: The name of the table to create. Can be fully qualified or just table name.
 433            query_or_df: The SQL query to run or a dataframe for the CTAS.
 434            columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
 435            exists: Indicates whether to include the IF NOT EXISTS check.
 436            table_description: Optional table description from MODEL DDL.
 437            column_descriptions: Optional column descriptions from model query.
 438            kwargs: Optional create table properties.
 439        """
 440        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 441            query_or_df, columns_to_types, target_table=table_name
 442        )
 443        return self._create_table_from_source_queries(
 444            table_name,
 445            source_queries,
 446            columns_to_types,
 447            exists,
 448            table_description=table_description,
 449            column_descriptions=column_descriptions,
 450            **kwargs,
 451        )
 452
 453    def create_state_table(
 454        self,
 455        table_name: str,
 456        columns_to_types: t.Dict[str, exp.DataType],
 457        primary_key: t.Optional[t.Tuple[str, ...]] = None,
 458    ) -> None:
 459        """Create a table to store SQLMesh internal state.
 460
 461        Args:
 462            table_name: The name of the table to create. Can be fully qualified or just table name.
 463            columns_to_types: A mapping between the column name and its data type.
 464            primary_key: Determines the table primary key.
 465        """
 466        self.create_table(
 467            table_name,
 468            columns_to_types,
 469            primary_key=primary_key,
 470        )
 471
 472    def _create_table_from_columns(
 473        self,
 474        table_name: TableName,
 475        columns_to_types: t.Dict[str, exp.DataType],
 476        primary_key: t.Optional[t.Tuple[str, ...]] = None,
 477        exists: bool = True,
 478        table_description: t.Optional[str] = None,
 479        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 480        **kwargs: t.Any,
 481    ) -> None:
 482        """
 483        Create a table using a DDL statement.
 484
 485        Args:
 486            table_name: The name of the table to create. Can be fully qualified or just table name.
 487            columns_to_types: Mapping between the column name and its data type.
 488            primary_key: Determines the table primary key.
 489            exists: Indicates whether to include the IF NOT EXISTS check.
 490            table_description: Optional table description from MODEL DDL.
 491            column_descriptions: Optional column descriptions from model query.
 492            kwargs: Optional create table properties.
 493        """
 494        table = exp.to_table(table_name)
 495
 496        if not columns_to_types_all_known(columns_to_types):
 497            # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set
 498            if exists and self.table_exists(table_name):
 499                return
 500            raise SQLMeshError(
 501                "Cannot create a table without knowing the column types. "
 502                "Try casting the columns to an expected type or defining the columns in the model metadata. "
 503                f"Columns to types: {columns_to_types}"
 504            )
 505
 506        primary_key_expression = (
 507            [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])]
 508            if primary_key and self.SUPPORTS_INDEXES
 509            else []
 510        )
 511
 512        schema = self._build_schema_exp(
 513            table,
 514            columns_to_types,
 515            column_descriptions,
 516            primary_key_expression,
 517        )
 518
 519        self._create_table(
 520            schema,
 521            None,
 522            exists=exists,
 523            columns_to_types=columns_to_types,
 524            table_description=table_description,
 525            **kwargs,
 526        )
 527
 528        # Register comments with commands if the engine doesn't support comments in the schema or CREATE
 529        if (
 530            table_description
 531            and self.COMMENT_CREATION_TABLE.is_comment_command_only
 532            and self.comments_enabled
 533        ):
 534            self._create_table_comment(table_name, table_description)
 535        if (
 536            column_descriptions
 537            and self.COMMENT_CREATION_TABLE.is_comment_command_only
 538            and self.comments_enabled
 539        ):
 540            self._create_column_comments(table_name, column_descriptions)
 541
 542    def _build_schema_exp(
 543        self,
 544        table: exp.Table,
 545        columns_to_types: t.Dict[str, exp.DataType],
 546        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 547        expressions: t.Optional[t.List[exp.PrimaryKey]] = None,
 548        is_view: bool = False,
 549    ) -> exp.Schema:
 550        """
 551        Build a schema expression for a table, columns, column comments, and additional schema properties.
 552        """
 553        expressions = expressions or []
 554        engine_supports_schema_comments = (
 555            self.COMMENT_CREATION_VIEW.supports_schema_def
 556            if is_view
 557            else self.COMMENT_CREATION_TABLE.supports_schema_def
 558        )
 559        return exp.Schema(
 560            this=table,
 561            expressions=[
 562                exp.ColumnDef(
 563                    this=exp.to_identifier(column),
 564                    kind=None if is_view else kind,  # don't include column data type for views
 565                    constraints=(
 566                        self._build_col_comment_exp(column, column_descriptions)
 567                        if column_descriptions
 568                        and engine_supports_schema_comments
 569                        and self.comments_enabled
 570                        else None
 571                    ),
 572                )
 573                for column, kind in columns_to_types.items()
 574            ]
 575            + expressions,
 576        )
 577
 578    def _build_col_comment_exp(
 579        self, col_name: str, column_descriptions: t.Dict[str, str]
 580    ) -> t.List[exp.ColumnConstraint]:
 581        comment = column_descriptions.get(col_name, None)
 582        if comment:
 583            return [
 584                exp.ColumnConstraint(
 585                    kind=exp.CommentColumnConstraint(
 586                        this=exp.Literal.string(self._truncate_column_comment(comment))
 587                    )
 588                )
 589            ]
 590        return []
 591
 592    def _create_table_from_source_queries(
 593        self,
 594        table_name: TableName,
 595        source_queries: t.List[SourceQuery],
 596        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 597        exists: bool = True,
 598        replace: bool = False,
 599        table_description: t.Optional[str] = None,
 600        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 601        **kwargs: t.Any,
 602    ) -> None:
 603        table = exp.to_table(table_name)
 604
 605        # CTAS calls do not usually include a schema expression. However, most engines
 606        # permit them in CTAS expressions, and they allow us to register all column comments
 607        # in a single call rather than in a separate comment command call for each column.
 608        #
 609        # This block conditionally builds a schema expression with column comments if the engine
 610        # supports it and we have columns_to_types. column_to_types is required because the
 611        # schema expression must include at least column name, data type, and the comment -
 612        # for example, `(colname INTEGER COMMENT 'comment')`.
 613        #
 614        # column_to_types will be available when loading from a DataFrame (by converting from
 615        # pandas to SQL types), when a model is "annotated" by explicitly specifying column
 616        # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()`
 617        # calls and SCD Type 2 model calls.
 618        schema = None
 619        columns_to_types_known = columns_to_types and columns_to_types_all_known(columns_to_types)
 620        if (
 621            column_descriptions
 622            and columns_to_types_known
 623            and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas
 624            and self.comments_enabled
 625        ):
 626            schema = self._build_schema_exp(table, columns_to_types, column_descriptions)  # type: ignore
 627
 628        with self.transaction(condition=len(source_queries) > 1):
 629            for i, source_query in enumerate(source_queries):
 630                with source_query as query:
 631                    if i == 0:
 632                        self._create_table(
 633                            schema if schema else table,
 634                            query,
 635                            columns_to_types=columns_to_types,
 636                            exists=exists,
 637                            replace=replace,
 638                            table_description=table_description,
 639                            **kwargs,
 640                        )
 641                    else:
 642                        self._insert_append_query(
 643                            table_name, query, columns_to_types or self.columns(table)
 644                        )
 645
 646        # Register comments with commands if the engine supports comments and we weren't able to
 647        # register them with the CTAS call's schema expression.
 648        if (
 649            table_description
 650            and self.COMMENT_CREATION_TABLE.is_comment_command_only
 651            and self.comments_enabled
 652        ):
 653            self._create_table_comment(table_name, table_description)
 654        if column_descriptions and schema is None and self.comments_enabled:
 655            self._create_column_comments(table_name, column_descriptions)
 656
 657    def _create_table(
 658        self,
 659        table_name_or_schema: t.Union[exp.Schema, TableName],
 660        expression: t.Optional[exp.Expression],
 661        exists: bool = True,
 662        replace: bool = False,
 663        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 664        table_description: t.Optional[str] = None,
 665        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 666        **kwargs: t.Any,
 667    ) -> None:
 668        self.execute(
 669            self._build_create_table_exp(
 670                table_name_or_schema,
 671                expression=expression,
 672                exists=exists,
 673                replace=replace,
 674                columns_to_types=columns_to_types,
 675                table_description=(
 676                    table_description
 677                    if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled
 678                    else None
 679                ),
 680                **kwargs,
 681            )
 682        )
 683
 684    def _build_create_table_exp(
 685        self,
 686        table_name_or_schema: t.Union[exp.Schema, TableName],
 687        expression: t.Optional[exp.Expression],
 688        exists: bool = True,
 689        replace: bool = False,
 690        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 691        **kwargs: t.Any,
 692    ) -> exp.Create:
 693        exists = False if replace else exists
 694        catalog_name = None
 695        if not isinstance(table_name_or_schema, exp.Schema):
 696            table_name_or_schema = exp.to_table(table_name_or_schema)
 697            catalog_name = table_name_or_schema.catalog
 698        else:
 699            if isinstance(table_name_or_schema.this, exp.Table):
 700                catalog_name = table_name_or_schema.this.catalog
 701
 702        properties = (
 703            self._build_table_properties_exp(
 704                **kwargs, catalog_name=catalog_name, columns_to_types=columns_to_types
 705            )
 706            if kwargs
 707            else None
 708        )
 709        return exp.Create(
 710            this=table_name_or_schema,
 711            kind="TABLE",
 712            replace=replace,
 713            exists=exists,
 714            expression=expression,
 715            properties=properties,
 716        )
 717
 718    def create_table_like(
 719        self,
 720        target_table_name: TableName,
 721        source_table_name: TableName,
 722        exists: bool = True,
 723    ) -> None:
 724        """
 725        Create a table like another table or view.
 726        """
 727        target_table = exp.to_table(target_table_name)
 728        source_table = exp.to_table(source_table_name)
 729        create_expression = exp.Create(
 730            this=target_table,
 731            kind="TABLE",
 732            exists=exists,
 733            properties=exp.Properties(
 734                expressions=[
 735                    exp.LikeProperty(this=source_table),
 736                ]
 737            ),
 738        )
 739        self.execute(create_expression)
 740
 741    def clone_table(
 742        self,
 743        target_table_name: TableName,
 744        source_table_name: TableName,
 745        replace: bool = False,
 746        clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
 747        **kwargs: t.Any,
 748    ) -> None:
 749        """Creates a table with the target name by cloning the source table.
 750
 751        Args:
 752            target_table_name: The name of the table that should be created.
 753            source_table_name: The name of the source table that should be cloned.
 754            replace: Whether or not to replace an existing table.
 755        """
 756        if not self.SUPPORTS_CLONING:
 757            raise NotImplementedError(f"Engine does not support cloning: {type(self)}")
 758        self.execute(
 759            exp.Create(
 760                this=exp.to_table(target_table_name),
 761                kind="TABLE",
 762                replace=replace,
 763                clone=exp.Clone(
 764                    this=exp.to_table(source_table_name),
 765                    **(clone_kwargs or {}),
 766                ),
 767                **kwargs,
 768            )
 769        )
 770
 771    def drop_table(self, table_name: TableName, exists: bool = True) -> None:
 772        """Drops a table.
 773
 774        Args:
 775            table_name: The name of the table to drop.
 776            exists: If exists, defaults to True.
 777        """
 778        drop_expression = exp.Drop(this=exp.to_table(table_name), kind="TABLE", exists=exists)
 779        self.execute(drop_expression)
 780
 781    def alter_table(
 782        self,
 783        current_table_name: TableName,
 784        target_table_name: TableName,
 785    ) -> None:
 786        """
 787        Performs the required alter statements to change the current table into the structure of the target table.
 788        """
 789        with self.transaction():
 790            for alter_expression in self.SCHEMA_DIFFER.compare_columns(
 791                current_table_name,
 792                self.columns(current_table_name),
 793                self.columns(target_table_name),
 794            ):
 795                self.execute(alter_expression)
 796
 797    def create_view(
 798        self,
 799        view_name: TableName,
 800        query_or_df: QueryOrDF,
 801        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 802        replace: bool = True,
 803        materialized: bool = False,
 804        table_description: t.Optional[str] = None,
 805        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 806        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
 807        **create_kwargs: t.Any,
 808    ) -> None:
 809        """Create a view with a query or dataframe.
 810
 811        If a dataframe is passed in, it will be converted into a literal values statement.
 812        This should only be done if the dataframe is very small!
 813
 814        Args:
 815            view_name: The view name.
 816            query_or_df: A query or dataframe.
 817            columns_to_types: Columns to use in the view statement.
 818            replace: Whether or not to replace an existing view defaults to True.
 819            materialized: Whether to create a a materialized view. Only used for engines that support this feature.
 820            table_description: Optional table description from MODEL DDL.
 821            column_descriptions: Optional column descriptions from model query.
 822            view_properties: Optional view properties to add to the view.
 823            create_kwargs: Additional kwargs to pass into the Create expression
 824        """
 825        if self.is_pandas_df(query_or_df):
 826            values = list(t.cast(pd.DataFrame, query_or_df).itertuples(index=False, name=None))
 827            columns_to_types = columns_to_types or self._columns_to_types(query_or_df)
 828            if not columns_to_types:
 829                raise SQLMeshError("columns_to_types must be provided for dataframes")
 830            query_or_df = self._values_to_sql(
 831                values,
 832                columns_to_types,
 833                batch_start=0,
 834                batch_end=len(values),
 835            )
 836
 837        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 838            query_or_df, columns_to_types, batch_size=0, target_table=view_name
 839        )
 840        if len(source_queries) != 1:
 841            raise SQLMeshError("Only one source query is supported for creating views")
 842
 843        schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name)
 844        if columns_to_types:
 845            schema = self._build_schema_exp(
 846                exp.to_table(view_name), columns_to_types, column_descriptions, is_view=True
 847            )
 848
 849        properties = create_kwargs.pop("properties", None)
 850        if not properties:
 851            properties = exp.Properties(expressions=[])
 852
 853        if materialized and self.SUPPORTS_MATERIALIZED_VIEWS:
 854            properties.append("expressions", exp.MaterializedProperty())
 855
 856            if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema):
 857                schema = schema.this
 858
 859        create_view_properties = self._build_view_properties_exp(
 860            view_properties,
 861            (
 862                table_description
 863                if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled
 864                else None
 865            ),
 866        )
 867        if create_view_properties:
 868            for view_property in create_view_properties.expressions:
 869                properties.append("expressions", view_property)
 870
 871        if properties.expressions:
 872            create_kwargs["properties"] = properties
 873
 874        with source_queries[0] as query:
 875            self.execute(
 876                exp.Create(
 877                    this=schema,
 878                    kind="VIEW",
 879                    replace=replace,
 880                    expression=query,
 881                    **create_kwargs,
 882                ),
 883                quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS,
 884            )
 885
 886        # Register table comment with commands if the engine doesn't support doing it in CREATE
 887        if (
 888            table_description
 889            and self.COMMENT_CREATION_VIEW.is_comment_command_only
 890            and self.comments_enabled
 891        ):
 892            self._create_table_comment(view_name, table_description, "VIEW")
 893        # Register column comments with commands if the engine doesn't support doing it in
 894        # CREATE or we couldn't do it in the CREATE schema definition because we don't have
 895        # columns_to_types
 896        if (
 897            column_descriptions
 898            and (
 899                self.COMMENT_CREATION_VIEW.is_comment_command_only
 900                or (
 901                    self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands
 902                    and not columns_to_types
 903                )
 904            )
 905            and self.comments_enabled
 906        ):
 907            self._create_column_comments(view_name, column_descriptions, "VIEW")
 908
 909    @set_catalog()
 910    def create_schema(
 911        self,
 912        schema_name: SchemaName,
 913        ignore_if_exists: bool = True,
 914        warn_on_error: bool = True,
 915    ) -> None:
 916        """Create a schema from a name or qualified table name."""
 917        try:
 918            self.execute(
 919                exp.Create(
 920                    this=to_schema(schema_name),
 921                    kind="SCHEMA",
 922                    exists=ignore_if_exists,
 923                )
 924            )
 925        except Exception as e:
 926            if not warn_on_error:
 927                raise
 928            logger.warning("Failed to create schema '%s': %s", schema_name, e)
 929
 930    def drop_schema(
 931        self,
 932        schema_name: SchemaName,
 933        ignore_if_not_exists: bool = True,
 934        cascade: bool = False,
 935    ) -> None:
 936        self.execute(
 937            exp.Drop(
 938                this=to_schema(schema_name),
 939                kind="SCHEMA",
 940                exists=ignore_if_not_exists,
 941                cascade=cascade,
 942            )
 943        )
 944
 945    def drop_view(
 946        self,
 947        view_name: TableName,
 948        ignore_if_not_exists: bool = True,
 949        materialized: bool = False,
 950        **kwargs: t.Any,
 951    ) -> None:
 952        """Drop a view."""
 953        self.execute(
 954            exp.Drop(
 955                this=exp.to_table(view_name),
 956                exists=ignore_if_not_exists,
 957                materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS,
 958                kind="VIEW",
 959                **kwargs,
 960            )
 961        )
 962
 963    def columns(
 964        self, table_name: TableName, include_pseudo_columns: bool = False
 965    ) -> t.Dict[str, exp.DataType]:
 966        """Fetches column names and types for the target table."""
 967        self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
 968        describe_output = self.cursor.fetchall()
 969        return {
 970            # Note: MySQL  returns the column type as bytes.
 971            column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect)
 972            for column_name, column_type, *_ in itertools.takewhile(
 973                lambda t: not t[0].startswith("#"),
 974                describe_output,
 975            )
 976            if column_name and column_name.strip() and column_type and column_type.strip()
 977        }
 978
 979    def table_exists(self, table_name: TableName) -> bool:
 980        try:
 981            self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
 982            return True
 983        except Exception:
 984            return False
 985
 986    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
 987        self.execute(exp.delete(table_name, where))
 988
 989    def insert_append(
 990        self,
 991        table_name: TableName,
 992        query_or_df: QueryOrDF,
 993        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 994    ) -> None:
 995        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 996            query_or_df, columns_to_types, target_table=table_name
 997        )
 998        self._insert_append_source_queries(table_name, source_queries, columns_to_types)
 999
1000    def _insert_append_source_queries(
1001        self,
1002        table_name: TableName,
1003        source_queries: t.List[SourceQuery],
1004        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1005    ) -> None:
1006        with self.transaction(condition=len(source_queries) > 0):
1007            columns_to_types = columns_to_types or self.columns(table_name)
1008            for source_query in source_queries:
1009                with source_query as query:
1010                    self._insert_append_query(table_name, query, columns_to_types)
1011
1012    def _insert_append_query(
1013        self,
1014        table_name: TableName,
1015        query: Query,
1016        columns_to_types: t.Dict[str, exp.DataType],
1017        order_projections: bool = True,
1018    ) -> None:
1019        if order_projections:
1020            query = self._order_projections_and_filter(query, columns_to_types)
1021        self.execute(exp.insert(query, table_name, columns=list(columns_to_types)))
1022
1023    def insert_overwrite_by_partition(
1024        self,
1025        table_name: TableName,
1026        query_or_df: QueryOrDF,
1027        partitioned_by: t.List[exp.Expression],
1028        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1029    ) -> None:
1030        raise NotImplementedError(
1031            "Insert Overwrite by Partition (not time) is not supported by this engine"
1032        )
1033
1034    def insert_overwrite_by_time_partition(
1035        self,
1036        table_name: TableName,
1037        query_or_df: QueryOrDF,
1038        start: TimeLike,
1039        end: TimeLike,
1040        time_formatter: t.Callable[
1041            [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression
1042        ],
1043        time_column: TimeColumn | exp.Expression | str,
1044        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1045        **kwargs: t.Any,
1046    ) -> None:
1047        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1048            query_or_df, columns_to_types, target_table=table_name
1049        )
1050        columns_to_types = columns_to_types or self.columns(table_name)
1051        low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)]
1052        if isinstance(time_column, TimeColumn):
1053            time_column = time_column.column
1054        where = exp.Between(
1055            this=exp.to_column(time_column) if isinstance(time_column, str) else time_column,
1056            low=low,
1057            high=high,
1058        )
1059        self._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where)
1060
1061    def _values_to_sql(
1062        self,
1063        values: t.List[t.Tuple[t.Any, ...]],
1064        columns_to_types: t.Dict[str, exp.DataType],
1065        batch_start: int,
1066        batch_end: int,
1067        alias: str = "t",
1068    ) -> Query:
1069        return select_from_values_for_batch_range(
1070            values=values,
1071            columns_to_types=columns_to_types,
1072            batch_start=batch_start,
1073            batch_end=batch_end,
1074            alias=alias,
1075        )
1076
1077    def _insert_overwrite_by_condition(
1078        self,
1079        table_name: TableName,
1080        source_queries: t.List[SourceQuery],
1081        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1082        where: t.Optional[exp.Condition] = None,
1083        insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
1084    ) -> None:
1085        table = exp.to_table(table_name)
1086        insert_overwrite_strategy = (
1087            insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY
1088        )
1089        with self.transaction(
1090            condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert
1091        ):
1092            columns_to_types = columns_to_types or self.columns(table_name)
1093            for i, source_query in enumerate(source_queries):
1094                with source_query as query:
1095                    query = self._order_projections_and_filter(query, columns_to_types, where=where)
1096                    if i > 0 or insert_overwrite_strategy.is_delete_insert:
1097                        if i == 0:
1098                            self.delete_from(table_name, where=where or exp.true())
1099                        self._insert_append_query(
1100                            table_name,
1101                            query,
1102                            columns_to_types=columns_to_types,
1103                            order_projections=False,
1104                        )
1105                    else:
1106                        insert_exp = exp.insert(
1107                            query,
1108                            table,
1109                            columns=(
1110                                list(columns_to_types)
1111                                if not insert_overwrite_strategy.is_replace_where
1112                                else None
1113                            ),
1114                            overwrite=insert_overwrite_strategy.is_insert_overwrite,
1115                        )
1116                        if insert_overwrite_strategy.is_replace_where:
1117                            insert_exp.set("where", where or exp.true())
1118                        self.execute(insert_exp)
1119
1120    def update_table(
1121        self,
1122        table_name: TableName,
1123        properties: t.Dict[str, t.Any],
1124        where: t.Optional[str | exp.Condition] = None,
1125    ) -> None:
1126        self.execute(exp.update(table_name, properties, where=where))
1127
1128    def _merge(
1129        self,
1130        target_table: TableName,
1131        query: Query,
1132        on: exp.Expression,
1133        match_expressions: t.List[exp.When],
1134    ) -> None:
1135        this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True)
1136        using = exp.alias_(
1137            exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
1138        )
1139        self.execute(
1140            exp.Merge(
1141                this=this,
1142                using=using,
1143                on=on,
1144                expressions=match_expressions,
1145            )
1146        )
1147
1148    def scd_type_2_by_time(
1149        self,
1150        target_table: TableName,
1151        source_table: QueryOrDF,
1152        unique_key: t.Sequence[exp.Expression],
1153        valid_from_col: exp.Column,
1154        valid_to_col: exp.Column,
1155        execution_time: TimeLike,
1156        updated_at_col: exp.Column,
1157        invalidate_hard_deletes: bool = True,
1158        updated_at_as_valid_from: bool = False,
1159        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1160        table_description: t.Optional[str] = None,
1161        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1162        truncate: bool = False,
1163        **kwargs: t.Any,
1164    ) -> None:
1165        self._scd_type_2(
1166            target_table=target_table,
1167            source_table=source_table,
1168            unique_key=unique_key,
1169            valid_from_col=valid_from_col,
1170            valid_to_col=valid_to_col,
1171            execution_time=execution_time,
1172            updated_at_col=updated_at_col,
1173            invalidate_hard_deletes=invalidate_hard_deletes,
1174            updated_at_as_valid_from=updated_at_as_valid_from,
1175            columns_to_types=columns_to_types,
1176            table_description=table_description,
1177            column_descriptions=column_descriptions,
1178            truncate=truncate,
1179        )
1180
1181    def scd_type_2_by_column(
1182        self,
1183        target_table: TableName,
1184        source_table: QueryOrDF,
1185        unique_key: t.Sequence[exp.Expression],
1186        valid_from_col: exp.Column,
1187        valid_to_col: exp.Column,
1188        execution_time: TimeLike,
1189        check_columns: t.Union[exp.Star, t.Sequence[exp.Column]],
1190        invalidate_hard_deletes: bool = True,
1191        execution_time_as_valid_from: bool = False,
1192        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1193        table_description: t.Optional[str] = None,
1194        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1195        truncate: bool = False,
1196        **kwargs: t.Any,
1197    ) -> None:
1198        self._scd_type_2(
1199            target_table=target_table,
1200            source_table=source_table,
1201            unique_key=unique_key,
1202            valid_from_col=valid_from_col,
1203            valid_to_col=valid_to_col,
1204            execution_time=execution_time,
1205            check_columns=check_columns,
1206            columns_to_types=columns_to_types,
1207            invalidate_hard_deletes=invalidate_hard_deletes,
1208            execution_time_as_valid_from=execution_time_as_valid_from,
1209            table_description=table_description,
1210            column_descriptions=column_descriptions,
1211            truncate=truncate,
1212        )
1213
1214    def _scd_type_2(
1215        self,
1216        target_table: TableName,
1217        source_table: QueryOrDF,
1218        unique_key: t.Sequence[exp.Expression],
1219        valid_from_col: exp.Column,
1220        valid_to_col: exp.Column,
1221        execution_time: TimeLike,
1222        invalidate_hard_deletes: bool = True,
1223        updated_at_col: t.Optional[exp.Column] = None,
1224        check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None,
1225        updated_at_as_valid_from: bool = False,
1226        execution_time_as_valid_from: bool = False,
1227        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1228        table_description: t.Optional[str] = None,
1229        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1230        truncate: bool = False,
1231    ) -> None:
1232        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1233            source_table, columns_to_types, target_table=target_table, batch_size=0
1234        )
1235        columns_to_types = columns_to_types or self.columns(target_table)
1236        valid_from_name = valid_from_col.name
1237        valid_to_name = valid_to_col.name
1238        updated_at_name = updated_at_col.name if updated_at_col else None
1239        if (
1240            valid_from_name not in columns_to_types
1241            or valid_to_name not in columns_to_types
1242            or not columns_to_types_all_known(columns_to_types)
1243        ):
1244            columns_to_types = self.columns(target_table)
1245        if not columns_to_types:
1246            raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?")
1247        if not unique_key:
1248            raise SQLMeshError("unique_key must be provided for SCD Type 2")
1249        if check_columns and updated_at_col:
1250            raise SQLMeshError(
1251                "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2"
1252            )
1253        if check_columns and updated_at_as_valid_from:
1254            raise SQLMeshError(
1255                "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2"
1256            )
1257        if execution_time_as_valid_from and not check_columns:
1258            raise SQLMeshError(
1259                "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2"
1260            )
1261        if updated_at_name and updated_at_name not in columns_to_types:
1262            raise SQLMeshError(
1263                f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2"
1264            )
1265
1266        unmanaged_columns = [
1267            col for col in columns_to_types if col not in {valid_from_name, valid_to_name}
1268        ]
1269        time_data_type = columns_to_types[valid_from_name]
1270        select_source_columns: t.List[t.Union[str, exp.Alias]] = [
1271            col for col in unmanaged_columns if col != updated_at_name
1272        ]
1273        table_columns = [exp.column(c, quoted=True) for c in columns_to_types]
1274        if updated_at_name:
1275            select_source_columns.append(
1276                exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this)  # type: ignore
1277            )
1278
1279        # If a star is provided, we include all unmanaged columns in the check.
1280        # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know
1281        # they are equal or not, the extra check is not a problem and we gain simplified logic here.
1282        # If we want to change this, then we just need to check the expressions in unique_key and pull out the
1283        # column names and then remove them from the unmanaged_columns
1284        if check_columns and check_columns == exp.Star():
1285            check_columns = [exp.column(col) for col in unmanaged_columns]
1286        execution_ts = to_time_column(execution_time, time_data_type)
1287        if updated_at_as_valid_from:
1288            if not updated_at_col:
1289                raise SQLMeshError(
1290                    "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2"
1291                )
1292            update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col
1293        elif execution_time_as_valid_from:
1294            update_valid_from_start = execution_ts
1295        else:
1296            update_valid_from_start = to_time_column("1970-01-01 00:00:00+00:00", time_data_type)
1297        insert_valid_from_start = execution_ts if check_columns else updated_at_col  # type: ignore
1298        # joined._exists IS NULL is saying "if the row is deleted"
1299        delete_check = (
1300            exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None
1301        )
1302        prefixed_valid_to_col = valid_to_col.copy()
1303        prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}")
1304        prefixed_valid_from_col = valid_from_col.copy()
1305        prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}")
1306        if check_columns:
1307            row_check_conditions = []
1308            for col in check_columns:
1309                t_col = col.copy()
1310                t_col.this.set("this", f"t_{col.name}")
1311                row_check_conditions.extend(
1312                    [
1313                        col.neq(t_col),
1314                        exp.and_(t_col.is_(exp.Null()), col.is_(exp.Null()).not_()),
1315                        exp.and_(t_col.is_(exp.Null()).not_(), col.is_(exp.Null())),
1316                    ]
1317                )
1318            row_value_check = exp.or_(*row_check_conditions)
1319            unique_key_conditions = []
1320            for col in unique_key:
1321                t_col = col.copy()
1322                t_col.this.set("this", f"t_{col.name}")
1323                unique_key_conditions.extend(
1324                    [t_col.is_(exp.Null()).not_(), col.is_(exp.Null()).not_()]
1325                )
1326            unique_key_check = exp.and_(*unique_key_conditions)
1327            # unique_key_check is saying "if the row is updated"
1328            # row_value_check is saying "if the row has changed"
1329            updated_row_filter = exp.and_(unique_key_check, row_value_check)
1330            valid_to_case_stmt = (
1331                exp.Case()
1332                .when(
1333                    exp.and_(
1334                        exp.or_(
1335                            delete_check,
1336                            updated_row_filter,
1337                        )
1338                    ),
1339                    execution_ts,
1340                )
1341                .else_(prefixed_valid_to_col)
1342                .as_(valid_to_col.this)
1343            )
1344            valid_from_case_stmt = exp.func(
1345                "COALESCE",
1346                prefixed_valid_from_col,
1347                update_valid_from_start,
1348            ).as_(valid_from_col.this)
1349        else:
1350            assert updated_at_col is not None
1351            prefixed_updated_at_col = updated_at_col.copy()
1352            prefixed_updated_at_col.this.set("this", f"t_{updated_at_col.name}")
1353            updated_row_filter = updated_at_col > prefixed_updated_at_col
1354
1355            valid_to_case_stmt_builder = exp.Case().when(updated_row_filter, updated_at_col)
1356            if delete_check:
1357                valid_to_case_stmt_builder = valid_to_case_stmt_builder.when(
1358                    delete_check, execution_ts
1359                )
1360            valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_(
1361                valid_to_col.this
1362            )
1363
1364            valid_from_case_stmt = (
1365                exp.Case()
1366                .when(
1367                    exp.and_(
1368                        prefixed_valid_from_col.is_(exp.Null()),
1369                        exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(),
1370                    ),
1371                    exp.Case()
1372                    .when(
1373                        exp.column(valid_to_col.this, "latest_deleted") > updated_at_col,
1374                        exp.column(valid_to_col.this, "latest_deleted"),
1375                    )
1376                    .else_(updated_at_col),
1377                )
1378                .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start)
1379                .else_(prefixed_valid_from_col)
1380            ).as_(valid_from_col.this)
1381
1382        existing_rows_query = exp.select(*table_columns).from_(target_table)
1383        if truncate:
1384            existing_rows_query = existing_rows_query.limit(0)
1385
1386        with source_queries[0] as source_query:
1387            prefixed_columns_to_types = []
1388            for column in columns_to_types:
1389                prefixed_col = exp.column(column).copy()
1390                prefixed_col.this.set("this", f"t_{prefixed_col.name}")
1391                prefixed_columns_to_types.append(prefixed_col)
1392            prefixed_unmanaged_columns = []
1393            for column in unmanaged_columns:
1394                prefixed_col = exp.column(column).copy()
1395                prefixed_col.this.set("this", f"t_{prefixed_col.name}")
1396                prefixed_unmanaged_columns.append(prefixed_col)
1397            query = (
1398                exp.Select()  # type: ignore
1399                .with_(
1400                    "source",
1401                    exp.select(exp.true().as_("_exists"), *select_source_columns)
1402                    .distinct(*unique_key)
1403                    .from_(source_query.subquery("raw_source")),  # type: ignore
1404                )
1405                # Historical Records that Do Not Change
1406                .with_(
1407                    "static",
1408                    existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
1409                )
1410                # Latest Records that can be updated
1411                .with_(
1412                    "latest",
1413                    existing_rows_query.where(valid_to_col.is_(exp.Null())),
1414                )
1415                # Deleted records which can be used to determine `valid_from` for undeleted source records
1416                .with_(
1417                    "deleted",
1418                    exp.select(*[exp.column(col, "static") for col in columns_to_types])
1419                    .from_("static")
1420                    .join(
1421                        "latest",
1422                        on=exp.and_(
1423                            *[
1424                                add_table(key, "static").eq(add_table(key, "latest"))
1425                                for key in unique_key
1426                            ]
1427                        ),
1428                        join_type="left",
1429                    )
1430                    .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())),
1431                )
1432                # Get the latest `valid_to` deleted record for each unique key
1433                .with_(
1434                    "latest_deleted",
1435                    exp.select(
1436                        exp.true().as_("_exists"),
1437                        *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)),
1438                        exp.Max(this=valid_to_col).as_(valid_to_col.this),
1439                    )
1440                    .from_("deleted")
1441                    .group_by(*unique_key),
1442                )
1443                # Do a full join between latest records and source table in order to combine them together
1444                # MySQL doesn't suport full join so going to do a left then right join and remove dups with union
1445                .with_(
1446                    "joined",
1447                    exp.select(
1448                        exp.column("_exists", table="source"),
1449                        *(
1450                            exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this)
1451                            for i, col in enumerate(columns_to_types)
1452                        ),
1453                        *(exp.column(col, table="source").as_(col) for col in unmanaged_columns),
1454                    )
1455                    .from_("latest")
1456                    .join(
1457                        "source",
1458                        on=exp.and_(
1459                            *[
1460                                add_table(key, "latest").eq(add_table(key, "source"))
1461                                for key in unique_key
1462                            ]
1463                        ),
1464                        join_type="left",
1465                    )
1466                    .union(
1467                        exp.select(
1468                            exp.column("_exists", table="source"),
1469                            *(
1470                                exp.column(col, table="latest").as_(
1471                                    prefixed_columns_to_types[i].this
1472                                )
1473                                for i, col in enumerate(columns_to_types)
1474                            ),
1475                            *(
1476                                exp.column(col, table="source").as_(col)
1477                                for col in unmanaged_columns
1478                            ),
1479                        )
1480                        .from_("latest")
1481                        .join(
1482                            "source",
1483                            on=exp.and_(
1484                                *[
1485                                    add_table(key, "latest").eq(add_table(key, "source"))
1486                                    for key in unique_key
1487                                ]
1488                            ),
1489                            join_type="right",
1490                        )
1491                    ),
1492                )
1493                # Get deleted, new, no longer current, or unchanged records
1494                .with_(
1495                    "updated_rows",
1496                    exp.select(
1497                        *(
1498                            exp.func(
1499                                "COALESCE",
1500                                exp.column(prefixed_unmanaged_columns[i].this, table="joined"),
1501                                exp.column(col, table="joined"),
1502                            ).as_(col)
1503                            for i, col in enumerate(unmanaged_columns)
1504                        ),
1505                        valid_from_case_stmt,
1506                        valid_to_case_stmt,
1507                    )
1508                    .from_("joined")
1509                    .join(
1510                        "latest_deleted",
1511                        on=exp.and_(
1512                            *[
1513                                add_table(part, "joined").eq(
1514                                    exp.column(f"_key{i}", "latest_deleted")
1515                                )
1516                                for i, part in enumerate(unique_key)
1517                            ]
1518                        ),
1519                        join_type="left",
1520                    ),
1521                )
1522                # Get records that have been "updated" which means inserting a new record with previous `valid_from`
1523                .with_(
1524                    "inserted_rows",
1525                    exp.select(
1526                        *unmanaged_columns,
1527                        insert_valid_from_start.as_(valid_from_col.this),  # type: ignore
1528                        to_time_column(exp.null(), time_data_type).as_(valid_to_col.this),
1529                    )
1530                    .from_("joined")
1531                    .where(updated_row_filter),
1532                )
1533                .select(*table_columns)
1534                .from_("static")
1535                .union(
1536                    exp.select(*table_columns).from_("updated_rows"),
1537                    distinct=False,
1538                )
1539                .union(
1540                    exp.select(*table_columns).from_("inserted_rows"),
1541                    distinct=False,
1542                )
1543            )
1544
1545            self.replace_query(
1546                target_table,
1547                query,
1548                columns_to_types=columns_to_types,
1549                table_description=table_description,
1550                column_descriptions=column_descriptions,
1551            )
1552
1553    def merge(
1554        self,
1555        target_table: TableName,
1556        source_table: QueryOrDF,
1557        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
1558        unique_key: t.Sequence[exp.Expression],
1559        when_matched: t.Optional[exp.When] = None,
1560    ) -> None:
1561        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1562            source_table, columns_to_types, target_table=target_table
1563        )
1564        columns_to_types = columns_to_types or self.columns(target_table)
1565        on = exp.and_(
1566            *(
1567                add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS))
1568                for part in unique_key
1569            )
1570        )
1571        if not when_matched:
1572            when_matched = exp.When(
1573                matched=True,
1574                source=False,
1575                then=exp.Update(
1576                    expressions=[
1577                        exp.column(col, MERGE_TARGET_ALIAS).eq(exp.column(col, MERGE_SOURCE_ALIAS))
1578                        for col in columns_to_types
1579                    ],
1580                ),
1581            )
1582        when_not_matched = exp.When(
1583            matched=False,
1584            source=False,
1585            then=exp.Insert(
1586                this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]),
1587                expression=exp.Tuple(
1588                    expressions=[exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types]
1589                ),
1590            ),
1591        )
1592        for source_query in source_queries:
1593            with source_query as query:
1594                self._merge(
1595                    target_table=target_table,
1596                    query=query,
1597                    on=on,
1598                    match_expressions=[when_matched, when_not_matched],
1599                )
1600
1601    def rename_table(
1602        self,
1603        old_table_name: TableName,
1604        new_table_name: TableName,
1605    ) -> None:
1606        new_table = exp.to_table(new_table_name)
1607        if new_table.catalog:
1608            old_table = exp.to_table(old_table_name)
1609            catalog = old_table.catalog or self.get_current_catalog()
1610            if catalog != new_table.catalog:
1611                raise UnsupportedCatalogOperationError(
1612                    "Tried to rename table across catalogs which is not supported"
1613                )
1614        self._rename_table(old_table_name, new_table_name)
1615
1616    def get_data_objects(
1617        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1618    ) -> t.List[DataObject]:
1619        """Lists all data objects in the target schema.
1620
1621        Args:
1622            schema_name: The name of the schema to list data objects from.
1623            object_names: If provided, only return data objects with these names.
1624
1625        Returns:
1626            A list of data objects in the target schema.
1627        """
1628        if object_names is not None:
1629            if not object_names:
1630                return []
1631            object_names_list = list(object_names)
1632            batches = [
1633                object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE]
1634                for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE)
1635            ]
1636            return [
1637                obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch))
1638            ]
1639        return self._get_data_objects(schema_name)
1640
1641    def fetchone(
1642        self,
1643        query: t.Union[exp.Expression, str],
1644        ignore_unsupported_errors: bool = False,
1645        quote_identifiers: bool = False,
1646    ) -> t.Tuple:
1647        with self.transaction():
1648            self.execute(
1649                query,
1650                ignore_unsupported_errors=ignore_unsupported_errors,
1651                quote_identifiers=quote_identifiers,
1652            )
1653            return self.cursor.fetchone()
1654
1655    def fetchall(
1656        self,
1657        query: t.Union[exp.Expression, str],
1658        ignore_unsupported_errors: bool = False,
1659        quote_identifiers: bool = False,
1660    ) -> t.List[t.Tuple]:
1661        with self.transaction():
1662            self.execute(
1663                query,
1664                ignore_unsupported_errors=ignore_unsupported_errors,
1665                quote_identifiers=quote_identifiers,
1666            )
1667            return self.cursor.fetchall()
1668
1669    def _fetch_native_df(
1670        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1671    ) -> DF:
1672        """Fetches a DataFrame that can be either Pandas or PySpark from the cursor"""
1673        with self.transaction():
1674            self.execute(query, quote_identifiers=quote_identifiers)
1675            return self.cursor.fetchdf()
1676
1677    def fetchdf(
1678        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1679    ) -> pd.DataFrame:
1680        """Fetches a Pandas DataFrame from the cursor"""
1681        df = self._fetch_native_df(query, quote_identifiers=quote_identifiers)
1682        if not isinstance(df, pd.DataFrame):
1683            raise NotImplementedError(
1684                "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned"
1685            )
1686        return df
1687
1688    def fetch_pyspark_df(
1689        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1690    ) -> PySparkDataFrame:
1691        """Fetches a PySpark DataFrame from the cursor"""
1692        raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")
1693
1694    def wap_supported(self, table_name: TableName) -> bool:
1695        """Returns whether WAP for the target table is supported."""
1696        return False
1697
1698    def wap_table_name(self, table_name: TableName, wap_id: str) -> str:
1699        """Returns the updated table name for the given WAP ID.
1700
1701        Args:
1702            table_name: The name of the target table.
1703            wap_id: The WAP ID to prepare.
1704
1705        Returns:
1706            The updated table name that should be used for writing.
1707        """
1708        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
1709
1710    def wap_prepare(self, table_name: TableName, wap_id: str) -> str:
1711        """Prepares the target table for WAP and returns the updated table name.
1712
1713        Args:
1714            table_name: The name of the target table.
1715            wap_id: The WAP ID to prepare.
1716
1717        Returns:
1718            The updated table name that should be used for writing.
1719        """
1720        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
1721
1722    def wap_publish(self, table_name: TableName, wap_id: str) -> None:
1723        """Publishes changes with the given WAP ID to the target table.
1724
1725        Args:
1726            table_name: The name of the target table.
1727            wap_id: The WAP ID to publish.
1728        """
1729        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
1730
1731    @contextlib.contextmanager
1732    def transaction(
1733        self,
1734        condition: t.Optional[bool] = None,
1735    ) -> t.Iterator[None]:
1736        """A transaction context manager."""
1737        if (
1738            self._connection_pool.is_transaction_active
1739            or not self.SUPPORTS_TRANSACTIONS
1740            or (condition is not None and not condition)
1741        ):
1742            yield
1743            return
1744        self._connection_pool.begin()
1745        try:
1746            yield
1747        except Exception as e:
1748            self._connection_pool.rollback()
1749            raise e
1750        else:
1751            self._connection_pool.commit()
1752
1753    @contextlib.contextmanager
1754    def session(self, properties: SessionProperties) -> t.Iterator[None]:
1755        """A session context manager."""
1756        if self._is_session_active():
1757            yield
1758            return
1759
1760        self._begin_session(properties)
1761        try:
1762            yield
1763        finally:
1764            self._end_session()
1765
1766    def _begin_session(self, properties: SessionProperties) -> t.Any:
1767        """Begin a new session."""
1768
1769    def _end_session(self) -> None:
1770        """End the existing session."""
1771
1772    def _is_session_active(self) -> bool:
1773        """Indicates whether or not a session is active."""
1774        return False
1775
1776    def execute(
1777        self,
1778        expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
1779        ignore_unsupported_errors: bool = False,
1780        quote_identifiers: bool = True,
1781        **kwargs: t.Any,
1782    ) -> None:
1783        """Execute a sql query."""
1784        to_sql_kwargs = (
1785            {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {}
1786        )
1787
1788        with self.transaction():
1789            for e in ensure_list(expressions):
1790                sql = t.cast(
1791                    str,
1792                    (
1793                        self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs)
1794                        if isinstance(e, exp.Expression)
1795                        else e
1796                    ),
1797                )
1798                self._log_sql(sql)
1799                self._execute(sql, **kwargs)
1800
1801    def _log_sql(self, sql: str) -> None:
1802        logger.log(self._execute_log_level, "Executing SQL: %s", sql)
1803
1804    def _execute(self, sql: str, **kwargs: t.Any) -> None:
1805        self.cursor.execute(sql, **kwargs)
1806
1807    @contextlib.contextmanager
1808    def temp_table(
1809        self,
1810        query_or_df: QueryOrDF,
1811        name: TableName = "diff",
1812        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1813        **kwargs: t.Any,
1814    ) -> t.Iterator[exp.Table]:
1815        """A context manager for working a temp table.
1816
1817        The table will be created with a random guid and cleaned up after the block.
1818
1819        Args:
1820            query_or_df: The query or df to create a temp table for.
1821            name: The base name of the temp table.
1822            columns_to_types: A mapping between the column name and its data type.
1823
1824        Yields:
1825            The table expression
1826        """
1827        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1828            query_or_df, columns_to_types=columns_to_types, target_table=name
1829        )
1830
1831        with self.transaction():
1832            table = self._get_temp_table(name)
1833            if table.db:
1834                self.create_schema(schema_(table.args["db"], table.args.get("catalog")))
1835            self._create_table_from_source_queries(
1836                table,
1837                source_queries,
1838                columns_to_types,
1839                exists=True,
1840                table_description=None,
1841                column_descriptions=None,
1842                **kwargs,
1843            )
1844
1845            try:
1846                yield table
1847            finally:
1848                self.drop_table(table)
1849
1850    def _table_or_view_properties_to_expressions(
1851        self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expression]] = None
1852    ) -> t.List[exp.Property]:
1853        """Converts model properties (either physical or virtual) to a list of property expressions."""
1854        if not table_or_view_properties:
1855            return []
1856        return [
1857            exp.Property(this=key, value=value.copy())
1858            for key, value in table_or_view_properties.items()
1859        ]
1860
1861    def _build_table_properties_exp(
1862        self,
1863        catalog_name: t.Optional[str] = None,
1864        storage_format: t.Optional[str] = None,
1865        partitioned_by: t.Optional[t.List[exp.Expression]] = None,
1866        partition_interval_unit: t.Optional[IntervalUnit] = None,
1867        clustered_by: t.Optional[t.List[str]] = None,
1868        table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
1869        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1870        table_description: t.Optional[str] = None,
1871    ) -> t.Optional[exp.Properties]:
1872        """Creates a SQLGlot table properties expression for ddl."""
1873        properties: t.List[exp.Expression] = []
1874
1875        if table_description:
1876            properties.append(
1877                exp.SchemaCommentProperty(
1878                    this=exp.Literal.string(self._truncate_table_comment(table_description))
1879                )
1880            )
1881
1882        if properties:
1883            return exp.Properties(expressions=properties)
1884        return None
1885
1886    def _build_view_properties_exp(
1887        self,
1888        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
1889        table_description: t.Optional[str] = None,
1890    ) -> t.Optional[exp.Properties]:
1891        """Creates a SQLGlot table properties expression for view"""
1892        properties: t.List[exp.Expression] = []
1893
1894        if table_description:
1895            properties.append(
1896                exp.SchemaCommentProperty(
1897                    this=exp.Literal.string(self._truncate_table_comment(table_description))
1898                )
1899            )
1900
1901        if properties:
1902            return exp.Properties(expressions=properties)
1903        return None
1904
1905    def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str:
1906        return comment[:length] if length else comment
1907
1908    def _truncate_table_comment(self, comment: str) -> str:
1909        return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH)
1910
1911    def _truncate_column_comment(self, comment: str) -> str:
1912        return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH)
1913
1914    def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.Any) -> str:
1915        """
1916        Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default
1917        kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine
1918        adapter, and then finally kwargs provided by the user when calling this method.
1919        """
1920        sql_gen_kwargs = {
1921            "dialect": self.dialect,
1922            "pretty": False,
1923            "comments": False,
1924            **self.sql_gen_kwargs,
1925            **kwargs,
1926        }
1927
1928        expression = expression.copy()
1929
1930        if quote:
1931            quote_identifiers(expression)
1932
1933        return expression.sql(**sql_gen_kwargs, copy=False)  # type: ignore
1934
1935    def _get_data_objects(
1936        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1937    ) -> t.List[DataObject]:
1938        """
1939        Returns all the data objects that exist in the given schema and optionally catalog.
1940        """
1941        raise NotImplementedError()
1942
1943    def _get_temp_table(self, table: TableName, table_only: bool = False) -> exp.Table:
1944        """
1945        Returns the name of the temp table that should be used for the given table name.
1946        """
1947        table = t.cast(exp.Table, exp.to_table(table).copy())
1948        table.set(
1949            "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=True)
1950        )
1951
1952        if table_only:
1953            table.set("db", None)
1954            table.set("catalog", None)
1955
1956        return table
1957
1958    def _order_projections_and_filter(
1959        self,
1960        query: Query,
1961        columns_to_types: t.Dict[str, exp.DataType],
1962        where: t.Optional[exp.Expression] = None,
1963    ) -> Query:
1964        if not isinstance(query, exp.Query) or (
1965            not where and query.named_selects == list(columns_to_types)
1966        ):
1967            return query
1968
1969        query = t.cast(exp.Query, query.copy())
1970        with_ = query.args.pop("with", None)
1971        query = self._select_columns(columns_to_types).from_(
1972            query.subquery("_subquery", copy=False), copy=False
1973        )
1974        if where:
1975            query = query.where(where, copy=False)
1976
1977        if with_:
1978            query.set("with", with_)
1979
1980        return query
1981
1982    def _truncate_table(self, table_name: TableName) -> None:
1983        table = exp.to_table(table_name)
1984        self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}")
1985
1986    def _build_create_comment_table_exp(
1987        self, table: exp.Table, table_comment: str, table_kind: str
1988    ) -> exp.Comment | str:
1989        return exp.Comment(
1990            this=table,
1991            kind=table_kind,
1992            expression=exp.Literal.string(self._truncate_table_comment(table_comment)),
1993        )
1994
1995    def _create_table_comment(
1996        self, table_name: TableName, table_comment: str, table_kind: str = "TABLE"
1997    ) -> None:
1998        table = exp.to_table(table_name)
1999
2000        try:
2001            self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind))
2002        except Exception:
2003            logger.warning(
2004                f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions.",
2005                exc_info=True,
2006            )
2007
2008    def _build_create_comment_column_exp(
2009        self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE"
2010    ) -> exp.Comment | str:
2011        return exp.Comment(
2012            this=exp.column(column_name, *reversed(table.parts)),  # type: ignore
2013            kind="COLUMN",
2014            expression=exp.Literal.string(self._truncate_column_comment(column_comment)),
2015        )
2016
2017    def _create_column_comments(
2018        self,
2019        table_name: TableName,
2020        column_comments: t.Dict[str, str],
2021        table_kind: str = "TABLE",
2022    ) -> None:
2023        table = exp.to_table(table_name)
2024
2025        for col, comment in column_comments.items():
2026            try:
2027                self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind))
2028            except Exception:
2029                logger.warning(
2030                    f"Column comments for table '{table.alias_or_name}' not registered - this may be due to limited permissions.",
2031                    exc_info=True,
2032                )
2033
2034    def _rename_table(
2035        self,
2036        old_table_name: TableName,
2037        new_table_name: TableName,
2038    ) -> None:
2039        self.execute(exp.rename_table(old_table_name, new_table_name))
2040
2041    @classmethod
2042    def _select_columns(cls, columns: t.Iterable[str]) -> exp.Select:
2043        return exp.select(*(exp.column(c, quoted=True) for c in columns))
2044
2045
2046class EngineAdapterWithIndexSupport(EngineAdapter):
2047    SUPPORTS_INDEXES = True
2048
2049
2050def _decoded_str(value: t.Union[str, bytes]) -> str:
2051    if isinstance(value, bytes):
2052        return value.decode("utf-8")
2053    return value
class EngineAdapter:
  67class EngineAdapter:
  68    """Base class wrapping a Database API compliant connection.
  69
  70    The EngineAdapter is an easily-subclassable interface that interacts
  71    with the underlying engine and data store.
  72
  73    Args:
  74        connection_factory: a callable which produces a new Database API-compliant
  75            connection on every call.
  76        dialect: The dialect with which this adapter is associated.
  77        multithreaded: Indicates whether this adapter will be used by more than one thread.
  78    """
  79
  80    DIALECT = ""
  81    DEFAULT_BATCH_SIZE = 10000
  82    DATA_OBJECT_FILTER_BATCH_SIZE = 4000
  83    SUPPORTS_TRANSACTIONS = True
  84    SUPPORTS_INDEXES = False
  85    COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS
  86    COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS
  87    MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None
  88    MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None
  89    INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
  90    SUPPORTS_MATERIALIZED_VIEWS = False
  91    SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False
  92    SUPPORTS_CLONING = False
  93    SCHEMA_DIFFER = SchemaDiffer()
  94    SUPPORTS_TUPLE_IN = True
  95    CATALOG_SUPPORT = CatalogSupport.UNSUPPORTED
  96    SUPPORTS_ROW_LEVEL_OP = True
  97    HAS_VIEW_BINDING = False
  98    SUPPORTS_REPLACE_TABLE = True
  99    DEFAULT_CATALOG_TYPE = DIALECT
 100    QUOTE_IDENTIFIERS_IN_VIEWS = True
 101
 102    def __init__(
 103        self,
 104        connection_factory: t.Callable[[], t.Any],
 105        dialect: str = "",
 106        sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None,
 107        multithreaded: bool = False,
 108        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
 109        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
 110        default_catalog: t.Optional[str] = None,
 111        execute_log_level: int = logging.DEBUG,
 112        register_comments: bool = True,
 113        **kwargs: t.Any,
 114    ):
 115        self.dialect = dialect.lower() or self.DIALECT
 116        self._connection_pool = create_connection_pool(
 117            connection_factory, multithreaded, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
 118        )
 119        self.sql_gen_kwargs = sql_gen_kwargs or {}
 120        self._default_catalog = default_catalog
 121        self._execute_log_level = execute_log_level
 122        self._extra_config = kwargs
 123        self.register_comments = register_comments
 124
 125    def with_log_level(self, level: int) -> EngineAdapter:
 126        adapter = self.__class__(
 127            lambda: None,
 128            dialect=self.dialect,
 129            sql_gen_kwargs=self.sql_gen_kwargs,
 130            default_catalog=self._default_catalog,
 131            execute_log_level=level,
 132            register_comments=self.register_comments,
 133            **self._extra_config,
 134        )
 135
 136        adapter._connection_pool = self._connection_pool
 137
 138        return adapter
 139
 140    @property
 141    def cursor(self) -> t.Any:
 142        return self._connection_pool.get_cursor()
 143
 144    @property
 145    def spark(self) -> t.Optional[PySparkSession]:
 146        return None
 147
 148    @property
 149    def comments_enabled(self) -> bool:
 150        return self.register_comments and self.COMMENT_CREATION_TABLE.is_supported
 151
 152    @classmethod
 153    def is_pandas_df(cls, value: t.Any) -> bool:
 154        return isinstance(value, pd.DataFrame)
 155
 156    @classmethod
 157    def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]:
 158        return [
 159            exp.alias_(exp.cast(exp.column(column), to=kind), column, copy=False)
 160            for column, kind in columns_to_types.items()
 161        ]
 162
 163    @property
 164    def default_catalog(self) -> t.Optional[str]:
 165        if self.CATALOG_SUPPORT.is_unsupported:
 166            return None
 167        default_catalog = self._default_catalog or self.get_current_catalog()
 168        if not default_catalog:
 169            raise SQLMeshError("Could not determine a default catalog despite it being supported.")
 170        return default_catalog
 171
 172    def _get_source_queries(
 173        self,
 174        query_or_df: QueryOrDF,
 175        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
 176        target_table: TableName,
 177        *,
 178        batch_size: t.Optional[int] = None,
 179    ) -> t.List[SourceQuery]:
 180        batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size
 181        if isinstance(query_or_df, (exp.Query, exp.DerivedTable)):
 182            return [SourceQuery(query_factory=lambda: query_or_df)]  # type: ignore
 183        if not columns_to_types:
 184            raise SQLMeshError(
 185                "It is expected that if a DF is passed in then columns_to_types is set"
 186            )
 187        return self._df_to_source_queries(
 188            query_or_df, columns_to_types, batch_size, target_table=target_table
 189        )
 190
 191    def _df_to_source_queries(
 192        self,
 193        df: DF,
 194        columns_to_types: t.Dict[str, exp.DataType],
 195        batch_size: int,
 196        target_table: TableName,
 197    ) -> t.List[SourceQuery]:
 198        assert isinstance(df, pd.DataFrame)
 199        num_rows = len(df.index)
 200        batch_size = sys.maxsize if batch_size == 0 else batch_size
 201        values = list(df.itertuples(index=False, name=None))
 202        return [
 203            SourceQuery(
 204                query_factory=partial(
 205                    self._values_to_sql,
 206                    values=values,
 207                    columns_to_types=columns_to_types,
 208                    batch_start=i,
 209                    batch_end=min(i + batch_size, num_rows),
 210                ),
 211            )
 212            for i in range(0, num_rows, batch_size)
 213        ]
 214
 215    def _get_source_queries_and_columns_to_types(
 216        self,
 217        query_or_df: QueryOrDF,
 218        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
 219        target_table: TableName,
 220        *,
 221        batch_size: t.Optional[int] = None,
 222    ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]:
 223        columns_to_types = self._columns_to_types(query_or_df, columns_to_types)
 224        return (
 225            self._get_source_queries(
 226                query_or_df, columns_to_types, target_table=target_table, batch_size=batch_size
 227            ),
 228            columns_to_types,
 229        )
 230
 231    @t.overload
 232    def _columns_to_types(
 233        self, query_or_df: DF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 234    ) -> t.Dict[str, exp.DataType]: ...
 235
 236    @t.overload
 237    def _columns_to_types(
 238        self, query_or_df: Query, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 239    ) -> t.Optional[t.Dict[str, exp.DataType]]: ...
 240
 241    def _columns_to_types(
 242        self, query_or_df: QueryOrDF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 243    ) -> t.Optional[t.Dict[str, exp.DataType]]:
 244        if columns_to_types:
 245            return columns_to_types
 246        if self.is_pandas_df(query_or_df):
 247            return columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df))
 248        return columns_to_types
 249
 250    def recycle(self) -> None:
 251        """Closes all open connections and releases all allocated resources associated with any thread
 252        except the calling one."""
 253        self._connection_pool.close_all(exclude_calling_thread=True)
 254
 255    def close(self) -> t.Any:
 256        """Closes all open connections and releases all allocated resources."""
 257        self._connection_pool.close_all()
 258
 259    def get_current_catalog(self) -> t.Optional[str]:
 260        """Returns the catalog name of the current connection."""
 261        raise NotImplementedError()
 262
 263    def set_current_catalog(self, catalog: str) -> None:
 264        """Sets the catalog name of the current connection."""
 265        raise NotImplementedError()
 266
 267    def get_catalog_type(self, catalog: t.Optional[str]) -> str:
 268        """Intended to be overridden for data virtualization systems like Trino that,
 269        depending on the target catalog, require slightly different properties to be set when creating / updating tables
 270        """
 271        if self.CATALOG_SUPPORT.is_unsupported:
 272            raise UnsupportedCatalogOperationError(
 273                f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}"
 274            )
 275        return self.DEFAULT_CATALOG_TYPE
 276
 277    @property
 278    def current_catalog_type(self) -> str:
 279        return self.get_catalog_type(self.get_current_catalog())
 280
 281    def replace_query(
 282        self,
 283        table_name: TableName,
 284        query_or_df: QueryOrDF,
 285        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 286        table_description: t.Optional[str] = None,
 287        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 288        **kwargs: t.Any,
 289    ) -> None:
 290        """Replaces an existing table with a query.
 291
 292        For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.
 293
 294        Args:
 295            table_name: The name of the table (eg. prod.table)
 296            query_or_df: The SQL query to run or a dataframe.
 297            columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type.
 298                Expected to be ordered to match the order of values in the dataframe.
 299            kwargs: Optional create table properties.
 300        """
 301        target_table = exp.to_table(table_name)
 302        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 303            query_or_df, columns_to_types, target_table=target_table
 304        )
 305        columns_to_types = columns_to_types or self.columns(target_table)
 306        query = source_queries[0].query_factory()
 307        self_referencing = any(
 308            quote_identifiers(table) == quote_identifiers(target_table)
 309            for table in query.find_all(exp.Table)
 310        )
 311        # If a query references itself then it must have a table created regardless of approach used.
 312        if self_referencing:
 313            self._create_table_from_columns(
 314                target_table,
 315                columns_to_types,
 316                exists=True,
 317                table_description=table_description,
 318                column_descriptions=column_descriptions,
 319            )
 320        # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we
 321        # use `CREATE OR REPLACE TABLE AS` if the engine supports it
 322        if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table):
 323            return self._create_table_from_source_queries(
 324                target_table,
 325                source_queries,
 326                columns_to_types,
 327                replace=self.SUPPORTS_REPLACE_TABLE,
 328                table_description=table_description,
 329                column_descriptions=column_descriptions,
 330                **kwargs,
 331            )
 332        else:
 333            if self_referencing:
 334                with self.temp_table(
 335                    self._select_columns(columns_to_types).from_(target_table),
 336                    name=target_table,
 337                    columns_to_types=columns_to_types,
 338                    **kwargs,
 339                ) as temp_table:
 340                    for source_query in source_queries:
 341                        source_query.add_transform(
 342                            lambda node: (  # type: ignore
 343                                temp_table  # type: ignore
 344                                if isinstance(node, exp.Table)
 345                                and quote_identifiers(node) == quote_identifiers(target_table)
 346                                else node
 347                            )
 348                        )
 349                    return self._insert_overwrite_by_condition(
 350                        target_table,
 351                        source_queries,
 352                        columns_to_types,
 353                    )
 354            return self._insert_overwrite_by_condition(
 355                target_table,
 356                source_queries,
 357                columns_to_types,
 358            )
 359
 360    def create_index(
 361        self,
 362        table_name: TableName,
 363        index_name: str,
 364        columns: t.Tuple[str, ...],
 365        exists: bool = True,
 366    ) -> None:
 367        """Creates a new index for the given table if supported
 368
 369        Args:
 370            table_name: The name of the target table.
 371            index_name: The name of the index.
 372            columns: The list of columns that constitute the index.
 373            exists: Indicates whether to include the IF NOT EXISTS check.
 374        """
 375        if not self.SUPPORTS_INDEXES:
 376            return
 377
 378        expression = exp.Create(
 379            this=exp.Index(
 380                this=exp.to_identifier(index_name),
 381                table=exp.to_table(table_name),
 382                params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]),
 383            ),
 384            kind="INDEX",
 385            exists=exists,
 386        )
 387        self.execute(expression)
 388
 389    def create_table(
 390        self,
 391        table_name: TableName,
 392        columns_to_types: t.Dict[str, exp.DataType],
 393        primary_key: t.Optional[t.Tuple[str, ...]] = None,
 394        exists: bool = True,
 395        table_description: t.Optional[str] = None,
 396        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 397        **kwargs: t.Any,
 398    ) -> None:
 399        """Create a table using a DDL statement
 400
 401        Args:
 402            table_name: The name of the table to create. Can be fully qualified or just table name.
 403            columns_to_types: A mapping between the column name and its data type.
 404            primary_key: Determines the table primary key.
 405            exists: Indicates whether to include the IF NOT EXISTS check.
 406            table_description: Optional table description from MODEL DDL.
 407            column_descriptions: Optional column descriptions from model query.
 408            kwargs: Optional create table properties.
 409        """
 410        self._create_table_from_columns(
 411            table_name,
 412            columns_to_types,
 413            primary_key,
 414            exists,
 415            table_description,
 416            column_descriptions,
 417            **kwargs,
 418        )
 419
 420    def ctas(
 421        self,
 422        table_name: TableName,
 423        query_or_df: QueryOrDF,
 424        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 425        exists: bool = True,
 426        table_description: t.Optional[str] = None,
 427        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 428        **kwargs: t.Any,
 429    ) -> None:
 430        """Create a table using a CTAS statement
 431
 432        Args:
 433            table_name: The name of the table to create. Can be fully qualified or just table name.
 434            query_or_df: The SQL query to run or a dataframe for the CTAS.
 435            columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
 436            exists: Indicates whether to include the IF NOT EXISTS check.
 437            table_description: Optional table description from MODEL DDL.
 438            column_descriptions: Optional column descriptions from model query.
 439            kwargs: Optional create table properties.
 440        """
 441        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 442            query_or_df, columns_to_types, target_table=table_name
 443        )
 444        return self._create_table_from_source_queries(
 445            table_name,
 446            source_queries,
 447            columns_to_types,
 448            exists,
 449            table_description=table_description,
 450            column_descriptions=column_descriptions,
 451            **kwargs,
 452        )
 453
 454    def create_state_table(
 455        self,
 456        table_name: str,
 457        columns_to_types: t.Dict[str, exp.DataType],
 458        primary_key: t.Optional[t.Tuple[str, ...]] = None,
 459    ) -> None:
 460        """Create a table to store SQLMesh internal state.
 461
 462        Args:
 463            table_name: The name of the table to create. Can be fully qualified or just table name.
 464            columns_to_types: A mapping between the column name and its data type.
 465            primary_key: Determines the table primary key.
 466        """
 467        self.create_table(
 468            table_name,
 469            columns_to_types,
 470            primary_key=primary_key,
 471        )
 472
 473    def _create_table_from_columns(
 474        self,
 475        table_name: TableName,
 476        columns_to_types: t.Dict[str, exp.DataType],
 477        primary_key: t.Optional[t.Tuple[str, ...]] = None,
 478        exists: bool = True,
 479        table_description: t.Optional[str] = None,
 480        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 481        **kwargs: t.Any,
 482    ) -> None:
 483        """
 484        Create a table using a DDL statement.
 485
 486        Args:
 487            table_name: The name of the table to create. Can be fully qualified or just table name.
 488            columns_to_types: Mapping between the column name and its data type.
 489            primary_key: Determines the table primary key.
 490            exists: Indicates whether to include the IF NOT EXISTS check.
 491            table_description: Optional table description from MODEL DDL.
 492            column_descriptions: Optional column descriptions from model query.
 493            kwargs: Optional create table properties.
 494        """
 495        table = exp.to_table(table_name)
 496
 497        if not columns_to_types_all_known(columns_to_types):
 498            # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set
 499            if exists and self.table_exists(table_name):
 500                return
 501            raise SQLMeshError(
 502                "Cannot create a table without knowing the column types. "
 503                "Try casting the columns to an expected type or defining the columns in the model metadata. "
 504                f"Columns to types: {columns_to_types}"
 505            )
 506
 507        primary_key_expression = (
 508            [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])]
 509            if primary_key and self.SUPPORTS_INDEXES
 510            else []
 511        )
 512
 513        schema = self._build_schema_exp(
 514            table,
 515            columns_to_types,
 516            column_descriptions,
 517            primary_key_expression,
 518        )
 519
 520        self._create_table(
 521            schema,
 522            None,
 523            exists=exists,
 524            columns_to_types=columns_to_types,
 525            table_description=table_description,
 526            **kwargs,
 527        )
 528
 529        # Register comments with commands if the engine doesn't support comments in the schema or CREATE
 530        if (
 531            table_description
 532            and self.COMMENT_CREATION_TABLE.is_comment_command_only
 533            and self.comments_enabled
 534        ):
 535            self._create_table_comment(table_name, table_description)
 536        if (
 537            column_descriptions
 538            and self.COMMENT_CREATION_TABLE.is_comment_command_only
 539            and self.comments_enabled
 540        ):
 541            self._create_column_comments(table_name, column_descriptions)
 542
 543    def _build_schema_exp(
 544        self,
 545        table: exp.Table,
 546        columns_to_types: t.Dict[str, exp.DataType],
 547        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 548        expressions: t.Optional[t.List[exp.PrimaryKey]] = None,
 549        is_view: bool = False,
 550    ) -> exp.Schema:
 551        """
 552        Build a schema expression for a table, columns, column comments, and additional schema properties.
 553        """
 554        expressions = expressions or []
 555        engine_supports_schema_comments = (
 556            self.COMMENT_CREATION_VIEW.supports_schema_def
 557            if is_view
 558            else self.COMMENT_CREATION_TABLE.supports_schema_def
 559        )
 560        return exp.Schema(
 561            this=table,
 562            expressions=[
 563                exp.ColumnDef(
 564                    this=exp.to_identifier(column),
 565                    kind=None if is_view else kind,  # don't include column data type for views
 566                    constraints=(
 567                        self._build_col_comment_exp(column, column_descriptions)
 568                        if column_descriptions
 569                        and engine_supports_schema_comments
 570                        and self.comments_enabled
 571                        else None
 572                    ),
 573                )
 574                for column, kind in columns_to_types.items()
 575            ]
 576            + expressions,
 577        )
 578
 579    def _build_col_comment_exp(
 580        self, col_name: str, column_descriptions: t.Dict[str, str]
 581    ) -> t.List[exp.ColumnConstraint]:
 582        comment = column_descriptions.get(col_name, None)
 583        if comment:
 584            return [
 585                exp.ColumnConstraint(
 586                    kind=exp.CommentColumnConstraint(
 587                        this=exp.Literal.string(self._truncate_column_comment(comment))
 588                    )
 589                )
 590            ]
 591        return []
 592
 593    def _create_table_from_source_queries(
 594        self,
 595        table_name: TableName,
 596        source_queries: t.List[SourceQuery],
 597        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 598        exists: bool = True,
 599        replace: bool = False,
 600        table_description: t.Optional[str] = None,
 601        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 602        **kwargs: t.Any,
 603    ) -> None:
 604        table = exp.to_table(table_name)
 605
 606        # CTAS calls do not usually include a schema expression. However, most engines
 607        # permit them in CTAS expressions, and they allow us to register all column comments
 608        # in a single call rather than in a separate comment command call for each column.
 609        #
 610        # This block conditionally builds a schema expression with column comments if the engine
 611        # supports it and we have columns_to_types. column_to_types is required because the
 612        # schema expression must include at least column name, data type, and the comment -
 613        # for example, `(colname INTEGER COMMENT 'comment')`.
 614        #
 615        # column_to_types will be available when loading from a DataFrame (by converting from
 616        # pandas to SQL types), when a model is "annotated" by explicitly specifying column
 617        # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()`
 618        # calls and SCD Type 2 model calls.
 619        schema = None
 620        columns_to_types_known = columns_to_types and columns_to_types_all_known(columns_to_types)
 621        if (
 622            column_descriptions
 623            and columns_to_types_known
 624            and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas
 625            and self.comments_enabled
 626        ):
 627            schema = self._build_schema_exp(table, columns_to_types, column_descriptions)  # type: ignore
 628
 629        with self.transaction(condition=len(source_queries) > 1):
 630            for i, source_query in enumerate(source_queries):
 631                with source_query as query:
 632                    if i == 0:
 633                        self._create_table(
 634                            schema if schema else table,
 635                            query,
 636                            columns_to_types=columns_to_types,
 637                            exists=exists,
 638                            replace=replace,
 639                            table_description=table_description,
 640                            **kwargs,
 641                        )
 642                    else:
 643                        self._insert_append_query(
 644                            table_name, query, columns_to_types or self.columns(table)
 645                        )
 646
 647        # Register comments with commands if the engine supports comments and we weren't able to
 648        # register them with the CTAS call's schema expression.
 649        if (
 650            table_description
 651            and self.COMMENT_CREATION_TABLE.is_comment_command_only
 652            and self.comments_enabled
 653        ):
 654            self._create_table_comment(table_name, table_description)
 655        if column_descriptions and schema is None and self.comments_enabled:
 656            self._create_column_comments(table_name, column_descriptions)
 657
 658    def _create_table(
 659        self,
 660        table_name_or_schema: t.Union[exp.Schema, TableName],
 661        expression: t.Optional[exp.Expression],
 662        exists: bool = True,
 663        replace: bool = False,
 664        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 665        table_description: t.Optional[str] = None,
 666        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 667        **kwargs: t.Any,
 668    ) -> None:
 669        self.execute(
 670            self._build_create_table_exp(
 671                table_name_or_schema,
 672                expression=expression,
 673                exists=exists,
 674                replace=replace,
 675                columns_to_types=columns_to_types,
 676                table_description=(
 677                    table_description
 678                    if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled
 679                    else None
 680                ),
 681                **kwargs,
 682            )
 683        )
 684
 685    def _build_create_table_exp(
 686        self,
 687        table_name_or_schema: t.Union[exp.Schema, TableName],
 688        expression: t.Optional[exp.Expression],
 689        exists: bool = True,
 690        replace: bool = False,
 691        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 692        **kwargs: t.Any,
 693    ) -> exp.Create:
 694        exists = False if replace else exists
 695        catalog_name = None
 696        if not isinstance(table_name_or_schema, exp.Schema):
 697            table_name_or_schema = exp.to_table(table_name_or_schema)
 698            catalog_name = table_name_or_schema.catalog
 699        else:
 700            if isinstance(table_name_or_schema.this, exp.Table):
 701                catalog_name = table_name_or_schema.this.catalog
 702
 703        properties = (
 704            self._build_table_properties_exp(
 705                **kwargs, catalog_name=catalog_name, columns_to_types=columns_to_types
 706            )
 707            if kwargs
 708            else None
 709        )
 710        return exp.Create(
 711            this=table_name_or_schema,
 712            kind="TABLE",
 713            replace=replace,
 714            exists=exists,
 715            expression=expression,
 716            properties=properties,
 717        )
 718
 719    def create_table_like(
 720        self,
 721        target_table_name: TableName,
 722        source_table_name: TableName,
 723        exists: bool = True,
 724    ) -> None:
 725        """
 726        Create a table like another table or view.
 727        """
 728        target_table = exp.to_table(target_table_name)
 729        source_table = exp.to_table(source_table_name)
 730        create_expression = exp.Create(
 731            this=target_table,
 732            kind="TABLE",
 733            exists=exists,
 734            properties=exp.Properties(
 735                expressions=[
 736                    exp.LikeProperty(this=source_table),
 737                ]
 738            ),
 739        )
 740        self.execute(create_expression)
 741
 742    def clone_table(
 743        self,
 744        target_table_name: TableName,
 745        source_table_name: TableName,
 746        replace: bool = False,
 747        clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
 748        **kwargs: t.Any,
 749    ) -> None:
 750        """Creates a table with the target name by cloning the source table.
 751
 752        Args:
 753            target_table_name: The name of the table that should be created.
 754            source_table_name: The name of the source table that should be cloned.
 755            replace: Whether or not to replace an existing table.
 756        """
 757        if not self.SUPPORTS_CLONING:
 758            raise NotImplementedError(f"Engine does not support cloning: {type(self)}")
 759        self.execute(
 760            exp.Create(
 761                this=exp.to_table(target_table_name),
 762                kind="TABLE",
 763                replace=replace,
 764                clone=exp.Clone(
 765                    this=exp.to_table(source_table_name),
 766                    **(clone_kwargs or {}),
 767                ),
 768                **kwargs,
 769            )
 770        )
 771
 772    def drop_table(self, table_name: TableName, exists: bool = True) -> None:
 773        """Drops a table.
 774
 775        Args:
 776            table_name: The name of the table to drop.
 777            exists: If exists, defaults to True.
 778        """
 779        drop_expression = exp.Drop(this=exp.to_table(table_name), kind="TABLE", exists=exists)
 780        self.execute(drop_expression)
 781
 782    def alter_table(
 783        self,
 784        current_table_name: TableName,
 785        target_table_name: TableName,
 786    ) -> None:
 787        """
 788        Performs the required alter statements to change the current table into the structure of the target table.
 789        """
 790        with self.transaction():
 791            for alter_expression in self.SCHEMA_DIFFER.compare_columns(
 792                current_table_name,
 793                self.columns(current_table_name),
 794                self.columns(target_table_name),
 795            ):
 796                self.execute(alter_expression)
 797
 798    def create_view(
 799        self,
 800        view_name: TableName,
 801        query_or_df: QueryOrDF,
 802        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 803        replace: bool = True,
 804        materialized: bool = False,
 805        table_description: t.Optional[str] = None,
 806        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 807        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
 808        **create_kwargs: t.Any,
 809    ) -> None:
 810        """Create a view with a query or dataframe.
 811
 812        If a dataframe is passed in, it will be converted into a literal values statement.
 813        This should only be done if the dataframe is very small!
 814
 815        Args:
 816            view_name: The view name.
 817            query_or_df: A query or dataframe.
 818            columns_to_types: Columns to use in the view statement.
 819            replace: Whether or not to replace an existing view defaults to True.
 820            materialized: Whether to create a a materialized view. Only used for engines that support this feature.
 821            table_description: Optional table description from MODEL DDL.
 822            column_descriptions: Optional column descriptions from model query.
 823            view_properties: Optional view properties to add to the view.
 824            create_kwargs: Additional kwargs to pass into the Create expression
 825        """
 826        if self.is_pandas_df(query_or_df):
 827            values = list(t.cast(pd.DataFrame, query_or_df).itertuples(index=False, name=None))
 828            columns_to_types = columns_to_types or self._columns_to_types(query_or_df)
 829            if not columns_to_types:
 830                raise SQLMeshError("columns_to_types must be provided for dataframes")
 831            query_or_df = self._values_to_sql(
 832                values,
 833                columns_to_types,
 834                batch_start=0,
 835                batch_end=len(values),
 836            )
 837
 838        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 839            query_or_df, columns_to_types, batch_size=0, target_table=view_name
 840        )
 841        if len(source_queries) != 1:
 842            raise SQLMeshError("Only one source query is supported for creating views")
 843
 844        schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name)
 845        if columns_to_types:
 846            schema = self._build_schema_exp(
 847                exp.to_table(view_name), columns_to_types, column_descriptions, is_view=True
 848            )
 849
 850        properties = create_kwargs.pop("properties", None)
 851        if not properties:
 852            properties = exp.Properties(expressions=[])
 853
 854        if materialized and self.SUPPORTS_MATERIALIZED_VIEWS:
 855            properties.append("expressions", exp.MaterializedProperty())
 856
 857            if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema):
 858                schema = schema.this
 859
 860        create_view_properties = self._build_view_properties_exp(
 861            view_properties,
 862            (
 863                table_description
 864                if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled
 865                else None
 866            ),
 867        )
 868        if create_view_properties:
 869            for view_property in create_view_properties.expressions:
 870                properties.append("expressions", view_property)
 871
 872        if properties.expressions:
 873            create_kwargs["properties"] = properties
 874
 875        with source_queries[0] as query:
 876            self.execute(
 877                exp.Create(
 878                    this=schema,
 879                    kind="VIEW",
 880                    replace=replace,
 881                    expression=query,
 882                    **create_kwargs,
 883                ),
 884                quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS,
 885            )
 886
 887        # Register table comment with commands if the engine doesn't support doing it in CREATE
 888        if (
 889            table_description
 890            and self.COMMENT_CREATION_VIEW.is_comment_command_only
 891            and self.comments_enabled
 892        ):
 893            self._create_table_comment(view_name, table_description, "VIEW")
 894        # Register column comments with commands if the engine doesn't support doing it in
 895        # CREATE or we couldn't do it in the CREATE schema definition because we don't have
 896        # columns_to_types
 897        if (
 898            column_descriptions
 899            and (
 900                self.COMMENT_CREATION_VIEW.is_comment_command_only
 901                or (
 902                    self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands
 903                    and not columns_to_types
 904                )
 905            )
 906            and self.comments_enabled
 907        ):
 908            self._create_column_comments(view_name, column_descriptions, "VIEW")
 909
 910    @set_catalog()
 911    def create_schema(
 912        self,
 913        schema_name: SchemaName,
 914        ignore_if_exists: bool = True,
 915        warn_on_error: bool = True,
 916    ) -> None:
 917        """Create a schema from a name or qualified table name."""
 918        try:
 919            self.execute(
 920                exp.Create(
 921                    this=to_schema(schema_name),
 922                    kind="SCHEMA",
 923                    exists=ignore_if_exists,
 924                )
 925            )
 926        except Exception as e:
 927            if not warn_on_error:
 928                raise
 929            logger.warning("Failed to create schema '%s': %s", schema_name, e)
 930
 931    def drop_schema(
 932        self,
 933        schema_name: SchemaName,
 934        ignore_if_not_exists: bool = True,
 935        cascade: bool = False,
 936    ) -> None:
 937        self.execute(
 938            exp.Drop(
 939                this=to_schema(schema_name),
 940                kind="SCHEMA",
 941                exists=ignore_if_not_exists,
 942                cascade=cascade,
 943            )
 944        )
 945
 946    def drop_view(
 947        self,
 948        view_name: TableName,
 949        ignore_if_not_exists: bool = True,
 950        materialized: bool = False,
 951        **kwargs: t.Any,
 952    ) -> None:
 953        """Drop a view."""
 954        self.execute(
 955            exp.Drop(
 956                this=exp.to_table(view_name),
 957                exists=ignore_if_not_exists,
 958                materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS,
 959                kind="VIEW",
 960                **kwargs,
 961            )
 962        )
 963
 964    def columns(
 965        self, table_name: TableName, include_pseudo_columns: bool = False
 966    ) -> t.Dict[str, exp.DataType]:
 967        """Fetches column names and types for the target table."""
 968        self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
 969        describe_output = self.cursor.fetchall()
 970        return {
 971            # Note: MySQL  returns the column type as bytes.
 972            column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect)
 973            for column_name, column_type, *_ in itertools.takewhile(
 974                lambda t: not t[0].startswith("#"),
 975                describe_output,
 976            )
 977            if column_name and column_name.strip() and column_type and column_type.strip()
 978        }
 979
 980    def table_exists(self, table_name: TableName) -> bool:
 981        try:
 982            self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
 983            return True
 984        except Exception:
 985            return False
 986
 987    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
 988        self.execute(exp.delete(table_name, where))
 989
 990    def insert_append(
 991        self,
 992        table_name: TableName,
 993        query_or_df: QueryOrDF,
 994        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 995    ) -> None:
 996        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
 997            query_or_df, columns_to_types, target_table=table_name
 998        )
 999        self._insert_append_source_queries(table_name, source_queries, columns_to_types)
1000
1001    def _insert_append_source_queries(
1002        self,
1003        table_name: TableName,
1004        source_queries: t.List[SourceQuery],
1005        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1006    ) -> None:
1007        with self.transaction(condition=len(source_queries) > 0):
1008            columns_to_types = columns_to_types or self.columns(table_name)
1009            for source_query in source_queries:
1010                with source_query as query:
1011                    self._insert_append_query(table_name, query, columns_to_types)
1012
1013    def _insert_append_query(
1014        self,
1015        table_name: TableName,
1016        query: Query,
1017        columns_to_types: t.Dict[str, exp.DataType],
1018        order_projections: bool = True,
1019    ) -> None:
1020        if order_projections:
1021            query = self._order_projections_and_filter(query, columns_to_types)
1022        self.execute(exp.insert(query, table_name, columns=list(columns_to_types)))
1023
1024    def insert_overwrite_by_partition(
1025        self,
1026        table_name: TableName,
1027        query_or_df: QueryOrDF,
1028        partitioned_by: t.List[exp.Expression],
1029        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1030    ) -> None:
1031        raise NotImplementedError(
1032            "Insert Overwrite by Partition (not time) is not supported by this engine"
1033        )
1034
1035    def insert_overwrite_by_time_partition(
1036        self,
1037        table_name: TableName,
1038        query_or_df: QueryOrDF,
1039        start: TimeLike,
1040        end: TimeLike,
1041        time_formatter: t.Callable[
1042            [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression
1043        ],
1044        time_column: TimeColumn | exp.Expression | str,
1045        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1046        **kwargs: t.Any,
1047    ) -> None:
1048        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1049            query_or_df, columns_to_types, target_table=table_name
1050        )
1051        columns_to_types = columns_to_types or self.columns(table_name)
1052        low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)]
1053        if isinstance(time_column, TimeColumn):
1054            time_column = time_column.column
1055        where = exp.Between(
1056            this=exp.to_column(time_column) if isinstance(time_column, str) else time_column,
1057            low=low,
1058            high=high,
1059        )
1060        self._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where)
1061
1062    def _values_to_sql(
1063        self,
1064        values: t.List[t.Tuple[t.Any, ...]],
1065        columns_to_types: t.Dict[str, exp.DataType],
1066        batch_start: int,
1067        batch_end: int,
1068        alias: str = "t",
1069    ) -> Query:
1070        return select_from_values_for_batch_range(
1071            values=values,
1072            columns_to_types=columns_to_types,
1073            batch_start=batch_start,
1074            batch_end=batch_end,
1075            alias=alias,
1076        )
1077
1078    def _insert_overwrite_by_condition(
1079        self,
1080        table_name: TableName,
1081        source_queries: t.List[SourceQuery],
1082        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1083        where: t.Optional[exp.Condition] = None,
1084        insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
1085    ) -> None:
1086        table = exp.to_table(table_name)
1087        insert_overwrite_strategy = (
1088            insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY
1089        )
1090        with self.transaction(
1091            condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert
1092        ):
1093            columns_to_types = columns_to_types or self.columns(table_name)
1094            for i, source_query in enumerate(source_queries):
1095                with source_query as query:
1096                    query = self._order_projections_and_filter(query, columns_to_types, where=where)
1097                    if i > 0 or insert_overwrite_strategy.is_delete_insert:
1098                        if i == 0:
1099                            self.delete_from(table_name, where=where or exp.true())
1100                        self._insert_append_query(
1101                            table_name,
1102                            query,
1103                            columns_to_types=columns_to_types,
1104                            order_projections=False,
1105                        )
1106                    else:
1107                        insert_exp = exp.insert(
1108                            query,
1109                            table,
1110                            columns=(
1111                                list(columns_to_types)
1112                                if not insert_overwrite_strategy.is_replace_where
1113                                else None
1114                            ),
1115                            overwrite=insert_overwrite_strategy.is_insert_overwrite,
1116                        )
1117                        if insert_overwrite_strategy.is_replace_where:
1118                            insert_exp.set("where", where or exp.true())
1119                        self.execute(insert_exp)
1120
1121    def update_table(
1122        self,
1123        table_name: TableName,
1124        properties: t.Dict[str, t.Any],
1125        where: t.Optional[str | exp.Condition] = None,
1126    ) -> None:
1127        self.execute(exp.update(table_name, properties, where=where))
1128
1129    def _merge(
1130        self,
1131        target_table: TableName,
1132        query: Query,
1133        on: exp.Expression,
1134        match_expressions: t.List[exp.When],
1135    ) -> None:
1136        this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True)
1137        using = exp.alias_(
1138            exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
1139        )
1140        self.execute(
1141            exp.Merge(
1142                this=this,
1143                using=using,
1144                on=on,
1145                expressions=match_expressions,
1146            )
1147        )
1148
1149    def scd_type_2_by_time(
1150        self,
1151        target_table: TableName,
1152        source_table: QueryOrDF,
1153        unique_key: t.Sequence[exp.Expression],
1154        valid_from_col: exp.Column,
1155        valid_to_col: exp.Column,
1156        execution_time: TimeLike,
1157        updated_at_col: exp.Column,
1158        invalidate_hard_deletes: bool = True,
1159        updated_at_as_valid_from: bool = False,
1160        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1161        table_description: t.Optional[str] = None,
1162        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1163        truncate: bool = False,
1164        **kwargs: t.Any,
1165    ) -> None:
1166        self._scd_type_2(
1167            target_table=target_table,
1168            source_table=source_table,
1169            unique_key=unique_key,
1170            valid_from_col=valid_from_col,
1171            valid_to_col=valid_to_col,
1172            execution_time=execution_time,
1173            updated_at_col=updated_at_col,
1174            invalidate_hard_deletes=invalidate_hard_deletes,
1175            updated_at_as_valid_from=updated_at_as_valid_from,
1176            columns_to_types=columns_to_types,
1177            table_description=table_description,
1178            column_descriptions=column_descriptions,
1179            truncate=truncate,
1180        )
1181
1182    def scd_type_2_by_column(
1183        self,
1184        target_table: TableName,
1185        source_table: QueryOrDF,
1186        unique_key: t.Sequence[exp.Expression],
1187        valid_from_col: exp.Column,
1188        valid_to_col: exp.Column,
1189        execution_time: TimeLike,
1190        check_columns: t.Union[exp.Star, t.Sequence[exp.Column]],
1191        invalidate_hard_deletes: bool = True,
1192        execution_time_as_valid_from: bool = False,
1193        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1194        table_description: t.Optional[str] = None,
1195        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1196        truncate: bool = False,
1197        **kwargs: t.Any,
1198    ) -> None:
1199        self._scd_type_2(
1200            target_table=target_table,
1201            source_table=source_table,
1202            unique_key=unique_key,
1203            valid_from_col=valid_from_col,
1204            valid_to_col=valid_to_col,
1205            execution_time=execution_time,
1206            check_columns=check_columns,
1207            columns_to_types=columns_to_types,
1208            invalidate_hard_deletes=invalidate_hard_deletes,
1209            execution_time_as_valid_from=execution_time_as_valid_from,
1210            table_description=table_description,
1211            column_descriptions=column_descriptions,
1212            truncate=truncate,
1213        )
1214
1215    def _scd_type_2(
1216        self,
1217        target_table: TableName,
1218        source_table: QueryOrDF,
1219        unique_key: t.Sequence[exp.Expression],
1220        valid_from_col: exp.Column,
1221        valid_to_col: exp.Column,
1222        execution_time: TimeLike,
1223        invalidate_hard_deletes: bool = True,
1224        updated_at_col: t.Optional[exp.Column] = None,
1225        check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None,
1226        updated_at_as_valid_from: bool = False,
1227        execution_time_as_valid_from: bool = False,
1228        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1229        table_description: t.Optional[str] = None,
1230        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1231        truncate: bool = False,
1232    ) -> None:
1233        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1234            source_table, columns_to_types, target_table=target_table, batch_size=0
1235        )
1236        columns_to_types = columns_to_types or self.columns(target_table)
1237        valid_from_name = valid_from_col.name
1238        valid_to_name = valid_to_col.name
1239        updated_at_name = updated_at_col.name if updated_at_col else None
1240        if (
1241            valid_from_name not in columns_to_types
1242            or valid_to_name not in columns_to_types
1243            or not columns_to_types_all_known(columns_to_types)
1244        ):
1245            columns_to_types = self.columns(target_table)
1246        if not columns_to_types:
1247            raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?")
1248        if not unique_key:
1249            raise SQLMeshError("unique_key must be provided for SCD Type 2")
1250        if check_columns and updated_at_col:
1251            raise SQLMeshError(
1252                "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2"
1253            )
1254        if check_columns and updated_at_as_valid_from:
1255            raise SQLMeshError(
1256                "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2"
1257            )
1258        if execution_time_as_valid_from and not check_columns:
1259            raise SQLMeshError(
1260                "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2"
1261            )
1262        if updated_at_name and updated_at_name not in columns_to_types:
1263            raise SQLMeshError(
1264                f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2"
1265            )
1266
1267        unmanaged_columns = [
1268            col for col in columns_to_types if col not in {valid_from_name, valid_to_name}
1269        ]
1270        time_data_type = columns_to_types[valid_from_name]
1271        select_source_columns: t.List[t.Union[str, exp.Alias]] = [
1272            col for col in unmanaged_columns if col != updated_at_name
1273        ]
1274        table_columns = [exp.column(c, quoted=True) for c in columns_to_types]
1275        if updated_at_name:
1276            select_source_columns.append(
1277                exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this)  # type: ignore
1278            )
1279
1280        # If a star is provided, we include all unmanaged columns in the check.
1281        # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know
1282        # they are equal or not, the extra check is not a problem and we gain simplified logic here.
1283        # If we want to change this, then we just need to check the expressions in unique_key and pull out the
1284        # column names and then remove them from the unmanaged_columns
1285        if check_columns and check_columns == exp.Star():
1286            check_columns = [exp.column(col) for col in unmanaged_columns]
1287        execution_ts = to_time_column(execution_time, time_data_type)
1288        if updated_at_as_valid_from:
1289            if not updated_at_col:
1290                raise SQLMeshError(
1291                    "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2"
1292                )
1293            update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col
1294        elif execution_time_as_valid_from:
1295            update_valid_from_start = execution_ts
1296        else:
1297            update_valid_from_start = to_time_column("1970-01-01 00:00:00+00:00", time_data_type)
1298        insert_valid_from_start = execution_ts if check_columns else updated_at_col  # type: ignore
1299        # joined._exists IS NULL is saying "if the row is deleted"
1300        delete_check = (
1301            exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None
1302        )
1303        prefixed_valid_to_col = valid_to_col.copy()
1304        prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}")
1305        prefixed_valid_from_col = valid_from_col.copy()
1306        prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}")
1307        if check_columns:
1308            row_check_conditions = []
1309            for col in check_columns:
1310                t_col = col.copy()
1311                t_col.this.set("this", f"t_{col.name}")
1312                row_check_conditions.extend(
1313                    [
1314                        col.neq(t_col),
1315                        exp.and_(t_col.is_(exp.Null()), col.is_(exp.Null()).not_()),
1316                        exp.and_(t_col.is_(exp.Null()).not_(), col.is_(exp.Null())),
1317                    ]
1318                )
1319            row_value_check = exp.or_(*row_check_conditions)
1320            unique_key_conditions = []
1321            for col in unique_key:
1322                t_col = col.copy()
1323                t_col.this.set("this", f"t_{col.name}")
1324                unique_key_conditions.extend(
1325                    [t_col.is_(exp.Null()).not_(), col.is_(exp.Null()).not_()]
1326                )
1327            unique_key_check = exp.and_(*unique_key_conditions)
1328            # unique_key_check is saying "if the row is updated"
1329            # row_value_check is saying "if the row has changed"
1330            updated_row_filter = exp.and_(unique_key_check, row_value_check)
1331            valid_to_case_stmt = (
1332                exp.Case()
1333                .when(
1334                    exp.and_(
1335                        exp.or_(
1336                            delete_check,
1337                            updated_row_filter,
1338                        )
1339                    ),
1340                    execution_ts,
1341                )
1342                .else_(prefixed_valid_to_col)
1343                .as_(valid_to_col.this)
1344            )
1345            valid_from_case_stmt = exp.func(
1346                "COALESCE",
1347                prefixed_valid_from_col,
1348                update_valid_from_start,
1349            ).as_(valid_from_col.this)
1350        else:
1351            assert updated_at_col is not None
1352            prefixed_updated_at_col = updated_at_col.copy()
1353            prefixed_updated_at_col.this.set("this", f"t_{updated_at_col.name}")
1354            updated_row_filter = updated_at_col > prefixed_updated_at_col
1355
1356            valid_to_case_stmt_builder = exp.Case().when(updated_row_filter, updated_at_col)
1357            if delete_check:
1358                valid_to_case_stmt_builder = valid_to_case_stmt_builder.when(
1359                    delete_check, execution_ts
1360                )
1361            valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_(
1362                valid_to_col.this
1363            )
1364
1365            valid_from_case_stmt = (
1366                exp.Case()
1367                .when(
1368                    exp.and_(
1369                        prefixed_valid_from_col.is_(exp.Null()),
1370                        exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(),
1371                    ),
1372                    exp.Case()
1373                    .when(
1374                        exp.column(valid_to_col.this, "latest_deleted") > updated_at_col,
1375                        exp.column(valid_to_col.this, "latest_deleted"),
1376                    )
1377                    .else_(updated_at_col),
1378                )
1379                .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start)
1380                .else_(prefixed_valid_from_col)
1381            ).as_(valid_from_col.this)
1382
1383        existing_rows_query = exp.select(*table_columns).from_(target_table)
1384        if truncate:
1385            existing_rows_query = existing_rows_query.limit(0)
1386
1387        with source_queries[0] as source_query:
1388            prefixed_columns_to_types = []
1389            for column in columns_to_types:
1390                prefixed_col = exp.column(column).copy()
1391                prefixed_col.this.set("this", f"t_{prefixed_col.name}")
1392                prefixed_columns_to_types.append(prefixed_col)
1393            prefixed_unmanaged_columns = []
1394            for column in unmanaged_columns:
1395                prefixed_col = exp.column(column).copy()
1396                prefixed_col.this.set("this", f"t_{prefixed_col.name}")
1397                prefixed_unmanaged_columns.append(prefixed_col)
1398            query = (
1399                exp.Select()  # type: ignore
1400                .with_(
1401                    "source",
1402                    exp.select(exp.true().as_("_exists"), *select_source_columns)
1403                    .distinct(*unique_key)
1404                    .from_(source_query.subquery("raw_source")),  # type: ignore
1405                )
1406                # Historical Records that Do Not Change
1407                .with_(
1408                    "static",
1409                    existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
1410                )
1411                # Latest Records that can be updated
1412                .with_(
1413                    "latest",
1414                    existing_rows_query.where(valid_to_col.is_(exp.Null())),
1415                )
1416                # Deleted records which can be used to determine `valid_from` for undeleted source records
1417                .with_(
1418                    "deleted",
1419                    exp.select(*[exp.column(col, "static") for col in columns_to_types])
1420                    .from_("static")
1421                    .join(
1422                        "latest",
1423                        on=exp.and_(
1424                            *[
1425                                add_table(key, "static").eq(add_table(key, "latest"))
1426                                for key in unique_key
1427                            ]
1428                        ),
1429                        join_type="left",
1430                    )
1431                    .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())),
1432                )
1433                # Get the latest `valid_to` deleted record for each unique key
1434                .with_(
1435                    "latest_deleted",
1436                    exp.select(
1437                        exp.true().as_("_exists"),
1438                        *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)),
1439                        exp.Max(this=valid_to_col).as_(valid_to_col.this),
1440                    )
1441                    .from_("deleted")
1442                    .group_by(*unique_key),
1443                )
1444                # Do a full join between latest records and source table in order to combine them together
1445                # MySQL doesn't suport full join so going to do a left then right join and remove dups with union
1446                .with_(
1447                    "joined",
1448                    exp.select(
1449                        exp.column("_exists", table="source"),
1450                        *(
1451                            exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this)
1452                            for i, col in enumerate(columns_to_types)
1453                        ),
1454                        *(exp.column(col, table="source").as_(col) for col in unmanaged_columns),
1455                    )
1456                    .from_("latest")
1457                    .join(
1458                        "source",
1459                        on=exp.and_(
1460                            *[
1461                                add_table(key, "latest").eq(add_table(key, "source"))
1462                                for key in unique_key
1463                            ]
1464                        ),
1465                        join_type="left",
1466                    )
1467                    .union(
1468                        exp.select(
1469                            exp.column("_exists", table="source"),
1470                            *(
1471                                exp.column(col, table="latest").as_(
1472                                    prefixed_columns_to_types[i].this
1473                                )
1474                                for i, col in enumerate(columns_to_types)
1475                            ),
1476                            *(
1477                                exp.column(col, table="source").as_(col)
1478                                for col in unmanaged_columns
1479                            ),
1480                        )
1481                        .from_("latest")
1482                        .join(
1483                            "source",
1484                            on=exp.and_(
1485                                *[
1486                                    add_table(key, "latest").eq(add_table(key, "source"))
1487                                    for key in unique_key
1488                                ]
1489                            ),
1490                            join_type="right",
1491                        )
1492                    ),
1493                )
1494                # Get deleted, new, no longer current, or unchanged records
1495                .with_(
1496                    "updated_rows",
1497                    exp.select(
1498                        *(
1499                            exp.func(
1500                                "COALESCE",
1501                                exp.column(prefixed_unmanaged_columns[i].this, table="joined"),
1502                                exp.column(col, table="joined"),
1503                            ).as_(col)
1504                            for i, col in enumerate(unmanaged_columns)
1505                        ),
1506                        valid_from_case_stmt,
1507                        valid_to_case_stmt,
1508                    )
1509                    .from_("joined")
1510                    .join(
1511                        "latest_deleted",
1512                        on=exp.and_(
1513                            *[
1514                                add_table(part, "joined").eq(
1515                                    exp.column(f"_key{i}", "latest_deleted")
1516                                )
1517                                for i, part in enumerate(unique_key)
1518                            ]
1519                        ),
1520                        join_type="left",
1521                    ),
1522                )
1523                # Get records that have been "updated" which means inserting a new record with previous `valid_from`
1524                .with_(
1525                    "inserted_rows",
1526                    exp.select(
1527                        *unmanaged_columns,
1528                        insert_valid_from_start.as_(valid_from_col.this),  # type: ignore
1529                        to_time_column(exp.null(), time_data_type).as_(valid_to_col.this),
1530                    )
1531                    .from_("joined")
1532                    .where(updated_row_filter),
1533                )
1534                .select(*table_columns)
1535                .from_("static")
1536                .union(
1537                    exp.select(*table_columns).from_("updated_rows"),
1538                    distinct=False,
1539                )
1540                .union(
1541                    exp.select(*table_columns).from_("inserted_rows"),
1542                    distinct=False,
1543                )
1544            )
1545
1546            self.replace_query(
1547                target_table,
1548                query,
1549                columns_to_types=columns_to_types,
1550                table_description=table_description,
1551                column_descriptions=column_descriptions,
1552            )
1553
1554    def merge(
1555        self,
1556        target_table: TableName,
1557        source_table: QueryOrDF,
1558        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
1559        unique_key: t.Sequence[exp.Expression],
1560        when_matched: t.Optional[exp.When] = None,
1561    ) -> None:
1562        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1563            source_table, columns_to_types, target_table=target_table
1564        )
1565        columns_to_types = columns_to_types or self.columns(target_table)
1566        on = exp.and_(
1567            *(
1568                add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS))
1569                for part in unique_key
1570            )
1571        )
1572        if not when_matched:
1573            when_matched = exp.When(
1574                matched=True,
1575                source=False,
1576                then=exp.Update(
1577                    expressions=[
1578                        exp.column(col, MERGE_TARGET_ALIAS).eq(exp.column(col, MERGE_SOURCE_ALIAS))
1579                        for col in columns_to_types
1580                    ],
1581                ),
1582            )
1583        when_not_matched = exp.When(
1584            matched=False,
1585            source=False,
1586            then=exp.Insert(
1587                this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]),
1588                expression=exp.Tuple(
1589                    expressions=[exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types]
1590                ),
1591            ),
1592        )
1593        for source_query in source_queries:
1594            with source_query as query:
1595                self._merge(
1596                    target_table=target_table,
1597                    query=query,
1598                    on=on,
1599                    match_expressions=[when_matched, when_not_matched],
1600                )
1601
1602    def rename_table(
1603        self,
1604        old_table_name: TableName,
1605        new_table_name: TableName,
1606    ) -> None:
1607        new_table = exp.to_table(new_table_name)
1608        if new_table.catalog:
1609            old_table = exp.to_table(old_table_name)
1610            catalog = old_table.catalog or self.get_current_catalog()
1611            if catalog != new_table.catalog:
1612                raise UnsupportedCatalogOperationError(
1613                    "Tried to rename table across catalogs which is not supported"
1614                )
1615        self._rename_table(old_table_name, new_table_name)
1616
1617    def get_data_objects(
1618        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1619    ) -> t.List[DataObject]:
1620        """Lists all data objects in the target schema.
1621
1622        Args:
1623            schema_name: The name of the schema to list data objects from.
1624            object_names: If provided, only return data objects with these names.
1625
1626        Returns:
1627            A list of data objects in the target schema.
1628        """
1629        if object_names is not None:
1630            if not object_names:
1631                return []
1632            object_names_list = list(object_names)
1633            batches = [
1634                object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE]
1635                for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE)
1636            ]
1637            return [
1638                obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch))
1639            ]
1640        return self._get_data_objects(schema_name)
1641
1642    def fetchone(
1643        self,
1644        query: t.Union[exp.Expression, str],
1645        ignore_unsupported_errors: bool = False,
1646        quote_identifiers: bool = False,
1647    ) -> t.Tuple:
1648        with self.transaction():
1649            self.execute(
1650                query,
1651                ignore_unsupported_errors=ignore_unsupported_errors,
1652                quote_identifiers=quote_identifiers,
1653            )
1654            return self.cursor.fetchone()
1655
1656    def fetchall(
1657        self,
1658        query: t.Union[exp.Expression, str],
1659        ignore_unsupported_errors: bool = False,
1660        quote_identifiers: bool = False,
1661    ) -> t.List[t.Tuple]:
1662        with self.transaction():
1663            self.execute(
1664                query,
1665                ignore_unsupported_errors=ignore_unsupported_errors,
1666                quote_identifiers=quote_identifiers,
1667            )
1668            return self.cursor.fetchall()
1669
1670    def _fetch_native_df(
1671        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1672    ) -> DF:
1673        """Fetches a DataFrame that can be either Pandas or PySpark from the cursor"""
1674        with self.transaction():
1675            self.execute(query, quote_identifiers=quote_identifiers)
1676            return self.cursor.fetchdf()
1677
1678    def fetchdf(
1679        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1680    ) -> pd.DataFrame:
1681        """Fetches a Pandas DataFrame from the cursor"""
1682        df = self._fetch_native_df(query, quote_identifiers=quote_identifiers)
1683        if not isinstance(df, pd.DataFrame):
1684            raise NotImplementedError(
1685                "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned"
1686            )
1687        return df
1688
1689    def fetch_pyspark_df(
1690        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1691    ) -> PySparkDataFrame:
1692        """Fetches a PySpark DataFrame from the cursor"""
1693        raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")
1694
1695    def wap_supported(self, table_name: TableName) -> bool:
1696        """Returns whether WAP for the target table is supported."""
1697        return False
1698
1699    def wap_table_name(self, table_name: TableName, wap_id: str) -> str:
1700        """Returns the updated table name for the given WAP ID.
1701
1702        Args:
1703            table_name: The name of the target table.
1704            wap_id: The WAP ID to prepare.
1705
1706        Returns:
1707            The updated table name that should be used for writing.
1708        """
1709        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
1710
1711    def wap_prepare(self, table_name: TableName, wap_id: str) -> str:
1712        """Prepares the target table for WAP and returns the updated table name.
1713
1714        Args:
1715            table_name: The name of the target table.
1716            wap_id: The WAP ID to prepare.
1717
1718        Returns:
1719            The updated table name that should be used for writing.
1720        """
1721        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
1722
1723    def wap_publish(self, table_name: TableName, wap_id: str) -> None:
1724        """Publishes changes with the given WAP ID to the target table.
1725
1726        Args:
1727            table_name: The name of the target table.
1728            wap_id: The WAP ID to publish.
1729        """
1730        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
1731
1732    @contextlib.contextmanager
1733    def transaction(
1734        self,
1735        condition: t.Optional[bool] = None,
1736    ) -> t.Iterator[None]:
1737        """A transaction context manager."""
1738        if (
1739            self._connection_pool.is_transaction_active
1740            or not self.SUPPORTS_TRANSACTIONS
1741            or (condition is not None and not condition)
1742        ):
1743            yield
1744            return
1745        self._connection_pool.begin()
1746        try:
1747            yield
1748        except Exception as e:
1749            self._connection_pool.rollback()
1750            raise e
1751        else:
1752            self._connection_pool.commit()
1753
1754    @contextlib.contextmanager
1755    def session(self, properties: SessionProperties) -> t.Iterator[None]:
1756        """A session context manager."""
1757        if self._is_session_active():
1758            yield
1759            return
1760
1761        self._begin_session(properties)
1762        try:
1763            yield
1764        finally:
1765            self._end_session()
1766
1767    def _begin_session(self, properties: SessionProperties) -> t.Any:
1768        """Begin a new session."""
1769
1770    def _end_session(self) -> None:
1771        """End the existing session."""
1772
1773    def _is_session_active(self) -> bool:
1774        """Indicates whether or not a session is active."""
1775        return False
1776
1777    def execute(
1778        self,
1779        expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
1780        ignore_unsupported_errors: bool = False,
1781        quote_identifiers: bool = True,
1782        **kwargs: t.Any,
1783    ) -> None:
1784        """Execute a sql query."""
1785        to_sql_kwargs = (
1786            {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {}
1787        )
1788
1789        with self.transaction():
1790            for e in ensure_list(expressions):
1791                sql = t.cast(
1792                    str,
1793                    (
1794                        self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs)
1795                        if isinstance(e, exp.Expression)
1796                        else e
1797                    ),
1798                )
1799                self._log_sql(sql)
1800                self._execute(sql, **kwargs)
1801
1802    def _log_sql(self, sql: str) -> None:
1803        logger.log(self._execute_log_level, "Executing SQL: %s", sql)
1804
1805    def _execute(self, sql: str, **kwargs: t.Any) -> None:
1806        self.cursor.execute(sql, **kwargs)
1807
1808    @contextlib.contextmanager
1809    def temp_table(
1810        self,
1811        query_or_df: QueryOrDF,
1812        name: TableName = "diff",
1813        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1814        **kwargs: t.Any,
1815    ) -> t.Iterator[exp.Table]:
1816        """A context manager for working a temp table.
1817
1818        The table will be created with a random guid and cleaned up after the block.
1819
1820        Args:
1821            query_or_df: The query or df to create a temp table for.
1822            name: The base name of the temp table.
1823            columns_to_types: A mapping between the column name and its data type.
1824
1825        Yields:
1826            The table expression
1827        """
1828        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1829            query_or_df, columns_to_types=columns_to_types, target_table=name
1830        )
1831
1832        with self.transaction():
1833            table = self._get_temp_table(name)
1834            if table.db:
1835                self.create_schema(schema_(table.args["db"], table.args.get("catalog")))
1836            self._create_table_from_source_queries(
1837                table,
1838                source_queries,
1839                columns_to_types,
1840                exists=True,
1841                table_description=None,
1842                column_descriptions=None,
1843                **kwargs,
1844            )
1845
1846            try:
1847                yield table
1848            finally:
1849                self.drop_table(table)
1850
1851    def _table_or_view_properties_to_expressions(
1852        self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expression]] = None
1853    ) -> t.List[exp.Property]:
1854        """Converts model properties (either physical or virtual) to a list of property expressions."""
1855        if not table_or_view_properties:
1856            return []
1857        return [
1858            exp.Property(this=key, value=value.copy())
1859            for key, value in table_or_view_properties.items()
1860        ]
1861
1862    def _build_table_properties_exp(
1863        self,
1864        catalog_name: t.Optional[str] = None,
1865        storage_format: t.Optional[str] = None,
1866        partitioned_by: t.Optional[t.List[exp.Expression]] = None,
1867        partition_interval_unit: t.Optional[IntervalUnit] = None,
1868        clustered_by: t.Optional[t.List[str]] = None,
1869        table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
1870        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1871        table_description: t.Optional[str] = None,
1872    ) -> t.Optional[exp.Properties]:
1873        """Creates a SQLGlot table properties expression for ddl."""
1874        properties: t.List[exp.Expression] = []
1875
1876        if table_description:
1877            properties.append(
1878                exp.SchemaCommentProperty(
1879                    this=exp.Literal.string(self._truncate_table_comment(table_description))
1880                )
1881            )
1882
1883        if properties:
1884            return exp.Properties(expressions=properties)
1885        return None
1886
1887    def _build_view_properties_exp(
1888        self,
1889        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
1890        table_description: t.Optional[str] = None,
1891    ) -> t.Optional[exp.Properties]:
1892        """Creates a SQLGlot table properties expression for view"""
1893        properties: t.List[exp.Expression] = []
1894
1895        if table_description:
1896            properties.append(
1897                exp.SchemaCommentProperty(
1898                    this=exp.Literal.string(self._truncate_table_comment(table_description))
1899                )
1900            )
1901
1902        if properties:
1903            return exp.Properties(expressions=properties)
1904        return None
1905
1906    def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str:
1907        return comment[:length] if length else comment
1908
1909    def _truncate_table_comment(self, comment: str) -> str:
1910        return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH)
1911
1912    def _truncate_column_comment(self, comment: str) -> str:
1913        return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH)
1914
1915    def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.Any) -> str:
1916        """
1917        Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default
1918        kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine
1919        adapter, and then finally kwargs provided by the user when calling this method.
1920        """
1921        sql_gen_kwargs = {
1922            "dialect": self.dialect,
1923            "pretty": False,
1924            "comments": False,
1925            **self.sql_gen_kwargs,
1926            **kwargs,
1927        }
1928
1929        expression = expression.copy()
1930
1931        if quote:
1932            quote_identifiers(expression)
1933
1934        return expression.sql(**sql_gen_kwargs, copy=False)  # type: ignore
1935
1936    def _get_data_objects(
1937        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1938    ) -> t.List[DataObject]:
1939        """
1940        Returns all the data objects that exist in the given schema and optionally catalog.
1941        """
1942        raise NotImplementedError()
1943
1944    def _get_temp_table(self, table: TableName, table_only: bool = False) -> exp.Table:
1945        """
1946        Returns the name of the temp table that should be used for the given table name.
1947        """
1948        table = t.cast(exp.Table, exp.to_table(table).copy())
1949        table.set(
1950            "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=True)
1951        )
1952
1953        if table_only:
1954            table.set("db", None)
1955            table.set("catalog", None)
1956
1957        return table
1958
1959    def _order_projections_and_filter(
1960        self,
1961        query: Query,
1962        columns_to_types: t.Dict[str, exp.DataType],
1963        where: t.Optional[exp.Expression] = None,
1964    ) -> Query:
1965        if not isinstance(query, exp.Query) or (
1966            not where and query.named_selects == list(columns_to_types)
1967        ):
1968            return query
1969
1970        query = t.cast(exp.Query, query.copy())
1971        with_ = query.args.pop("with", None)
1972        query = self._select_columns(columns_to_types).from_(
1973            query.subquery("_subquery", copy=False), copy=False
1974        )
1975        if where:
1976            query = query.where(where, copy=False)
1977
1978        if with_:
1979            query.set("with", with_)
1980
1981        return query
1982
1983    def _truncate_table(self, table_name: TableName) -> None:
1984        table = exp.to_table(table_name)
1985        self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}")
1986
1987    def _build_create_comment_table_exp(
1988        self, table: exp.Table, table_comment: str, table_kind: str
1989    ) -> exp.Comment | str:
1990        return exp.Comment(
1991            this=table,
1992            kind=table_kind,
1993            expression=exp.Literal.string(self._truncate_table_comment(table_comment)),
1994        )
1995
1996    def _create_table_comment(
1997        self, table_name: TableName, table_comment: str, table_kind: str = "TABLE"
1998    ) -> None:
1999        table = exp.to_table(table_name)
2000
2001        try:
2002            self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind))
2003        except Exception:
2004            logger.warning(
2005                f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions.",
2006                exc_info=True,
2007            )
2008
2009    def _build_create_comment_column_exp(
2010        self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE"
2011    ) -> exp.Comment | str:
2012        return exp.Comment(
2013            this=exp.column(column_name, *reversed(table.parts)),  # type: ignore
2014            kind="COLUMN",
2015            expression=exp.Literal.string(self._truncate_column_comment(column_comment)),
2016        )
2017
2018    def _create_column_comments(
2019        self,
2020        table_name: TableName,
2021        column_comments: t.Dict[str, str],
2022        table_kind: str = "TABLE",
2023    ) -> None:
2024        table = exp.to_table(table_name)
2025
2026        for col, comment in column_comments.items():
2027            try:
2028                self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind))
2029            except Exception:
2030                logger.warning(
2031                    f"Column comments for table '{table.alias_or_name}' not registered - this may be due to limited permissions.",
2032                    exc_info=True,
2033                )
2034
2035    def _rename_table(
2036        self,
2037        old_table_name: TableName,
2038        new_table_name: TableName,
2039    ) -> None:
2040        self.execute(exp.rename_table(old_table_name, new_table_name))
2041
2042    @classmethod
2043    def _select_columns(cls, columns: t.Iterable[str]) -> exp.Select:
2044        return exp.select(*(exp.column(c, quoted=True) for c in columns))

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
EngineAdapter( connection_factory: Callable[[], Any], dialect: str = '', sql_gen_kwargs: 't.Optional[t.Dict[str, Dialect | bool | str]]' = None, multithreaded: bool = False, cursor_kwargs: Union[Dict[str, Any], NoneType] = None, cursor_init: Union[Callable[[Any], NoneType], NoneType] = None, default_catalog: Union[str, NoneType] = None, execute_log_level: int = 10, register_comments: bool = True, **kwargs: Any)
102    def __init__(
103        self,
104        connection_factory: t.Callable[[], t.Any],
105        dialect: str = "",
106        sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None,
107        multithreaded: bool = False,
108        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
109        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
110        default_catalog: t.Optional[str] = None,
111        execute_log_level: int = logging.DEBUG,
112        register_comments: bool = True,
113        **kwargs: t.Any,
114    ):
115        self.dialect = dialect.lower() or self.DIALECT
116        self._connection_pool = create_connection_pool(
117            connection_factory, multithreaded, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
118        )
119        self.sql_gen_kwargs = sql_gen_kwargs or {}
120        self._default_catalog = default_catalog
121        self._execute_log_level = execute_log_level
122        self._extra_config = kwargs
123        self.register_comments = register_comments
COMMENT_CREATION_TABLE = <CommentCreationTable.IN_SCHEMA_DEF_CTAS: 2>
COMMENT_CREATION_VIEW = <CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS: 2>
INSERT_OVERWRITE_STRATEGY = <InsertOverwriteStrategy.DELETE_INSERT: 1>
CATALOG_SUPPORT = <CatalogSupport.UNSUPPORTED: 1>
def with_log_level(self, level: int) -> sqlmesh.core.engine_adapter.base.EngineAdapter:
125    def with_log_level(self, level: int) -> EngineAdapter:
126        adapter = self.__class__(
127            lambda: None,
128            dialect=self.dialect,
129            sql_gen_kwargs=self.sql_gen_kwargs,
130            default_catalog=self._default_catalog,
131            execute_log_level=level,
132            register_comments=self.register_comments,
133            **self._extra_config,
134        )
135
136        adapter._connection_pool = self._connection_pool
137
138        return adapter
@classmethod
def is_pandas_df(cls, value: Any) -> bool:
152    @classmethod
153    def is_pandas_df(cls, value: t.Any) -> bool:
154        return isinstance(value, pd.DataFrame)
def recycle(self) -> None:
250    def recycle(self) -> None:
251        """Closes all open connections and releases all allocated resources associated with any thread
252        except the calling one."""
253        self._connection_pool.close_all(exclude_calling_thread=True)

Closes all open connections and releases all allocated resources associated with any thread except the calling one.

def close(self) -> Any:
255    def close(self) -> t.Any:
256        """Closes all open connections and releases all allocated resources."""
257        self._connection_pool.close_all()

Closes all open connections and releases all allocated resources.

def get_current_catalog(self) -> Union[str, NoneType]:
259    def get_current_catalog(self) -> t.Optional[str]:
260        """Returns the catalog name of the current connection."""
261        raise NotImplementedError()

Returns the catalog name of the current connection.

def set_current_catalog(self, catalog: str) -> None:
263    def set_current_catalog(self, catalog: str) -> None:
264        """Sets the catalog name of the current connection."""
265        raise NotImplementedError()

Sets the catalog name of the current connection.

def get_catalog_type(self, catalog: Union[str, NoneType]) -> str:
267    def get_catalog_type(self, catalog: t.Optional[str]) -> str:
268        """Intended to be overridden for data virtualization systems like Trino that,
269        depending on the target catalog, require slightly different properties to be set when creating / updating tables
270        """
271        if self.CATALOG_SUPPORT.is_unsupported:
272            raise UnsupportedCatalogOperationError(
273                f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}"
274            )
275        return self.DEFAULT_CATALOG_TYPE

Intended to be overridden for data virtualization systems like Trino that, depending on the target catalog, require slightly different properties to be set when creating / updating tables

def replace_query( self, table_name: <MagicMock id='139832422884640'>, query_or_df: <MagicMock id='139832422909072'>, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, table_description: Union[str, NoneType] = None, column_descriptions: Union[Dict[str, str], NoneType] = None, **kwargs: Any) -> None:
281    def replace_query(
282        self,
283        table_name: TableName,
284        query_or_df: QueryOrDF,
285        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
286        table_description: t.Optional[str] = None,
287        column_descriptions: t.Optional[t.Dict[str, str]] = None,
288        **kwargs: t.Any,
289    ) -> None:
290        """Replaces an existing table with a query.
291
292        For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.
293
294        Args:
295            table_name: The name of the table (eg. prod.table)
296            query_or_df: The SQL query to run or a dataframe.
297            columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type.
298                Expected to be ordered to match the order of values in the dataframe.
299            kwargs: Optional create table properties.
300        """
301        target_table = exp.to_table(table_name)
302        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
303            query_or_df, columns_to_types, target_table=target_table
304        )
305        columns_to_types = columns_to_types or self.columns(target_table)
306        query = source_queries[0].query_factory()
307        self_referencing = any(
308            quote_identifiers(table) == quote_identifiers(target_table)
309            for table in query.find_all(exp.Table)
310        )
311        # If a query references itself then it must have a table created regardless of approach used.
312        if self_referencing:
313            self._create_table_from_columns(
314                target_table,
315                columns_to_types,
316                exists=True,
317                table_description=table_description,
318                column_descriptions=column_descriptions,
319            )
320        # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we
321        # use `CREATE OR REPLACE TABLE AS` if the engine supports it
322        if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table):
323            return self._create_table_from_source_queries(
324                target_table,
325                source_queries,
326                columns_to_types,
327                replace=self.SUPPORTS_REPLACE_TABLE,
328                table_description=table_description,
329                column_descriptions=column_descriptions,
330                **kwargs,
331            )
332        else:
333            if self_referencing:
334                with self.temp_table(
335                    self._select_columns(columns_to_types).from_(target_table),
336                    name=target_table,
337                    columns_to_types=columns_to_types,
338                    **kwargs,
339                ) as temp_table:
340                    for source_query in source_queries:
341                        source_query.add_transform(
342                            lambda node: (  # type: ignore
343                                temp_table  # type: ignore
344                                if isinstance(node, exp.Table)
345                                and quote_identifiers(node) == quote_identifiers(target_table)
346                                else node
347                            )
348                        )
349                    return self._insert_overwrite_by_condition(
350                        target_table,
351                        source_queries,
352                        columns_to_types,
353                    )
354            return self._insert_overwrite_by_condition(
355                target_table,
356                source_queries,
357                columns_to_types,
358            )

Replaces an existing table with a query.

For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.

Arguments:
  • table_name: The name of the table (eg. prod.table)
  • query_or_df: The SQL query to run or a dataframe.
  • columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. Expected to be ordered to match the order of values in the dataframe.
  • kwargs: Optional create table properties.
def create_index( self, table_name: <MagicMock id='139832422921168'>, index_name: str, columns: Tuple[str, ...], exists: bool = True) -> None:
360    def create_index(
361        self,
362        table_name: TableName,
363        index_name: str,
364        columns: t.Tuple[str, ...],
365        exists: bool = True,
366    ) -> None:
367        """Creates a new index for the given table if supported
368
369        Args:
370            table_name: The name of the target table.
371            index_name: The name of the index.
372            columns: The list of columns that constitute the index.
373            exists: Indicates whether to include the IF NOT EXISTS check.
374        """
375        if not self.SUPPORTS_INDEXES:
376            return
377
378        expression = exp.Create(
379            this=exp.Index(
380                this=exp.to_identifier(index_name),
381                table=exp.to_table(table_name),
382                params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]),
383            ),
384            kind="INDEX",
385            exists=exists,
386        )
387        self.execute(expression)

Creates a new index for the given table if supported

Arguments:
  • table_name: The name of the target table.
  • index_name: The name of the index.
  • columns: The list of columns that constitute the index.
  • exists: Indicates whether to include the IF NOT EXISTS check.
def create_table( self, table_name: <MagicMock id='139832423035568'>, columns_to_types: Dict[str, sqlglot.expressions.DataType], primary_key: Union[Tuple[str, ...], NoneType] = None, exists: bool = True, table_description: Union[str, NoneType] = None, column_descriptions: Union[Dict[str, str], NoneType] = None, **kwargs: Any) -> None:
389    def create_table(
390        self,
391        table_name: TableName,
392        columns_to_types: t.Dict[str, exp.DataType],
393        primary_key: t.Optional[t.Tuple[str, ...]] = None,
394        exists: bool = True,
395        table_description: t.Optional[str] = None,
396        column_descriptions: t.Optional[t.Dict[str, str]] = None,
397        **kwargs: t.Any,
398    ) -> None:
399        """Create a table using a DDL statement
400
401        Args:
402            table_name: The name of the table to create. Can be fully qualified or just table name.
403            columns_to_types: A mapping between the column name and its data type.
404            primary_key: Determines the table primary key.
405            exists: Indicates whether to include the IF NOT EXISTS check.
406            table_description: Optional table description from MODEL DDL.
407            column_descriptions: Optional column descriptions from model query.
408            kwargs: Optional create table properties.
409        """
410        self._create_table_from_columns(
411            table_name,
412            columns_to_types,
413            primary_key,
414            exists,
415            table_description,
416            column_descriptions,
417            **kwargs,
418        )

Create a table using a DDL statement

Arguments:
  • table_name: The name of the table to create. Can be fully qualified or just table name.
  • columns_to_types: A mapping between the column name and its data type.
  • primary_key: Determines the table primary key.
  • exists: Indicates whether to include the IF NOT EXISTS check.
  • table_description: Optional table description from MODEL DDL.
  • column_descriptions: Optional column descriptions from model query.
  • kwargs: Optional create table properties.
def ctas( self, table_name: <MagicMock id='139832422712128'>, query_or_df: <MagicMock id='139832422732464'>, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, exists: bool = True, table_description: Union[str, NoneType] = None, column_descriptions: Union[Dict[str, str], NoneType] = None, **kwargs: Any) -> None:
420    def ctas(
421        self,
422        table_name: TableName,
423        query_or_df: QueryOrDF,
424        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
425        exists: bool = True,
426        table_description: t.Optional[str] = None,
427        column_descriptions: t.Optional[t.Dict[str, str]] = None,
428        **kwargs: t.Any,
429    ) -> None:
430        """Create a table using a CTAS statement
431
432        Args:
433            table_name: The name of the table to create. Can be fully qualified or just table name.
434            query_or_df: The SQL query to run or a dataframe for the CTAS.
435            columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
436            exists: Indicates whether to include the IF NOT EXISTS check.
437            table_description: Optional table description from MODEL DDL.
438            column_descriptions: Optional column descriptions from model query.
439            kwargs: Optional create table properties.
440        """
441        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
442            query_or_df, columns_to_types, target_table=table_name
443        )
444        return self._create_table_from_source_queries(
445            table_name,
446            source_queries,
447            columns_to_types,
448            exists,
449            table_description=table_description,
450            column_descriptions=column_descriptions,
451            **kwargs,
452        )

Create a table using a CTAS statement

Arguments:
  • table_name: The name of the table to create. Can be fully qualified or just table name.
  • query_or_df: The SQL query to run or a dataframe for the CTAS.
  • columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
  • exists: Indicates whether to include the IF NOT EXISTS check.
  • table_description: Optional table description from MODEL DDL.
  • column_descriptions: Optional column descriptions from model query.
  • kwargs: Optional create table properties.
def create_state_table( self, table_name: str, columns_to_types: Dict[str, sqlglot.expressions.DataType], primary_key: Union[Tuple[str, ...], NoneType] = None) -> None:
454    def create_state_table(
455        self,
456        table_name: str,
457        columns_to_types: t.Dict[str, exp.DataType],
458        primary_key: t.Optional[t.Tuple[str, ...]] = None,
459    ) -> None:
460        """Create a table to store SQLMesh internal state.
461
462        Args:
463            table_name: The name of the table to create. Can be fully qualified or just table name.
464            columns_to_types: A mapping between the column name and its data type.
465            primary_key: Determines the table primary key.
466        """
467        self.create_table(
468            table_name,
469            columns_to_types,
470            primary_key=primary_key,
471        )

Create a table to store SQLMesh internal state.

Arguments:
  • table_name: The name of the table to create. Can be fully qualified or just table name.
  • columns_to_types: A mapping between the column name and its data type.
  • primary_key: Determines the table primary key.
def create_table_like( self, target_table_name: <MagicMock id='139832422709136'>, source_table_name: <MagicMock id='139832422721232'>, exists: bool = True) -> None:
719    def create_table_like(
720        self,
721        target_table_name: TableName,
722        source_table_name: TableName,
723        exists: bool = True,
724    ) -> None:
725        """
726        Create a table like another table or view.
727        """
728        target_table = exp.to_table(target_table_name)
729        source_table = exp.to_table(source_table_name)
730        create_expression = exp.Create(
731            this=target_table,
732            kind="TABLE",
733            exists=exists,
734            properties=exp.Properties(
735                expressions=[
736                    exp.LikeProperty(this=source_table),
737                ]
738            ),
739        )
740        self.execute(create_expression)

Create a table like another table or view.

def clone_table( self, target_table_name: <MagicMock id='139832422786960'>, source_table_name: <MagicMock id='139832422799056'>, replace: bool = False, clone_kwargs: Union[Dict[str, Any], NoneType] = None, **kwargs: Any) -> None:
742    def clone_table(
743        self,
744        target_table_name: TableName,
745        source_table_name: TableName,
746        replace: bool = False,
747        clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
748        **kwargs: t.Any,
749    ) -> None:
750        """Creates a table with the target name by cloning the source table.
751
752        Args:
753            target_table_name: The name of the table that should be created.
754            source_table_name: The name of the source table that should be cloned.
755            replace: Whether or not to replace an existing table.
756        """
757        if not self.SUPPORTS_CLONING:
758            raise NotImplementedError(f"Engine does not support cloning: {type(self)}")
759        self.execute(
760            exp.Create(
761                this=exp.to_table(target_table_name),
762                kind="TABLE",
763                replace=replace,
764                clone=exp.Clone(
765                    this=exp.to_table(source_table_name),
766                    **(clone_kwargs or {}),
767                ),
768                **kwargs,
769            )
770        )

Creates a table with the target name by cloning the source table.

Arguments:
  • target_table_name: The name of the table that should be created.
  • source_table_name: The name of the source table that should be cloned.
  • replace: Whether or not to replace an existing table.
def drop_table( self, table_name: <MagicMock id='139832422817168'>, exists: bool = True) -> None:
772    def drop_table(self, table_name: TableName, exists: bool = True) -> None:
773        """Drops a table.
774
775        Args:
776            table_name: The name of the table to drop.
777            exists: If exists, defaults to True.
778        """
779        drop_expression = exp.Drop(this=exp.to_table(table_name), kind="TABLE", exists=exists)
780        self.execute(drop_expression)

Drops a table.

Arguments:
  • table_name: The name of the table to drop.
  • exists: If exists, defaults to True.
def alter_table( self, current_table_name: <MagicMock id='139832422610160'>, target_table_name: <MagicMock id='139832422650928'>) -> None:
782    def alter_table(
783        self,
784        current_table_name: TableName,
785        target_table_name: TableName,
786    ) -> None:
787        """
788        Performs the required alter statements to change the current table into the structure of the target table.
789        """
790        with self.transaction():
791            for alter_expression in self.SCHEMA_DIFFER.compare_columns(
792                current_table_name,
793                self.columns(current_table_name),
794                self.columns(target_table_name),
795            ):
796                self.execute(alter_expression)

Performs the required alter statements to change the current table into the structure of the target table.

def create_view( self, view_name: <MagicMock id='139832422360640'>, query_or_df: <MagicMock id='139832422380928'>, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, replace: bool = True, materialized: bool = False, table_description: Union[str, NoneType] = None, column_descriptions: Union[Dict[str, str], NoneType] = None, view_properties: Union[Dict[str, sqlglot.expressions.Expression], NoneType] = None, **create_kwargs: Any) -> None:
798    def create_view(
799        self,
800        view_name: TableName,
801        query_or_df: QueryOrDF,
802        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
803        replace: bool = True,
804        materialized: bool = False,
805        table_description: t.Optional[str] = None,
806        column_descriptions: t.Optional[t.Dict[str, str]] = None,
807        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
808        **create_kwargs: t.Any,
809    ) -> None:
810        """Create a view with a query or dataframe.
811
812        If a dataframe is passed in, it will be converted into a literal values statement.
813        This should only be done if the dataframe is very small!
814
815        Args:
816            view_name: The view name.
817            query_or_df: A query or dataframe.
818            columns_to_types: Columns to use in the view statement.
819            replace: Whether or not to replace an existing view defaults to True.
820            materialized: Whether to create a a materialized view. Only used for engines that support this feature.
821            table_description: Optional table description from MODEL DDL.
822            column_descriptions: Optional column descriptions from model query.
823            view_properties: Optional view properties to add to the view.
824            create_kwargs: Additional kwargs to pass into the Create expression
825        """
826        if self.is_pandas_df(query_or_df):
827            values = list(t.cast(pd.DataFrame, query_or_df).itertuples(index=False, name=None))
828            columns_to_types = columns_to_types or self._columns_to_types(query_or_df)
829            if not columns_to_types:
830                raise SQLMeshError("columns_to_types must be provided for dataframes")
831            query_or_df = self._values_to_sql(
832                values,
833                columns_to_types,
834                batch_start=0,
835                batch_end=len(values),
836            )
837
838        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
839            query_or_df, columns_to_types, batch_size=0, target_table=view_name
840        )
841        if len(source_queries) != 1:
842            raise SQLMeshError("Only one source query is supported for creating views")
843
844        schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name)
845        if columns_to_types:
846            schema = self._build_schema_exp(
847                exp.to_table(view_name), columns_to_types, column_descriptions, is_view=True
848            )
849
850        properties = create_kwargs.pop("properties", None)
851        if not properties:
852            properties = exp.Properties(expressions=[])
853
854        if materialized and self.SUPPORTS_MATERIALIZED_VIEWS:
855            properties.append("expressions", exp.MaterializedProperty())
856
857            if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema):
858                schema = schema.this
859
860        create_view_properties = self._build_view_properties_exp(
861            view_properties,
862            (
863                table_description
864                if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled
865                else None
866            ),
867        )
868        if create_view_properties:
869            for view_property in create_view_properties.expressions:
870                properties.append("expressions", view_property)
871
872        if properties.expressions:
873            create_kwargs["properties"] = properties
874
875        with source_queries[0] as query:
876            self.execute(
877                exp.Create(
878                    this=schema,
879                    kind="VIEW",
880                    replace=replace,
881                    expression=query,
882                    **create_kwargs,
883                ),
884                quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS,
885            )
886
887        # Register table comment with commands if the engine doesn't support doing it in CREATE
888        if (
889            table_description
890            and self.COMMENT_CREATION_VIEW.is_comment_command_only
891            and self.comments_enabled
892        ):
893            self._create_table_comment(view_name, table_description, "VIEW")
894        # Register column comments with commands if the engine doesn't support doing it in
895        # CREATE or we couldn't do it in the CREATE schema definition because we don't have
896        # columns_to_types
897        if (
898            column_descriptions
899            and (
900                self.COMMENT_CREATION_VIEW.is_comment_command_only
901                or (
902                    self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands
903                    and not columns_to_types
904                )
905            )
906            and self.comments_enabled
907        ):
908            self._create_column_comments(view_name, column_descriptions, "VIEW")

Create a view with a query or dataframe.

If a dataframe is passed in, it will be converted into a literal values statement. This should only be done if the dataframe is very small!

Arguments:
  • view_name: The view name.
  • query_or_df: A query or dataframe.
  • columns_to_types: Columns to use in the view statement.
  • replace: Whether or not to replace an existing view defaults to True.
  • materialized: Whether to create a a materialized view. Only used for engines that support this feature.
  • table_description: Optional table description from MODEL DDL.
  • column_descriptions: Optional column descriptions from model query.
  • view_properties: Optional view properties to add to the view.
  • create_kwargs: Additional kwargs to pass into the Create expression
@set_catalog()
def create_schema( self, schema_name: <MagicMock id='139832422403952'>, ignore_if_exists: bool = True, warn_on_error: bool = True) -> None:
910    @set_catalog()
911    def create_schema(
912        self,
913        schema_name: SchemaName,
914        ignore_if_exists: bool = True,
915        warn_on_error: bool = True,
916    ) -> None:
917        """Create a schema from a name or qualified table name."""
918        try:
919            self.execute(
920                exp.Create(
921                    this=to_schema(schema_name),
922                    kind="SCHEMA",
923                    exists=ignore_if_exists,
924                )
925            )
926        except Exception as e:
927            if not warn_on_error:
928                raise
929            logger.warning("Failed to create schema '%s': %s", schema_name, e)

Create a schema from a name or qualified table name.

def drop_schema( self, schema_name: <MagicMock id='139832422446656'>, ignore_if_not_exists: bool = True, cascade: bool = False) -> None:
931    def drop_schema(
932        self,
933        schema_name: SchemaName,
934        ignore_if_not_exists: bool = True,
935        cascade: bool = False,
936    ) -> None:
937        self.execute(
938            exp.Drop(
939                this=to_schema(schema_name),
940                kind="SCHEMA",
941                exists=ignore_if_not_exists,
942                cascade=cascade,
943            )
944        )
def drop_view( self, view_name: <MagicMock id='139832422453056'>, ignore_if_not_exists: bool = True, materialized: bool = False, **kwargs: Any) -> None:
946    def drop_view(
947        self,
948        view_name: TableName,
949        ignore_if_not_exists: bool = True,
950        materialized: bool = False,
951        **kwargs: t.Any,
952    ) -> None:
953        """Drop a view."""
954        self.execute(
955            exp.Drop(
956                this=exp.to_table(view_name),
957                exists=ignore_if_not_exists,
958                materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS,
959                kind="VIEW",
960                **kwargs,
961            )
962        )

Drop a view.

def columns( self, table_name: <MagicMock id='139832422506656'>, include_pseudo_columns: bool = False) -> Dict[str, sqlglot.expressions.DataType]:
964    def columns(
965        self, table_name: TableName, include_pseudo_columns: bool = False
966    ) -> t.Dict[str, exp.DataType]:
967        """Fetches column names and types for the target table."""
968        self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
969        describe_output = self.cursor.fetchall()
970        return {
971            # Note: MySQL  returns the column type as bytes.
972            column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect)
973            for column_name, column_type, *_ in itertools.takewhile(
974                lambda t: not t[0].startswith("#"),
975                describe_output,
976            )
977            if column_name and column_name.strip() and column_type and column_type.strip()
978        }

Fetches column names and types for the target table.

def table_exists(self, table_name: <MagicMock id='139832422524576'>) -> bool:
980    def table_exists(self, table_name: TableName) -> bool:
981        try:
982            self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
983            return True
984        except Exception:
985            return False
def delete_from( self, table_name: <MagicMock id='139832422538112'>, where: Union[str, sqlglot.expressions.Expression]) -> None:
987    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
988        self.execute(exp.delete(table_name, where))
def insert_append( self, table_name: <MagicMock id='139832422564912'>, query_or_df: <MagicMock id='139832422060912'>, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None) -> None:
990    def insert_append(
991        self,
992        table_name: TableName,
993        query_or_df: QueryOrDF,
994        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
995    ) -> None:
996        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
997            query_or_df, columns_to_types, target_table=table_name
998        )
999        self._insert_append_source_queries(table_name, source_queries, columns_to_types)
def insert_overwrite_by_partition( self, table_name: <MagicMock id='139832422075072'>, query_or_df: <MagicMock id='139832422103552'>, partitioned_by: List[sqlglot.expressions.Expression], columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None) -> None:
1024    def insert_overwrite_by_partition(
1025        self,
1026        table_name: TableName,
1027        query_or_df: QueryOrDF,
1028        partitioned_by: t.List[exp.Expression],
1029        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1030    ) -> None:
1031        raise NotImplementedError(
1032            "Insert Overwrite by Partition (not time) is not supported by this engine"
1033        )
def insert_overwrite_by_time_partition( self, table_name: <MagicMock id='139832422151552'>, query_or_df: <MagicMock id='139832422167744'>, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], time_formatter: Callable[[Union[datetime.date, datetime.datetime, str, int, float], Union[Dict[str, sqlglot.expressions.DataType], NoneType]], sqlglot.expressions.Expression], time_column: 'TimeColumn | exp.Expression | str', columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, **kwargs: Any) -> None:
1035    def insert_overwrite_by_time_partition(
1036        self,
1037        table_name: TableName,
1038        query_or_df: QueryOrDF,
1039        start: TimeLike,
1040        end: TimeLike,
1041        time_formatter: t.Callable[
1042            [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression
1043        ],
1044        time_column: TimeColumn | exp.Expression | str,
1045        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1046        **kwargs: t.Any,
1047    ) -> None:
1048        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1049            query_or_df, columns_to_types, target_table=table_name
1050        )
1051        columns_to_types = columns_to_types or self.columns(table_name)
1052        low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)]
1053        if isinstance(time_column, TimeColumn):
1054            time_column = time_column.column
1055        where = exp.Between(
1056            this=exp.to_column(time_column) if isinstance(time_column, str) else time_column,
1057            low=low,
1058            high=high,
1059        )
1060        self._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where)
def update_table( self, table_name: <MagicMock id='139832422150496'>, properties: Dict[str, Any], where: 't.Optional[str | exp.Condition]' = None) -> None:
1121    def update_table(
1122        self,
1123        table_name: TableName,
1124        properties: t.Dict[str, t.Any],
1125        where: t.Optional[str | exp.Condition] = None,
1126    ) -> None:
1127        self.execute(exp.update(table_name, properties, where=where))
def scd_type_2_by_time( self, target_table: <MagicMock id='139832422233088'>, source_table: <MagicMock id='139832422245232'>, unique_key: Sequence[sqlglot.expressions.Expression], valid_from_col: sqlglot.expressions.Column, valid_to_col: sqlglot.expressions.Column, execution_time: Union[datetime.date, datetime.datetime, str, int, float], updated_at_col: sqlglot.expressions.Column, invalidate_hard_deletes: bool = True, updated_at_as_valid_from: bool = False, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, table_description: Union[str, NoneType] = None, column_descriptions: Union[Dict[str, str], NoneType] = None, truncate: bool = False, **kwargs: Any) -> None:
1149    def scd_type_2_by_time(
1150        self,
1151        target_table: TableName,
1152        source_table: QueryOrDF,
1153        unique_key: t.Sequence[exp.Expression],
1154        valid_from_col: exp.Column,
1155        valid_to_col: exp.Column,
1156        execution_time: TimeLike,
1157        updated_at_col: exp.Column,
1158        invalidate_hard_deletes: bool = True,
1159        updated_at_as_valid_from: bool = False,
1160        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1161        table_description: t.Optional[str] = None,
1162        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1163        truncate: bool = False,
1164        **kwargs: t.Any,
1165    ) -> None:
1166        self._scd_type_2(
1167            target_table=target_table,
1168            source_table=source_table,
1169            unique_key=unique_key,
1170            valid_from_col=valid_from_col,
1171            valid_to_col=valid_to_col,
1172            execution_time=execution_time,
1173            updated_at_col=updated_at_col,
1174            invalidate_hard_deletes=invalidate_hard_deletes,
1175            updated_at_as_valid_from=updated_at_as_valid_from,
1176            columns_to_types=columns_to_types,
1177            table_description=table_description,
1178            column_descriptions=column_descriptions,
1179            truncate=truncate,
1180        )
def scd_type_2_by_column( self, target_table: <MagicMock id='139832422292880'>, source_table: <MagicMock id='139832421522688'>, unique_key: Sequence[sqlglot.expressions.Expression], valid_from_col: sqlglot.expressions.Column, valid_to_col: sqlglot.expressions.Column, execution_time: Union[datetime.date, datetime.datetime, str, int, float], check_columns: Union[sqlglot.expressions.Star, Sequence[sqlglot.expressions.Column]], invalidate_hard_deletes: bool = True, execution_time_as_valid_from: bool = False, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, table_description: Union[str, NoneType] = None, column_descriptions: Union[Dict[str, str], NoneType] = None, truncate: bool = False, **kwargs: Any) -> None:
1182    def scd_type_2_by_column(
1183        self,
1184        target_table: TableName,
1185        source_table: QueryOrDF,
1186        unique_key: t.Sequence[exp.Expression],
1187        valid_from_col: exp.Column,
1188        valid_to_col: exp.Column,
1189        execution_time: TimeLike,
1190        check_columns: t.Union[exp.Star, t.Sequence[exp.Column]],
1191        invalidate_hard_deletes: bool = True,
1192        execution_time_as_valid_from: bool = False,
1193        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1194        table_description: t.Optional[str] = None,
1195        column_descriptions: t.Optional[t.Dict[str, str]] = None,
1196        truncate: bool = False,
1197        **kwargs: t.Any,
1198    ) -> None:
1199        self._scd_type_2(
1200            target_table=target_table,
1201            source_table=source_table,
1202            unique_key=unique_key,
1203            valid_from_col=valid_from_col,
1204            valid_to_col=valid_to_col,
1205            execution_time=execution_time,
1206            check_columns=check_columns,
1207            columns_to_types=columns_to_types,
1208            invalidate_hard_deletes=invalidate_hard_deletes,
1209            execution_time_as_valid_from=execution_time_as_valid_from,
1210            table_description=table_description,
1211            column_descriptions=column_descriptions,
1212            truncate=truncate,
1213        )
def merge( self, target_table: <MagicMock id='139832421573584'>, source_table: <MagicMock id='139832421589728'>, columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType], unique_key: Sequence[sqlglot.expressions.Expression], when_matched: Union[sqlglot.expressions.When, NoneType] = None) -> None:
1554    def merge(
1555        self,
1556        target_table: TableName,
1557        source_table: QueryOrDF,
1558        columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
1559        unique_key: t.Sequence[exp.Expression],
1560        when_matched: t.Optional[exp.When] = None,
1561    ) -> None:
1562        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1563            source_table, columns_to_types, target_table=target_table
1564        )
1565        columns_to_types = columns_to_types or self.columns(target_table)
1566        on = exp.and_(
1567            *(
1568                add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS))
1569                for part in unique_key
1570            )
1571        )
1572        if not when_matched:
1573            when_matched = exp.When(
1574                matched=True,
1575                source=False,
1576                then=exp.Update(
1577                    expressions=[
1578                        exp.column(col, MERGE_TARGET_ALIAS).eq(exp.column(col, MERGE_SOURCE_ALIAS))
1579                        for col in columns_to_types
1580                    ],
1581                ),
1582            )
1583        when_not_matched = exp.When(
1584            matched=False,
1585            source=False,
1586            then=exp.Insert(
1587                this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]),
1588                expression=exp.Tuple(
1589                    expressions=[exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types]
1590                ),
1591            ),
1592        )
1593        for source_query in source_queries:
1594            with source_query as query:
1595                self._merge(
1596                    target_table=target_table,
1597                    query=query,
1598                    on=on,
1599                    match_expressions=[when_matched, when_not_matched],
1600                )
def rename_table( self, old_table_name: <MagicMock id='139832421612608'>, new_table_name: <MagicMock id='139832421636896'>) -> None:
1602    def rename_table(
1603        self,
1604        old_table_name: TableName,
1605        new_table_name: TableName,
1606    ) -> None:
1607        new_table = exp.to_table(new_table_name)
1608        if new_table.catalog:
1609            old_table = exp.to_table(old_table_name)
1610            catalog = old_table.catalog or self.get_current_catalog()
1611            if catalog != new_table.catalog:
1612                raise UnsupportedCatalogOperationError(
1613                    "Tried to rename table across catalogs which is not supported"
1614                )
1615        self._rename_table(old_table_name, new_table_name)
def get_data_objects( self, schema_name: <MagicMock id='139832421670096'>, object_names: Union[Set[str], NoneType] = None) -> List[sqlmesh.core.engine_adapter.shared.DataObject]:
1617    def get_data_objects(
1618        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1619    ) -> t.List[DataObject]:
1620        """Lists all data objects in the target schema.
1621
1622        Args:
1623            schema_name: The name of the schema to list data objects from.
1624            object_names: If provided, only return data objects with these names.
1625
1626        Returns:
1627            A list of data objects in the target schema.
1628        """
1629        if object_names is not None:
1630            if not object_names:
1631                return []
1632            object_names_list = list(object_names)
1633            batches = [
1634                object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE]
1635                for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE)
1636            ]
1637            return [
1638                obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch))
1639            ]
1640        return self._get_data_objects(schema_name)

Lists all data objects in the target schema.

Arguments:
  • schema_name: The name of the schema to list data objects from.
  • object_names: If provided, only return data objects with these names.
Returns:

A list of data objects in the target schema.

def fetchone( self, query: Union[sqlglot.expressions.Expression, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> Tuple:
1642    def fetchone(
1643        self,
1644        query: t.Union[exp.Expression, str],
1645        ignore_unsupported_errors: bool = False,
1646        quote_identifiers: bool = False,
1647    ) -> t.Tuple:
1648        with self.transaction():
1649            self.execute(
1650                query,
1651                ignore_unsupported_errors=ignore_unsupported_errors,
1652                quote_identifiers=quote_identifiers,
1653            )
1654            return self.cursor.fetchone()
def fetchall( self, query: Union[sqlglot.expressions.Expression, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> List[Tuple]:
1656    def fetchall(
1657        self,
1658        query: t.Union[exp.Expression, str],
1659        ignore_unsupported_errors: bool = False,
1660        quote_identifiers: bool = False,
1661    ) -> t.List[t.Tuple]:
1662        with self.transaction():
1663            self.execute(
1664                query,
1665                ignore_unsupported_errors=ignore_unsupported_errors,
1666                quote_identifiers=quote_identifiers,
1667            )
1668            return self.cursor.fetchall()
def fetchdf( self, query: Union[sqlglot.expressions.Expression, str], quote_identifiers: bool = False) -> pandas.core.frame.DataFrame:
1678    def fetchdf(
1679        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1680    ) -> pd.DataFrame:
1681        """Fetches a Pandas DataFrame from the cursor"""
1682        df = self._fetch_native_df(query, quote_identifiers=quote_identifiers)
1683        if not isinstance(df, pd.DataFrame):
1684            raise NotImplementedError(
1685                "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned"
1686            )
1687        return df

Fetches a Pandas DataFrame from the cursor

def fetch_pyspark_df( self, query: Union[sqlglot.expressions.Expression, str], quote_identifiers: bool = False) -> <MagicMock id='139832420733328'>:
1689    def fetch_pyspark_df(
1690        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
1691    ) -> PySparkDataFrame:
1692        """Fetches a PySpark DataFrame from the cursor"""
1693        raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")

Fetches a PySpark DataFrame from the cursor

def wap_supported(self, table_name: <MagicMock id='139832420746288'>) -> bool:
1695    def wap_supported(self, table_name: TableName) -> bool:
1696        """Returns whether WAP for the target table is supported."""
1697        return False

Returns whether WAP for the target table is supported.

def wap_table_name(self, table_name: <MagicMock id='139832420767776'>, wap_id: str) -> str:
1699    def wap_table_name(self, table_name: TableName, wap_id: str) -> str:
1700        """Returns the updated table name for the given WAP ID.
1701
1702        Args:
1703            table_name: The name of the target table.
1704            wap_id: The WAP ID to prepare.
1705
1706        Returns:
1707            The updated table name that should be used for writing.
1708        """
1709        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")

Returns the updated table name for the given WAP ID.

Arguments:
  • table_name: The name of the target table.
  • wap_id: The WAP ID to prepare.
Returns:

The updated table name that should be used for writing.

def wap_prepare(self, table_name: <MagicMock id='139832420785168'>, wap_id: str) -> str:
1711    def wap_prepare(self, table_name: TableName, wap_id: str) -> str:
1712        """Prepares the target table for WAP and returns the updated table name.
1713
1714        Args:
1715            table_name: The name of the target table.
1716            wap_id: The WAP ID to prepare.
1717
1718        Returns:
1719            The updated table name that should be used for writing.
1720        """
1721        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")

Prepares the target table for WAP and returns the updated table name.

Arguments:
  • table_name: The name of the target table.
  • wap_id: The WAP ID to prepare.
Returns:

The updated table name that should be used for writing.

def wap_publish(self, table_name: <MagicMock id='139832420818752'>, wap_id: str) -> None:
1723    def wap_publish(self, table_name: TableName, wap_id: str) -> None:
1724        """Publishes changes with the given WAP ID to the target table.
1725
1726        Args:
1727            table_name: The name of the target table.
1728            wap_id: The WAP ID to publish.
1729        """
1730        raise NotImplementedError(f"Engine does not support WAP: {type(self)}")

Publishes changes with the given WAP ID to the target table.

Arguments:
  • table_name: The name of the target table.
  • wap_id: The WAP ID to publish.
@contextlib.contextmanager
def transaction(self, condition: Union[bool, NoneType] = None) -> Iterator[NoneType]:
1732    @contextlib.contextmanager
1733    def transaction(
1734        self,
1735        condition: t.Optional[bool] = None,
1736    ) -> t.Iterator[None]:
1737        """A transaction context manager."""
1738        if (
1739            self._connection_pool.is_transaction_active
1740            or not self.SUPPORTS_TRANSACTIONS
1741            or (condition is not None and not condition)
1742        ):
1743            yield
1744            return
1745        self._connection_pool.begin()
1746        try:
1747            yield
1748        except Exception as e:
1749            self._connection_pool.rollback()
1750            raise e
1751        else:
1752            self._connection_pool.commit()

A transaction context manager.

@contextlib.contextmanager
def session(self, properties: <MagicMock id='139832420873840'>) -> Iterator[NoneType]:
1754    @contextlib.contextmanager
1755    def session(self, properties: SessionProperties) -> t.Iterator[None]:
1756        """A session context manager."""
1757        if self._is_session_active():
1758            yield
1759            return
1760
1761        self._begin_session(properties)
1762        try:
1763            yield
1764        finally:
1765            self._end_session()

A session context manager.

def execute( self, expressions: Union[str, sqlglot.expressions.Expression, Sequence[sqlglot.expressions.Expression]], ignore_unsupported_errors: bool = False, quote_identifiers: bool = True, **kwargs: Any) -> None:
1777    def execute(
1778        self,
1779        expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
1780        ignore_unsupported_errors: bool = False,
1781        quote_identifiers: bool = True,
1782        **kwargs: t.Any,
1783    ) -> None:
1784        """Execute a sql query."""
1785        to_sql_kwargs = (
1786            {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {}
1787        )
1788
1789        with self.transaction():
1790            for e in ensure_list(expressions):
1791                sql = t.cast(
1792                    str,
1793                    (
1794                        self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs)
1795                        if isinstance(e, exp.Expression)
1796                        else e
1797                    ),
1798                )
1799                self._log_sql(sql)
1800                self._execute(sql, **kwargs)

Execute a sql query.

@contextlib.contextmanager
def temp_table( self, query_or_df: <MagicMock id='139832420933696'>, name: <MagicMock id='139832420945728'> = 'diff', columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], NoneType] = None, **kwargs: Any) -> Iterator[sqlglot.expressions.Table]:
1808    @contextlib.contextmanager
1809    def temp_table(
1810        self,
1811        query_or_df: QueryOrDF,
1812        name: TableName = "diff",
1813        columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1814        **kwargs: t.Any,
1815    ) -> t.Iterator[exp.Table]:
1816        """A context manager for working a temp table.
1817
1818        The table will be created with a random guid and cleaned up after the block.
1819
1820        Args:
1821            query_or_df: The query or df to create a temp table for.
1822            name: The base name of the temp table.
1823            columns_to_types: A mapping between the column name and its data type.
1824
1825        Yields:
1826            The table expression
1827        """
1828        source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
1829            query_or_df, columns_to_types=columns_to_types, target_table=name
1830        )
1831
1832        with self.transaction():
1833            table = self._get_temp_table(name)
1834            if table.db:
1835                self.create_schema(schema_(table.args["db"], table.args.get("catalog")))
1836            self._create_table_from_source_queries(
1837                table,
1838                source_queries,
1839                columns_to_types,
1840                exists=True,
1841                table_description=None,
1842                column_descriptions=None,
1843                **kwargs,
1844            )
1845
1846            try:
1847                yield table
1848            finally:
1849                self.drop_table(table)

A context manager for working a temp table.

The table will be created with a random guid and cleaned up after the block.

Arguments:
  • query_or_df: The query or df to create a temp table for.
  • name: The base name of the temp table.
  • columns_to_types: A mapping between the column name and its data type.
Yields:

The table expression

class EngineAdapterWithIndexSupport(EngineAdapter):
2047class EngineAdapterWithIndexSupport(EngineAdapter):
2048    SUPPORTS_INDEXES = True

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.