Edit on GitHub

sqlmesh.core.engine_adapter.clickhouse

  1from __future__ import annotations
  2
  3import typing as t
  4import logging
  5import re
  6from sqlglot import exp, maybe_parse
  7from sqlmesh.core.dialect import to_schema
  8from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
  9from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport
 10from sqlmesh.core.engine_adapter.shared import (
 11    DataObject,
 12    DataObjectType,
 13    EngineRunMode,
 14    SourceQuery,
 15    CommentCreationView,
 16    InsertOverwriteStrategy,
 17)
 18from sqlmesh.core.schema_diff import TableAlterOperation
 19from sqlmesh.utils import get_source_columns_to_types
 20
 21if t.TYPE_CHECKING:
 22    import pandas as pd
 23
 24    from sqlmesh.core._typing import SchemaName, TableName
 25    from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF
 26
 27    from sqlmesh.core.node import IntervalUnit
 28
 29
 30logger = logging.getLogger(__name__)
 31
 32
 33class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
 34    DIALECT = "clickhouse"
 35    SUPPORTS_TRANSACTIONS = False
 36    SUPPORTS_VIEW_SCHEMA = False
 37    SUPPORTS_REPLACE_TABLE = False
 38    COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
 39
 40    SCHEMA_DIFFER_KWARGS = {}
 41
 42    DEFAULT_TABLE_ENGINE = "MergeTree"
 43    ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"
 44
 45    @property
 46    def engine_run_mode(self) -> EngineRunMode:
 47        if self._extra_config.get("cloud_mode"):
 48            return EngineRunMode.CLOUD
 49        # we use the user's specification of a cluster in the connection config to determine if
 50        #   the engine is in cluster mode
 51        if self._extra_config.get("cluster"):
 52            return EngineRunMode.CLUSTER
 53        return EngineRunMode.STANDALONE
 54
 55    @property
 56    def cluster(self) -> t.Optional[str]:
 57        return self._extra_config.get("cluster")
 58
 59    # Workaround for clickhouse-connect cursor bug
 60    # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()`
 61    #     return the wrong (or no) rows after the very first cursor query that returns rows
 62    #     in the connection
 63    # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it
 64    #     doesn't use the row index at all
 65    def fetchone(
 66        self,
 67        query: t.Union[exp.Expr, str],
 68        ignore_unsupported_errors: bool = False,
 69        quote_identifiers: bool = False,
 70    ) -> t.Tuple:
 71        with self.transaction():
 72            self.execute(
 73                query,
 74                ignore_unsupported_errors=ignore_unsupported_errors,
 75                quote_identifiers=quote_identifiers,
 76            )
 77            return self.cursor.fetchall()[0]
 78
 79    def _fetch_native_df(
 80        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 81    ) -> pd.DataFrame:
 82        """Fetches a Pandas DataFrame from the cursor"""
 83        return self.cursor.client.query_df(
 84            self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query,
 85            use_extended_dtypes=True,
 86        )
 87
 88    def _df_to_source_queries(
 89        self,
 90        df: DF,
 91        target_columns_to_types: t.Dict[str, exp.DataType],
 92        batch_size: int,
 93        target_table: TableName,
 94        source_columns: t.Optional[t.List[str]] = None,
 95        **kwargs: t.Any,
 96    ) -> t.List[SourceQuery]:
 97        temp_table = self._get_temp_table(target_table, **kwargs)
 98        source_columns_to_types = get_source_columns_to_types(
 99            target_columns_to_types, source_columns
100        )
101
102        def query_factory() -> Query:
103            # It is possible for the factory to be called multiple times and if so then the temp table will already
104            # be created so we skip creating again. This means we are assuming the first call is the same result
105            # as later calls.
106            if not self.table_exists(temp_table):
107                self.create_table(
108                    temp_table,
109                    source_columns_to_types,
110                    storage_format=exp.var("MergeTree"),
111                    **kwargs,
112                )
113                ordered_df = df[list(source_columns_to_types)]
114
115                self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df)
116
117            return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_(
118                temp_table
119            )
120
121        return [
122            SourceQuery(
123                query_factory=query_factory,
124                cleanup_func=lambda: self.drop_table(temp_table, **kwargs),
125            )
126        ]
127
128    def _get_data_objects(
129        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
130    ) -> t.List[DataObject]:
131        """
132        Returns all the data objects that exist in the given database.
133        """
134        query = (
135            exp.select(
136                exp.column("database").as_("schema_name"),
137                exp.column("name"),
138                exp.case(exp.column("engine"))
139                .when(
140                    exp.Literal.string("View"),
141                    exp.Literal.string("view"),
142                )
143                .else_(
144                    exp.Literal.string("table"),
145                )
146                .as_("type"),
147            )
148            .from_("system.tables")
149            .where(exp.column("database").eq(to_schema(schema_name).db))
150        )
151        if object_names:
152            query = query.where(exp.column("name").isin(*object_names))
153        df = self.fetchdf(query)
154        return [
155            DataObject(
156                catalog=None,
157                schema=row.schema_name,
158                name=row.name,
159                type=DataObjectType.from_str(row.type),  # type: ignore
160            )
161            for row in df.itertuples()
162        ]
163
164    def create_schema(
165        self,
166        schema_name: SchemaName,
167        ignore_if_exists: bool = True,
168        warn_on_error: bool = True,
169        properties: t.List[exp.Expr] = [],
170    ) -> None:
171        """Create a Clickhouse database from a name or qualified table name.
172
173        Clickhouse has a two-level naming scheme [database].[table].
174        """
175        properties_copy = properties.copy()
176        if self.engine_run_mode.is_cluster:
177            properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
178
179        # can't call super() because it will try to set a catalog
180        return self._create_schema(
181            schema_name=schema_name,
182            ignore_if_exists=ignore_if_exists,
183            warn_on_error=warn_on_error,
184            properties=properties_copy,
185            # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message
186            kind="DATABASE",
187        )
188
189    def _insert_overwrite_by_condition(
190        self,
191        table_name: TableName,
192        source_queries: t.List[SourceQuery],
193        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
194        where: t.Optional[exp.Condition] = None,
195        insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
196        **kwargs: t.Any,
197    ) -> None:
198        """
199        Implements the table or partition swap approach to insert-overwriting records.
200
201        Because this method executes multiple variants (full table replace, replace by time
202        range, replace by key, replace by partition), some upstream caller info is needed and
203        passed via kwargs.
204
205        Args:
206            table_name: Name of target table
207            source_queries: Source queries returning records to insert
208            target_columns_to_types: Column names and data types of target table
209            where: SQLGlot expression determining which target table rows should be overwritten
210            insert_overwrite_strategy_override: Not used by Clickhouse
211            kwargs:
212                dynamic_key: Key columns (replace by key only)
213                dynamic_key_exp: Expression to build key (replace by key only)
214                dynamic_key_unique: Whether more than one record can exist per key value (replace by key only)
215
216                keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only)
217
218        Returns:
219            Side effects only: execution of insert-overwrite operation.
220        """
221        target_table = exp.to_table(table_name)
222        target_columns_to_types = target_columns_to_types or self.columns(target_table)
223
224        temp_table = self._get_temp_table(target_table)
225        self.create_table_like(temp_table, target_table)
226
227        # REPLACE BY KEY: extract kwargs if present
228        dynamic_key = kwargs.get("dynamic_key")
229        if dynamic_key:
230            dynamic_key_exp = t.cast(exp.Expr, kwargs.get("dynamic_key_exp"))
231            dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique"))
232
233        try:
234            # insert new records into temp table
235            for source_query in source_queries:
236                with source_query as query:
237                    # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key
238                    if dynamic_key and dynamic_key_unique:
239                        query = query.distinct(*dynamic_key)  # type: ignore
240
241                    query = self._order_projections_and_filter(
242                        query, target_columns_to_types, where=where
243                    )
244                    self._insert_append_query(
245                        temp_table,
246                        query,
247                        target_columns_to_types=target_columns_to_types,
248                        order_projections=False,
249                    )
250
251            # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)"
252            if dynamic_key:
253                key_query = exp.select(dynamic_key_exp).from_(temp_table)
254                if not dynamic_key_unique:
255                    key_query = key_query.distinct()
256                where = dynamic_key_exp.isin(query=key_query)
257
258            # get target table partition key to confirm it's actually partitioned
259            table_partition_exp = self.fetchone(
260                exp.select("partition_key")
261                .from_("system.tables")
262                .where(
263                    exp.column("database").eq(target_table.db),
264                    exp.column("name").eq(target_table.name),
265                )
266            )
267
268            all_affected_partitions: t.Set[str] = set()
269
270            if where:
271                # identify existing records to keep by inverting the delete `where` clause
272                existing_records_insert_exp = exp.insert(
273                    self._select_columns(target_columns_to_types)
274                    .from_(target_table)
275                    .where(exp.paren(expression=where).not_()),
276                    temp_table,
277                )
278
279                # if target table is partitioned, modify insert expression to only insert
280                #   existing records that are in one of the affected partitions
281                if table_partition_exp:
282                    partitions_temp_table_name = self._get_temp_table(
283                        exp.to_table(f"{target_table.db}._affected_partitions")
284                    )
285                    all_affected_partitions, existing_records_insert_exp = (
286                        self._get_affected_partitions_and_insert_exp(
287                            target_table,
288                            temp_table,
289                            where,
290                            existing_records_insert_exp,
291                            partitions_temp_table_name,
292                        )
293                    )
294
295                try:
296                    self.execute(existing_records_insert_exp, track_rows_processed=True)
297                finally:
298                    if table_partition_exp:
299                        self.drop_table(partitions_temp_table_name)
300
301            # process by partition if:
302            #   1. The table is partitioned AND
303            #   (2a. There are existing records to keep (`where`) OR
304            #    2b. We're overwriting existing partition rows (incremental by partition model))
305            if table_partition_exp and (
306                where or kwargs.get("keep_existing_partition_rows") is False
307            ):
308                # only replace partitions that have records in temp_table
309                partitions_to_replace = self._get_partition_ids(temp_table)
310
311                # drop affected partitions that have no records in temp_table
312                #   - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False
313                #      because previous code block is skipped
314                partitions_to_drop = all_affected_partitions - partitions_to_replace
315
316                if partitions_to_replace or partitions_to_drop:
317                    self.alter_table(
318                        [
319                            self._build_alter_partition_exp(
320                                target_table, temp_table, partitions_to_replace, partitions_to_drop
321                            )
322                        ]
323                    )
324            else:
325                self._exchange_tables(target_table, temp_table)
326        finally:
327            self.drop_table(temp_table)
328
329    def _get_affected_partitions_and_insert_exp(
330        self,
331        target_table: exp.Table,
332        temp_table: exp.Table,
333        where: exp.Condition,
334        existing_records_insert_exp: exp.Insert,
335        partitions_temp_table_name: exp.Table,
336    ) -> tuple[t.Set[str], exp.Insert]:
337        # identify all affected partition IDs
338        #   - store in temp table so we can reuse results
339        self.ctas(
340            partitions_temp_table_name,
341            exp.select("partition_id")
342            .distinct()
343            .from_(
344                exp.union(
345                    # target table partitions with records in `where`
346                    exp.select(exp.column("_partition_id").as_("partition_id"))
347                    .from_(target_table)
348                    .where(where),
349                    # temp table partitions with new records to insert
350                    exp.select(
351                        exp.column("_partition_id").as_("partition_id"),
352                    ).from_(temp_table),
353                ).subquery("_affected_partitions")
354            ),
355        )
356
357        # read all affected partition IDs into memory
358        all_affected_partitions = self._get_partition_ids(
359            partitions_temp_table_name, "partition_id"
360        )
361
362        # limit existing records insert expression WHERE to affected target table partitions
363        #   by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)`
364        existing_records_insert_exp.set(
365            "expression",
366            existing_records_insert_exp.expression.where(
367                exp.column("_partition_id").isin(
368                    exp.select("partition_id").from_(partitions_temp_table_name)
369                )
370            ),
371        )
372
373        return all_affected_partitions, existing_records_insert_exp
374
375    def _build_alter_partition_exp(
376        self,
377        target_table: exp.Table,
378        temp_table: exp.Table,
379        partitions_to_replace: t.Set[str],
380        partitions_to_drop: t.Set[str],
381    ) -> exp.Alter:
382        alter_expr = exp.Alter(this=target_table, kind="TABLE")
383
384        for partition in partitions_to_replace:
385            alter_expr.append(
386                "actions",
387                exp.ReplacePartition(
388                    expression=exp.Partition(
389                        expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
390                    ),
391                    source=temp_table,
392                ),
393            )
394
395        for partition in partitions_to_drop:
396            alter_expr.append(
397                "actions",
398                exp.DropPartition(
399                    expressions=[
400                        exp.Partition(
401                            expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
402                        )
403                    ],
404                    source=temp_table,
405                ),
406            )
407
408        return alter_expr
409
410    def _replace_by_key(
411        self,
412        target_table: TableName,
413        source_table: QueryOrDF,
414        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
415        key: t.Sequence[exp.Expr],
416        is_unique_key: bool,
417        source_columns: t.Optional[t.List[str]] = None,
418    ) -> None:
419        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
420            source_table,
421            target_columns_to_types,
422            target_table=target_table,
423            source_columns=source_columns,
424        )
425
426        key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0]
427
428        self._insert_overwrite_by_condition(
429            target_table,
430            source_queries,
431            target_columns_to_types,
432            dynamic_key=key,
433            dynamic_key_exp=key_exp,
434            dynamic_key_unique=is_unique_key,
435        )
436
437    def insert_overwrite_by_partition(
438        self,
439        table_name: TableName,
440        query_or_df: QueryOrDF,
441        partitioned_by: t.List[exp.Expr],
442        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
443        source_columns: t.Optional[t.List[str]] = None,
444    ) -> None:
445        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
446            query_or_df,
447            target_columns_to_types,
448            target_table=table_name,
449            source_columns=source_columns,
450        )
451
452        self._insert_overwrite_by_condition(
453            table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False
454        )
455
456    def _create_table_like(
457        self,
458        target_table_name: TableName,
459        source_table_name: TableName,
460        exists: bool,
461        **kwargs: t.Any,
462    ) -> None:
463        """Create table with identical structure as source table"""
464        self.execute(
465            f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}"
466        )
467
468    def _get_partition_ids(
469        self,
470        table: exp.Table,
471        partition_col_name: str = "_partition_id",
472        where: t.Optional[exp.Condition] = None,
473        limit: t.Optional[int] = None,
474    ) -> t.Set[t.Any]:
475        """List partition IDs present in table"""
476        partitions_query = exp.select(partition_col_name).distinct().from_(table)
477        if where:
478            partitions_query = partitions_query.where(where)
479        if limit:
480            partitions_query = partitions_query.limit(limit)
481        partitions = self.fetchall(partitions_query)
482
483        return set([part[0] for part in partitions] if partitions else [])
484
485    def _create_table(
486        self,
487        table_name_or_schema: t.Union[exp.Schema, TableName],
488        expression: t.Optional[exp.Expr],
489        exists: bool = True,
490        replace: bool = False,
491        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
492        table_description: t.Optional[str] = None,
493        column_descriptions: t.Optional[t.Dict[str, str]] = None,
494        table_kind: t.Optional[str] = None,
495        track_rows_processed: bool = True,
496        **kwargs: t.Any,
497    ) -> None:
498        """Creates a table in the database.
499
500        Clickhouse Cloud requires doing CTAS in two steps.
501
502        First, we add the `EMPTY` property to the CTAS call to create a table with the proper
503        schema, then insert the data with the CTAS query.
504        """
505        # ensure columns used for partitioning are non-Nullable
506        #   - normally user's responsibility, but we automatically partition by time column in
507        #       incremental by time models
508        if kwargs.get("partitioned_by"):
509            partition_cols = [
510                col.name
511                for part_expr in kwargs["partitioned_by"]
512                for col in part_expr.find_all(exp.Column)
513            ]
514            if isinstance(table_name_or_schema, exp.Schema):
515                for coldef in table_name_or_schema.expressions:
516                    if coldef.name in partition_cols:
517                        coldef.kind.set("nullable", False)
518            if target_columns_to_types:
519                for col in partition_cols:
520                    target_columns_to_types[col].set("nullable", False)
521
522        super()._create_table(
523            table_name_or_schema,
524            expression,
525            exists,
526            replace,
527            target_columns_to_types,
528            table_description,
529            column_descriptions,
530            table_kind,
531            empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
532            track_rows_processed=track_rows_processed,
533            **kwargs,
534        )
535
536        # execute the second INSERT step if on cloud and creating a table
537        # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0
538        #     returns a success code but malformed response
539        if (
540            self.engine_run_mode.is_cloud
541            and table_kind != "VIEW"
542            and expression
543            and not (
544                expression.args.get("limit") is not None
545                and expression.args["limit"].expression.this == "0"
546            )
547        ):
548            table_name = (
549                table_name_or_schema.this
550                if isinstance(table_name_or_schema, exp.Schema)
551                else table_name_or_schema
552            )
553            self._insert_append_query(
554                table_name,
555                expression,  # type: ignore
556                target_columns_to_types or self.columns(table_name),
557            )
558
559    def _exchange_tables(
560        self,
561        old_table_name: TableName,
562        new_table_name: TableName,
563    ) -> None:
564        from clickhouse_connect.driver.exceptions import DatabaseError  # type: ignore
565
566        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
567        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
568
569        try:
570            self.execute(
571                f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}"
572            )
573        except DatabaseError as e:
574            if "NOT_IMPLEMENTED" in str(e):
575                # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges,
576                # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead.
577                #
578                # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported
579                # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent
580                # and does not require defining an additional method.
581                throwaway_table_name = self._get_temp_table(old_table_name)
582                self._rename_table(old_table_name, throwaway_table_name)
583                self._rename_table(new_table_name, old_table_name)
584                self.drop_table(throwaway_table_name)
585
586    def _rename_table(
587        self,
588        old_table_name: TableName,
589        new_table_name: TableName,
590    ) -> None:
591        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
592        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
593
594        self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}")
595
596    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None:
597        delete_expr = exp.delete(table_name, where)
598        if self.engine_run_mode.is_cluster:
599            delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
600        self.execute(delete_expr)
601
602    def alter_table(
603        self,
604        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
605    ) -> None:
606        """
607        Performs the alter statements to change the current table into the structure of the target table.
608        """
609        with self.transaction():
610            for alter_expression in [
611                x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
612            ]:
613                if self.engine_run_mode.is_cluster:
614                    alter_expression.set(
615                        "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
616                    )
617                self.execute(alter_expression)
618
619    def _drop_object(
620        self,
621        name: TableName | SchemaName,
622        exists: bool = True,
623        kind: str = "TABLE",
624        cascade: bool = False,
625        **drop_args: t.Any,
626    ) -> None:
627        """Drops an object.
628
629        An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind.
630
631        Args:
632            name: The name of the table to drop.
633            exists: If exists, defaults to True.
634            kind: What kind of object to drop. Defaults to TABLE
635            **drop_args: Any extra arguments to set on the Drop expression
636        """
637        super()._drop_object(
638            name=name,
639            exists=exists,
640            kind=kind,
641            cascade=cascade,
642            cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
643            if self.engine_run_mode.is_cluster
644            else None,
645            **drop_args,
646        )
647
648    def _build_partitioned_by_exp(
649        self,
650        partitioned_by: t.List[exp.Expr],
651        **kwargs: t.Any,
652    ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]:
653        return exp.PartitionedByProperty(
654            this=exp.Schema(expressions=partitioned_by),
655        )
656
657    def ensure_nulls_for_unmatched_after_join(
658        self,
659        query: Query,
660    ) -> Query:
661        # Set `join_use_nulls = 1` in a query's SETTINGS clause
662        query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1")))
663        return query
664
665    def use_server_nulls_for_unmatched_after_join(
666        self,
667        query: Query,
668    ) -> Query:
669        # Set the `join_use_nulls` server value in a query's SETTINGS clause
670        #
671        # Use in SCD models:
672        #  - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join
673        #      are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`.
674        #  - The SCD embeds the user's original query in the `source` CTE
675        #  - Settings are dynamically scoped, so our setting may override the server's default setting the user expects
676        #      for their query.
677        #  - To prevent this, we:
678        #     - If the user query sets `join_use_nulls`, we do nothing
679        #     - If the user query does not set `join_use_nulls`, we query the server for the current setting
680        #       - If the server value is 1, we do nothing
681        #       - If the server values is not 1, we inject its `join_use_nulls` value into the user query
682        #     - We do not need to check user subqueries because our injected setting operates at the same scope the
683        #         server value would normally operate at
684        setting_name = "join_use_nulls"
685        setting_value = "1"
686
687        user_settings = query.args.get("settings")
688        # if user has not already set it explicitly
689        if not (
690            user_settings
691            and any(
692                [
693                    isinstance(setting, exp.EQ) and setting.name == setting_name
694                    for setting in user_settings
695                ]
696            )
697        ):
698            server_value = self.fetchone(
699                exp.select("value")
700                .from_("system.settings")
701                .where(exp.column("name").eq(exp.Literal.string(setting_name)))
702            )[0]
703            # only inject the setting if the server value isn't 1
704            inject_setting = setting_value != server_value
705            setting_value = server_value if inject_setting else setting_value
706
707            if inject_setting:
708                query.append(
709                    "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value))
710                )
711
712        return query
713
714    def _build_settings_property(
715        self, key: str, value: exp.Expr | str | int | float
716    ) -> exp.SettingsProperty:
717        return exp.SettingsProperty(
718            expressions=[
719                exp.EQ(
720                    this=exp.var(key.lower()),
721                    expression=value
722                    if isinstance(value, exp.Expr)
723                    else exp.Literal(this=value, is_string=isinstance(value, str)),
724                )
725            ]
726        )
727
728    def _build_table_properties_exp(
729        self,
730        catalog_name: t.Optional[str] = None,
731        table_format: t.Optional[str] = None,
732        storage_format: t.Optional[str] = None,
733        partitioned_by: t.Optional[t.List[exp.Expr]] = None,
734        partition_interval_unit: t.Optional[IntervalUnit] = None,
735        clustered_by: t.Optional[t.List[exp.Expr]] = None,
736        table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
737        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
738        table_description: t.Optional[str] = None,
739        table_kind: t.Optional[str] = None,
740        empty_ctas: bool = False,
741        **kwargs: t.Any,
742    ) -> t.Optional[exp.Properties]:
743        properties: t.List[exp.Expr] = []
744
745        table_engine = self.DEFAULT_TABLE_ENGINE
746        if storage_format:
747            table_engine = (
748                storage_format.this if isinstance(storage_format, exp.Var) else storage_format  # type: ignore
749            )
750        properties.append(exp.EngineProperty(this=table_engine))
751
752        # copy of table_properties so we can pop items off below then consume the rest later
753        table_properties_copy = {
754            k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items()
755        }
756
757        mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine))
758        ordered_by_raw = table_properties_copy.pop("ORDER_BY", None)
759        if mergetree_engine:
760            ordered_by_exprs = []
761            if ordered_by_raw:
762                ordered_by_vals = []
763
764                if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)):
765                    ordered_by_vals = ordered_by_raw.expressions
766                if isinstance(ordered_by_raw, exp.Paren):
767                    ordered_by_vals = [ordered_by_raw.this]
768
769                if not ordered_by_vals:
770                    ordered_by_vals = (
771                        ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw]
772                    )
773
774                for col in ordered_by_vals:
775                    ordered_by_exprs.append(
776                        col
777                        if isinstance(col, exp.Column)
778                        else maybe_parse(
779                            col.name if isinstance(col, exp.Literal) else col,
780                            dialect=self.dialect,
781                            into=exp.Ordered,
782                        )
783                    )
784
785            properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)]))
786
787        primary_key = table_properties_copy.pop("PRIMARY_KEY", None)
788        if mergetree_engine and primary_key:
789            primary_key_vals = []
790            if isinstance(primary_key, (exp.Tuple, exp.Array)):
791                primary_key_vals = primary_key.expressions
792            if isinstance(ordered_by_raw, exp.Paren):
793                primary_key_vals = [primary_key.this]
794
795            if not primary_key_vals:
796                primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key]
797
798            properties.append(
799                exp.PrimaryKey(
800                    expressions=[
801                        exp.to_column(k.name if isinstance(k, exp.Literal) else k)
802                        for k in primary_key_vals
803                    ]
804                )
805            )
806
807        ttl = table_properties_copy.pop("TTL", None)
808        if ttl:
809            properties.append(
810                exp.MergeTreeTTL(expressions=[ttl if isinstance(ttl, exp.Expr) else exp.var(ttl)])
811            )
812
813        if (
814            partitioned_by
815            and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None
816        ):
817            properties.append(partitioned_by_prop)
818
819        if self.engine_run_mode.is_cluster:
820            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
821
822        if empty_ctas:
823            properties.append(exp.EmptyProperty())
824
825        if table_properties_copy:
826            properties.extend(
827                [self._build_settings_property(k, v) for k, v in table_properties_copy.items()]
828            )
829
830        if table_description:
831            properties.append(
832                exp.SchemaCommentProperty(
833                    this=exp.Literal.string(self._truncate_table_comment(table_description))
834                )
835            )
836
837        if properties:
838            return exp.Properties(expressions=properties)
839
840        return None
841
842    def _build_view_properties_exp(
843        self,
844        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
845        table_description: t.Optional[str] = None,
846        **kwargs: t.Any,
847    ) -> t.Optional[exp.Properties]:
848        """Creates a SQLGlot table properties expression for view"""
849        properties: t.List[exp.Expr] = []
850
851        view_properties_copy = view_properties.copy() if view_properties else {}
852
853        if self.engine_run_mode.is_cluster:
854            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
855
856        if view_properties_copy:
857            properties.extend(
858                [self._build_settings_property(k, v) for k, v in view_properties_copy.items()]
859            )
860
861        if table_description:
862            properties.append(
863                exp.SchemaCommentProperty(
864                    this=exp.Literal.string(self._truncate_table_comment(table_description))
865                )
866            )
867
868        if properties:
869            return exp.Properties(expressions=properties)
870        return None
871
872    def _build_create_comment_table_exp(
873        self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any
874    ) -> exp.Comment | str:
875        table_sql = table.sql(dialect=self.dialect, identify=True)
876
877        truncated_comment = self._truncate_table_comment(table_comment)
878        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
879
880        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}"
881
882    def _build_create_comment_column_exp(
883        self,
884        table: exp.Table,
885        column_name: str,
886        column_comment: str,
887        table_kind: str = "TABLE",
888        **kwargs: t.Any,
889    ) -> exp.Comment | str:
890        table_sql = table.sql(dialect=self.dialect, identify=True)
891        column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True)
892
893        truncated_comment = self._truncate_table_comment(column_comment)
894        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
895
896        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}"
897
898    def _on_cluster_sql(self) -> str:
899        if self.engine_run_mode.is_cluster:
900            cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect)  #  type: ignore
901            return f" ON CLUSTER {cluster_name} "
902        return ""
logger = <Logger sqlmesh.core.engine_adapter.clickhouse (WARNING)>
 34class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
 35    DIALECT = "clickhouse"
 36    SUPPORTS_TRANSACTIONS = False
 37    SUPPORTS_VIEW_SCHEMA = False
 38    SUPPORTS_REPLACE_TABLE = False
 39    COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
 40
 41    SCHEMA_DIFFER_KWARGS = {}
 42
 43    DEFAULT_TABLE_ENGINE = "MergeTree"
 44    ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"
 45
 46    @property
 47    def engine_run_mode(self) -> EngineRunMode:
 48        if self._extra_config.get("cloud_mode"):
 49            return EngineRunMode.CLOUD
 50        # we use the user's specification of a cluster in the connection config to determine if
 51        #   the engine is in cluster mode
 52        if self._extra_config.get("cluster"):
 53            return EngineRunMode.CLUSTER
 54        return EngineRunMode.STANDALONE
 55
 56    @property
 57    def cluster(self) -> t.Optional[str]:
 58        return self._extra_config.get("cluster")
 59
 60    # Workaround for clickhouse-connect cursor bug
 61    # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()`
 62    #     return the wrong (or no) rows after the very first cursor query that returns rows
 63    #     in the connection
 64    # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it
 65    #     doesn't use the row index at all
 66    def fetchone(
 67        self,
 68        query: t.Union[exp.Expr, str],
 69        ignore_unsupported_errors: bool = False,
 70        quote_identifiers: bool = False,
 71    ) -> t.Tuple:
 72        with self.transaction():
 73            self.execute(
 74                query,
 75                ignore_unsupported_errors=ignore_unsupported_errors,
 76                quote_identifiers=quote_identifiers,
 77            )
 78            return self.cursor.fetchall()[0]
 79
 80    def _fetch_native_df(
 81        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 82    ) -> pd.DataFrame:
 83        """Fetches a Pandas DataFrame from the cursor"""
 84        return self.cursor.client.query_df(
 85            self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query,
 86            use_extended_dtypes=True,
 87        )
 88
 89    def _df_to_source_queries(
 90        self,
 91        df: DF,
 92        target_columns_to_types: t.Dict[str, exp.DataType],
 93        batch_size: int,
 94        target_table: TableName,
 95        source_columns: t.Optional[t.List[str]] = None,
 96        **kwargs: t.Any,
 97    ) -> t.List[SourceQuery]:
 98        temp_table = self._get_temp_table(target_table, **kwargs)
 99        source_columns_to_types = get_source_columns_to_types(
100            target_columns_to_types, source_columns
101        )
102
103        def query_factory() -> Query:
104            # It is possible for the factory to be called multiple times and if so then the temp table will already
105            # be created so we skip creating again. This means we are assuming the first call is the same result
106            # as later calls.
107            if not self.table_exists(temp_table):
108                self.create_table(
109                    temp_table,
110                    source_columns_to_types,
111                    storage_format=exp.var("MergeTree"),
112                    **kwargs,
113                )
114                ordered_df = df[list(source_columns_to_types)]
115
116                self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df)
117
118            return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_(
119                temp_table
120            )
121
122        return [
123            SourceQuery(
124                query_factory=query_factory,
125                cleanup_func=lambda: self.drop_table(temp_table, **kwargs),
126            )
127        ]
128
129    def _get_data_objects(
130        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
131    ) -> t.List[DataObject]:
132        """
133        Returns all the data objects that exist in the given database.
134        """
135        query = (
136            exp.select(
137                exp.column("database").as_("schema_name"),
138                exp.column("name"),
139                exp.case(exp.column("engine"))
140                .when(
141                    exp.Literal.string("View"),
142                    exp.Literal.string("view"),
143                )
144                .else_(
145                    exp.Literal.string("table"),
146                )
147                .as_("type"),
148            )
149            .from_("system.tables")
150            .where(exp.column("database").eq(to_schema(schema_name).db))
151        )
152        if object_names:
153            query = query.where(exp.column("name").isin(*object_names))
154        df = self.fetchdf(query)
155        return [
156            DataObject(
157                catalog=None,
158                schema=row.schema_name,
159                name=row.name,
160                type=DataObjectType.from_str(row.type),  # type: ignore
161            )
162            for row in df.itertuples()
163        ]
164
165    def create_schema(
166        self,
167        schema_name: SchemaName,
168        ignore_if_exists: bool = True,
169        warn_on_error: bool = True,
170        properties: t.List[exp.Expr] = [],
171    ) -> None:
172        """Create a Clickhouse database from a name or qualified table name.
173
174        Clickhouse has a two-level naming scheme [database].[table].
175        """
176        properties_copy = properties.copy()
177        if self.engine_run_mode.is_cluster:
178            properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
179
180        # can't call super() because it will try to set a catalog
181        return self._create_schema(
182            schema_name=schema_name,
183            ignore_if_exists=ignore_if_exists,
184            warn_on_error=warn_on_error,
185            properties=properties_copy,
186            # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message
187            kind="DATABASE",
188        )
189
190    def _insert_overwrite_by_condition(
191        self,
192        table_name: TableName,
193        source_queries: t.List[SourceQuery],
194        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
195        where: t.Optional[exp.Condition] = None,
196        insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
197        **kwargs: t.Any,
198    ) -> None:
199        """
200        Implements the table or partition swap approach to insert-overwriting records.
201
202        Because this method executes multiple variants (full table replace, replace by time
203        range, replace by key, replace by partition), some upstream caller info is needed and
204        passed via kwargs.
205
206        Args:
207            table_name: Name of target table
208            source_queries: Source queries returning records to insert
209            target_columns_to_types: Column names and data types of target table
210            where: SQLGlot expression determining which target table rows should be overwritten
211            insert_overwrite_strategy_override: Not used by Clickhouse
212            kwargs:
213                dynamic_key: Key columns (replace by key only)
214                dynamic_key_exp: Expression to build key (replace by key only)
215                dynamic_key_unique: Whether more than one record can exist per key value (replace by key only)
216
217                keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only)
218
219        Returns:
220            Side effects only: execution of insert-overwrite operation.
221        """
222        target_table = exp.to_table(table_name)
223        target_columns_to_types = target_columns_to_types or self.columns(target_table)
224
225        temp_table = self._get_temp_table(target_table)
226        self.create_table_like(temp_table, target_table)
227
228        # REPLACE BY KEY: extract kwargs if present
229        dynamic_key = kwargs.get("dynamic_key")
230        if dynamic_key:
231            dynamic_key_exp = t.cast(exp.Expr, kwargs.get("dynamic_key_exp"))
232            dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique"))
233
234        try:
235            # insert new records into temp table
236            for source_query in source_queries:
237                with source_query as query:
238                    # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key
239                    if dynamic_key and dynamic_key_unique:
240                        query = query.distinct(*dynamic_key)  # type: ignore
241
242                    query = self._order_projections_and_filter(
243                        query, target_columns_to_types, where=where
244                    )
245                    self._insert_append_query(
246                        temp_table,
247                        query,
248                        target_columns_to_types=target_columns_to_types,
249                        order_projections=False,
250                    )
251
252            # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)"
253            if dynamic_key:
254                key_query = exp.select(dynamic_key_exp).from_(temp_table)
255                if not dynamic_key_unique:
256                    key_query = key_query.distinct()
257                where = dynamic_key_exp.isin(query=key_query)
258
259            # get target table partition key to confirm it's actually partitioned
260            table_partition_exp = self.fetchone(
261                exp.select("partition_key")
262                .from_("system.tables")
263                .where(
264                    exp.column("database").eq(target_table.db),
265                    exp.column("name").eq(target_table.name),
266                )
267            )
268
269            all_affected_partitions: t.Set[str] = set()
270
271            if where:
272                # identify existing records to keep by inverting the delete `where` clause
273                existing_records_insert_exp = exp.insert(
274                    self._select_columns(target_columns_to_types)
275                    .from_(target_table)
276                    .where(exp.paren(expression=where).not_()),
277                    temp_table,
278                )
279
280                # if target table is partitioned, modify insert expression to only insert
281                #   existing records that are in one of the affected partitions
282                if table_partition_exp:
283                    partitions_temp_table_name = self._get_temp_table(
284                        exp.to_table(f"{target_table.db}._affected_partitions")
285                    )
286                    all_affected_partitions, existing_records_insert_exp = (
287                        self._get_affected_partitions_and_insert_exp(
288                            target_table,
289                            temp_table,
290                            where,
291                            existing_records_insert_exp,
292                            partitions_temp_table_name,
293                        )
294                    )
295
296                try:
297                    self.execute(existing_records_insert_exp, track_rows_processed=True)
298                finally:
299                    if table_partition_exp:
300                        self.drop_table(partitions_temp_table_name)
301
302            # process by partition if:
303            #   1. The table is partitioned AND
304            #   (2a. There are existing records to keep (`where`) OR
305            #    2b. We're overwriting existing partition rows (incremental by partition model))
306            if table_partition_exp and (
307                where or kwargs.get("keep_existing_partition_rows") is False
308            ):
309                # only replace partitions that have records in temp_table
310                partitions_to_replace = self._get_partition_ids(temp_table)
311
312                # drop affected partitions that have no records in temp_table
313                #   - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False
314                #      because previous code block is skipped
315                partitions_to_drop = all_affected_partitions - partitions_to_replace
316
317                if partitions_to_replace or partitions_to_drop:
318                    self.alter_table(
319                        [
320                            self._build_alter_partition_exp(
321                                target_table, temp_table, partitions_to_replace, partitions_to_drop
322                            )
323                        ]
324                    )
325            else:
326                self._exchange_tables(target_table, temp_table)
327        finally:
328            self.drop_table(temp_table)
329
330    def _get_affected_partitions_and_insert_exp(
331        self,
332        target_table: exp.Table,
333        temp_table: exp.Table,
334        where: exp.Condition,
335        existing_records_insert_exp: exp.Insert,
336        partitions_temp_table_name: exp.Table,
337    ) -> tuple[t.Set[str], exp.Insert]:
338        # identify all affected partition IDs
339        #   - store in temp table so we can reuse results
340        self.ctas(
341            partitions_temp_table_name,
342            exp.select("partition_id")
343            .distinct()
344            .from_(
345                exp.union(
346                    # target table partitions with records in `where`
347                    exp.select(exp.column("_partition_id").as_("partition_id"))
348                    .from_(target_table)
349                    .where(where),
350                    # temp table partitions with new records to insert
351                    exp.select(
352                        exp.column("_partition_id").as_("partition_id"),
353                    ).from_(temp_table),
354                ).subquery("_affected_partitions")
355            ),
356        )
357
358        # read all affected partition IDs into memory
359        all_affected_partitions = self._get_partition_ids(
360            partitions_temp_table_name, "partition_id"
361        )
362
363        # limit existing records insert expression WHERE to affected target table partitions
364        #   by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)`
365        existing_records_insert_exp.set(
366            "expression",
367            existing_records_insert_exp.expression.where(
368                exp.column("_partition_id").isin(
369                    exp.select("partition_id").from_(partitions_temp_table_name)
370                )
371            ),
372        )
373
374        return all_affected_partitions, existing_records_insert_exp
375
376    def _build_alter_partition_exp(
377        self,
378        target_table: exp.Table,
379        temp_table: exp.Table,
380        partitions_to_replace: t.Set[str],
381        partitions_to_drop: t.Set[str],
382    ) -> exp.Alter:
383        alter_expr = exp.Alter(this=target_table, kind="TABLE")
384
385        for partition in partitions_to_replace:
386            alter_expr.append(
387                "actions",
388                exp.ReplacePartition(
389                    expression=exp.Partition(
390                        expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
391                    ),
392                    source=temp_table,
393                ),
394            )
395
396        for partition in partitions_to_drop:
397            alter_expr.append(
398                "actions",
399                exp.DropPartition(
400                    expressions=[
401                        exp.Partition(
402                            expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
403                        )
404                    ],
405                    source=temp_table,
406                ),
407            )
408
409        return alter_expr
410
411    def _replace_by_key(
412        self,
413        target_table: TableName,
414        source_table: QueryOrDF,
415        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
416        key: t.Sequence[exp.Expr],
417        is_unique_key: bool,
418        source_columns: t.Optional[t.List[str]] = None,
419    ) -> None:
420        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
421            source_table,
422            target_columns_to_types,
423            target_table=target_table,
424            source_columns=source_columns,
425        )
426
427        key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0]
428
429        self._insert_overwrite_by_condition(
430            target_table,
431            source_queries,
432            target_columns_to_types,
433            dynamic_key=key,
434            dynamic_key_exp=key_exp,
435            dynamic_key_unique=is_unique_key,
436        )
437
438    def insert_overwrite_by_partition(
439        self,
440        table_name: TableName,
441        query_or_df: QueryOrDF,
442        partitioned_by: t.List[exp.Expr],
443        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
444        source_columns: t.Optional[t.List[str]] = None,
445    ) -> None:
446        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
447            query_or_df,
448            target_columns_to_types,
449            target_table=table_name,
450            source_columns=source_columns,
451        )
452
453        self._insert_overwrite_by_condition(
454            table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False
455        )
456
457    def _create_table_like(
458        self,
459        target_table_name: TableName,
460        source_table_name: TableName,
461        exists: bool,
462        **kwargs: t.Any,
463    ) -> None:
464        """Create table with identical structure as source table"""
465        self.execute(
466            f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}"
467        )
468
469    def _get_partition_ids(
470        self,
471        table: exp.Table,
472        partition_col_name: str = "_partition_id",
473        where: t.Optional[exp.Condition] = None,
474        limit: t.Optional[int] = None,
475    ) -> t.Set[t.Any]:
476        """List partition IDs present in table"""
477        partitions_query = exp.select(partition_col_name).distinct().from_(table)
478        if where:
479            partitions_query = partitions_query.where(where)
480        if limit:
481            partitions_query = partitions_query.limit(limit)
482        partitions = self.fetchall(partitions_query)
483
484        return set([part[0] for part in partitions] if partitions else [])
485
486    def _create_table(
487        self,
488        table_name_or_schema: t.Union[exp.Schema, TableName],
489        expression: t.Optional[exp.Expr],
490        exists: bool = True,
491        replace: bool = False,
492        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
493        table_description: t.Optional[str] = None,
494        column_descriptions: t.Optional[t.Dict[str, str]] = None,
495        table_kind: t.Optional[str] = None,
496        track_rows_processed: bool = True,
497        **kwargs: t.Any,
498    ) -> None:
499        """Creates a table in the database.
500
501        Clickhouse Cloud requires doing CTAS in two steps.
502
503        First, we add the `EMPTY` property to the CTAS call to create a table with the proper
504        schema, then insert the data with the CTAS query.
505        """
506        # ensure columns used for partitioning are non-Nullable
507        #   - normally user's responsibility, but we automatically partition by time column in
508        #       incremental by time models
509        if kwargs.get("partitioned_by"):
510            partition_cols = [
511                col.name
512                for part_expr in kwargs["partitioned_by"]
513                for col in part_expr.find_all(exp.Column)
514            ]
515            if isinstance(table_name_or_schema, exp.Schema):
516                for coldef in table_name_or_schema.expressions:
517                    if coldef.name in partition_cols:
518                        coldef.kind.set("nullable", False)
519            if target_columns_to_types:
520                for col in partition_cols:
521                    target_columns_to_types[col].set("nullable", False)
522
523        super()._create_table(
524            table_name_or_schema,
525            expression,
526            exists,
527            replace,
528            target_columns_to_types,
529            table_description,
530            column_descriptions,
531            table_kind,
532            empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
533            track_rows_processed=track_rows_processed,
534            **kwargs,
535        )
536
537        # execute the second INSERT step if on cloud and creating a table
538        # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0
539        #     returns a success code but malformed response
540        if (
541            self.engine_run_mode.is_cloud
542            and table_kind != "VIEW"
543            and expression
544            and not (
545                expression.args.get("limit") is not None
546                and expression.args["limit"].expression.this == "0"
547            )
548        ):
549            table_name = (
550                table_name_or_schema.this
551                if isinstance(table_name_or_schema, exp.Schema)
552                else table_name_or_schema
553            )
554            self._insert_append_query(
555                table_name,
556                expression,  # type: ignore
557                target_columns_to_types or self.columns(table_name),
558            )
559
560    def _exchange_tables(
561        self,
562        old_table_name: TableName,
563        new_table_name: TableName,
564    ) -> None:
565        from clickhouse_connect.driver.exceptions import DatabaseError  # type: ignore
566
567        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
568        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
569
570        try:
571            self.execute(
572                f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}"
573            )
574        except DatabaseError as e:
575            if "NOT_IMPLEMENTED" in str(e):
576                # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges,
577                # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead.
578                #
579                # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported
580                # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent
581                # and does not require defining an additional method.
582                throwaway_table_name = self._get_temp_table(old_table_name)
583                self._rename_table(old_table_name, throwaway_table_name)
584                self._rename_table(new_table_name, old_table_name)
585                self.drop_table(throwaway_table_name)
586
587    def _rename_table(
588        self,
589        old_table_name: TableName,
590        new_table_name: TableName,
591    ) -> None:
592        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
593        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
594
595        self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}")
596
597    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None:
598        delete_expr = exp.delete(table_name, where)
599        if self.engine_run_mode.is_cluster:
600            delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
601        self.execute(delete_expr)
602
603    def alter_table(
604        self,
605        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
606    ) -> None:
607        """
608        Performs the alter statements to change the current table into the structure of the target table.
609        """
610        with self.transaction():
611            for alter_expression in [
612                x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
613            ]:
614                if self.engine_run_mode.is_cluster:
615                    alter_expression.set(
616                        "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
617                    )
618                self.execute(alter_expression)
619
620    def _drop_object(
621        self,
622        name: TableName | SchemaName,
623        exists: bool = True,
624        kind: str = "TABLE",
625        cascade: bool = False,
626        **drop_args: t.Any,
627    ) -> None:
628        """Drops an object.
629
630        An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind.
631
632        Args:
633            name: The name of the table to drop.
634            exists: If exists, defaults to True.
635            kind: What kind of object to drop. Defaults to TABLE
636            **drop_args: Any extra arguments to set on the Drop expression
637        """
638        super()._drop_object(
639            name=name,
640            exists=exists,
641            kind=kind,
642            cascade=cascade,
643            cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
644            if self.engine_run_mode.is_cluster
645            else None,
646            **drop_args,
647        )
648
649    def _build_partitioned_by_exp(
650        self,
651        partitioned_by: t.List[exp.Expr],
652        **kwargs: t.Any,
653    ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]:
654        return exp.PartitionedByProperty(
655            this=exp.Schema(expressions=partitioned_by),
656        )
657
658    def ensure_nulls_for_unmatched_after_join(
659        self,
660        query: Query,
661    ) -> Query:
662        # Set `join_use_nulls = 1` in a query's SETTINGS clause
663        query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1")))
664        return query
665
666    def use_server_nulls_for_unmatched_after_join(
667        self,
668        query: Query,
669    ) -> Query:
670        # Set the `join_use_nulls` server value in a query's SETTINGS clause
671        #
672        # Use in SCD models:
673        #  - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join
674        #      are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`.
675        #  - The SCD embeds the user's original query in the `source` CTE
676        #  - Settings are dynamically scoped, so our setting may override the server's default setting the user expects
677        #      for their query.
678        #  - To prevent this, we:
679        #     - If the user query sets `join_use_nulls`, we do nothing
680        #     - If the user query does not set `join_use_nulls`, we query the server for the current setting
681        #       - If the server value is 1, we do nothing
682        #       - If the server values is not 1, we inject its `join_use_nulls` value into the user query
683        #     - We do not need to check user subqueries because our injected setting operates at the same scope the
684        #         server value would normally operate at
685        setting_name = "join_use_nulls"
686        setting_value = "1"
687
688        user_settings = query.args.get("settings")
689        # if user has not already set it explicitly
690        if not (
691            user_settings
692            and any(
693                [
694                    isinstance(setting, exp.EQ) and setting.name == setting_name
695                    for setting in user_settings
696                ]
697            )
698        ):
699            server_value = self.fetchone(
700                exp.select("value")
701                .from_("system.settings")
702                .where(exp.column("name").eq(exp.Literal.string(setting_name)))
703            )[0]
704            # only inject the setting if the server value isn't 1
705            inject_setting = setting_value != server_value
706            setting_value = server_value if inject_setting else setting_value
707
708            if inject_setting:
709                query.append(
710                    "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value))
711                )
712
713        return query
714
715    def _build_settings_property(
716        self, key: str, value: exp.Expr | str | int | float
717    ) -> exp.SettingsProperty:
718        return exp.SettingsProperty(
719            expressions=[
720                exp.EQ(
721                    this=exp.var(key.lower()),
722                    expression=value
723                    if isinstance(value, exp.Expr)
724                    else exp.Literal(this=value, is_string=isinstance(value, str)),
725                )
726            ]
727        )
728
729    def _build_table_properties_exp(
730        self,
731        catalog_name: t.Optional[str] = None,
732        table_format: t.Optional[str] = None,
733        storage_format: t.Optional[str] = None,
734        partitioned_by: t.Optional[t.List[exp.Expr]] = None,
735        partition_interval_unit: t.Optional[IntervalUnit] = None,
736        clustered_by: t.Optional[t.List[exp.Expr]] = None,
737        table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
738        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
739        table_description: t.Optional[str] = None,
740        table_kind: t.Optional[str] = None,
741        empty_ctas: bool = False,
742        **kwargs: t.Any,
743    ) -> t.Optional[exp.Properties]:
744        properties: t.List[exp.Expr] = []
745
746        table_engine = self.DEFAULT_TABLE_ENGINE
747        if storage_format:
748            table_engine = (
749                storage_format.this if isinstance(storage_format, exp.Var) else storage_format  # type: ignore
750            )
751        properties.append(exp.EngineProperty(this=table_engine))
752
753        # copy of table_properties so we can pop items off below then consume the rest later
754        table_properties_copy = {
755            k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items()
756        }
757
758        mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine))
759        ordered_by_raw = table_properties_copy.pop("ORDER_BY", None)
760        if mergetree_engine:
761            ordered_by_exprs = []
762            if ordered_by_raw:
763                ordered_by_vals = []
764
765                if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)):
766                    ordered_by_vals = ordered_by_raw.expressions
767                if isinstance(ordered_by_raw, exp.Paren):
768                    ordered_by_vals = [ordered_by_raw.this]
769
770                if not ordered_by_vals:
771                    ordered_by_vals = (
772                        ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw]
773                    )
774
775                for col in ordered_by_vals:
776                    ordered_by_exprs.append(
777                        col
778                        if isinstance(col, exp.Column)
779                        else maybe_parse(
780                            col.name if isinstance(col, exp.Literal) else col,
781                            dialect=self.dialect,
782                            into=exp.Ordered,
783                        )
784                    )
785
786            properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)]))
787
788        primary_key = table_properties_copy.pop("PRIMARY_KEY", None)
789        if mergetree_engine and primary_key:
790            primary_key_vals = []
791            if isinstance(primary_key, (exp.Tuple, exp.Array)):
792                primary_key_vals = primary_key.expressions
793            if isinstance(ordered_by_raw, exp.Paren):
794                primary_key_vals = [primary_key.this]
795
796            if not primary_key_vals:
797                primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key]
798
799            properties.append(
800                exp.PrimaryKey(
801                    expressions=[
802                        exp.to_column(k.name if isinstance(k, exp.Literal) else k)
803                        for k in primary_key_vals
804                    ]
805                )
806            )
807
808        ttl = table_properties_copy.pop("TTL", None)
809        if ttl:
810            properties.append(
811                exp.MergeTreeTTL(expressions=[ttl if isinstance(ttl, exp.Expr) else exp.var(ttl)])
812            )
813
814        if (
815            partitioned_by
816            and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None
817        ):
818            properties.append(partitioned_by_prop)
819
820        if self.engine_run_mode.is_cluster:
821            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
822
823        if empty_ctas:
824            properties.append(exp.EmptyProperty())
825
826        if table_properties_copy:
827            properties.extend(
828                [self._build_settings_property(k, v) for k, v in table_properties_copy.items()]
829            )
830
831        if table_description:
832            properties.append(
833                exp.SchemaCommentProperty(
834                    this=exp.Literal.string(self._truncate_table_comment(table_description))
835                )
836            )
837
838        if properties:
839            return exp.Properties(expressions=properties)
840
841        return None
842
843    def _build_view_properties_exp(
844        self,
845        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
846        table_description: t.Optional[str] = None,
847        **kwargs: t.Any,
848    ) -> t.Optional[exp.Properties]:
849        """Creates a SQLGlot table properties expression for view"""
850        properties: t.List[exp.Expr] = []
851
852        view_properties_copy = view_properties.copy() if view_properties else {}
853
854        if self.engine_run_mode.is_cluster:
855            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
856
857        if view_properties_copy:
858            properties.extend(
859                [self._build_settings_property(k, v) for k, v in view_properties_copy.items()]
860            )
861
862        if table_description:
863            properties.append(
864                exp.SchemaCommentProperty(
865                    this=exp.Literal.string(self._truncate_table_comment(table_description))
866                )
867            )
868
869        if properties:
870            return exp.Properties(expressions=properties)
871        return None
872
873    def _build_create_comment_table_exp(
874        self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any
875    ) -> exp.Comment | str:
876        table_sql = table.sql(dialect=self.dialect, identify=True)
877
878        truncated_comment = self._truncate_table_comment(table_comment)
879        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
880
881        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}"
882
883    def _build_create_comment_column_exp(
884        self,
885        table: exp.Table,
886        column_name: str,
887        column_comment: str,
888        table_kind: str = "TABLE",
889        **kwargs: t.Any,
890    ) -> exp.Comment | str:
891        table_sql = table.sql(dialect=self.dialect, identify=True)
892        column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True)
893
894        truncated_comment = self._truncate_table_comment(column_comment)
895        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
896
897        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}"
898
899    def _on_cluster_sql(self) -> str:
900        if self.engine_run_mode.is_cluster:
901            cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect)  #  type: ignore
902            return f" ON CLUSTER {cluster_name} "
903        return ""

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
DIALECT = 'clickhouse'
SUPPORTS_TRANSACTIONS = False
SUPPORTS_VIEW_SCHEMA = False
SUPPORTS_REPLACE_TABLE = False
COMMENT_CREATION_VIEW = <CommentCreationView.COMMENT_COMMAND_ONLY: 4>
SCHEMA_DIFFER_KWARGS = {}
DEFAULT_TABLE_ENGINE = 'MergeTree'
ORDER_BY_TABLE_ENGINE_REGEX = '^.*?MergeTree.*$'
46    @property
47    def engine_run_mode(self) -> EngineRunMode:
48        if self._extra_config.get("cloud_mode"):
49            return EngineRunMode.CLOUD
50        # we use the user's specification of a cluster in the connection config to determine if
51        #   the engine is in cluster mode
52        if self._extra_config.get("cluster"):
53            return EngineRunMode.CLUSTER
54        return EngineRunMode.STANDALONE
cluster: Optional[str]
56    @property
57    def cluster(self) -> t.Optional[str]:
58        return self._extra_config.get("cluster")
def fetchone( self, query: Union[sqlglot.expressions.core.Expr, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> Tuple:
66    def fetchone(
67        self,
68        query: t.Union[exp.Expr, str],
69        ignore_unsupported_errors: bool = False,
70        quote_identifiers: bool = False,
71    ) -> t.Tuple:
72        with self.transaction():
73            self.execute(
74                query,
75                ignore_unsupported_errors=ignore_unsupported_errors,
76                quote_identifiers=quote_identifiers,
77            )
78            return self.cursor.fetchall()[0]
def create_schema( self, schema_name: Union[str, sqlglot.expressions.query.Table], ignore_if_exists: bool = True, warn_on_error: bool = True, properties: List[sqlglot.expressions.core.Expr] = []) -> None:
165    def create_schema(
166        self,
167        schema_name: SchemaName,
168        ignore_if_exists: bool = True,
169        warn_on_error: bool = True,
170        properties: t.List[exp.Expr] = [],
171    ) -> None:
172        """Create a Clickhouse database from a name or qualified table name.
173
174        Clickhouse has a two-level naming scheme [database].[table].
175        """
176        properties_copy = properties.copy()
177        if self.engine_run_mode.is_cluster:
178            properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
179
180        # can't call super() because it will try to set a catalog
181        return self._create_schema(
182            schema_name=schema_name,
183            ignore_if_exists=ignore_if_exists,
184            warn_on_error=warn_on_error,
185            properties=properties_copy,
186            # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message
187            kind="DATABASE",
188        )

Create a Clickhouse database from a name or qualified table name.

Clickhouse has a two-level naming scheme [database].[table].

def insert_overwrite_by_partition( self, table_name: Union[str, sqlglot.expressions.query.Table], query_or_df: <MagicMock id='132726897462096'>, partitioned_by: List[sqlglot.expressions.core.Expr], target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]] = None, source_columns: Optional[List[str]] = None) -> None:
438    def insert_overwrite_by_partition(
439        self,
440        table_name: TableName,
441        query_or_df: QueryOrDF,
442        partitioned_by: t.List[exp.Expr],
443        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
444        source_columns: t.Optional[t.List[str]] = None,
445    ) -> None:
446        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
447            query_or_df,
448            target_columns_to_types,
449            target_table=table_name,
450            source_columns=source_columns,
451        )
452
453        self._insert_overwrite_by_condition(
454            table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False
455        )
def delete_from( self, table_name: Union[str, sqlglot.expressions.query.Table], where: Union[str, sqlglot.expressions.core.Expr]) -> None:
597    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None:
598        delete_expr = exp.delete(table_name, where)
599        if self.engine_run_mode.is_cluster:
600            delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
601        self.execute(delete_expr)
def alter_table( self, alter_expressions: Union[List[sqlglot.expressions.ddl.Alter], List[sqlmesh.core.schema_diff.TableAlterOperation]]) -> None:
603    def alter_table(
604        self,
605        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
606    ) -> None:
607        """
608        Performs the alter statements to change the current table into the structure of the target table.
609        """
610        with self.transaction():
611            for alter_expression in [
612                x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
613            ]:
614                if self.engine_run_mode.is_cluster:
615                    alter_expression.set(
616                        "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
617                    )
618                self.execute(alter_expression)

