Edit on GitHub

sqlmesh.core.engine_adapter.clickhouse

  1from __future__ import annotations
  2
  3import typing as t
  4import logging
  5import re
  6from sqlglot import exp, maybe_parse
  7from sqlmesh.core.dialect import to_schema
  8from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
  9from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport
 10from sqlmesh.core.engine_adapter.shared import (
 11    DataObject,
 12    DataObjectType,
 13    EngineRunMode,
 14    SourceQuery,
 15    CommentCreationView,
 16    InsertOverwriteStrategy,
 17)
 18from sqlmesh.core.schema_diff import TableAlterOperation
 19from sqlmesh.utils import get_source_columns_to_types
 20
 21if t.TYPE_CHECKING:
 22    import pandas as pd
 23
 24    from sqlmesh.core._typing import SchemaName, TableName
 25    from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF
 26
 27    from sqlmesh.core.node import IntervalUnit
 28
 29
 30logger = logging.getLogger(__name__)
 31
 32
 33class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
 34    DIALECT = "clickhouse"
 35    SUPPORTS_TRANSACTIONS = False
 36    SUPPORTS_VIEW_SCHEMA = False
 37    SUPPORTS_REPLACE_TABLE = False
 38    COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
 39
 40    SCHEMA_DIFFER_KWARGS = {}
 41
 42    DEFAULT_TABLE_ENGINE = "MergeTree"
 43    ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"
 44
 45    @property
 46    def engine_run_mode(self) -> EngineRunMode:
 47        if self._extra_config.get("cloud_mode"):
 48            return EngineRunMode.CLOUD
 49        # we use the user's specification of a cluster in the connection config to determine if
 50        #   the engine is in cluster mode
 51        if self._extra_config.get("cluster"):
 52            return EngineRunMode.CLUSTER
 53        return EngineRunMode.STANDALONE
 54
 55    @property
 56    def cluster(self) -> t.Optional[str]:
 57        return self._extra_config.get("cluster")
 58
 59    # Workaround for clickhouse-connect cursor bug
 60    # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()`
 61    #     return the wrong (or no) rows after the very first cursor query that returns rows
 62    #     in the connection
 63    # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it
 64    #     doesn't use the row index at all
 65    def fetchone(
 66        self,
 67        query: t.Union[exp.Expression, str],
 68        ignore_unsupported_errors: bool = False,
 69        quote_identifiers: bool = False,
 70    ) -> t.Tuple:
 71        with self.transaction():
 72            self.execute(
 73                query,
 74                ignore_unsupported_errors=ignore_unsupported_errors,
 75                quote_identifiers=quote_identifiers,
 76            )
 77            return self.cursor.fetchall()[0]
 78
 79    def _fetch_native_df(
 80        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 81    ) -> pd.DataFrame:
 82        """Fetches a Pandas DataFrame from the cursor"""
 83        return self.cursor.client.query_df(
 84            self._to_sql(query, quote=quote_identifiers)
 85            if isinstance(query, exp.Expression)
 86            else query,
 87            use_extended_dtypes=True,
 88        )
 89
 90    def _df_to_source_queries(
 91        self,
 92        df: DF,
 93        target_columns_to_types: t.Dict[str, exp.DataType],
 94        batch_size: int,
 95        target_table: TableName,
 96        source_columns: t.Optional[t.List[str]] = None,
 97        **kwargs: t.Any,
 98    ) -> t.List[SourceQuery]:
 99        temp_table = self._get_temp_table(target_table, **kwargs)
