sqlmesh.core.engine_adapter.clickhouse
1from __future__ import annotations 2 3import typing as t 4import logging 5import re 6from sqlglot import exp, maybe_parse 7from sqlmesh.core.dialect import to_schema 8from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin 9from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport 10from sqlmesh.core.engine_adapter.shared import ( 11 DataObject, 12 DataObjectType, 13 EngineRunMode, 14 SourceQuery, 15 CommentCreationView, 16 InsertOverwriteStrategy, 17) 18from sqlmesh.core.schema_diff import TableAlterOperation 19from sqlmesh.utils import get_source_columns_to_types 20 21if t.TYPE_CHECKING: 22 import pandas as pd 23 24 from sqlmesh.core._typing import SchemaName, TableName 25 from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF 26 27 from sqlmesh.core.node import IntervalUnit 28 29 30logger = logging.getLogger(__name__) 31 32 33class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): 34 DIALECT = "clickhouse" 35 SUPPORTS_TRANSACTIONS = False 36 SUPPORTS_VIEW_SCHEMA = False 37 SUPPORTS_REPLACE_TABLE = False 38 COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY 39 40 SCHEMA_DIFFER_KWARGS = {} 41 42 DEFAULT_TABLE_ENGINE = "MergeTree" 43 ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" 44 45 @property 46 def engine_run_mode(self) -> EngineRunMode: 47 if self._extra_config.get("cloud_mode"): 48 return EngineRunMode.CLOUD 49 # we use the user's specification of a cluster in the connection config to determine if 50 # the engine is in cluster mode 51 if self._extra_config.get("cluster"): 52 return EngineRunMode.CLUSTER 53 return EngineRunMode.STANDALONE 54 55 @property 56 def cluster(self) -> t.Optional[str]: 57 return self._extra_config.get("cluster") 58 59 # Workaround for clickhouse-connect cursor bug 60 # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()` 61 # return the wrong (or no) rows after the very first cursor query that returns rows 62 # in the connection 63 # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it 64 # doesn't use the row index at all 65 def fetchone( 66 self, 67 query: t.Union[exp.Expression, str], 68 ignore_unsupported_errors: bool = False, 69 quote_identifiers: bool = False, 70 ) -> t.Tuple: 71 with self.transaction(): 72 self.execute( 73 query, 74 ignore_unsupported_errors=ignore_unsupported_errors, 75 quote_identifiers=quote_identifiers, 76 ) 77 return self.cursor.fetchall()[0] 78 79 def _fetch_native_df( 80 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 81 ) -> pd.DataFrame: 82 """Fetches a Pandas DataFrame from the cursor""" 83 return self.cursor.client.query_df( 84 self._to_sql(query, quote=quote_identifiers) 85 if isinstance(query, exp.Expression) 86 else query, 87 use_extended_dtypes=True, 88 ) 89 90 def _df_to_source_queries( 91 self, 92 df: DF, 93 target_columns_to_types: t.Dict[str, exp.DataType], 94 batch_size: int, 95 target_table: TableName, 96 source_columns: t.Optional[t.List[str]] = None, 97 **kwargs: t.Any, 98 ) -> t.List[SourceQuery]: 99 temp_table = self._get_temp_table(target_table, **kwargs) 100 source_columns_to_types = get_source_columns_to_types( 101 target_columns_to_types, source_columns 102 ) 103 104 def query_factory() -> Query: 105 # It is possible for the factory to be called multiple times and if so then the temp table will already 106 # be created so we skip creating again. This means we are assuming the first call is the same result 107 # as later calls. 108 if not self.table_exists(temp_table): 109 self.create_table( 110 temp_table, 111 source_columns_to_types, 112 storage_format=exp.var("MergeTree"), 113 **kwargs, 114 ) 115 ordered_df = df[list(source_columns_to_types)] 116 117 self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df) 118 119 return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_( 120 temp_table 121 ) 122 123 return [ 124 SourceQuery( 125 query_factory=query_factory, 126 cleanup_func=lambda: self.drop_table(temp_table, **kwargs), 127 ) 128 ] 129 130 def _get_data_objects( 131 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 132 ) -> t.List[DataObject]: 133 """ 134 Returns all the data objects that exist in the given database. 135 """ 136 query = ( 137 exp.select( 138 exp.column("database").as_("schema_name"), 139 exp.column("name"), 140 exp.case(exp.column("engine")) 141 .when( 142 exp.Literal.string("View"), 143 exp.Literal.string("view"), 144 ) 145 .else_( 146 exp.Literal.string("table"), 147 ) 148 .as_("type"), 149 ) 150 .from_("system.tables") 151 .where(exp.column("database").eq(to_schema(schema_name).db)) 152 ) 153 if object_names: 154 query = query.where(exp.column("name").isin(*object_names)) 155 df = self.fetchdf(query) 156 return [ 157 DataObject( 158 catalog=None, 159 schema=row.schema_name, 160 name=row.name, 161 type=DataObjectType.from_str(row.type), # type: ignore 162 ) 163 for row in df.itertuples() 164 ] 165 166 def create_schema( 167 self, 168 schema_name: SchemaName, 169 ignore_if_exists: bool = True, 170 warn_on_error: bool = True, 171 properties: t.List[exp.Expression] = [], 172 ) -> None: 173 """Create a Clickhouse database from a name or qualified table name. 174 175 Clickhouse has a two-level naming scheme [database].[table]. 176 """ 177 properties_copy = properties.copy() 178 if self.engine_run_mode.is_cluster: 179 properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 180 181 # can't call super() because it will try to set a catalog 182 return self._create_schema( 183 schema_name=schema_name, 184 ignore_if_exists=ignore_if_exists, 185 warn_on_error=warn_on_error, 186 properties=properties_copy, 187 # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message 188 kind="DATABASE", 189 ) 190 191 def _insert_overwrite_by_condition( 192 self, 193 table_name: TableName, 194 source_queries: t.List[SourceQuery], 195 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 196 where: t.Optional[exp.Condition] = None, 197 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 198 **kwargs: t.Any, 199 ) -> None: 200 """ 201 Implements the table or partition swap approach to insert-overwriting records. 202 203 Because this method executes multiple variants (full table replace, replace by time 204 range, replace by key, replace by partition), some upstream caller info is needed and 205 passed via kwargs. 206 207 Args: 208 table_name: Name of target table 209 source_queries: Source queries returning records to insert 210 target_columns_to_types: Column names and data types of target table 211 where: SQLGlot expression determining which target table rows should be overwritten 212 insert_overwrite_strategy_override: Not used by Clickhouse 213 kwargs: 214 dynamic_key: Key columns (replace by key only) 215 dynamic_key_exp: Expression to build key (replace by key only) 216 dynamic_key_unique: Whether more than one record can exist per key value (replace by key only) 217 218 keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only) 219 220 Returns: 221 Side effects only: execution of insert-overwrite operation. 222 """ 223 target_table = exp.to_table(table_name) 224 target_columns_to_types = target_columns_to_types or self.columns(target_table) 225 226 temp_table = self._get_temp_table(target_table) 227 self.create_table_like(temp_table, target_table) 228 229 # REPLACE BY KEY: extract kwargs if present 230 dynamic_key = kwargs.get("dynamic_key") 231 if dynamic_key: 232 dynamic_key_exp = t.cast(exp.Expression, kwargs.get("dynamic_key_exp")) 233 dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique")) 234 235 try: 236 # insert new records into temp table 237 for source_query in source_queries: 238 with source_query as query: 239 # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key 240 if dynamic_key and dynamic_key_unique: 241 query = query.distinct(*dynamic_key) # type: ignore 242 243 query = self._order_projections_and_filter( 244 query, target_columns_to_types, where=where 245 ) 246 self._insert_append_query( 247 temp_table, 248 query, 249 target_columns_to_types=target_columns_to_types, 250 order_projections=False, 251 ) 252 253 # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)" 254 if dynamic_key: 255 key_query = exp.select(dynamic_key_exp).from_(temp_table) 256 if not dynamic_key_unique: 257 key_query = key_query.distinct() 258 where = dynamic_key_exp.isin(query=key_query) 259 260 # get target table partition key to confirm it's actually partitioned 261 table_partition_exp = self.fetchone( 262 exp.select("partition_key") 263 .from_("system.tables") 264 .where( 265 exp.column("database").eq(target_table.db), 266 exp.column("name").eq(target_table.name), 267 ) 268 ) 269 270 all_affected_partitions: t.Set[str] = set() 271 272 if where: 273 # identify existing records to keep by inverting the delete `where` clause 274 existing_records_insert_exp = exp.insert( 275 self._select_columns(target_columns_to_types) 276 .from_(target_table) 277 .where(exp.paren(expression=where).not_()), 278 temp_table, 279 ) 280 281 # if target table is partitioned, modify insert expression to only insert 282 # existing records that are in one of the affected partitions 283 if table_partition_exp: 284 partitions_temp_table_name = self._get_temp_table( 285 exp.to_table(f"{target_table.db}._affected_partitions") 286 ) 287 all_affected_partitions, existing_records_insert_exp = ( 288 self._get_affected_partitions_and_insert_exp( 289 target_table, 290 temp_table, 291 where, 292 existing_records_insert_exp, 293 partitions_temp_table_name, 294 ) 295 ) 296 297 try: 298 self.execute(existing_records_insert_exp, track_rows_processed=True) 299 finally: 300 if table_partition_exp: 301 self.drop_table(partitions_temp_table_name) 302 303 # process by partition if: 304 # 1. The table is partitioned AND 305 # (2a. There are existing records to keep (`where`) OR 306 # 2b. We're overwriting existing partition rows (incremental by partition model)) 307 if table_partition_exp and ( 308 where or kwargs.get("keep_existing_partition_rows") is False 309 ): 310 # only replace partitions that have records in temp_table 311 partitions_to_replace = self._get_partition_ids(temp_table) 312 313 # drop affected partitions that have no records in temp_table 314 # - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False 315 # because previous code block is skipped 316 partitions_to_drop = all_affected_partitions - partitions_to_replace 317 318 if partitions_to_replace or partitions_to_drop: 319 self.alter_table( 320 [ 321 self._build_alter_partition_exp( 322 target_table, temp_table, partitions_to_replace, partitions_to_drop 323 ) 324 ] 325 ) 326 else: 327 self._exchange_tables(target_table, temp_table) 328 finally: 329 self.drop_table(temp_table) 330 331 def _get_affected_partitions_and_insert_exp( 332 self, 333 target_table: exp.Table, 334 temp_table: exp.Table, 335 where: exp.Condition, 336 existing_records_insert_exp: exp.Insert, 337 partitions_temp_table_name: exp.Table, 338 ) -> tuple[t.Set[str], exp.Insert]: 339 # identify all affected partition IDs 340 # - store in temp table so we can reuse results 341 self.ctas( 342 partitions_temp_table_name, 343 exp.select("partition_id") 344 .distinct() 345 .from_( 346 exp.union( 347 # target table partitions with records in `where` 348 exp.select(exp.column("_partition_id").as_("partition_id")) 349 .from_(target_table) 350 .where(where), 351 # temp table partitions with new records to insert 352 exp.select( 353 exp.column("_partition_id").as_("partition_id"), 354 ).from_(temp_table), 355 ).subquery("_affected_partitions") 356 ), 357 ) 358 359 # read all affected partition IDs into memory 360 all_affected_partitions = self._get_partition_ids( 361 partitions_temp_table_name, "partition_id" 362 ) 363 364 # limit existing records insert expression WHERE to affected target table partitions 365 # by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)` 366 existing_records_insert_exp.set( 367 "expression", 368 existing_records_insert_exp.expression.where( 369 exp.column("_partition_id").isin( 370 exp.select("partition_id").from_(partitions_temp_table_name) 371 ) 372 ), 373 ) 374 375 return all_affected_partitions, existing_records_insert_exp 376 377 def _build_alter_partition_exp( 378 self, 379 target_table: exp.Table, 380 temp_table: exp.Table, 381 partitions_to_replace: t.Set[str], 382 partitions_to_drop: t.Set[str], 383 ) -> exp.Alter: 384 alter_expr = exp.Alter(this=target_table, kind="TABLE") 385 386 for partition in partitions_to_replace: 387 alter_expr.append( 388 "actions", 389 exp.ReplacePartition( 390 expression=exp.Partition( 391 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 392 ), 393 source=temp_table, 394 ), 395 ) 396 397 for partition in partitions_to_drop: 398 alter_expr.append( 399 "actions", 400 exp.DropPartition( 401 expressions=[ 402 exp.Partition( 403 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 404 ) 405 ], 406 source=temp_table, 407 ), 408 ) 409 410 return alter_expr 411 412 def _replace_by_key( 413 self, 414 target_table: TableName, 415 source_table: QueryOrDF, 416 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 417 key: t.Sequence[exp.Expression], 418 is_unique_key: bool, 419 source_columns: t.Optional[t.List[str]] = None, 420 ) -> None: 421 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 422 source_table, 423 target_columns_to_types, 424 target_table=target_table, 425 source_columns=source_columns, 426 ) 427 428 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 429 430 self._insert_overwrite_by_condition( 431 target_table, 432 source_queries, 433 target_columns_to_types, 434 dynamic_key=key, 435 dynamic_key_exp=key_exp, 436 dynamic_key_unique=is_unique_key, 437 ) 438 439 def insert_overwrite_by_partition( 440 self, 441 table_name: TableName, 442 query_or_df: QueryOrDF, 443 partitioned_by: t.List[exp.Expression], 444 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 445 source_columns: t.Optional[t.List[str]] = None, 446 ) -> None: 447 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 448 query_or_df, 449 target_columns_to_types, 450 target_table=table_name, 451 source_columns=source_columns, 452 ) 453 454 self._insert_overwrite_by_condition( 455 table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False 456 ) 457 458 def _create_table_like( 459 self, 460 target_table_name: TableName, 461 source_table_name: TableName, 462 exists: bool, 463 **kwargs: t.Any, 464 ) -> None: 465 """Create table with identical structure as source table""" 466 self.execute( 467 f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}" 468 ) 469 470 def _get_partition_ids( 471 self, 472 table: exp.Table, 473 partition_col_name: str = "_partition_id", 474 where: t.Optional[exp.Condition] = None, 475 limit: t.Optional[int] = None, 476 ) -> t.Set[t.Any]: 477 """List partition IDs present in table""" 478 partitions_query = exp.select(partition_col_name).distinct().from_(table) 479 if where: 480 partitions_query = partitions_query.where(where) 481 if limit: 482 partitions_query = partitions_query.limit(limit) 483 partitions = self.fetchall(partitions_query) 484 485 return set([part[0] for part in partitions] if partitions else []) 486 487 def _create_table( 488 self, 489 table_name_or_schema: t.Union[exp.Schema, TableName], 490 expression: t.Optional[exp.Expression], 491 exists: bool = True, 492 replace: bool = False, 493 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 494 table_description: t.Optional[str] = None, 495 column_descriptions: t.Optional[t.Dict[str, str]] = None, 496 table_kind: t.Optional[str] = None, 497 track_rows_processed: bool = True, 498 **kwargs: t.Any, 499 ) -> None: 500 """Creates a table in the database. 501 502 Clickhouse Cloud requires doing CTAS in two steps. 503 504 First, we add the `EMPTY` property to the CTAS call to create a table with the proper 505 schema, then insert the data with the CTAS query. 506 """ 507 # ensure columns used for partitioning are non-Nullable 508 # - normally user's responsibility, but we automatically partition by time column in 509 # incremental by time models 510 if kwargs.get("partitioned_by"): 511 partition_cols = [ 512 col.name 513 for part_expr in kwargs["partitioned_by"] 514 for col in part_expr.find_all(exp.Column) 515 ] 516 if isinstance(table_name_or_schema, exp.Schema): 517 for coldef in table_name_or_schema.expressions: 518 if coldef.name in partition_cols: 519 coldef.kind.set("nullable", False) 520 if target_columns_to_types: 521 for col in partition_cols: 522 target_columns_to_types[col].set("nullable", False) 523 524 super()._create_table( 525 table_name_or_schema, 526 expression, 527 exists, 528 replace, 529 target_columns_to_types, 530 table_description, 531 column_descriptions, 532 table_kind, 533 empty_ctas=(self.engine_run_mode.is_cloud and expression is not None), 534 track_rows_processed=track_rows_processed, 535 **kwargs, 536 ) 537 538 # execute the second INSERT step if on cloud and creating a table 539 # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0 540 # returns a success code but malformed response 541 if ( 542 self.engine_run_mode.is_cloud 543 and table_kind != "VIEW" 544 and expression 545 and not ( 546 expression.args.get("limit") is not None 547 and expression.args["limit"].expression.this == "0" 548 ) 549 ): 550 table_name = ( 551 table_name_or_schema.this 552 if isinstance(table_name_or_schema, exp.Schema) 553 else table_name_or_schema 554 ) 555 self._insert_append_query( 556 table_name, 557 expression, # type: ignore 558 target_columns_to_types or self.columns(table_name), 559 ) 560 561 def _exchange_tables( 562 self, 563 old_table_name: TableName, 564 new_table_name: TableName, 565 ) -> None: 566 from clickhouse_connect.driver.exceptions import DatabaseError # type: ignore 567 568 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 569 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 570 571 try: 572 self.execute( 573 f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}" 574 ) 575 except DatabaseError as e: 576 if "NOT_IMPLEMENTED" in str(e): 577 # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges, 578 # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead. 579 # 580 # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported 581 # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent 582 # and does not require defining an additional method. 583 throwaway_table_name = self._get_temp_table(old_table_name) 584 self._rename_table(old_table_name, throwaway_table_name) 585 self._rename_table(new_table_name, old_table_name) 586 self.drop_table(throwaway_table_name) 587 588 def _rename_table( 589 self, 590 old_table_name: TableName, 591 new_table_name: TableName, 592 ) -> None: 593 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 594 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 595 596 self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}") 597 598 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None: 599 delete_expr = exp.delete(table_name, where) 600 if self.engine_run_mode.is_cluster: 601 delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))) 602 self.execute(delete_expr) 603 604 def alter_table( 605 self, 606 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 607 ) -> None: 608 """ 609 Performs the alter statements to change the current table into the structure of the target table. 610 """ 611 with self.transaction(): 612 for alter_expression in [ 613 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 614 ]: 615 if self.engine_run_mode.is_cluster: 616 alter_expression.set( 617 "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) 618 ) 619 self.execute(alter_expression) 620 621 def _drop_object( 622 self, 623 name: TableName | SchemaName, 624 exists: bool = True, 625 kind: str = "TABLE", 626 cascade: bool = False, 627 **drop_args: t.Any, 628 ) -> None: 629 """Drops an object. 630 631 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 632 633 Args: 634 name: The name of the table to drop. 635 exists: If exists, defaults to True. 636 kind: What kind of object to drop. Defaults to TABLE 637 **drop_args: Any extra arguments to set on the Drop expression 638 """ 639 super()._drop_object( 640 name=name, 641 exists=exists, 642 kind=kind, 643 cascade=cascade, 644 cluster=exp.OnCluster(this=exp.to_identifier(self.cluster)) 645 if self.engine_run_mode.is_cluster 646 else None, 647 **drop_args, 648 ) 649 650 def _build_partitioned_by_exp( 651 self, 652 partitioned_by: t.List[exp.Expression], 653 **kwargs: t.Any, 654 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 655 return exp.PartitionedByProperty( 656 this=exp.Schema(expressions=partitioned_by), 657 ) 658 659 def ensure_nulls_for_unmatched_after_join( 660 self, 661 query: Query, 662 ) -> Query: 663 # Set `join_use_nulls = 1` in a query's SETTINGS clause 664 query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1"))) 665 return query 666 667 def use_server_nulls_for_unmatched_after_join( 668 self, 669 query: Query, 670 ) -> Query: 671 # Set the `join_use_nulls` server value in a query's SETTINGS clause 672 # 673 # Use in SCD models: 674 # - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join 675 # are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`. 676 # - The SCD embeds the user's original query in the `source` CTE 677 # - Settings are dynamically scoped, so our setting may override the server's default setting the user expects 678 # for their query. 679 # - To prevent this, we: 680 # - If the user query sets `join_use_nulls`, we do nothing 681 # - If the user query does not set `join_use_nulls`, we query the server for the current setting 682 # - If the server value is 1, we do nothing 683 # - If the server values is not 1, we inject its `join_use_nulls` value into the user query 684 # - We do not need to check user subqueries because our injected setting operates at the same scope the 685 # server value would normally operate at 686 setting_name = "join_use_nulls" 687 setting_value = "1" 688 689 user_settings = query.args.get("settings") 690 # if user has not already set it explicitly 691 if not ( 692 user_settings 693 and any( 694 [ 695 isinstance(setting, exp.EQ) and setting.name == setting_name 696 for setting in user_settings 697 ] 698 ) 699 ): 700 server_value = self.fetchone( 701 exp.select("value") 702 .from_("system.settings") 703 .where(exp.column("name").eq(exp.Literal.string(setting_name))) 704 )[0] 705 # only inject the setting if the server value isn't 1 706 inject_setting = setting_value != server_value 707 setting_value = server_value if inject_setting else setting_value 708 709 if inject_setting: 710 query.append( 711 "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value)) 712 ) 713 714 return query 715 716 def _build_settings_property( 717 self, key: str, value: exp.Expression | str | int | float 718 ) -> exp.SettingsProperty: 719 return exp.SettingsProperty( 720 expressions=[ 721 exp.EQ( 722 this=exp.var(key.lower()), 723 expression=value 724 if isinstance(value, exp.Expression) 725 else exp.Literal(this=value, is_string=isinstance(value, str)), 726 ) 727 ] 728 ) 729 730 def _build_table_properties_exp( 731 self, 732 catalog_name: t.Optional[str] = None, 733 table_format: t.Optional[str] = None, 734 storage_format: t.Optional[str] = None, 735 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 736 partition_interval_unit: t.Optional[IntervalUnit] = None, 737 clustered_by: t.Optional[t.List[exp.Expression]] = None, 738 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 739 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 740 table_description: t.Optional[str] = None, 741 table_kind: t.Optional[str] = None, 742 empty_ctas: bool = False, 743 **kwargs: t.Any, 744 ) -> t.Optional[exp.Properties]: 745 properties: t.List[exp.Expression] = [] 746 747 table_engine = self.DEFAULT_TABLE_ENGINE 748 if storage_format: 749 table_engine = ( 750 storage_format.this if isinstance(storage_format, exp.Var) else storage_format # type: ignore 751 ) 752 properties.append(exp.EngineProperty(this=table_engine)) 753 754 # copy of table_properties so we can pop items off below then consume the rest later 755 table_properties_copy = { 756 k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items() 757 } 758 759 mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine)) 760 ordered_by_raw = table_properties_copy.pop("ORDER_BY", None) 761 if mergetree_engine: 762 ordered_by_exprs = [] 763 if ordered_by_raw: 764 ordered_by_vals = [] 765 766 if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)): 767 ordered_by_vals = ordered_by_raw.expressions 768 if isinstance(ordered_by_raw, exp.Paren): 769 ordered_by_vals = [ordered_by_raw.this] 770 771 if not ordered_by_vals: 772 ordered_by_vals = ( 773 ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw] 774 ) 775 776 for col in ordered_by_vals: 777 ordered_by_exprs.append( 778 col 779 if isinstance(col, exp.Column) 780 else maybe_parse( 781 col.name if isinstance(col, exp.Literal) else col, 782 dialect=self.dialect, 783 into=exp.Ordered, 784 ) 785 ) 786 787 properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)])) 788 789 primary_key = table_properties_copy.pop("PRIMARY_KEY", None) 790 if mergetree_engine and primary_key: 791 primary_key_vals = [] 792 if isinstance(primary_key, (exp.Tuple, exp.Array)): 793 primary_key_vals = primary_key.expressions 794 if isinstance(ordered_by_raw, exp.Paren): 795 primary_key_vals = [primary_key.this] 796 797 if not primary_key_vals: 798 primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key] 799 800 properties.append( 801 exp.PrimaryKey( 802 expressions=[ 803 exp.to_column(k.name if isinstance(k, exp.Literal) else k) 804 for k in primary_key_vals 805 ] 806 ) 807 ) 808 809 ttl = table_properties_copy.pop("TTL", None) 810 if ttl: 811 properties.append( 812 exp.MergeTreeTTL( 813 expressions=[ttl if isinstance(ttl, exp.Expression) else exp.var(ttl)] 814 ) 815 ) 816 817 if ( 818 partitioned_by 819 and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None 820 ): 821 properties.append(partitioned_by_prop) 822 823 if self.engine_run_mode.is_cluster: 824 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 825 826 if empty_ctas: 827 properties.append(exp.EmptyProperty()) 828 829 if table_properties_copy: 830 properties.extend( 831 [self._build_settings_property(k, v) for k, v in table_properties_copy.items()] 832 ) 833 834 if table_description: 835 properties.append( 836 exp.SchemaCommentProperty( 837 this=exp.Literal.string(self._truncate_table_comment(table_description)) 838 ) 839 ) 840 841 if properties: 842 return exp.Properties(expressions=properties) 843 844 return None 845 846 def _build_view_properties_exp( 847 self, 848 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 849 table_description: t.Optional[str] = None, 850 **kwargs: t.Any, 851 ) -> t.Optional[exp.Properties]: 852 """Creates a SQLGlot table properties expression for view""" 853 properties: t.List[exp.Expression] = [] 854 855 view_properties_copy = view_properties.copy() if view_properties else {} 856 857 if self.engine_run_mode.is_cluster: 858 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 859 860 if view_properties_copy: 861 properties.extend( 862 [self._build_settings_property(k, v) for k, v in view_properties_copy.items()] 863 ) 864 865 if table_description: 866 properties.append( 867 exp.SchemaCommentProperty( 868 this=exp.Literal.string(self._truncate_table_comment(table_description)) 869 ) 870 ) 871 872 if properties: 873 return exp.Properties(expressions=properties) 874 return None 875 876 def _build_create_comment_table_exp( 877 self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any 878 ) -> exp.Comment | str: 879 table_sql = table.sql(dialect=self.dialect, identify=True) 880 881 truncated_comment = self._truncate_table_comment(table_comment) 882 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 883 884 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}" 885 886 def _build_create_comment_column_exp( 887 self, 888 table: exp.Table, 889 column_name: str, 890 column_comment: str, 891 table_kind: str = "TABLE", 892 **kwargs: t.Any, 893 ) -> exp.Comment | str: 894 table_sql = table.sql(dialect=self.dialect, identify=True) 895 column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True) 896 897 truncated_comment = self._truncate_table_comment(column_comment) 898 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 899 900 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}" 901 902 def _on_cluster_sql(self) -> str: 903 if self.engine_run_mode.is_cluster: 904 cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect) # type: ignore 905 return f" ON CLUSTER {cluster_name} " 906 return ""
logger =
<Logger sqlmesh.core.engine_adapter.clickhouse (WARNING)>
class
ClickhouseEngineAdapter(sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport, sqlmesh.core.engine_adapter.mixins.LogicalMergeMixin):
34class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): 35 DIALECT = "clickhouse" 36 SUPPORTS_TRANSACTIONS = False 37 SUPPORTS_VIEW_SCHEMA = False 38 SUPPORTS_REPLACE_TABLE = False 39 COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY 40 41 SCHEMA_DIFFER_KWARGS = {} 42 43 DEFAULT_TABLE_ENGINE = "MergeTree" 44 ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" 45 46 @property 47 def engine_run_mode(self) -> EngineRunMode: 48 if self._extra_config.get("cloud_mode"): 49 return EngineRunMode.CLOUD 50 # we use the user's specification of a cluster in the connection config to determine if 51 # the engine is in cluster mode 52 if self._extra_config.get("cluster"): 53 return EngineRunMode.CLUSTER 54 return EngineRunMode.STANDALONE 55 56 @property 57 def cluster(self) -> t.Optional[str]: 58 return self._extra_config.get("cluster") 59 60 # Workaround for clickhouse-connect cursor bug 61 # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()` 62 # return the wrong (or no) rows after the very first cursor query that returns rows 63 # in the connection 64 # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it 65 # doesn't use the row index at all 66 def fetchone( 67 self, 68 query: t.Union[exp.Expression, str], 69 ignore_unsupported_errors: bool = False, 70 quote_identifiers: bool = False, 71 ) -> t.Tuple: 72 with self.transaction(): 73 self.execute( 74 query, 75 ignore_unsupported_errors=ignore_unsupported_errors, 76 quote_identifiers=quote_identifiers, 77 ) 78 return self.cursor.fetchall()[0] 79 80 def _fetch_native_df( 81 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 82 ) -> pd.DataFrame: 83 """Fetches a Pandas DataFrame from the cursor""" 84 return self.cursor.client.query_df( 85 self._to_sql(query, quote=quote_identifiers) 86 if isinstance(query, exp.Expression) 87 else query, 88 use_extended_dtypes=True, 89 ) 90 91 def _df_to_source_queries( 92 self, 93 df: DF, 94 target_columns_to_types: t.Dict[str, exp.DataType], 95 batch_size: int, 96 target_table: TableName, 97 source_columns: t.Optional[t.List[str]] = None, 98 **kwargs: t.Any, 99 ) -> t.List[SourceQuery]: 100 temp_table = self._get_temp_table(target_table, **kwargs) 101 source_columns_to_types = get_source_columns_to_types( 102 target_columns_to_types, source_columns 103 ) 104 105 def query_factory() -> Query: 106 # It is possible for the factory to be called multiple times and if so then the temp table will already 107 # be created so we skip creating again. This means we are assuming the first call is the same result 108 # as later calls. 109 if not self.table_exists(temp_table): 110 self.create_table( 111 temp_table, 112 source_columns_to_types, 113 storage_format=exp.var("MergeTree"), 114 **kwargs, 115 ) 116 ordered_df = df[list(source_columns_to_types)] 117 118 self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df) 119 120 return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_( 121 temp_table 122 ) 123 124 return [ 125 SourceQuery( 126 query_factory=query_factory, 127 cleanup_func=lambda: self.drop_table(temp_table, **kwargs), 128 ) 129 ] 130 131 def _get_data_objects( 132 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 133 ) -> t.List[DataObject]: 134 """ 135 Returns all the data objects that exist in the given database. 136 """ 137 query = ( 138 exp.select( 139 exp.column("database").as_("schema_name"), 140 exp.column("name"), 141 exp.case(exp.column("engine")) 142 .when( 143 exp.Literal.string("View"), 144 exp.Literal.string("view"), 145 ) 146 .else_( 147 exp.Literal.string("table"), 148 ) 149 .as_("type"), 150 ) 151 .from_("system.tables") 152 .where(exp.column("database").eq(to_schema(schema_name).db)) 153 ) 154 if object_names: 155 query = query.where(exp.column("name").isin(*object_names)) 156 df = self.fetchdf(query) 157 return [ 158 DataObject( 159 catalog=None, 160 schema=row.schema_name, 161 name=row.name, 162 type=DataObjectType.from_str(row.type), # type: ignore 163 ) 164 for row in df.itertuples() 165 ] 166 167 def create_schema( 168 self, 169 schema_name: SchemaName, 170 ignore_if_exists: bool = True, 171 warn_on_error: bool = True, 172 properties: t.List[exp.Expression] = [], 173 ) -> None: 174 """Create a Clickhouse database from a name or qualified table name. 175 176 Clickhouse has a two-level naming scheme [database].[table]. 177 """ 178 properties_copy = properties.copy() 179 if self.engine_run_mode.is_cluster: 180 properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 181 182 # can't call super() because it will try to set a catalog 183 return self._create_schema( 184 schema_name=schema_name, 185 ignore_if_exists=ignore_if_exists, 186 warn_on_error=warn_on_error, 187 properties=properties_copy, 188 # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message 189 kind="DATABASE", 190 ) 191 192 def _insert_overwrite_by_condition( 193 self, 194 table_name: TableName, 195 source_queries: t.List[SourceQuery], 196 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 197 where: t.Optional[exp.Condition] = None, 198 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 199 **kwargs: t.Any, 200 ) -> None: 201 """ 202 Implements the table or partition swap approach to insert-overwriting records. 203 204 Because this method executes multiple variants (full table replace, replace by time 205 range, replace by key, replace by partition), some upstream caller info is needed and 206 passed via kwargs. 207 208 Args: 209 table_name: Name of target table 210 source_queries: Source queries returning records to insert 211 target_columns_to_types: Column names and data types of target table 212 where: SQLGlot expression determining which target table rows should be overwritten 213 insert_overwrite_strategy_override: Not used by Clickhouse 214 kwargs: 215 dynamic_key: Key columns (replace by key only) 216 dynamic_key_exp: Expression to build key (replace by key only) 217 dynamic_key_unique: Whether more than one record can exist per key value (replace by key only) 218 219 keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only) 220 221 Returns: 222 Side effects only: execution of insert-overwrite operation. 223 """ 224 target_table = exp.to_table(table_name) 225 target_columns_to_types = target_columns_to_types or self.columns(target_table) 226 227 temp_table = self._get_temp_table(target_table) 228 self.create_table_like(temp_table, target_table) 229 230 # REPLACE BY KEY: extract kwargs if present 231 dynamic_key = kwargs.get("dynamic_key") 232 if dynamic_key: 233 dynamic_key_exp = t.cast(exp.Expression, kwargs.get("dynamic_key_exp")) 234 dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique")) 235 236 try: 237 # insert new records into temp table 238 for source_query in source_queries: 239 with source_query as query: 240 # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key 241 if dynamic_key and dynamic_key_unique: 242 query = query.distinct(*dynamic_key) # type: ignore 243 244 query = self._order_projections_and_filter( 245 query, target_columns_to_types, where=where 246 ) 247 self._insert_append_query( 248 temp_table, 249 query, 250 target_columns_to_types=target_columns_to_types, 251 order_projections=False, 252 ) 253 254 # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)" 255 if dynamic_key: 256 key_query = exp.select(dynamic_key_exp).from_(temp_table) 257 if not dynamic_key_unique: 258 key_query = key_query.distinct() 259 where = dynamic_key_exp.isin(query=key_query) 260 261 # get target table partition key to confirm it's actually partitioned 262 table_partition_exp = self.fetchone( 263 exp.select("partition_key") 264 .from_("system.tables") 265 .where( 266 exp.column("database").eq(target_table.db), 267 exp.column("name").eq(target_table.name), 268 ) 269 ) 270 271 all_affected_partitions: t.Set[str] = set() 272 273 if where: 274 # identify existing records to keep by inverting the delete `where` clause 275 existing_records_insert_exp = exp.insert( 276 self._select_columns(target_columns_to_types) 277 .from_(target_table) 278 .where(exp.paren(expression=where).not_()), 279 temp_table, 280 ) 281 282 # if target table is partitioned, modify insert expression to only insert 283 # existing records that are in one of the affected partitions 284 if table_partition_exp: 285 partitions_temp_table_name = self._get_temp_table( 286 exp.to_table(f"{target_table.db}._affected_partitions") 287 ) 288 all_affected_partitions, existing_records_insert_exp = ( 289 self._get_affected_partitions_and_insert_exp( 290 target_table, 291 temp_table, 292 where, 293 existing_records_insert_exp, 294 partitions_temp_table_name, 295 ) 296 ) 297 298 try: 299 self.execute(existing_records_insert_exp, track_rows_processed=True) 300 finally: 301 if table_partition_exp: 302 self.drop_table(partitions_temp_table_name) 303 304 # process by partition if: 305 # 1. The table is partitioned AND 306 # (2a. There are existing records to keep (`where`) OR 307 # 2b. We're overwriting existing partition rows (incremental by partition model)) 308 if table_partition_exp and ( 309 where or kwargs.get("keep_existing_partition_rows") is False 310 ): 311 # only replace partitions that have records in temp_table 312 partitions_to_replace = self._get_partition_ids(temp_table) 313 314 # drop affected partitions that have no records in temp_table 315 # - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False 316 # because previous code block is skipped 317 partitions_to_drop = all_affected_partitions - partitions_to_replace 318 319 if partitions_to_replace or partitions_to_drop: 320 self.alter_table( 321 [ 322 self._build_alter_partition_exp( 323 target_table, temp_table, partitions_to_replace, partitions_to_drop 324 ) 325 ] 326 ) 327 else: 328 self._exchange_tables(target_table, temp_table) 329 finally: 330 self.drop_table(temp_table) 331 332 def _get_affected_partitions_and_insert_exp( 333 self, 334 target_table: exp.Table, 335 temp_table: exp.Table, 336 where: exp.Condition, 337 existing_records_insert_exp: exp.Insert, 338 partitions_temp_table_name: exp.Table, 339 ) -> tuple[t.Set[str], exp.Insert]: 340 # identify all affected partition IDs 341 # - store in temp table so we can reuse results 342 self.ctas( 343 partitions_temp_table_name, 344 exp.select("partition_id") 345 .distinct() 346 .from_( 347 exp.union( 348 # target table partitions with records in `where` 349 exp.select(exp.column("_partition_id").as_("partition_id")) 350 .from_(target_table) 351 .where(where), 352 # temp table partitions with new records to insert 353 exp.select( 354 exp.column("_partition_id").as_("partition_id"), 355 ).from_(temp_table), 356 ).subquery("_affected_partitions") 357 ), 358 ) 359 360 # read all affected partition IDs into memory 361 all_affected_partitions = self._get_partition_ids( 362 partitions_temp_table_name, "partition_id" 363 ) 364 365 # limit existing records insert expression WHERE to affected target table partitions 366 # by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)` 367 existing_records_insert_exp.set( 368 "expression", 369 existing_records_insert_exp.expression.where( 370 exp.column("_partition_id").isin( 371 exp.select("partition_id").from_(partitions_temp_table_name) 372 ) 373 ), 374 ) 375 376 return all_affected_partitions, existing_records_insert_exp 377 378 def _build_alter_partition_exp( 379 self, 380 target_table: exp.Table, 381 temp_table: exp.Table, 382 partitions_to_replace: t.Set[str], 383 partitions_to_drop: t.Set[str], 384 ) -> exp.Alter: 385 alter_expr = exp.Alter(this=target_table, kind="TABLE") 386 387 for partition in partitions_to_replace: 388 alter_expr.append( 389 "actions", 390 exp.ReplacePartition( 391 expression=exp.Partition( 392 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 393 ), 394 source=temp_table, 395 ), 396 ) 397 398 for partition in partitions_to_drop: 399 alter_expr.append( 400 "actions", 401 exp.DropPartition( 402 expressions=[ 403 exp.Partition( 404 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 405 ) 406 ], 407 source=temp_table, 408 ), 409 ) 410 411 return alter_expr 412 413 def _replace_by_key( 414 self, 415 target_table: TableName, 416 source_table: QueryOrDF, 417 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 418 key: t.Sequence[exp.Expression], 419 is_unique_key: bool, 420 source_columns: t.Optional[t.List[str]] = None, 421 ) -> None: 422 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 423 source_table, 424 target_columns_to_types, 425 target_table=target_table, 426 source_columns=source_columns, 427 ) 428 429 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 430 431 self._insert_overwrite_by_condition( 432 target_table, 433 source_queries, 434 target_columns_to_types, 435 dynamic_key=key, 436 dynamic_key_exp=key_exp, 437 dynamic_key_unique=is_unique_key, 438 ) 439 440 def insert_overwrite_by_partition( 441 self, 442 table_name: TableName, 443 query_or_df: QueryOrDF, 444 partitioned_by: t.List[exp.Expression], 445 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 446 source_columns: t.Optional[t.List[str]] = None, 447 ) -> None: 448 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 449 query_or_df, 450 target_columns_to_types, 451 target_table=table_name, 452 source_columns=source_columns, 453 ) 454 455 self._insert_overwrite_by_condition( 456 table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False 457 ) 458 459 def _create_table_like( 460 self, 461 target_table_name: TableName, 462 source_table_name: TableName, 463 exists: bool, 464 **kwargs: t.Any, 465 ) -> None: 466 """Create table with identical structure as source table""" 467 self.execute( 468 f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}" 469 ) 470 471 def _get_partition_ids( 472 self, 473 table: exp.Table, 474 partition_col_name: str = "_partition_id", 475 where: t.Optional[exp.Condition] = None, 476 limit: t.Optional[int] = None, 477 ) -> t.Set[t.Any]: 478 """List partition IDs present in table""" 479 partitions_query = exp.select(partition_col_name).distinct().from_(table) 480 if where: 481 partitions_query = partitions_query.where(where) 482 if limit: 483 partitions_query = partitions_query.limit(limit) 484 partitions = self.fetchall(partitions_query) 485 486 return set([part[0] for part in partitions] if partitions else []) 487 488 def _create_table( 489 self, 490 table_name_or_schema: t.Union[exp.Schema, TableName], 491 expression: t.Optional[exp.Expression], 492 exists: bool = True, 493 replace: bool = False, 494 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 495 table_description: t.Optional[str] = None, 496 column_descriptions: t.Optional[t.Dict[str, str]] = None, 497 table_kind: t.Optional[str] = None, 498 track_rows_processed: bool = True, 499 **kwargs: t.Any, 500 ) -> None: 501 """Creates a table in the database. 502 503 Clickhouse Cloud requires doing CTAS in two steps. 504 505 First, we add the `EMPTY` property to the CTAS call to create a table with the proper 506 schema, then insert the data with the CTAS query. 507 """ 508 # ensure columns used for partitioning are non-Nullable 509 # - normally user's responsibility, but we automatically partition by time column in 510 # incremental by time models 511 if kwargs.get("partitioned_by"): 512 partition_cols = [ 513 col.name 514 for part_expr in kwargs["partitioned_by"] 515 for col in part_expr.find_all(exp.Column) 516 ] 517 if isinstance(table_name_or_schema, exp.Schema): 518 for coldef in table_name_or_schema.expressions: 519 if coldef.name in partition_cols: 520 coldef.kind.set("nullable", False) 521 if target_columns_to_types: 522 for col in partition_cols: 523 target_columns_to_types[col].set("nullable", False) 524 525 super()._create_table( 526 table_name_or_schema, 527 expression, 528 exists, 529 replace, 530 target_columns_to_types, 531 table_description, 532 column_descriptions, 533 table_kind, 534 empty_ctas=(self.engine_run_mode.is_cloud and expression is not None), 535 track_rows_processed=track_rows_processed, 536 **kwargs, 537 ) 538 539 # execute the second INSERT step if on cloud and creating a table 540 # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0 541 # returns a success code but malformed response 542 if ( 543 self.engine_run_mode.is_cloud 544 and table_kind != "VIEW" 545 and expression 546 and not ( 547 expression.args.get("limit") is not None 548 and expression.args["limit"].expression.this == "0" 549 ) 550 ): 551 table_name = ( 552 table_name_or_schema.this 553 if isinstance(table_name_or_schema, exp.Schema) 554 else table_name_or_schema 555 ) 556 self._insert_append_query( 557 table_name, 558 expression, # type: ignore 559 target_columns_to_types or self.columns(table_name), 560 ) 561 562 def _exchange_tables( 563 self, 564 old_table_name: TableName, 565 new_table_name: TableName, 566 ) -> None: 567 from clickhouse_connect.driver.exceptions import DatabaseError # type: ignore 568 569 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 570 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 571 572 try: 573 self.execute( 574 f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}" 575 ) 576 except DatabaseError as e: 577 if "NOT_IMPLEMENTED" in str(e): 578 # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges, 579 # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead. 580 # 581 # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported 582 # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent 583 # and does not require defining an additional method. 584 throwaway_table_name = self._get_temp_table(old_table_name) 585 self._rename_table(old_table_name, throwaway_table_name) 586 self._rename_table(new_table_name, old_table_name) 587 self.drop_table(throwaway_table_name) 588 589 def _rename_table( 590 self, 591 old_table_name: TableName, 592 new_table_name: TableName, 593 ) -> None: 594 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 595 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 596 597 self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}") 598 599 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None: 600 delete_expr = exp.delete(table_name, where) 601 if self.engine_run_mode.is_cluster: 602 delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))) 603 self.execute(delete_expr) 604 605 def alter_table( 606 self, 607 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 608 ) -> None: 609 """ 610 Performs the alter statements to change the current table into the structure of the target table. 611 """ 612 with self.transaction(): 613 for alter_expression in [ 614 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 615 ]: 616 if self.engine_run_mode.is_cluster: 617 alter_expression.set( 618 "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) 619 ) 620 self.execute(alter_expression) 621 622 def _drop_object( 623 self, 624 name: TableName | SchemaName, 625 exists: bool = True, 626 kind: str = "TABLE", 627 cascade: bool = False, 628 **drop_args: t.Any, 629 ) -> None: 630 """Drops an object. 631 632 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 633 634 Args: 635 name: The name of the table to drop. 636 exists: If exists, defaults to True. 637 kind: What kind of object to drop. Defaults to TABLE 638 **drop_args: Any extra arguments to set on the Drop expression 639 """ 640 super()._drop_object( 641 name=name, 642 exists=exists, 643 kind=kind, 644 cascade=cascade, 645 cluster=exp.OnCluster(this=exp.to_identifier(self.cluster)) 646 if self.engine_run_mode.is_cluster 647 else None, 648 **drop_args, 649 ) 650 651 def _build_partitioned_by_exp( 652 self, 653 partitioned_by: t.List[exp.Expression], 654 **kwargs: t.Any, 655 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 656 return exp.PartitionedByProperty( 657 this=exp.Schema(expressions=partitioned_by), 658 ) 659 660 def ensure_nulls_for_unmatched_after_join( 661 self, 662 query: Query, 663 ) -> Query: 664 # Set `join_use_nulls = 1` in a query's SETTINGS clause 665 query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1"))) 666 return query 667 668 def use_server_nulls_for_unmatched_after_join( 669 self, 670 query: Query, 671 ) -> Query: 672 # Set the `join_use_nulls` server value in a query's SETTINGS clause 673 # 674 # Use in SCD models: 675 # - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join 676 # are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`. 677 # - The SCD embeds the user's original query in the `source` CTE 678 # - Settings are dynamically scoped, so our setting may override the server's default setting the user expects 679 # for their query. 680 # - To prevent this, we: 681 # - If the user query sets `join_use_nulls`, we do nothing 682 # - If the user query does not set `join_use_nulls`, we query the server for the current setting 683 # - If the server value is 1, we do nothing 684 # - If the server values is not 1, we inject its `join_use_nulls` value into the user query 685 # - We do not need to check user subqueries because our injected setting operates at the same scope the 686 # server value would normally operate at 687 setting_name = "join_use_nulls" 688 setting_value = "1" 689 690 user_settings = query.args.get("settings") 691 # if user has not already set it explicitly 692 if not ( 693 user_settings 694 and any( 695 [ 696 isinstance(setting, exp.EQ) and setting.name == setting_name 697 for setting in user_settings 698 ] 699 ) 700 ): 701 server_value = self.fetchone( 702 exp.select("value") 703 .from_("system.settings") 704 .where(exp.column("name").eq(exp.Literal.string(setting_name))) 705 )[0] 706 # only inject the setting if the server value isn't 1 707 inject_setting = setting_value != server_value 708 setting_value = server_value if inject_setting else setting_value 709 710 if inject_setting: 711 query.append( 712 "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value)) 713 ) 714 715 return query 716 717 def _build_settings_property( 718 self, key: str, value: exp.Expression | str | int | float 719 ) -> exp.SettingsProperty: 720 return exp.SettingsProperty( 721 expressions=[ 722 exp.EQ( 723 this=exp.var(key.lower()), 724 expression=value 725 if isinstance(value, exp.Expression) 726 else exp.Literal(this=value, is_string=isinstance(value, str)), 727 ) 728 ] 729 ) 730 731 def _build_table_properties_exp( 732 self, 733 catalog_name: t.Optional[str] = None, 734 table_format: t.Optional[str] = None, 735 storage_format: t.Optional[str] = None, 736 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 737 partition_interval_unit: t.Optional[IntervalUnit] = None, 738 clustered_by: t.Optional[t.List[exp.Expression]] = None, 739 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 740 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 741 table_description: t.Optional[str] = None, 742 table_kind: t.Optional[str] = None, 743 empty_ctas: bool = False, 744 **kwargs: t.Any, 745 ) -> t.Optional[exp.Properties]: 746 properties: t.List[exp.Expression] = [] 747 748 table_engine = self.DEFAULT_TABLE_ENGINE 749 if storage_format: 750 table_engine = ( 751 storage_format.this if isinstance(storage_format, exp.Var) else storage_format # type: ignore 752 ) 753 properties.append(exp.EngineProperty(this=table_engine)) 754 755 # copy of table_properties so we can pop items off below then consume the rest later 756 table_properties_copy = { 757 k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items() 758 } 759 760 mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine)) 761 ordered_by_raw = table_properties_copy.pop("ORDER_BY", None) 762 if mergetree_engine: 763 ordered_by_exprs = [] 764 if ordered_by_raw: 765 ordered_by_vals = [] 766 767 if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)): 768 ordered_by_vals = ordered_by_raw.expressions 769 if isinstance(ordered_by_raw, exp.Paren): 770 ordered_by_vals = [ordered_by_raw.this] 771 772 if not ordered_by_vals: 773 ordered_by_vals = ( 774 ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw] 775 ) 776 777 for col in ordered_by_vals: 778 ordered_by_exprs.append( 779 col 780 if isinstance(col, exp.Column) 781 else maybe_parse( 782 col.name if isinstance(col, exp.Literal) else col, 783 dialect=self.dialect, 784 into=exp.Ordered, 785 ) 786 ) 787 788 properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)])) 789 790 primary_key = table_properties_copy.pop("PRIMARY_KEY", None) 791 if mergetree_engine and primary_key: 792 primary_key_vals = [] 793 if isinstance(primary_key, (exp.Tuple, exp.Array)): 794 primary_key_vals = primary_key.expressions 795 if isinstance(ordered_by_raw, exp.Paren): 796 primary_key_vals = [primary_key.this] 797 798 if not primary_key_vals: 799 primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key] 800 801 properties.append( 802 exp.PrimaryKey( 803 expressions=[ 804 exp.to_column(k.name if isinstance(k, exp.Literal) else k) 805 for k in primary_key_vals 806 ] 807 ) 808 ) 809 810 ttl = table_properties_copy.pop("TTL", None) 811 if ttl: 812 properties.append( 813 exp.MergeTreeTTL( 814 expressions=[ttl if isinstance(ttl, exp.Expression) else exp.var(ttl)] 815 ) 816 ) 817 818 if ( 819 partitioned_by 820 and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None 821 ): 822 properties.append(partitioned_by_prop) 823 824 if self.engine_run_mode.is_cluster: 825 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 826 827 if empty_ctas: 828 properties.append(exp.EmptyProperty()) 829 830 if table_properties_copy: 831 properties.extend( 832 [self._build_settings_property(k, v) for k, v in table_properties_copy.items()] 833 ) 834 835 if table_description: 836 properties.append( 837 exp.SchemaCommentProperty( 838 this=exp.Literal.string(self._truncate_table_comment(table_description)) 839 ) 840 ) 841 842 if properties: 843 return exp.Properties(expressions=properties) 844 845 return None 846 847 def _build_view_properties_exp( 848 self, 849 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 850 table_description: t.Optional[str] = None, 851 **kwargs: t.Any, 852 ) -> t.Optional[exp.Properties]: 853 """Creates a SQLGlot table properties expression for view""" 854 properties: t.List[exp.Expression] = [] 855 856 view_properties_copy = view_properties.copy() if view_properties else {} 857 858 if self.engine_run_mode.is_cluster: 859 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 860 861 if view_properties_copy: 862 properties.extend( 863 [self._build_settings_property(k, v) for k, v in view_properties_copy.items()] 864 ) 865 866 if table_description: 867 properties.append( 868 exp.SchemaCommentProperty( 869 this=exp.Literal.string(self._truncate_table_comment(table_description)) 870 ) 871 ) 872 873 if properties: 874 return exp.Properties(expressions=properties) 875 return None 876 877 def _build_create_comment_table_exp( 878 self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any 879 ) -> exp.Comment | str: 880 table_sql = table.sql(dialect=self.dialect, identify=True) 881 882 truncated_comment = self._truncate_table_comment(table_comment) 883 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 884 885 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}" 886 887 def _build_create_comment_column_exp( 888 self, 889 table: exp.Table, 890 column_name: str, 891 column_comment: str, 892 table_kind: str = "TABLE", 893 **kwargs: t.Any, 894 ) -> exp.Comment | str: 895 table_sql = table.sql(dialect=self.dialect, identify=True) 896 column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True) 897 898 truncated_comment = self._truncate_table_comment(column_comment) 899 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 900 901 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}" 902 903 def _on_cluster_sql(self) -> str: 904 if self.engine_run_mode.is_cluster: 905 cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect) # type: ignore 906 return f" ON CLUSTER {cluster_name} " 907 return ""
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
engine_run_mode: sqlmesh.core.engine_adapter.shared.EngineRunMode
46 @property 47 def engine_run_mode(self) -> EngineRunMode: 48 if self._extra_config.get("cloud_mode"): 49 return EngineRunMode.CLOUD 50 # we use the user's specification of a cluster in the connection config to determine if 51 # the engine is in cluster mode 52 if self._extra_config.get("cluster"): 53 return EngineRunMode.CLUSTER 54 return EngineRunMode.STANDALONE
def
fetchone( self, query: Union[sqlglot.expressions.Expression, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> Tuple:
66 def fetchone( 67 self, 68 query: t.Union[exp.Expression, str], 69 ignore_unsupported_errors: bool = False, 70 quote_identifiers: bool = False, 71 ) -> t.Tuple: 72 with self.transaction(): 73 self.execute( 74 query, 75 ignore_unsupported_errors=ignore_unsupported_errors, 76 quote_identifiers=quote_identifiers, 77 ) 78 return self.cursor.fetchall()[0]
def
create_schema( self, schema_name: Union[str, sqlglot.expressions.Table], ignore_if_exists: bool = True, warn_on_error: bool = True, properties: List[sqlglot.expressions.Expression] = []) -> None:
167 def create_schema( 168 self, 169 schema_name: SchemaName, 170 ignore_if_exists: bool = True, 171 warn_on_error: bool = True, 172 properties: t.List[exp.Expression] = [], 173 ) -> None: 174 """Create a Clickhouse database from a name or qualified table name. 175 176 Clickhouse has a two-level naming scheme [database].[table]. 177 """ 178 properties_copy = properties.copy() 179 if self.engine_run_mode.is_cluster: 180 properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 181 182 # can't call super() because it will try to set a catalog 183 return self._create_schema( 184 schema_name=schema_name, 185 ignore_if_exists=ignore_if_exists, 186 warn_on_error=warn_on_error, 187 properties=properties_copy, 188 # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message 189 kind="DATABASE", 190 )
Create a Clickhouse database from a name or qualified table name.
Clickhouse has a two-level naming scheme [database].[table].
def
insert_overwrite_by_partition( self, table_name: Union[str, sqlglot.expressions.Table], query_or_df: <MagicMock id='126494348203616'>, partitioned_by: List[sqlglot.expressions.Expression], target_columns_to_types: Optional[Dict[str, sqlglot.expressions.DataType]] = None, source_columns: Optional[List[str]] = None) -> None:
440 def insert_overwrite_by_partition( 441 self, 442 table_name: TableName, 443 query_or_df: QueryOrDF, 444 partitioned_by: t.List[exp.Expression], 445 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 446 source_columns: t.Optional[t.List[str]] = None, 447 ) -> None: 448 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 449 query_or_df, 450 target_columns_to_types, 451 target_table=table_name, 452 source_columns=source_columns, 453 ) 454 455 self._insert_overwrite_by_condition( 456 table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False 457 )
def
delete_from( self, table_name: Union[str, sqlglot.expressions.Table], where: Union[str, sqlglot.expressions.Expression]) -> None:
def
alter_table( self, alter_expressions: Union[List[sqlglot.expressions.Alter], List[sqlmesh.core.schema_diff.TableAlterOperation]]) -> None:
605 def alter_table( 606 self, 607 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 608 ) -> None: 609 """ 610 Performs the alter statements to change the current table into the structure of the target table. 611 """ 612 with self.transaction(): 613 for alter_expression in [ 614 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 615 ]: 616 if self.engine_run_mode.is_cluster: 617 alter_expression.set( 618 "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) 619 ) 620 self.execute(alter_expression)
Performs the alter statements to change the current table into the structure of the target table.
def
ensure_nulls_for_unmatched_after_join( self, query: <MagicMock id='126494342496464'>) -> <MagicMock id='126494342496464'>:
def
use_server_nulls_for_unmatched_after_join( self, query: <MagicMock id='126494342496464'>) -> <MagicMock id='126494342496464'>:
668 def use_server_nulls_for_unmatched_after_join( 669 self, 670 query: Query, 671 ) -> Query: 672 # Set the `join_use_nulls` server value in a query's SETTINGS clause 673 # 674 # Use in SCD models: 675 # - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join 676 # are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`. 677 # - The SCD embeds the user's original query in the `source` CTE 678 # - Settings are dynamically scoped, so our setting may override the server's default setting the user expects 679 # for their query. 680 # - To prevent this, we: 681 # - If the user query sets `join_use_nulls`, we do nothing 682 # - If the user query does not set `join_use_nulls`, we query the server for the current setting 683 # - If the server value is 1, we do nothing 684 # - If the server values is not 1, we inject its `join_use_nulls` value into the user query 685 # - We do not need to check user subqueries because our injected setting operates at the same scope the 686 # server value would normally operate at 687 setting_name = "join_use_nulls" 688 setting_value = "1" 689 690 user_settings = query.args.get("settings") 691 # if user has not already set it explicitly 692 if not ( 693 user_settings 694 and any( 695 [ 696 isinstance(setting, exp.EQ) and setting.name == setting_name 697 for setting in user_settings 698 ] 699 ) 700 ): 701 server_value = self.fetchone( 702 exp.select("value") 703 .from_("system.settings") 704 .where(exp.column("name").eq(exp.Literal.string(setting_name))) 705 )[0] 706 # only inject the setting if the server value isn't 1 707 inject_setting = setting_value != server_value 708 setting_value = server_value if inject_setting else setting_value 709 710 if inject_setting: 711 query.append( 712 "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value)) 713 ) 714 715 return query
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- COMMENT_CREATION_TABLE
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- create_view
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- insert_append
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ping
- get_table_last_modified_ts