Performs the alter statements to change the current table into the structure of the target table.

def ensure_nulls_for_unmatched_after_join( self, query: <MagicMock id='132726897251504'>) -> <MagicMock id='132726897251504'>:
658    def ensure_nulls_for_unmatched_after_join(
659        self,
660        query: Query,
661    ) -> Query:
662        # Set `join_use_nulls = 1` in a query's SETTINGS clause
663        query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1")))
664        return query
def use_server_nulls_for_unmatched_after_join( self, query: <MagicMock id='132726897251504'>) -> <MagicMock id='132726897251504'>:
666    def use_server_nulls_for_unmatched_after_join(
667        self,
668        query: Query,
669    ) -> Query:
670        # Set the `join_use_nulls` server value in a query's SETTINGS clause
671        #
672        # Use in SCD models:
673        #  - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join
674        #      are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`.
675        #  - The SCD embeds the user's original query in the `source` CTE
676        #  - Settings are dynamically scoped, so our setting may override the server's default setting the user expects
677        #      for their query.
678        #  - To prevent this, we:
679        #     - If the user query sets `join_use_nulls`, we do nothing
680        #     - If the user query does not set `join_use_nulls`, we query the server for the current setting
681        #       - If the server value is 1, we do nothing
682        #       - If the server values is not 1, we inject its `join_use_nulls` value into the user query
683        #     - We do not need to check user subqueries because our injected setting operates at the same scope the
684        #         server value would normally operate at
685        setting_name = "join_use_nulls"
686        setting_value = "1"
687
688        user_settings = query.args.get("settings")
689        # if user has not already set it explicitly
690        if not (
691            user_settings
692            and any(
693                [
694                    isinstance(setting, exp.EQ) and setting.name == setting_name
695                    for setting in user_settings
696                ]
697            )
698        ):
699            server_value = self.fetchone(
700                exp.select("value")
701                .from_("system.settings")
702                .where(exp.column("name").eq(exp.Literal.string(setting_name)))
703            )[0]
704            # only inject the setting if the server value isn't 1
705            inject_setting = setting_value != server_value
706            setting_value = server_value if inject_setting else setting_value
707
708            if inject_setting:
709                query.append(
710                    "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value))
711                )
712
713        return query
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
COMMENT_CREATION_TABLE
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
create_view
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
insert_append
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ping
get_table_last_modified_ts
sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport
SUPPORTS_INDEXES
sqlmesh.core.engine_adapter.mixins.LogicalMergeMixin
merge