100        source_columns_to_types = get_source_columns_to_types(
101            target_columns_to_types, source_columns
102        )
103
104        def query_factory() -> Query:
105            # It is possible for the factory to be called multiple times and if so then the temp table will already
106            # be created so we skip creating again. This means we are assuming the first call is the same result
107            # as later calls.
108            if not self.table_exists(temp_table):
109                self.create_table(
110                    temp_table,
111                    source_columns_to_types,
112                    storage_format=exp.var("MergeTree"),
113                    **kwargs,
114                )
115                ordered_df = df[list(source_columns_to_types)]
116
117                self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df)
118
119            return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_(
120                temp_table
121            )
122
123        return [
124            SourceQuery(
125                query_factory=query_factory,
126                cleanup_func=lambda: self.drop_table(temp_table, **kwargs),
127            )
128        ]
129
130    def _get_data_objects(
131        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
132    ) -> t.List[DataObject]:
133        """
134        Returns all the data objects that exist in the given database.
135        """
136        query = (
137            exp.select(
138                exp.column("database").as_("schema_name"),
139                exp.column("name"),
140                exp.case(exp.column("engine"))
141                .when(
142                    exp.Literal.string("View"),
143                    exp.Literal.string("view"),
144                )
145                .else_(
146                    exp.Literal.string("table"),
147                )
148                .as_("type"),
149            )
150            .from_("system.tables")
151            .where(exp.column("database").eq(to_schema(schema_name).db))
152        )
153        if object_names:
154            query = query.where(exp.column("name").isin(*object_names))
155        df = self.fetchdf(query)
156        return [
157            DataObject(
158                catalog=None,
159                schema=row.schema_name,
160                name=row.name,
161                type=DataObjectType.from_str(row.type),  # type: ignore
162            )
163            for row in df.itertuples()
164        ]
165
166    def create_schema(
167        self,
168        schema_name: SchemaName,
169        ignore_if_exists: bool = True,
170        warn_on_error: bool = True,
171        properties: t.List[exp.Expression] = [],
172    ) -> None:
173        """Create a Clickhouse database from a name or qualified table name.
174
175        Clickhouse has a two-level naming scheme [database].[table].
176        """
177        properties_copy = properties.copy()
178        if self.engine_run_mode.is_cluster:
179            properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
180
181        # can't call super() because it will try to set a catalog
182        return self._create_schema(
183            schema_name=schema_name,
184            ignore_if_exists=ignore_if_exists,
185            warn_on_error=warn_on_error,
186            properties=properties_copy,
187            # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message
188            kind="DATABASE",
189        )
190
191    def _insert_overwrite_by_condition(
192        self,
193        table_name: TableName,
194        source_queries: t.List[SourceQuery],
195        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
196        where: t.Optional[exp.Condition] = None,
197        insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
198        **kwargs: t.Any,
199    ) -> None:
200        """
201        Implements the table or partition swap approach to insert-overwriting records.
202
203        Because this method executes multiple variants (full table replace, replace by time
204        range, replace by key, replace by partition), some upstream caller info is needed and
205        passed via kwargs.
206
207        Args:
208            table_name: Name of target table
209            source_queries: Source queries returning records to insert
210            target_columns_to_types: Column names and data types of target table
211            where: SQLGlot expression determining which target table rows should be overwritten
212            insert_overwrite_strategy_override: Not used by Clickhouse
213            kwargs:
214                dynamic_key: Key columns (replace by key only)
215                dynamic_key_exp: Expression to build key (replace by key only)
216                dynamic_key_unique: Whether more than one record can exist per key value (replace by key only)
217
218                keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only)
219
220        Returns:
221            Side effects only: execution of insert-overwrite operation.
222        """
223        target_table = exp.to_table(table_name)
224        target_columns_to_types = target_columns_to_types or self.columns(target_table)
225
226        temp_table = self._get_temp_table(target_table)
227        self.create_table_like(temp_table, target_table)
228
229        # REPLACE BY KEY: extract kwargs if present
230        dynamic_key = kwargs.get("dynamic_key")
231        if dynamic_key:
232            dynamic_key_exp = t.cast(exp.Expression, kwargs.get("dynamic_key_exp"))
233            dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique"))
234
235        try:
236            # insert new records into temp table
237            for source_query in source_queries:
238                with source_query as query:
239                    # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key
240                    if dynamic_key and dynamic_key_unique:
241                        query = query.distinct(*dynamic_key)  # type: ignore
242
243                    query = self._order_projections_and_filter(
244                        query, target_columns_to_types, where=where
245                    )
246                    self._insert_append_query(
247                        temp_table,
248                        query,
249                        target_columns_to_types=target_columns_to_types,
250                        order_projections=False,
251                    )
252
253            # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)"
254            if dynamic_key:
255                key_query = exp.select(dynamic_key_exp).from_(temp_table)
256                if not dynamic_key_unique:
257                    key_query = key_query.distinct()
258                where = dynamic_key_exp.isin(query=key_query)
259
260            # get target table partition key to confirm it's actually partitioned
261            table_partition_exp = self.fetchone(
262                exp.select("partition_key")
263                .from_("system.tables")
264                .where(
265                    exp.column("database").eq(target_table.db),
266                    exp.column("name").eq(target_table.name),
267                )
268            )
269
270            all_affected_partitions: t.Set[str] = set()
271
272            if where:
273                # identify existing records to keep by inverting the delete `where` clause
274                existing_records_insert_exp = exp.insert(
275                    self._select_columns(target_columns_to_types)
276                    .from_(target_table)
277                    .where(exp.paren(expression=where).not_()),
278                    temp_table,
279                )
280
281                # if target table is partitioned, modify insert expression to only insert
282                #   existing records that are in one of the affected partitions
283                if table_partition_exp:
284                    partitions_temp_table_name = self._get_temp_table(
285                        exp.to_table(f"{target_table.db}._affected_partitions")
286                    )
287                    all_affected_partitions, existing_records_insert_exp = (
288                        self._get_affected_partitions_and_insert_exp(
289                            target_table,
290                            temp_table,
291                            where,
292                            existing_records_insert_exp,
293                            partitions_temp_table_name,
294                        )
295                    )
296
297                try:
298                    self.execute(existing_records_insert_exp, track_rows_processed=True)
299                finally:
300                    if table_partition_exp:
301                        self.drop_table(partitions_temp_table_name)
302
303            # process by partition if:
304            #   1. The table is partitioned AND
305            #   (2a. There are existing records to keep (`where`) OR
306            #    2b. We're overwriting existing partition rows (incremental by partition model))
307            if table_partition_exp and (
308                where or kwargs.get("keep_existing_partition_rows") is False
309            ):
310                # only replace partitions that have records in temp_table
311                partitions_to_replace = self._get_partition_ids(temp_table)
312
313                # drop affected partitions that have no records in temp_table
314                #   - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False
315                #      because previous code block is skipped
316                partitions_to_drop = all_affected_partitions - partitions_to_replace
317
318                if partitions_to_replace or partitions_to_drop:
319                    self.alter_table(
320                        [
321                            self._build_alter_partition_exp(
322                                target_table, temp_table, partitions_to_replace, partitions_to_drop
323                            )
324                        ]
325                    )
326            else:
327                self._exchange_tables(target_table, temp_table)
328        finally:
329            self.drop_table(temp_table)
330
331    def _get_affected_partitions_and_insert_exp(
332        self,
333        target_table: exp.Table,
334        temp_table: exp.Table,
335        where: exp.Condition,
336        existing_records_insert_exp: exp.Insert,
337        partitions_temp_table_name: exp.Table,
338    ) -> tuple[t.Set[str], exp.Insert]:
339        # identify all affected partition IDs
340        #   - store in temp table so we can reuse results
341        self.ctas(
342            partitions_temp_table_name,
343            exp.select("partition_id")
344            .distinct()
345            .from_(
346                exp.union(
347                    # target table partitions with records in `where`
348                    exp.select(exp.column("_partition_id").as_("partition_id"))
349                    .from_(target_table)
350                    .where(where),
351                    # temp table partitions with new records to insert
352                    exp.select(
353                        exp.column("_partition_id").as_("partition_id"),
354                    ).from_(temp_table),
355                ).subquery("_affected_partitions")
356            ),
357        )
358
359        # read all affected partition IDs into memory
360        all_affected_partitions = self._get_partition_ids(
361            partitions_temp_table_name, "partition_id"
362        )
363
364        # limit existing records insert expression WHERE to affected target table partitions
365        #   by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)`
366        existing_records_insert_exp.set(
367            "expression",
368            existing_records_insert_exp.expression.where(
369                exp.column("_partition_id").isin(
370                    exp.select("partition_id").from_(partitions_temp_table_name)
371                )
372            ),
373        )
374
375        return all_affected_partitions, existing_records_insert_exp
376
377    def _build_alter_partition_exp(
378        self,
379        target_table: exp.Table,
380        temp_table: exp.Table,
381        partitions_to_replace: t.Set[str],
382        partitions_to_drop: t.Set[str],
383    ) -> exp.Alter:
384        alter_expr = exp.Alter(this=target_table, kind="TABLE")
385
386        for partition in partitions_to_replace:
387            alter_expr.append(
388                "actions",
389                exp.ReplacePartition(
390                    expression=exp.Partition(
391                        expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
392                    ),
393                    source=temp_table,
394                ),
395            )
396
397        for partition in partitions_to_drop:
398            alter_expr.append(
399                "actions",
400                exp.DropPartition(
401                    expressions=[
402                        exp.Partition(
403                            expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
404                        )
405                    ],
406                    source=temp_table,
407                ),
408            )
409
410        return alter_expr
411
412    def _replace_by_key(
413        self,
414        target_table: TableName,
415        source_table: QueryOrDF,
416        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
417        key: t.Sequence[exp.Expression],
418        is_unique_key: bool,
419        source_columns: t.Optional[t.List[str]] = None,
420    ) -> None:
421        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
422            source_table,
423            target_columns_to_types,
424            target_table=target_table,
425            source_columns=source_columns,
426        )
427
428        key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0]
429
430        self._insert_overwrite_by_condition(
431            target_table,
432            source_queries,
433            target_columns_to_types,
434            dynamic_key=key,
435            dynamic_key_exp=key_exp,
436            dynamic_key_unique=is_unique_key,
437        )
438
439    def insert_overwrite_by_partition(
440        self,
441        table_name: TableName,
442        query_or_df: QueryOrDF,
443        partitioned_by: t.List[exp.Expression],
444        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
445        source_columns: t.Optional[t.List[str]] = None,
446    ) -> None:
447        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
448            query_or_df,
449            target_columns_to_types,
450            target_table=table_name,
451            source_columns=source_columns,
452        )
453
454        self._insert_overwrite_by_condition(
455            table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False
456        )
457
458    def _create_table_like(
459        self,
460        target_table_name: TableName,
461        source_table_name: TableName,
462        exists: bool,
463        **kwargs: t.Any,
464    ) -> None:
465        """Create table with identical structure as source table"""
466        self.execute(
467            f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}"
468        )
469
470    def _get_partition_ids(
471        self,
472        table: exp.Table,
473        partition_col_name: str = "_partition_id",
474        where: t.Optional[exp.Condition] = None,
475        limit: t.Optional[int] = None,
476    ) -> t.Set[t.Any]:
477        """List partition IDs present in table"""
478        partitions_query = exp.select(partition_col_name).distinct().from_(table)
479        if where:
480            partitions_query = partitions_query.where(where)
481        if limit:
482            partitions_query = partitions_query.limit(limit)
483        partitions = self.fetchall(partitions_query)
484
485        return set([part[0] for part in partitions] if partitions else [])
486
487    def _create_table(
488        self,
489        table_name_or_schema: t.Union[exp.Schema, TableName],
490        expression: t.Optional[exp.Expression],
491        exists: bool = True,
492        replace: bool = False,
493        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
494        table_description: t.Optional[str] = None,
495        column_descriptions: t.Optional[t.Dict[str, str]] = None,
496        table_kind: t.Optional[str] = None,
497        track_rows_processed: bool = True,
498        **kwargs: t.Any,
499    ) -> None:
500        """Creates a table in the database.
501
502        Clickhouse Cloud requires doing CTAS in two steps.
503
504        First, we add the `EMPTY` property to the CTAS call to create a table with the proper
505        schema, then insert the data with the CTAS query.
506        """
507        # ensure columns used for partitioning are non-Nullable
508        #   - normally user's responsibility, but we automatically partition by time column in
509        #       incremental by time models
510        if kwargs.get("partitioned_by"):
511            partition_cols = [
512                col.name
513                for part_expr in kwargs["partitioned_by"]
514                for col in part_expr.find_all(exp.Column)
515            ]
516            if isinstance(table_name_or_schema, exp.Schema):
517                for coldef in table_name_or_schema.expressions:
518                    if coldef.name in partition_cols:
519                        coldef.kind.set("nullable", False)
520            if target_columns_to_types:
521                for col in partition_cols:
522                    target_columns_to_types[col].set("nullable", False)
523
524        super()._create_table(
525            table_name_or_schema,
526            expression,
527            exists,
528            replace,
529            target_columns_to_types,
530            table_description,
531            column_descriptions,
532            table_kind,
533            empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
534            track_rows_processed=track_rows_processed,
535            **kwargs,
536        )
537
538        # execute the second INSERT step if on cloud and creating a table
539        # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0
540        #     returns a success code but malformed response
541        if (
542            self.engine_run_mode.is_cloud
543            and table_kind != "VIEW"
544            and expression
545            and not (
546                expression.args.get("limit") is not None
547                and expression.args["limit"].expression.this == "0"
548            )
549        ):
550            table_name = (
551                table_name_or_schema.this
552                if isinstance(table_name_or_schema, exp.Schema)
553                else table_name_or_schema
554            )
555            self._insert_append_query(
556                table_name,
557                expression,  # type: ignore
558                target_columns_to_types or self.columns(table_name),
559            )
560
561    def _exchange_tables(
562        self,
563        old_table_name: TableName,
564        new_table_name: TableName,
565    ) -> None:
566        from clickhouse_connect.driver.exceptions import DatabaseError  # type: ignore
567
568        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
569        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
570
571        try:
572            self.execute(
573                f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}"
574            )
575        except DatabaseError as e:
576            if "NOT_IMPLEMENTED" in str(e):
577                # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges,
578                # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead.
579                #
580                # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported
581                # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent
582                # and does not require defining an additional method.
583                throwaway_table_name = self._get_temp_table(old_table_name)
584                self._rename_table(old_table_name, throwaway_table_name)
585                self._rename_table(new_table_name, old_table_name)
586                self.drop_table(throwaway_table_name)
587
588    def _rename_table(
589        self,
590        old_table_name: TableName,
591        new_table_name: TableName,
592    ) -> None:
593        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
594        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
595
596        self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}")
597
598    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
599        delete_expr = exp.delete(table_name, where)
600        if self.engine_run_mode.is_cluster:
601            delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
602        self.execute(delete_expr)
603
604    def alter_table(
605        self,
606        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
607    ) -> None:
608        """
609        Performs the alter statements to change the current table into the structure of the target table.
610        """
611        with self.transaction():
612            for alter_expression in [
613                x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
614            ]:
615                if self.engine_run_mode.is_cluster:
616                    alter_expression.set(
617                        "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
618                    )
619                self.execute(alter_expression)
620
621    def _drop_object(
622        self,
623        name: TableName | SchemaName,
624        exists: bool = True,
625        kind: str = "TABLE",
626        cascade: bool = False,
627        **drop_args: t.Any,
628    ) -> None:
629        """Drops an object.
630
631        An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind.
632
633        Args:
634            name: The name of the table to drop.
635            exists: If exists, defaults to True.
636            kind: What kind of object to drop. Defaults to TABLE
637            **drop_args: Any extra arguments to set on the Drop expression
638        """
639        super()._drop_object(
640            name=name,
641            exists=exists,
642            kind=kind,
643            cascade=cascade,
644            cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
645            if self.engine_run_mode.is_cluster
646            else None,
647            **drop_args,
648        )
649
650    def _build_partitioned_by_exp(
651        self,
652        partitioned_by: t.List[exp.Expression],
653        **kwargs: t.Any,
654    ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]:
655        return exp.PartitionedByProperty(
656            this=exp.Schema(expressions=partitioned_by),
657        )
658
659    def ensure_nulls_for_unmatched_after_join(
660        self,
661        query: Query,
662    ) -> Query:
663        # Set `join_use_nulls = 1` in a query's SETTINGS clause
664        query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1")))
665        return query
666
667    def use_server_nulls_for_unmatched_after_join(
668        self,
669        query: Query,
670    ) -> Query:
671        # Set the `join_use_nulls` server value in a query's SETTINGS clause
672        #
673        # Use in SCD models:
674        #  - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join
675        #      are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`.
676        #  - The SCD embeds the user's original query in the `source` CTE
677        #  - Settings are dynamically scoped, so our setting may override the server's default setting the user expects
678        #      for their query.
679        #  - To prevent this, we:
680        #     - If the user query sets `join_use_nulls`, we do nothing
681        #     - If the user query does not set `join_use_nulls`, we query the server for the current setting
682        #       - If the server value is 1, we do nothing
683        #       - If the server values is not 1, we inject its `join_use_nulls` value into the user query
684        #     - We do not need to check user subqueries because our injected setting operates at the same scope the
685        #         server value would normally operate at
686        setting_name = "join_use_nulls"
687        setting_value = "1"
688
689        user_settings = query.args.get("settings")
690        # if user has not already set it explicitly
691        if not (
692            user_settings
693            and any(
694                [
695                    isinstance(setting, exp.EQ) and setting.name == setting_name
696                    for setting in user_settings
697                ]
698            )
699        ):
700            server_value = self.fetchone(
701                exp.select("value")
702                .from_("system.settings")
703                .where(exp.column("name").eq(exp.Literal.string(setting_name)))
704            )[0]
705            # only inject the setting if the server value isn't 1
706            inject_setting = setting_value != server_value
707            setting_value = server_value if inject_setting else setting_value
708
709            if inject_setting:
710                query.append(
711                    "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value))
712                )
713
714        return query
715
716    def _build_settings_property(
717        self, key: str, value: exp.Expression | str | int | float
718    ) -> exp.SettingsProperty:
719        return exp.SettingsProperty(
720            expressions=[
721                exp.EQ(
722                    this=exp.var(key.lower()),
723                    expression=value
724                    if isinstance(value, exp.Expression)
725                    else exp.Literal(this=value, is_string=isinstance(value, str)),
726                )
727            ]
728        )
729
730    def _build_table_properties_exp(
731        self,
732        catalog_name: t.Optional[str] = None,
733        table_format: t.Optional[str] = None,
734        storage_format: t.Optional[str] = None,
735        partitioned_by: t.Optional[t.List[exp.Expression]] = None,
736        partition_interval_unit: t.Optional[IntervalUnit] = None,
737        clustered_by: t.Optional[t.List[exp.Expression]] = None,
738        table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
739        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
740        table_description: t.Optional[str] = None,
741        table_kind: t.Optional[str] = None,
742        empty_ctas: bool = False,
743        **kwargs: t.Any,
744    ) -> t.Optional[exp.Properties]:
745        properties: t.List[exp.Expression] = []
746
747        table_engine = self.DEFAULT_TABLE_ENGINE
748        if storage_format:
749            table_engine = (
750                storage_format.this if isinstance(storage_format, exp.Var) else storage_format  # type: ignore
751            )
752        properties.append(exp.EngineProperty(this=table_engine))
753
754        # copy of table_properties so we can pop items off below then consume the rest later
755        table_properties_copy = {
756            k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items()
757        }
758
759        mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine))
760        ordered_by_raw = table_properties_copy.pop("ORDER_BY", None)
761        if mergetree_engine:
762            ordered_by_exprs = []
763            if ordered_by_raw:
764                ordered_by_vals = []
765
766                if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)):
767                    ordered_by_vals = ordered_by_raw.expressions
768                if isinstance(ordered_by_raw, exp.Paren):
769                    ordered_by_vals = [ordered_by_raw.this]
770
771                if not ordered_by_vals:
772                    ordered_by_vals = (
773                        ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw]
774                    )
775
776                for col in ordered_by_vals:
777                    ordered_by_exprs.append(
778                        col
779                        if isinstance(col, exp.Column)
780                        else maybe_parse(
781                            col.name if isinstance(col, exp.Literal) else col,
782                            dialect=self.dialect,
783                            into=exp.Ordered,
784                        )
785                    )
786
787            properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)]))
788
789        primary_key = table_properties_copy.pop("PRIMARY_KEY", None)
790        if mergetree_engine and primary_key:
791            primary_key_vals = []
792            if isinstance(primary_key, (exp.Tuple, exp.Array)):
793                primary_key_vals = primary_key.expressions
794            if isinstance(ordered_by_raw, exp.Paren):
795                primary_key_vals = [primary_key.this]
796
797            if not primary_key_vals:
798                primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key]
799
800            properties.append(
801                exp.PrimaryKey(
802                    expressions=[
803                        exp.to_column(k.name if isinstance(k, exp.Literal) else k)
804                        for k in primary_key_vals
805                    ]
806                )
807            )
808
809        ttl = table_properties_copy.pop("TTL", None)
810        if ttl:
811            properties.append(
812                exp.MergeTreeTTL(
813                    expressions=[ttl if isinstance(ttl, exp.Expression) else exp.var(ttl)]
814                )
815            )
816
817        if (
818            partitioned_by
819            and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None
820        ):
821            properties.append(partitioned_by_prop)
822
823        if self.engine_run_mode.is_cluster:
824            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
825
826        if empty_ctas:
827            properties.append(exp.EmptyProperty())
828
829        if table_properties_copy:
830            properties.extend(
831                [self._build_settings_property(k, v) for k, v in table_properties_copy.items()]
832            )
833
834        if table_description:
835            properties.append(
836                exp.SchemaCommentProperty(
837                    this=exp.Literal.string(self._truncate_table_comment(table_description))
838                )
839            )
840
841        if properties:
842            return exp.Properties(expressions=properties)
843
844        return None
845
846    def _build_view_properties_exp(
847        self,
848        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
849        table_description: t.Optional[str] = None,
850        **kwargs: t.Any,
851    ) -> t.Optional[exp.Properties]:
852        """Creates a SQLGlot table properties expression for view"""
853        properties: t.List[exp.Expression] = []
854
855        view_properties_copy = view_properties.copy() if view_properties else {}
856
857        if self.engine_run_mode.is_cluster:
858            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
859
860        if view_properties_copy:
861            properties.extend(
862                [self._build_settings_property(k, v) for k, v in view_properties_copy.items()]
863            )
864
865        if table_description:
866            properties.append(
867                exp.SchemaCommentProperty(
868                    this=exp.Literal.string(self._truncate_table_comment(table_description))
869                )
870            )
871
872        if properties:
873            return exp.Properties(expressions=properties)
874        return None
875
876    def _build_create_comment_table_exp(
877        self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any
878    ) -> exp.Comment | str:
879        table_sql = table.sql(dialect=self.dialect, identify=True)
880
881        truncated_comment = self._truncate_table_comment(table_comment)
882        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
883
884        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}"
885
886    def _build_create_comment_column_exp(
887        self,
888        table: exp.Table,
889        column_name: str,
890        column_comment: str,
891        table_kind: str = "TABLE",
892        **kwargs: t.Any,
893    ) -> exp.Comment | str:
894        table_sql = table.sql(dialect=self.dialect, identify=True)
895        column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True)
896
897        truncated_comment = self._truncate_table_comment(column_comment)
898        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
899
900        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}"
901
902    def _on_cluster_sql(self) -> str:
903        if self.engine_run_mode.is_cluster:
904            cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect)  #  type: ignore
905            return f" ON CLUSTER {cluster_name} "
906        return ""
logger = <Logger sqlmesh.core.engine_adapter.clickhouse (WARNING)>
 34class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
 35    DIALECT = "clickhouse"
 36    SUPPORTS_TRANSACTIONS = False
 37    SUPPORTS_VIEW_SCHEMA = False
 38    SUPPORTS_REPLACE_TABLE = False
 39    COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
 40
 41    SCHEMA_DIFFER_KWARGS = {}
 42
 43    DEFAULT_TABLE_ENGINE = "MergeTree"
 44    ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"
 45
 46    @property
 47    def engine_run_mode(self) -> EngineRunMode:
 48        if self._extra_config.get("cloud_mode"):
 49            return EngineRunMode.CLOUD
 50        # we use the user's specification of a cluster in the connection config to determine if
 51        #   the engine is in cluster mode
 52        if self._extra_config.get("cluster"):
 53            return EngineRunMode.CLUSTER
 54        return EngineRunMode.STANDALONE
 55
 56    @property
 57    def cluster(self) -> t.Optional[str]:
 58        return self._extra_config.get("cluster")
 59
 60    # Workaround for clickhouse-connect cursor bug
 61    # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()`
 62    #     return the wrong (or no) rows after the very first cursor query that returns rows
 63    #     in the connection
 64    # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it
 65    #     doesn't use the row index at all
 66    def fetchone(
 67        self,
 68        query: t.Union[exp.Expression, str],
 69        ignore_unsupported_errors: bool = False,
 70        quote_identifiers: bool = False,
 71    ) -> t.Tuple:
 72        with self.transaction():
 73            self.execute(
 74                query,
 75                ignore_unsupported_errors=ignore_unsupported_errors,
 76                quote_identifiers=quote_identifiers,
 77            )
 78            return self.cursor.fetchall()[0]
 79
 80    def _fetch_native_df(
 81        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 82    ) -> pd.DataFrame:
 83        """Fetches a Pandas DataFrame from the cursor"""
 84        return self.cursor.client.query_df(
 85            self._to_sql(query, quote=quote_identifiers)
 86            if isinstance(query, exp.Expression)
 87            else query,
 88            use_extended_dtypes=True,
 89        )
 90
 91    def _df_to_source_queries(
 92        self,
 93        df: DF,
 94        target_columns_to_types: t.Dict[str, exp.DataType],
 95        batch_size: int,
 96        target_table: TableName,
 97        source_columns: t.Optional[t.List[str]] = None,
 98        **kwargs: t.Any,
 99    ) -> t.List[SourceQuery]:
100        temp_table = self._get_temp_table(target_table, **kwargs)
101        source_columns_to_types = get_source_columns_to_types(
102            target_columns_to_types, source_columns
103        )
104
105        def query_factory() -> Query:
106            # It is possible for the factory to be called multiple times and if so then the temp table will already
107            # be created so we skip creating again. This means we are assuming the first call is the same result
108            # as later calls.
109            if not self.table_exists(temp_table):
110                self.create_table(
111                    temp_table,
112                    source_columns_to_types,
113                    storage_format=exp.var("MergeTree"),
114                    **kwargs,
115                )
116                ordered_df = df[list(source_columns_to_types)]
117
118                self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df)
119
120            return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_(
121                temp_table
122            )
123
124        return [
125            SourceQuery(
126                query_factory=query_factory,
127                cleanup_func=lambda: self.drop_table(temp_table, **kwargs),
128            )
129        ]
130
131    def _get_data_objects(
132        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
133    ) -> t.List[DataObject]:
134        """
135        Returns all the data objects that exist in the given database.
136        """
137        query = (
138            exp.select(
139                exp.column("database").as_("schema_name"),
140                exp.column("name"),
141                exp.case(exp.column("engine"))
142                .when(
143                    exp.Literal.string("View"),
144                    exp.Literal.string("view"),
145                )
146                .else_(
147                    exp.Literal.string("table"),
148                )
149                .as_("type"),
150            )
151            .from_("system.tables")
152            .where(exp.column("database").eq(to_schema(schema_name).db))
153        )
154        if object_names:
155            query = query.where(exp.column("name").isin(*object_names))
156        df = self.fetchdf(query)
157        return [
158            DataObject(
159                catalog=None,
160                schema=row.schema_name,
161                name=row.name,
162                type=DataObjectType.from_str(row.type),  # type: ignore
163            )
164            for row in df.itertuples()
165        ]
166
167    def create_schema(
168        self,
169        schema_name: SchemaName,
170        ignore_if_exists: bool = True,
171        warn_on_error: bool = True,
172        properties: t.List[exp.Expression] = [],
173    ) -> None:
174        """Create a Clickhouse database from a name or qualified table name.
175
176        Clickhouse has a two-level naming scheme [database].[table].
177        """
178        properties_copy = properties.copy()
179        if self.engine_run_mode.is_cluster:
180            properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
181
182        # can't call super() because it will try to set a catalog
183        return self._create_schema(
184            schema_name=schema_name,
185            ignore_if_exists=ignore_if_exists,
186            warn_on_error=warn_on_error,
187            properties=properties_copy,
188            # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message
189            kind="DATABASE",
190        )
191
192    def _insert_overwrite_by_condition(
193        self,
194        table_name: TableName,
195        source_queries: t.List[SourceQuery],
196        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
197        where: t.Optional[exp.Condition] = None,
198        insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
199        **kwargs: t.Any,
200    ) -> None:
201        """
202        Implements the table or partition swap approach to insert-overwriting records.
203
204        Because this method executes multiple variants (full table replace, replace by time
205        range, replace by key, replace by partition), some upstream caller info is needed and
206        passed via kwargs.
207
208        Args:
209            table_name: Name of target table
210            source_queries: Source queries returning records to insert
211            target_columns_to_types: Column names and data types of target table
212            where: SQLGlot expression determining which target table rows should be overwritten
213            insert_overwrite_strategy_override: Not used by Clickhouse
214            kwargs:
215                dynamic_key: Key columns (replace by key only)
216                dynamic_key_exp: Expression to build key (replace by key only)
217                dynamic_key_unique: Whether more than one record can exist per key value (replace by key only)
218
219                keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only)
220
221        Returns:
222            Side effects only: execution of insert-overwrite operation.
223        """
224        target_table = exp.to_table(table_name)
225        target_columns_to_types = target_columns_to_types or self.columns(target_table)
226
227        temp_table = self._get_temp_table(target_table)
228        self.create_table_like(temp_table, target_table)
229
230        # REPLACE BY KEY: extract kwargs if present
231        dynamic_key = kwargs.get("dynamic_key")
232        if dynamic_key:
233            dynamic_key_exp = t.cast(exp.Expression, kwargs.get("dynamic_key_exp"))
234            dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique"))
235
236        try:
237            # insert new records into temp table
238            for source_query in source_queries:
239                with source_query as query:
240                    # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key
241                    if dynamic_key and dynamic_key_unique:
242                        query = query.distinct(*dynamic_key)  # type: ignore
243
244                    query = self._order_projections_and_filter(
245                        query, target_columns_to_types, where=where
246                    )
247                    self._insert_append_query(
248                        temp_table,
249                        query,
250                        target_columns_to_types=target_columns_to_types,
251                        order_projections=False,
252                    )
253
254            # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)"
255            if dynamic_key:
256                key_query = exp.select(dynamic_key_exp).from_(temp_table)
257                if not dynamic_key_unique:
258                    key_query = key_query.distinct()
259                where = dynamic_key_exp.isin(query=key_query)
260
261            # get target table partition key to confirm it's actually partitioned
262            table_partition_exp = self.fetchone(
263                exp.select("partition_key")
264                .from_("system.tables")
265                .where(
266                    exp.column("database").eq(target_table.db),
267                    exp.column("name").eq(target_table.name),
268                )
269            )
270
271            all_affected_partitions: t.Set[str] = set()
272
273            if where:
274                # identify existing records to keep by inverting the delete `where` clause
275                existing_records_insert_exp = exp.insert(
276                    self._select_columns(target_columns_to_types)
277                    .from_(target_table)
278                    .where(exp.paren(expression=where).not_()),
279                    temp_table,
280                )
281
282                # if target table is partitioned, modify insert expression to only insert
283                #   existing records that are in one of the affected partitions
284                if table_partition_exp:
285                    partitions_temp_table_name = self._get_temp_table(
286                        exp.to_table(f"{target_table.db}._affected_partitions")
287                    )
288                    all_affected_partitions, existing_records_insert_exp = (
289                        self._get_affected_partitions_and_insert_exp(
290                            target_table,
291                            temp_table,
292                            where,
293                            existing_records_insert_exp,
294                            partitions_temp_table_name,
295                        )
296                    )
297
298                try:
299                    self.execute(existing_records_insert_exp, track_rows_processed=True)
300                finally:
301                    if table_partition_exp:
302                        self.drop_table(partitions_temp_table_name)
303
304            # process by partition if:
305            #   1. The table is partitioned AND
306            #   (2a. There are existing records to keep (`where`) OR
307            #    2b. We're overwriting existing partition rows (incremental by partition model))
308            if table_partition_exp and (
309                where or kwargs.get("keep_existing_partition_rows") is False
310            ):
311                # only replace partitions that have records in temp_table
312                partitions_to_replace = self._get_partition_ids(temp_table)
313
314                # drop affected partitions that have no records in temp_table
315                #   - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False
316                #      because previous code block is skipped
317                partitions_to_drop = all_affected_partitions - partitions_to_replace
318
319                if partitions_to_replace or partitions_to_drop:
320                    self.alter_table(
321                        [
322                            self._build_alter_partition_exp(
323                                target_table, temp_table, partitions_to_replace, partitions_to_drop
324                            )
325                        ]
326                    )
327            else:
328                self._exchange_tables(target_table, temp_table)
329        finally:
330            self.drop_table(temp_table)
331
332    def _get_affected_partitions_and_insert_exp(
333        self,
334        target_table: exp.Table,
335        temp_table: exp.Table,
336        where: exp.Condition,
337        existing_records_insert_exp: exp.Insert,
338        partitions_temp_table_name: exp.Table,
339    ) -> tuple[t.Set[str], exp.Insert]:
340        # identify all affected partition IDs
341        #   - store in temp table so we can reuse results
342        self.ctas(
343            partitions_temp_table_name,
344            exp.select("partition_id")
345            .distinct()
346            .from_(
347                exp.union(
348                    # target table partitions with records in `where`
349                    exp.select(exp.column("_partition_id").as_("partition_id"))
350                    .from_(target_table)
351                    .where(where),
352                    # temp table partitions with new records to insert
353                    exp.select(
354                        exp.column("_partition_id").as_("partition_id"),
355                    ).from_(temp_table),
356                ).subquery("_affected_partitions")
357            ),
358        )
359
360        # read all affected partition IDs into memory
361        all_affected_partitions = self._get_partition_ids(
362            partitions_temp_table_name, "partition_id"
363        )
364
365        # limit existing records insert expression WHERE to affected target table partitions
366        #   by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)`
367        existing_records_insert_exp.set(
368            "expression",
369            existing_records_insert_exp.expression.where(
370                exp.column("_partition_id").isin(
371                    exp.select("partition_id").from_(partitions_temp_table_name)
372                )
373            ),
374        )
375
376        return all_affected_partitions, existing_records_insert_exp
377
378    def _build_alter_partition_exp(
379        self,
380        target_table: exp.Table,
381        temp_table: exp.Table,
382        partitions_to_replace: t.Set[str],
383        partitions_to_drop: t.Set[str],
384    ) -> exp.Alter:
385        alter_expr = exp.Alter(this=target_table, kind="TABLE")
386
387        for partition in partitions_to_replace:
388            alter_expr.append(
389                "actions",
390                exp.ReplacePartition(
391                    expression=exp.Partition(
392                        expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
393                    ),
394                    source=temp_table,
395                ),
396            )
397
398        for partition in partitions_to_drop:
399            alter_expr.append(
400                "actions",
401                exp.DropPartition(
402                    expressions=[
403                        exp.Partition(
404                            expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))]
405                        )
406                    ],
407                    source=temp_table,
408                ),
409            )
410
411        return alter_expr
412
413    def _replace_by_key(
414        self,
415        target_table: TableName,
416        source_table: QueryOrDF,
417        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
418        key: t.Sequence[exp.Expression],
419        is_unique_key: bool,
420        source_columns: t.Optional[t.List[str]] = None,
421    ) -> None:
422        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
423            source_table,
424            target_columns_to_types,
425            target_table=target_table,
426            source_columns=source_columns,
427        )
428
429        key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0]
430
431        self._insert_overwrite_by_condition(
432            target_table,
433            source_queries,
434            target_columns_to_types,
435            dynamic_key=key,
436            dynamic_key_exp=key_exp,
437            dynamic_key_unique=is_unique_key,
438        )
439
440    def insert_overwrite_by_partition(
441        self,
442        table_name: TableName,
443        query_or_df: QueryOrDF,
444        partitioned_by: t.List[exp.Expression],
445        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
446        source_columns: t.Optional[t.List[str]] = None,
447    ) -> None:
448        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
449            query_or_df,
450            target_columns_to_types,
451            target_table=table_name,
452            source_columns=source_columns,
453        )
454
455        self._insert_overwrite_by_condition(
456            table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False
457        )
458
459    def _create_table_like(
460        self,
461        target_table_name: TableName,
462        source_table_name: TableName,
463        exists: bool,
464        **kwargs: t.Any,
465    ) -> None:
466        """Create table with identical structure as source table"""
467        self.execute(
468            f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}"
469        )
470
471    def _get_partition_ids(
472        self,
473        table: exp.Table,
474        partition_col_name: str = "_partition_id",
475        where: t.Optional[exp.Condition] = None,
476        limit: t.Optional[int] = None,
477    ) -> t.Set[t.Any]:
478        """List partition IDs present in table"""
479        partitions_query = exp.select(partition_col_name).distinct().from_(table)
480        if where:
481            partitions_query = partitions_query.where(where)
482        if limit:
483            partitions_query = partitions_query.limit(limit)
484        partitions = self.fetchall(partitions_query)
485
486        return set([part[0] for part in partitions] if partitions else [])
487
488    def _create_table(
489        self,
490        table_name_or_schema: t.Union[exp.Schema, TableName],
491        expression: t.Optional[exp.Expression],
492        exists: bool = True,
493        replace: bool = False,
494        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
495        table_description: t.Optional[str] = None,
496        column_descriptions: t.Optional[t.Dict[str, str]] = None,
497        table_kind: t.Optional[str] = None,
498        track_rows_processed: bool = True,
499        **kwargs: t.Any,
500    ) -> None:
501        """Creates a table in the database.
502
503        Clickhouse Cloud requires doing CTAS in two steps.
504
505        First, we add the `EMPTY` property to the CTAS call to create a table with the proper
506        schema, then insert the data with the CTAS query.
507        """
508        # ensure columns used for partitioning are non-Nullable
509        #   - normally user's responsibility, but we automatically partition by time column in
510        #       incremental by time models
511        if kwargs.get("partitioned_by"):
512            partition_cols = [
513                col.name
514                for part_expr in kwargs["partitioned_by"]
515                for col in part_expr.find_all(exp.Column)
516            ]
517            if isinstance(table_name_or_schema, exp.Schema):
518                for coldef in table_name_or_schema.expressions:
519                    if coldef.name in partition_cols:
520                        coldef.kind.set("nullable", False)
521            if target_columns_to_types:
522                for col in partition_cols:
523                    target_columns_to_types[col].set("nullable", False)
524
525        super()._create_table(
526            table_name_or_schema,
527            expression,
528            exists,
529            replace,
530            target_columns_to_types,
531            table_description,
532            column_descriptions,
533            table_kind,
534            empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
535            track_rows_processed=track_rows_processed,
536            **kwargs,
537        )
538
539        # execute the second INSERT step if on cloud and creating a table
540        # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0
541        #     returns a success code but malformed response
542        if (
543            self.engine_run_mode.is_cloud
544            and table_kind != "VIEW"
545            and expression
546            and not (
547                expression.args.get("limit") is not None
548                and expression.args["limit"].expression.this == "0"
549            )
550        ):
551            table_name = (
552                table_name_or_schema.this
553                if isinstance(table_name_or_schema, exp.Schema)
554                else table_name_or_schema
555            )
556            self._insert_append_query(
557                table_name,
558                expression,  # type: ignore
559                target_columns_to_types or self.columns(table_name),
560            )
561
562    def _exchange_tables(
563        self,
564        old_table_name: TableName,
565        new_table_name: TableName,
566    ) -> None:
567        from clickhouse_connect.driver.exceptions import DatabaseError  # type: ignore
568
569        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
570        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
571
572        try:
573            self.execute(
574                f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}"
575            )
576        except DatabaseError as e:
577            if "NOT_IMPLEMENTED" in str(e):
578                # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges,
579                # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead.
580                #
581                # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported
582                # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent
583                # and does not require defining an additional method.
584                throwaway_table_name = self._get_temp_table(old_table_name)
585                self._rename_table(old_table_name, throwaway_table_name)
586                self._rename_table(new_table_name, old_table_name)
587                self.drop_table(throwaway_table_name)
588
589    def _rename_table(
590        self,
591        old_table_name: TableName,
592        new_table_name: TableName,
593    ) -> None:
594        old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True)
595        new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True)
596
597        self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}")
598
599    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
600        delete_expr = exp.delete(table_name, where)
601        if self.engine_run_mode.is_cluster:
602            delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
603        self.execute(delete_expr)
604
605    def alter_table(
606        self,
607        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
608    ) -> None:
609        """
610        Performs the alter statements to change the current table into the structure of the target table.
611        """
612        with self.transaction():
613            for alter_expression in [
614                x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
615            ]:
616                if self.engine_run_mode.is_cluster:
617                    alter_expression.set(
618                        "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
619                    )
620                self.execute(alter_expression)
621
622    def _drop_object(
623        self,
624        name: TableName | SchemaName,
625        exists: bool = True,
626        kind: str = "TABLE",
627        cascade: bool = False,
628        **drop_args: t.Any,
629    ) -> None:
630        """Drops an object.
631
632        An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind.
633
634        Args:
635            name: The name of the table to drop.
636            exists: If exists, defaults to True.
637            kind: What kind of object to drop. Defaults to TABLE
638            **drop_args: Any extra arguments to set on the Drop expression
639        """
640        super()._drop_object(
641            name=name,
642            exists=exists,
643            kind=kind,
644            cascade=cascade,
645            cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
646            if self.engine_run_mode.is_cluster
647            else None,
648            **drop_args,
649        )
650
651    def _build_partitioned_by_exp(
652        self,
653        partitioned_by: t.List[exp.Expression],
654        **kwargs: t.Any,
655    ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]:
656        return exp.PartitionedByProperty(
657            this=exp.Schema(expressions=partitioned_by),
658        )
659
660    def ensure_nulls_for_unmatched_after_join(
661        self,
662        query: Query,
663    ) -> Query:
664        # Set `join_use_nulls = 1` in a query's SETTINGS clause
665        query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1")))
666        return query
667
668    def use_server_nulls_for_unmatched_after_join(
669        self,
670        query: Query,
671    ) -> Query:
672        # Set the `join_use_nulls` server value in a query's SETTINGS clause
673        #
674        # Use in SCD models:
675        #  - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join
676        #      are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`.
677        #  - The SCD embeds the user's original query in the `source` CTE
678        #  - Settings are dynamically scoped, so our setting may override the server's default setting the user expects
679        #      for their query.
680        #  - To prevent this, we:
681        #     - If the user query sets `join_use_nulls`, we do nothing
682        #     - If the user query does not set `join_use_nulls`, we query the server for the current setting
683        #       - If the server value is 1, we do nothing
684        #       - If the server values is not 1, we inject its `join_use_nulls` value into the user query
685        #     - We do not need to check user subqueries because our injected setting operates at the same scope the
686        #         server value would normally operate at
687        setting_name = "join_use_nulls"
688        setting_value = "1"
689
690        user_settings = query.args.get("settings")
691        # if user has not already set it explicitly
692        if not (
693            user_settings
694            and any(
695                [
696                    isinstance(setting, exp.EQ) and setting.name == setting_name
697                    for setting in user_settings
698                ]
699            )
700        ):
701            server_value = self.fetchone(
702                exp.select("value")
703                .from_("system.settings")
704                .where(exp.column("name").eq(exp.Literal.string(setting_name)))
705            )[0]
706            # only inject the setting if the server value isn't 1
707            inject_setting = setting_value != server_value
708            setting_value = server_value if inject_setting else setting_value
709
710            if inject_setting:
711                query.append(
712                    "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value))
713                )
714
715        return query
716
717    def _build_settings_property(
718        self, key: str, value: exp.Expression | str | int | float
719    ) -> exp.SettingsProperty:
720        return exp.SettingsProperty(
721            expressions=[
722                exp.EQ(
723                    this=exp.var(key.lower()),
724                    expression=value
725                    if isinstance(value, exp.Expression)
726                    else exp.Literal(this=value, is_string=isinstance(value, str)),
727                )
728            ]
729        )
730
731    def _build_table_properties_exp(
732        self,
733        catalog_name: t.Optional[str] = None,
734        table_format: t.Optional[str] = None,
735        storage_format: t.Optional[str] = None,
736        partitioned_by: t.Optional[t.List[exp.Expression]] = None,
737        partition_interval_unit: t.Optional[IntervalUnit] = None,
738        clustered_by: t.Optional[t.List[exp.Expression]] = None,
739        table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
740        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
741        table_description: t.Optional[str] = None,
742        table_kind: t.Optional[str] = None,
743        empty_ctas: bool = False,
744        **kwargs: t.Any,
745    ) -> t.Optional[exp.Properties]:
746        properties: t.List[exp.Expression] = []
747
748        table_engine = self.DEFAULT_TABLE_ENGINE
749        if storage_format:
750            table_engine = (
751                storage_format.this if isinstance(storage_format, exp.Var) else storage_format  # type: ignore
752            )
753        properties.append(exp.EngineProperty(this=table_engine))
754
755        # copy of table_properties so we can pop items off below then consume the rest later
756        table_properties_copy = {
757            k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items()
758        }
759
760        mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine))
761        ordered_by_raw = table_properties_copy.pop("ORDER_BY", None)
762        if mergetree_engine:
763            ordered_by_exprs = []
764            if ordered_by_raw:
765                ordered_by_vals = []
766
767                if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)):
768                    ordered_by_vals = ordered_by_raw.expressions
769                if isinstance(ordered_by_raw, exp.Paren):
770                    ordered_by_vals = [ordered_by_raw.this]
771
772                if not ordered_by_vals:
773                    ordered_by_vals = (
774                        ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw]
775                    )
776
777                for col in ordered_by_vals:
778                    ordered_by_exprs.append(
779                        col
780                        if isinstance(col, exp.Column)
781                        else maybe_parse(
782                            col.name if isinstance(col, exp.Literal) else col,
783                            dialect=self.dialect,
784                            into=exp.Ordered,
785                        )
786                    )
787
788            properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)]))
789
790        primary_key = table_properties_copy.pop("PRIMARY_KEY", None)
791        if mergetree_engine and primary_key:
792            primary_key_vals = []
793            if isinstance(primary_key, (exp.Tuple, exp.Array)):
794                primary_key_vals = primary_key.expressions
795            if isinstance(ordered_by_raw, exp.Paren):
796                primary_key_vals = [primary_key.this]
797
798            if not primary_key_vals:
799                primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key]
800
801            properties.append(
802                exp.PrimaryKey(
803                    expressions=[
804                        exp.to_column(k.name if isinstance(k, exp.Literal) else k)
805                        for k in primary_key_vals
806                    ]
807                )
808            )
809
810        ttl = table_properties_copy.pop("TTL", None)
811        if ttl:
812            properties.append(
813                exp.MergeTreeTTL(
814                    expressions=[ttl if isinstance(ttl, exp.Expression) else exp.var(ttl)]
815                )
816            )
817
818        if (
819            partitioned_by
820            and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None
821        ):
822            properties.append(partitioned_by_prop)
823
824        if self.engine_run_mode.is_cluster:
825            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
826
827        if empty_ctas:
828            properties.append(exp.EmptyProperty())
829
830        if table_properties_copy:
831            properties.extend(
832                [self._build_settings_property(k, v) for k, v in table_properties_copy.items()]
833            )
834
835        if table_description:
836            properties.append(
837                exp.SchemaCommentProperty(
838                    this=exp.Literal.string(self._truncate_table_comment(table_description))
839                )
840            )
841
842        if properties:
843            return exp.Properties(expressions=properties)
844
845        return None
846
847    def _build_view_properties_exp(
848        self,
849        view_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
850        table_description: t.Optional[str] = None,
851        **kwargs: t.Any,
852    ) -> t.Optional[exp.Properties]:
853        """Creates a SQLGlot table properties expression for view"""
854        properties: t.List[exp.Expression] = []
855
856        view_properties_copy = view_properties.copy() if view_properties else {}
857
858        if self.engine_run_mode.is_cluster:
859            properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
860
861        if view_properties_copy:
862            properties.extend(
863                [self._build_settings_property(k, v) for k, v in view_properties_copy.items()]
864            )
865
866        if table_description:
867            properties.append(
868                exp.SchemaCommentProperty(
869                    this=exp.Literal.string(self._truncate_table_comment(table_description))
870                )
871            )
872
873        if properties:
874            return exp.Properties(expressions=properties)
875        return None
876
877    def _build_create_comment_table_exp(
878        self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any
879    ) -> exp.Comment | str:
880        table_sql = table.sql(dialect=self.dialect, identify=True)
881
882        truncated_comment = self._truncate_table_comment(table_comment)
883        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
884
885        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}"
886
887    def _build_create_comment_column_exp(
888        self,
889        table: exp.Table,
890        column_name: str,
891        column_comment: str,
892        table_kind: str = "TABLE",
893        **kwargs: t.Any,
894    ) -> exp.Comment | str:
895        table_sql = table.sql(dialect=self.dialect, identify=True)
896        column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True)
897
898        truncated_comment = self._truncate_table_comment(column_comment)
899        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
900
901        return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}"
902
903    def _on_cluster_sql(self) -> str:
904        if self.engine_run_mode.is_cluster:
905            cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect)  #  type: ignore
906            return f" ON CLUSTER {cluster_name} "
907        return ""

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
DIALECT = 'clickhouse'
SUPPORTS_TRANSACTIONS = False
SUPPORTS_VIEW_SCHEMA = False
SUPPORTS_REPLACE_TABLE = False
COMMENT_CREATION_VIEW = <CommentCreationView.COMMENT_COMMAND_ONLY: 4>
SCHEMA_DIFFER_KWARGS = {}
DEFAULT_TABLE_ENGINE = 'MergeTree'
ORDER_BY_TABLE_ENGINE_REGEX = '^.*?MergeTree.*$'
46    @property
47    def engine_run_mode(self) -> EngineRunMode:
48        if self._extra_config.get("cloud_mode"):
49            return EngineRunMode.CLOUD
50        # we use the user's specification of a cluster in the connection config to determine if
51        #   the engine is in cluster mode
52        if self._extra_config.get("cluster"):
53            return EngineRunMode.CLUSTER
54        return EngineRunMode.STANDALONE
cluster: Optional[str]
56    @property
57    def cluster(self) -> t.Optional[str]:
58        return self._extra_config.get("cluster")
def fetchone( self, query: Union[sqlglot.expressions.Expression, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> Tuple:
66    def fetchone(
67        self,
68        query: t.Union[exp.Expression, str],
69        ignore_unsupported_errors: bool = False,
70        quote_identifiers: bool = False,
71    ) -> t.Tuple:
72        with self.transaction():
73            self.execute(
74                query,
75                ignore_unsupported_errors=ignore_unsupported_errors,
76                quote_identifiers=quote_identifiers,
77            )
78            return self.cursor.fetchall()[0]
def create_schema( self, schema_name: Union[str, sqlglot.expressions.Table], ignore_if_exists: bool = True, warn_on_error: bool = True, properties: List[sqlglot.expressions.Expression] = []) -> None:
167    def create_schema(
168        self,
169        schema_name: SchemaName,
170        ignore_if_exists: bool = True,
171        warn_on_error: bool = True,
172        properties: t.List[exp.Expression] = [],
173    ) -> None:
174        """Create a Clickhouse database from a name or qualified table name.
175
176        Clickhouse has a two-level naming scheme [database].[table].
177        """
178        properties_copy = properties.copy()
179        if self.engine_run_mode.is_cluster:
180            properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
181
182        # can't call super() because it will try to set a catalog
183        return self._create_schema(
184            schema_name=schema_name,
185            ignore_if_exists=ignore_if_exists,
186            warn_on_error=warn_on_error,
187            properties=properties_copy,
188            # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message
189            kind="DATABASE",
190        )

Create a Clickhouse database from a name or qualified table name.

Clickhouse has a two-level naming scheme [database].[table].

def insert_overwrite_by_partition( self, table_name: Union[str, sqlglot.expressions.Table], query_or_df: <MagicMock id='126494348203616'>, partitioned_by: List[sqlglot.expressions.Expression], target_columns_to_types: Optional[Dict[str, sqlglot.expressions.DataType]] = None, source_columns: Optional[List[str]] = None) -> None:
440    def insert_overwrite_by_partition(
441        self,
442        table_name: TableName,
443        query_or_df: QueryOrDF,
444        partitioned_by: t.List[exp.Expression],
445        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
446        source_columns: t.Optional[t.List[str]] = None,
447    ) -> None:
448        source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
449            query_or_df,
450            target_columns_to_types,
451            target_table=table_name,
452            source_columns=source_columns,
453        )
454
455        self._insert_overwrite_by_condition(
456            table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False
457        )
def delete_from( self, table_name: Union[str, sqlglot.expressions.Table], where: Union[str, sqlglot.expressions.Expression]) -> None:
599    def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
600        delete_expr = exp.delete(table_name, where)
601        if self.engine_run_mode.is_cluster:
602            delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
603        self.execute(delete_expr)
def alter_table( self, alter_expressions: Union[List[sqlglot.expressions.Alter], List[sqlmesh.core.schema_diff.TableAlterOperation]]) -> None:
605    def alter_table(
606        self,
607        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
608    ) -> None:
609        """
610        Performs the alter statements to change the current table into the structure of the target table.
611        """
612        with self.transaction():
613            for alter_expression in [
614                x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
615            ]:
616                if self.engine_run_mode.is_cluster:
617                    alter_expression.set(
618                        "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
619                    )
620                self.execute(alter_expression)

Performs the alter statements to change the current table into the structure of the target table.

def ensure_nulls_for_unmatched_after_join( self, query: <MagicMock id='126494342496464'>) -> <MagicMock id='126494342496464'>:
660    def ensure_nulls_for_unmatched_after_join(
661        self,
662        query: Query,
663    ) -> Query:
664        # Set `join_use_nulls = 1` in a query's SETTINGS clause
665        query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1")))
666        return query
def use_server_nulls_for_unmatched_after_join( self, query: <MagicMock id='126494342496464'>) -> <MagicMock id='126494342496464'>:
668    def use_server_nulls_for_unmatched_after_join(
669        self,
670        query: Query,
671    ) -> Query:
672        # Set the `join_use_nulls` server value in a query's SETTINGS clause
673        #
674        # Use in SCD models:
675        #  - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join
676        #      are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`.
677        #  - The SCD embeds the user's original query in the `source` CTE
678        #  - Settings are dynamically scoped, so our setting may override the server's default setting the user expects
679        #      for their query.
680        #  - To prevent this, we:
681        #     - If the user query sets `join_use_nulls`, we do nothing
682        #     - If the user query does not set `join_use_nulls`, we query the server for the current setting
683        #       - If the server value is 1, we do nothing
684        #       - If the server values is not 1, we inject its `join_use_nulls` value into the user query
685        #     - We do not need to check user subqueries because our injected setting operates at the same scope the
686        #         server value would normally operate at
687        setting_name = "join_use_nulls"
688        setting_value = "1"
689
690        user_settings = query.args.get("settings")
691        # if user has not already set it explicitly
692        if not (
693            user_settings
694            and any(
695                [
696                    isinstance(setting, exp.EQ) and setting.name == setting_name
697                    for setting in user_settings
698                ]
699            )
700        ):
701            server_value = self.fetchone(
702                exp.select("value")
703                .from_("system.settings")
704                .where(exp.column("name").eq(exp.Literal.string(setting_name)))
705            )[0]
706            # only inject the setting if the server value isn't 1
707            inject_setting = setting_value != server_value
708            setting_value = server_value if inject_setting else setting_value
709
710            if inject_setting:
711                query.append(
712                    "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value))
713                )
714
715        return query
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
COMMENT_CREATION_TABLE
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_QUERY_EXECUTION_TRACKING
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
create_view
drop_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
insert_append
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ping
get_table_last_modified_ts
sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport
SUPPORTS_INDEXES
sqlmesh.core.engine_adapter.mixins.LogicalMergeMixin
merge