sqlmesh.core.engine_adapter.clickhouse
1from __future__ import annotations 2 3import typing as t 4import logging 5import re 6from sqlglot import exp, maybe_parse 7from sqlmesh.core.dialect import to_schema 8from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin 9from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport 10from sqlmesh.core.engine_adapter.shared import ( 11 DataObject, 12 DataObjectType, 13 EngineRunMode, 14 SourceQuery, 15 CommentCreationView, 16 InsertOverwriteStrategy, 17) 18from sqlmesh.core.schema_diff import TableAlterOperation 19from sqlmesh.utils import get_source_columns_to_types 20 21if t.TYPE_CHECKING: 22 import pandas as pd 23 24 from sqlmesh.core._typing import SchemaName, TableName 25 from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF 26 27 from sqlmesh.core.node import IntervalUnit 28 29 30logger = logging.getLogger(__name__) 31 32 33class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): 34 DIALECT = "clickhouse" 35 SUPPORTS_TRANSACTIONS = False 36 SUPPORTS_VIEW_SCHEMA = False 37 SUPPORTS_REPLACE_TABLE = False 38 COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY 39 40 SCHEMA_DIFFER_KWARGS = {} 41 42 DEFAULT_TABLE_ENGINE = "MergeTree" 43 ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" 44 45 @property 46 def engine_run_mode(self) -> EngineRunMode: 47 if self._extra_config.get("cloud_mode"): 48 return EngineRunMode.CLOUD 49 # we use the user's specification of a cluster in the connection config to determine if 50 # the engine is in cluster mode 51 if self._extra_config.get("cluster"): 52 return EngineRunMode.CLUSTER 53 return EngineRunMode.STANDALONE 54 55 @property 56 def cluster(self) -> t.Optional[str]: 57 return self._extra_config.get("cluster") 58 59 # Workaround for clickhouse-connect cursor bug 60 # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()` 61 # return the wrong (or no) rows after the very first cursor query that returns rows 62 # in the connection 63 # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it 64 # doesn't use the row index at all 65 def fetchone( 66 self, 67 query: t.Union[exp.Expr, str], 68 ignore_unsupported_errors: bool = False, 69 quote_identifiers: bool = False, 70 ) -> t.Tuple: 71 with self.transaction(): 72 self.execute( 73 query, 74 ignore_unsupported_errors=ignore_unsupported_errors, 75 quote_identifiers=quote_identifiers, 76 ) 77 return self.cursor.fetchall()[0] 78 79 def _fetch_native_df( 80 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 81 ) -> pd.DataFrame: 82 """Fetches a Pandas DataFrame from the cursor""" 83 return self.cursor.client.query_df( 84 self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query, 85 use_extended_dtypes=True, 86 ) 87 88 def _df_to_source_queries( 89 self, 90 df: DF, 91 target_columns_to_types: t.Dict[str, exp.DataType], 92 batch_size: int, 93 target_table: TableName, 94 source_columns: t.Optional[t.List[str]] = None, 95 **kwargs: t.Any, 96 ) -> t.List[SourceQuery]: 97 temp_table = self._get_temp_table(target_table, **kwargs) 98 source_columns_to_types = get_source_columns_to_types( 99 target_columns_to_types, source_columns 100 ) 101 102 def query_factory() -> Query: 103 # It is possible for the factory to be called multiple times and if so then the temp table will already 104 # be created so we skip creating again. This means we are assuming the first call is the same result 105 # as later calls. 106 if not self.table_exists(temp_table): 107 self.create_table( 108 temp_table, 109 source_columns_to_types, 110 storage_format=exp.var("MergeTree"), 111 **kwargs, 112 ) 113 ordered_df = df[list(source_columns_to_types)] 114 115 self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df) 116 117 return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_( 118 temp_table 119 ) 120 121 return [ 122 SourceQuery( 123 query_factory=query_factory, 124 cleanup_func=lambda: self.drop_table(temp_table, **kwargs), 125 ) 126 ] 127 128 def _get_data_objects( 129 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 130 ) -> t.List[DataObject]: 131 """ 132 Returns all the data objects that exist in the given database. 133 """ 134 query = ( 135 exp.select( 136 exp.column("database").as_("schema_name"), 137 exp.column("name"), 138 exp.case(exp.column("engine")) 139 .when( 140 exp.Literal.string("View"), 141 exp.Literal.string("view"), 142 ) 143 .else_( 144 exp.Literal.string("table"), 145 ) 146 .as_("type"), 147 ) 148 .from_("system.tables") 149 .where(exp.column("database").eq(to_schema(schema_name).db)) 150 ) 151 if object_names: 152 query = query.where(exp.column("name").isin(*object_names)) 153 df = self.fetchdf(query) 154 return [ 155 DataObject( 156 catalog=None, 157 schema=row.schema_name, 158 name=row.name, 159 type=DataObjectType.from_str(row.type), # type: ignore 160 ) 161 for row in df.itertuples() 162 ] 163 164 def create_schema( 165 self, 166 schema_name: SchemaName, 167 ignore_if_exists: bool = True, 168 warn_on_error: bool = True, 169 properties: t.List[exp.Expr] = [], 170 ) -> None: 171 """Create a Clickhouse database from a name or qualified table name. 172 173 Clickhouse has a two-level naming scheme [database].[table]. 174 """ 175 properties_copy = properties.copy() 176 if self.engine_run_mode.is_cluster: 177 properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 178 179 # can't call super() because it will try to set a catalog 180 return self._create_schema( 181 schema_name=schema_name, 182 ignore_if_exists=ignore_if_exists, 183 warn_on_error=warn_on_error, 184 properties=properties_copy, 185 # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message 186 kind="DATABASE", 187 ) 188 189 def _insert_overwrite_by_condition( 190 self, 191 table_name: TableName, 192 source_queries: t.List[SourceQuery], 193 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 194 where: t.Optional[exp.Condition] = None, 195 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 196 **kwargs: t.Any, 197 ) -> None: 198 """ 199 Implements the table or partition swap approach to insert-overwriting records. 200 201 Because this method executes multiple variants (full table replace, replace by time 202 range, replace by key, replace by partition), some upstream caller info is needed and 203 passed via kwargs. 204 205 Args: 206 table_name: Name of target table 207 source_queries: Source queries returning records to insert 208 target_columns_to_types: Column names and data types of target table 209 where: SQLGlot expression determining which target table rows should be overwritten 210 insert_overwrite_strategy_override: Not used by Clickhouse 211 kwargs: 212 dynamic_key: Key columns (replace by key only) 213 dynamic_key_exp: Expression to build key (replace by key only) 214 dynamic_key_unique: Whether more than one record can exist per key value (replace by key only) 215 216 keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only) 217 218 Returns: 219 Side effects only: execution of insert-overwrite operation. 220 """ 221 target_table = exp.to_table(table_name) 222 target_columns_to_types = target_columns_to_types or self.columns(target_table) 223 224 temp_table = self._get_temp_table(target_table) 225 self.create_table_like(temp_table, target_table) 226 227 # REPLACE BY KEY: extract kwargs if present 228 dynamic_key = kwargs.get("dynamic_key") 229 if dynamic_key: 230 dynamic_key_exp = t.cast(exp.Expr, kwargs.get("dynamic_key_exp")) 231 dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique")) 232 233 try: 234 # insert new records into temp table 235 for source_query in source_queries: 236 with source_query as query: 237 # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key 238 if dynamic_key and dynamic_key_unique: 239 query = query.distinct(*dynamic_key) # type: ignore 240 241 query = self._order_projections_and_filter( 242 query, target_columns_to_types, where=where 243 ) 244 self._insert_append_query( 245 temp_table, 246 query, 247 target_columns_to_types=target_columns_to_types, 248 order_projections=False, 249 ) 250 251 # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)" 252 if dynamic_key: 253 key_query = exp.select(dynamic_key_exp).from_(temp_table) 254 if not dynamic_key_unique: 255 key_query = key_query.distinct() 256 where = dynamic_key_exp.isin(query=key_query) 257 258 # get target table partition key to confirm it's actually partitioned 259 table_partition_exp = self.fetchone( 260 exp.select("partition_key") 261 .from_("system.tables") 262 .where( 263 exp.column("database").eq(target_table.db), 264 exp.column("name").eq(target_table.name), 265 ) 266 ) 267 268 all_affected_partitions: t.Set[str] = set() 269 270 if where: 271 # identify existing records to keep by inverting the delete `where` clause 272 existing_records_insert_exp = exp.insert( 273 self._select_columns(target_columns_to_types) 274 .from_(target_table) 275 .where(exp.paren(expression=where).not_()), 276 temp_table, 277 ) 278 279 # if target table is partitioned, modify insert expression to only insert 280 # existing records that are in one of the affected partitions 281 if table_partition_exp: 282 partitions_temp_table_name = self._get_temp_table( 283 exp.to_table(f"{target_table.db}._affected_partitions") 284 ) 285 all_affected_partitions, existing_records_insert_exp = ( 286 self._get_affected_partitions_and_insert_exp( 287 target_table, 288 temp_table, 289 where, 290 existing_records_insert_exp, 291 partitions_temp_table_name, 292 ) 293 ) 294 295 try: 296 self.execute(existing_records_insert_exp, track_rows_processed=True) 297 finally: 298 if table_partition_exp: 299 self.drop_table(partitions_temp_table_name) 300 301 # process by partition if: 302 # 1. The table is partitioned AND 303 # (2a. There are existing records to keep (`where`) OR 304 # 2b. We're overwriting existing partition rows (incremental by partition model)) 305 if table_partition_exp and ( 306 where or kwargs.get("keep_existing_partition_rows") is False 307 ): 308 # only replace partitions that have records in temp_table 309 partitions_to_replace = self._get_partition_ids(temp_table) 310 311 # drop affected partitions that have no records in temp_table 312 # - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False 313 # because previous code block is skipped 314 partitions_to_drop = all_affected_partitions - partitions_to_replace 315 316 if partitions_to_replace or partitions_to_drop: 317 self.alter_table( 318 [ 319 self._build_alter_partition_exp( 320 target_table, temp_table, partitions_to_replace, partitions_to_drop 321 ) 322 ] 323 ) 324 else: 325 self._exchange_tables(target_table, temp_table) 326 finally: 327 self.drop_table(temp_table) 328 329 def _get_affected_partitions_and_insert_exp( 330 self, 331 target_table: exp.Table, 332 temp_table: exp.Table, 333 where: exp.Condition, 334 existing_records_insert_exp: exp.Insert, 335 partitions_temp_table_name: exp.Table, 336 ) -> tuple[t.Set[str], exp.Insert]: 337 # identify all affected partition IDs 338 # - store in temp table so we can reuse results 339 self.ctas( 340 partitions_temp_table_name, 341 exp.select("partition_id") 342 .distinct() 343 .from_( 344 exp.union( 345 # target table partitions with records in `where` 346 exp.select(exp.column("_partition_id").as_("partition_id")) 347 .from_(target_table) 348 .where(where), 349 # temp table partitions with new records to insert 350 exp.select( 351 exp.column("_partition_id").as_("partition_id"), 352 ).from_(temp_table), 353 ).subquery("_affected_partitions") 354 ), 355 ) 356 357 # read all affected partition IDs into memory 358 all_affected_partitions = self._get_partition_ids( 359 partitions_temp_table_name, "partition_id" 360 ) 361 362 # limit existing records insert expression WHERE to affected target table partitions 363 # by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)` 364 existing_records_insert_exp.set( 365 "expression", 366 existing_records_insert_exp.expression.where( 367 exp.column("_partition_id").isin( 368 exp.select("partition_id").from_(partitions_temp_table_name) 369 ) 370 ), 371 ) 372 373 return all_affected_partitions, existing_records_insert_exp 374 375 def _build_alter_partition_exp( 376 self, 377 target_table: exp.Table, 378 temp_table: exp.Table, 379 partitions_to_replace: t.Set[str], 380 partitions_to_drop: t.Set[str], 381 ) -> exp.Alter: 382 alter_expr = exp.Alter(this=target_table, kind="TABLE") 383 384 for partition in partitions_to_replace: 385 alter_expr.append( 386 "actions", 387 exp.ReplacePartition( 388 expression=exp.Partition( 389 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 390 ), 391 source=temp_table, 392 ), 393 ) 394 395 for partition in partitions_to_drop: 396 alter_expr.append( 397 "actions", 398 exp.DropPartition( 399 expressions=[ 400 exp.Partition( 401 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 402 ) 403 ], 404 source=temp_table, 405 ), 406 ) 407 408 return alter_expr 409 410 def _replace_by_key( 411 self, 412 target_table: TableName, 413 source_table: QueryOrDF, 414 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 415 key: t.Sequence[exp.Expr], 416 is_unique_key: bool, 417 source_columns: t.Optional[t.List[str]] = None, 418 ) -> None: 419 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 420 source_table, 421 target_columns_to_types, 422 target_table=target_table, 423 source_columns=source_columns, 424 ) 425 426 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 427 428 self._insert_overwrite_by_condition( 429 target_table, 430 source_queries, 431 target_columns_to_types, 432 dynamic_key=key, 433 dynamic_key_exp=key_exp, 434 dynamic_key_unique=is_unique_key, 435 ) 436 437 def insert_overwrite_by_partition( 438 self, 439 table_name: TableName, 440 query_or_df: QueryOrDF, 441 partitioned_by: t.List[exp.Expr], 442 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 443 source_columns: t.Optional[t.List[str]] = None, 444 ) -> None: 445 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 446 query_or_df, 447 target_columns_to_types, 448 target_table=table_name, 449 source_columns=source_columns, 450 ) 451 452 self._insert_overwrite_by_condition( 453 table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False 454 ) 455 456 def _create_table_like( 457 self, 458 target_table_name: TableName, 459 source_table_name: TableName, 460 exists: bool, 461 **kwargs: t.Any, 462 ) -> None: 463 """Create table with identical structure as source table""" 464 self.execute( 465 f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}" 466 ) 467 468 def _get_partition_ids( 469 self, 470 table: exp.Table, 471 partition_col_name: str = "_partition_id", 472 where: t.Optional[exp.Condition] = None, 473 limit: t.Optional[int] = None, 474 ) -> t.Set[t.Any]: 475 """List partition IDs present in table""" 476 partitions_query = exp.select(partition_col_name).distinct().from_(table) 477 if where: 478 partitions_query = partitions_query.where(where) 479 if limit: 480 partitions_query = partitions_query.limit(limit) 481 partitions = self.fetchall(partitions_query) 482 483 return set([part[0] for part in partitions] if partitions else []) 484 485 def _create_table( 486 self, 487 table_name_or_schema: t.Union[exp.Schema, TableName], 488 expression: t.Optional[exp.Expr], 489 exists: bool = True, 490 replace: bool = False, 491 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 492 table_description: t.Optional[str] = None, 493 column_descriptions: t.Optional[t.Dict[str, str]] = None, 494 table_kind: t.Optional[str] = None, 495 track_rows_processed: bool = True, 496 **kwargs: t.Any, 497 ) -> None: 498 """Creates a table in the database. 499 500 Clickhouse Cloud requires doing CTAS in two steps. 501 502 First, we add the `EMPTY` property to the CTAS call to create a table with the proper 503 schema, then insert the data with the CTAS query. 504 """ 505 # ensure columns used for partitioning are non-Nullable 506 # - normally user's responsibility, but we automatically partition by time column in 507 # incremental by time models 508 if kwargs.get("partitioned_by"): 509 partition_cols = [ 510 col.name 511 for part_expr in kwargs["partitioned_by"] 512 for col in part_expr.find_all(exp.Column) 513 ] 514 if isinstance(table_name_or_schema, exp.Schema): 515 for coldef in table_name_or_schema.expressions: 516 if coldef.name in partition_cols: 517 coldef.kind.set("nullable", False) 518 if target_columns_to_types: 519 for col in partition_cols: 520 target_columns_to_types[col].set("nullable", False) 521 522 super()._create_table( 523 table_name_or_schema, 524 expression, 525 exists, 526 replace, 527 target_columns_to_types, 528 table_description, 529 column_descriptions, 530 table_kind, 531 empty_ctas=(self.engine_run_mode.is_cloud and expression is not None), 532 track_rows_processed=track_rows_processed, 533 **kwargs, 534 ) 535 536 # execute the second INSERT step if on cloud and creating a table 537 # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0 538 # returns a success code but malformed response 539 if ( 540 self.engine_run_mode.is_cloud 541 and table_kind != "VIEW" 542 and expression 543 and not ( 544 expression.args.get("limit") is not None 545 and expression.args["limit"].expression.this == "0" 546 ) 547 ): 548 table_name = ( 549 table_name_or_schema.this 550 if isinstance(table_name_or_schema, exp.Schema) 551 else table_name_or_schema 552 ) 553 self._insert_append_query( 554 table_name, 555 expression, # type: ignore 556 target_columns_to_types or self.columns(table_name), 557 ) 558 559 def _exchange_tables( 560 self, 561 old_table_name: TableName, 562 new_table_name: TableName, 563 ) -> None: 564 from clickhouse_connect.driver.exceptions import DatabaseError # type: ignore 565 566 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 567 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 568 569 try: 570 self.execute( 571 f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}" 572 ) 573 except DatabaseError as e: 574 if "NOT_IMPLEMENTED" in str(e): 575 # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges, 576 # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead. 577 # 578 # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported 579 # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent 580 # and does not require defining an additional method. 581 throwaway_table_name = self._get_temp_table(old_table_name) 582 self._rename_table(old_table_name, throwaway_table_name) 583 self._rename_table(new_table_name, old_table_name) 584 self.drop_table(throwaway_table_name) 585 586 def _rename_table( 587 self, 588 old_table_name: TableName, 589 new_table_name: TableName, 590 ) -> None: 591 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 592 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 593 594 self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}") 595 596 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None: 597 delete_expr = exp.delete(table_name, where) 598 if self.engine_run_mode.is_cluster: 599 delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))) 600 self.execute(delete_expr) 601 602 def alter_table( 603 self, 604 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 605 ) -> None: 606 """ 607 Performs the alter statements to change the current table into the structure of the target table. 608 """ 609 with self.transaction(): 610 for alter_expression in [ 611 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 612 ]: 613 if self.engine_run_mode.is_cluster: 614 alter_expression.set( 615 "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) 616 ) 617 self.execute(alter_expression) 618 619 def _drop_object( 620 self, 621 name: TableName | SchemaName, 622 exists: bool = True, 623 kind: str = "TABLE", 624 cascade: bool = False, 625 **drop_args: t.Any, 626 ) -> None: 627 """Drops an object. 628 629 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 630 631 Args: 632 name: The name of the table to drop. 633 exists: If exists, defaults to True. 634 kind: What kind of object to drop. Defaults to TABLE 635 **drop_args: Any extra arguments to set on the Drop expression 636 """ 637 super()._drop_object( 638 name=name, 639 exists=exists, 640 kind=kind, 641 cascade=cascade, 642 cluster=exp.OnCluster(this=exp.to_identifier(self.cluster)) 643 if self.engine_run_mode.is_cluster 644 else None, 645 **drop_args, 646 ) 647 648 def _build_partitioned_by_exp( 649 self, 650 partitioned_by: t.List[exp.Expr], 651 **kwargs: t.Any, 652 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 653 return exp.PartitionedByProperty( 654 this=exp.Schema(expressions=partitioned_by), 655 ) 656 657 def ensure_nulls_for_unmatched_after_join( 658 self, 659 query: Query, 660 ) -> Query: 661 # Set `join_use_nulls = 1` in a query's SETTINGS clause 662 query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1"))) 663 return query 664 665 def use_server_nulls_for_unmatched_after_join( 666 self, 667 query: Query, 668 ) -> Query: 669 # Set the `join_use_nulls` server value in a query's SETTINGS clause 670 # 671 # Use in SCD models: 672 # - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join 673 # are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`. 674 # - The SCD embeds the user's original query in the `source` CTE 675 # - Settings are dynamically scoped, so our setting may override the server's default setting the user expects 676 # for their query. 677 # - To prevent this, we: 678 # - If the user query sets `join_use_nulls`, we do nothing 679 # - If the user query does not set `join_use_nulls`, we query the server for the current setting 680 # - If the server value is 1, we do nothing 681 # - If the server values is not 1, we inject its `join_use_nulls` value into the user query 682 # - We do not need to check user subqueries because our injected setting operates at the same scope the 683 # server value would normally operate at 684 setting_name = "join_use_nulls" 685 setting_value = "1" 686 687 user_settings = query.args.get("settings") 688 # if user has not already set it explicitly 689 if not ( 690 user_settings 691 and any( 692 [ 693 isinstance(setting, exp.EQ) and setting.name == setting_name 694 for setting in user_settings 695 ] 696 ) 697 ): 698 server_value = self.fetchone( 699 exp.select("value") 700 .from_("system.settings") 701 .where(exp.column("name").eq(exp.Literal.string(setting_name))) 702 )[0] 703 # only inject the setting if the server value isn't 1 704 inject_setting = setting_value != server_value 705 setting_value = server_value if inject_setting else setting_value 706 707 if inject_setting: 708 query.append( 709 "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value)) 710 ) 711 712 return query 713 714 def _build_settings_property( 715 self, key: str, value: exp.Expr | str | int | float 716 ) -> exp.SettingsProperty: 717 return exp.SettingsProperty( 718 expressions=[ 719 exp.EQ( 720 this=exp.var(key.lower()), 721 expression=value 722 if isinstance(value, exp.Expr) 723 else exp.Literal(this=value, is_string=isinstance(value, str)), 724 ) 725 ] 726 ) 727 728 def _build_table_properties_exp( 729 self, 730 catalog_name: t.Optional[str] = None, 731 table_format: t.Optional[str] = None, 732 storage_format: t.Optional[str] = None, 733 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 734 partition_interval_unit: t.Optional[IntervalUnit] = None, 735 clustered_by: t.Optional[t.List[exp.Expr]] = None, 736 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 737 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 738 table_description: t.Optional[str] = None, 739 table_kind: t.Optional[str] = None, 740 empty_ctas: bool = False, 741 **kwargs: t.Any, 742 ) -> t.Optional[exp.Properties]: 743 properties: t.List[exp.Expr] = [] 744 745 table_engine = self.DEFAULT_TABLE_ENGINE 746 if storage_format: 747 table_engine = ( 748 storage_format.this if isinstance(storage_format, exp.Var) else storage_format # type: ignore 749 ) 750 properties.append(exp.EngineProperty(this=table_engine)) 751 752 # copy of table_properties so we can pop items off below then consume the rest later 753 table_properties_copy = { 754 k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items() 755 } 756 757 mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine)) 758 ordered_by_raw = table_properties_copy.pop("ORDER_BY", None) 759 if mergetree_engine: 760 ordered_by_exprs = [] 761 if ordered_by_raw: 762 ordered_by_vals = [] 763 764 if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)): 765 ordered_by_vals = ordered_by_raw.expressions 766 if isinstance(ordered_by_raw, exp.Paren): 767 ordered_by_vals = [ordered_by_raw.this] 768 769 if not ordered_by_vals: 770 ordered_by_vals = ( 771 ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw] 772 ) 773 774 for col in ordered_by_vals: 775 ordered_by_exprs.append( 776 col 777 if isinstance(col, exp.Column) 778 else maybe_parse( 779 col.name if isinstance(col, exp.Literal) else col, 780 dialect=self.dialect, 781 into=exp.Ordered, 782 ) 783 ) 784 785 properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)])) 786 787 primary_key = table_properties_copy.pop("PRIMARY_KEY", None) 788 if mergetree_engine and primary_key: 789 primary_key_vals = [] 790 if isinstance(primary_key, (exp.Tuple, exp.Array)): 791 primary_key_vals = primary_key.expressions 792 if isinstance(ordered_by_raw, exp.Paren): 793 primary_key_vals = [primary_key.this] 794 795 if not primary_key_vals: 796 primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key] 797 798 properties.append( 799 exp.PrimaryKey( 800 expressions=[ 801 exp.to_column(k.name if isinstance(k, exp.Literal) else k) 802 for k in primary_key_vals 803 ] 804 ) 805 ) 806 807 ttl = table_properties_copy.pop("TTL", None) 808 if ttl: 809 properties.append( 810 exp.MergeTreeTTL(expressions=[ttl if isinstance(ttl, exp.Expr) else exp.var(ttl)]) 811 ) 812 813 if ( 814 partitioned_by 815 and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None 816 ): 817 properties.append(partitioned_by_prop) 818 819 if self.engine_run_mode.is_cluster: 820 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 821 822 if empty_ctas: 823 properties.append(exp.EmptyProperty()) 824 825 if table_properties_copy: 826 properties.extend( 827 [self._build_settings_property(k, v) for k, v in table_properties_copy.items()] 828 ) 829 830 if table_description: 831 properties.append( 832 exp.SchemaCommentProperty( 833 this=exp.Literal.string(self._truncate_table_comment(table_description)) 834 ) 835 ) 836 837 if properties: 838 return exp.Properties(expressions=properties) 839 840 return None 841 842 def _build_view_properties_exp( 843 self, 844 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 845 table_description: t.Optional[str] = None, 846 **kwargs: t.Any, 847 ) -> t.Optional[exp.Properties]: 848 """Creates a SQLGlot table properties expression for view""" 849 properties: t.List[exp.Expr] = [] 850 851 view_properties_copy = view_properties.copy() if view_properties else {} 852 853 if self.engine_run_mode.is_cluster: 854 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 855 856 if view_properties_copy: 857 properties.extend( 858 [self._build_settings_property(k, v) for k, v in view_properties_copy.items()] 859 ) 860 861 if table_description: 862 properties.append( 863 exp.SchemaCommentProperty( 864 this=exp.Literal.string(self._truncate_table_comment(table_description)) 865 ) 866 ) 867 868 if properties: 869 return exp.Properties(expressions=properties) 870 return None 871 872 def _build_create_comment_table_exp( 873 self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any 874 ) -> exp.Comment | str: 875 table_sql = table.sql(dialect=self.dialect, identify=True) 876 877 truncated_comment = self._truncate_table_comment(table_comment) 878 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 879 880 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}" 881 882 def _build_create_comment_column_exp( 883 self, 884 table: exp.Table, 885 column_name: str, 886 column_comment: str, 887 table_kind: str = "TABLE", 888 **kwargs: t.Any, 889 ) -> exp.Comment | str: 890 table_sql = table.sql(dialect=self.dialect, identify=True) 891 column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True) 892 893 truncated_comment = self._truncate_table_comment(column_comment) 894 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 895 896 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}" 897 898 def _on_cluster_sql(self) -> str: 899 if self.engine_run_mode.is_cluster: 900 cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect) # type: ignore 901 return f" ON CLUSTER {cluster_name} " 902 return ""
logger =
<Logger sqlmesh.core.engine_adapter.clickhouse (WARNING)>
class
ClickhouseEngineAdapter(sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport, sqlmesh.core.engine_adapter.mixins.LogicalMergeMixin):
34class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): 35 DIALECT = "clickhouse" 36 SUPPORTS_TRANSACTIONS = False 37 SUPPORTS_VIEW_SCHEMA = False 38 SUPPORTS_REPLACE_TABLE = False 39 COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY 40 41 SCHEMA_DIFFER_KWARGS = {} 42 43 DEFAULT_TABLE_ENGINE = "MergeTree" 44 ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" 45 46 @property 47 def engine_run_mode(self) -> EngineRunMode: 48 if self._extra_config.get("cloud_mode"): 49 return EngineRunMode.CLOUD 50 # we use the user's specification of a cluster in the connection config to determine if 51 # the engine is in cluster mode 52 if self._extra_config.get("cluster"): 53 return EngineRunMode.CLUSTER 54 return EngineRunMode.STANDALONE 55 56 @property 57 def cluster(self) -> t.Optional[str]: 58 return self._extra_config.get("cluster") 59 60 # Workaround for clickhouse-connect cursor bug 61 # - cursor does not reset row index correctly on `close()`, so `fetchone()` and `fetchmany()` 62 # return the wrong (or no) rows after the very first cursor query that returns rows 63 # in the connection 64 # - cursor does reset the data rows correctly on `close()`, so `fetchall()` works because it 65 # doesn't use the row index at all 66 def fetchone( 67 self, 68 query: t.Union[exp.Expr, str], 69 ignore_unsupported_errors: bool = False, 70 quote_identifiers: bool = False, 71 ) -> t.Tuple: 72 with self.transaction(): 73 self.execute( 74 query, 75 ignore_unsupported_errors=ignore_unsupported_errors, 76 quote_identifiers=quote_identifiers, 77 ) 78 return self.cursor.fetchall()[0] 79 80 def _fetch_native_df( 81 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 82 ) -> pd.DataFrame: 83 """Fetches a Pandas DataFrame from the cursor""" 84 return self.cursor.client.query_df( 85 self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query, 86 use_extended_dtypes=True, 87 ) 88 89 def _df_to_source_queries( 90 self, 91 df: DF, 92 target_columns_to_types: t.Dict[str, exp.DataType], 93 batch_size: int, 94 target_table: TableName, 95 source_columns: t.Optional[t.List[str]] = None, 96 **kwargs: t.Any, 97 ) -> t.List[SourceQuery]: 98 temp_table = self._get_temp_table(target_table, **kwargs) 99 source_columns_to_types = get_source_columns_to_types( 100 target_columns_to_types, source_columns 101 ) 102 103 def query_factory() -> Query: 104 # It is possible for the factory to be called multiple times and if so then the temp table will already 105 # be created so we skip creating again. This means we are assuming the first call is the same result 106 # as later calls. 107 if not self.table_exists(temp_table): 108 self.create_table( 109 temp_table, 110 source_columns_to_types, 111 storage_format=exp.var("MergeTree"), 112 **kwargs, 113 ) 114 ordered_df = df[list(source_columns_to_types)] 115 116 self.cursor.client.insert_df(temp_table.sql(dialect=self.dialect), df=ordered_df) 117 118 return exp.select(*self._casted_columns(target_columns_to_types, source_columns)).from_( 119 temp_table 120 ) 121 122 return [ 123 SourceQuery( 124 query_factory=query_factory, 125 cleanup_func=lambda: self.drop_table(temp_table, **kwargs), 126 ) 127 ] 128 129 def _get_data_objects( 130 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 131 ) -> t.List[DataObject]: 132 """ 133 Returns all the data objects that exist in the given database. 134 """ 135 query = ( 136 exp.select( 137 exp.column("database").as_("schema_name"), 138 exp.column("name"), 139 exp.case(exp.column("engine")) 140 .when( 141 exp.Literal.string("View"), 142 exp.Literal.string("view"), 143 ) 144 .else_( 145 exp.Literal.string("table"), 146 ) 147 .as_("type"), 148 ) 149 .from_("system.tables") 150 .where(exp.column("database").eq(to_schema(schema_name).db)) 151 ) 152 if object_names: 153 query = query.where(exp.column("name").isin(*object_names)) 154 df = self.fetchdf(query) 155 return [ 156 DataObject( 157 catalog=None, 158 schema=row.schema_name, 159 name=row.name, 160 type=DataObjectType.from_str(row.type), # type: ignore 161 ) 162 for row in df.itertuples() 163 ] 164 165 def create_schema( 166 self, 167 schema_name: SchemaName, 168 ignore_if_exists: bool = True, 169 warn_on_error: bool = True, 170 properties: t.List[exp.Expr] = [], 171 ) -> None: 172 """Create a Clickhouse database from a name or qualified table name. 173 174 Clickhouse has a two-level naming scheme [database].[table]. 175 """ 176 properties_copy = properties.copy() 177 if self.engine_run_mode.is_cluster: 178 properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 179 180 # can't call super() because it will try to set a catalog 181 return self._create_schema( 182 schema_name=schema_name, 183 ignore_if_exists=ignore_if_exists, 184 warn_on_error=warn_on_error, 185 properties=properties_copy, 186 # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message 187 kind="DATABASE", 188 ) 189 190 def _insert_overwrite_by_condition( 191 self, 192 table_name: TableName, 193 source_queries: t.List[SourceQuery], 194 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 195 where: t.Optional[exp.Condition] = None, 196 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 197 **kwargs: t.Any, 198 ) -> None: 199 """ 200 Implements the table or partition swap approach to insert-overwriting records. 201 202 Because this method executes multiple variants (full table replace, replace by time 203 range, replace by key, replace by partition), some upstream caller info is needed and 204 passed via kwargs. 205 206 Args: 207 table_name: Name of target table 208 source_queries: Source queries returning records to insert 209 target_columns_to_types: Column names and data types of target table 210 where: SQLGlot expression determining which target table rows should be overwritten 211 insert_overwrite_strategy_override: Not used by Clickhouse 212 kwargs: 213 dynamic_key: Key columns (replace by key only) 214 dynamic_key_exp: Expression to build key (replace by key only) 215 dynamic_key_unique: Whether more than one record can exist per key value (replace by key only) 216 217 keep_existing_partition_rows: Whether to overwrite partitions with only new records (incremental by partition only) 218 219 Returns: 220 Side effects only: execution of insert-overwrite operation. 221 """ 222 target_table = exp.to_table(table_name) 223 target_columns_to_types = target_columns_to_types or self.columns(target_table) 224 225 temp_table = self._get_temp_table(target_table) 226 self.create_table_like(temp_table, target_table) 227 228 # REPLACE BY KEY: extract kwargs if present 229 dynamic_key = kwargs.get("dynamic_key") 230 if dynamic_key: 231 dynamic_key_exp = t.cast(exp.Expr, kwargs.get("dynamic_key_exp")) 232 dynamic_key_unique = t.cast(bool, kwargs.get("dynamic_key_unique")) 233 234 try: 235 # insert new records into temp table 236 for source_query in source_queries: 237 with source_query as query: 238 # REPLACE BY KEY: if unique key, DISTINCTify by key columns so only one row is present per key 239 if dynamic_key and dynamic_key_unique: 240 query = query.distinct(*dynamic_key) # type: ignore 241 242 query = self._order_projections_and_filter( 243 query, target_columns_to_types, where=where 244 ) 245 self._insert_append_query( 246 temp_table, 247 query, 248 target_columns_to_types=target_columns_to_types, 249 order_projections=False, 250 ) 251 252 # REPLACE BY KEY: build `where` expression as "key IN (new rows' key values)" 253 if dynamic_key: 254 key_query = exp.select(dynamic_key_exp).from_(temp_table) 255 if not dynamic_key_unique: 256 key_query = key_query.distinct() 257 where = dynamic_key_exp.isin(query=key_query) 258 259 # get target table partition key to confirm it's actually partitioned 260 table_partition_exp = self.fetchone( 261 exp.select("partition_key") 262 .from_("system.tables") 263 .where( 264 exp.column("database").eq(target_table.db), 265 exp.column("name").eq(target_table.name), 266 ) 267 ) 268 269 all_affected_partitions: t.Set[str] = set() 270 271 if where: 272 # identify existing records to keep by inverting the delete `where` clause 273 existing_records_insert_exp = exp.insert( 274 self._select_columns(target_columns_to_types) 275 .from_(target_table) 276 .where(exp.paren(expression=where).not_()), 277 temp_table, 278 ) 279 280 # if target table is partitioned, modify insert expression to only insert 281 # existing records that are in one of the affected partitions 282 if table_partition_exp: 283 partitions_temp_table_name = self._get_temp_table( 284 exp.to_table(f"{target_table.db}._affected_partitions") 285 ) 286 all_affected_partitions, existing_records_insert_exp = ( 287 self._get_affected_partitions_and_insert_exp( 288 target_table, 289 temp_table, 290 where, 291 existing_records_insert_exp, 292 partitions_temp_table_name, 293 ) 294 ) 295 296 try: 297 self.execute(existing_records_insert_exp, track_rows_processed=True) 298 finally: 299 if table_partition_exp: 300 self.drop_table(partitions_temp_table_name) 301 302 # process by partition if: 303 # 1. The table is partitioned AND 304 # (2a. There are existing records to keep (`where`) OR 305 # 2b. We're overwriting existing partition rows (incremental by partition model)) 306 if table_partition_exp and ( 307 where or kwargs.get("keep_existing_partition_rows") is False 308 ): 309 # only replace partitions that have records in temp_table 310 partitions_to_replace = self._get_partition_ids(temp_table) 311 312 # drop affected partitions that have no records in temp_table 313 # - NOTE: `all_affected_partitions` will be empty when keep_existing_partition_rows=False 314 # because previous code block is skipped 315 partitions_to_drop = all_affected_partitions - partitions_to_replace 316 317 if partitions_to_replace or partitions_to_drop: 318 self.alter_table( 319 [ 320 self._build_alter_partition_exp( 321 target_table, temp_table, partitions_to_replace, partitions_to_drop 322 ) 323 ] 324 ) 325 else: 326 self._exchange_tables(target_table, temp_table) 327 finally: 328 self.drop_table(temp_table) 329 330 def _get_affected_partitions_and_insert_exp( 331 self, 332 target_table: exp.Table, 333 temp_table: exp.Table, 334 where: exp.Condition, 335 existing_records_insert_exp: exp.Insert, 336 partitions_temp_table_name: exp.Table, 337 ) -> tuple[t.Set[str], exp.Insert]: 338 # identify all affected partition IDs 339 # - store in temp table so we can reuse results 340 self.ctas( 341 partitions_temp_table_name, 342 exp.select("partition_id") 343 .distinct() 344 .from_( 345 exp.union( 346 # target table partitions with records in `where` 347 exp.select(exp.column("_partition_id").as_("partition_id")) 348 .from_(target_table) 349 .where(where), 350 # temp table partitions with new records to insert 351 exp.select( 352 exp.column("_partition_id").as_("partition_id"), 353 ).from_(temp_table), 354 ).subquery("_affected_partitions") 355 ), 356 ) 357 358 # read all affected partition IDs into memory 359 all_affected_partitions = self._get_partition_ids( 360 partitions_temp_table_name, "partition_id" 361 ) 362 363 # limit existing records insert expression WHERE to affected target table partitions 364 # by adding `AND _partition_id IN (SELECT partition_id FROM partitions_temp_table)` 365 existing_records_insert_exp.set( 366 "expression", 367 existing_records_insert_exp.expression.where( 368 exp.column("_partition_id").isin( 369 exp.select("partition_id").from_(partitions_temp_table_name) 370 ) 371 ), 372 ) 373 374 return all_affected_partitions, existing_records_insert_exp 375 376 def _build_alter_partition_exp( 377 self, 378 target_table: exp.Table, 379 temp_table: exp.Table, 380 partitions_to_replace: t.Set[str], 381 partitions_to_drop: t.Set[str], 382 ) -> exp.Alter: 383 alter_expr = exp.Alter(this=target_table, kind="TABLE") 384 385 for partition in partitions_to_replace: 386 alter_expr.append( 387 "actions", 388 exp.ReplacePartition( 389 expression=exp.Partition( 390 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 391 ), 392 source=temp_table, 393 ), 394 ) 395 396 for partition in partitions_to_drop: 397 alter_expr.append( 398 "actions", 399 exp.DropPartition( 400 expressions=[ 401 exp.Partition( 402 expressions=[exp.PartitionId(this=exp.Literal.string(str(partition)))] 403 ) 404 ], 405 source=temp_table, 406 ), 407 ) 408 409 return alter_expr 410 411 def _replace_by_key( 412 self, 413 target_table: TableName, 414 source_table: QueryOrDF, 415 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 416 key: t.Sequence[exp.Expr], 417 is_unique_key: bool, 418 source_columns: t.Optional[t.List[str]] = None, 419 ) -> None: 420 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 421 source_table, 422 target_columns_to_types, 423 target_table=target_table, 424 source_columns=source_columns, 425 ) 426 427 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 428 429 self._insert_overwrite_by_condition( 430 target_table, 431 source_queries, 432 target_columns_to_types, 433 dynamic_key=key, 434 dynamic_key_exp=key_exp, 435 dynamic_key_unique=is_unique_key, 436 ) 437 438 def insert_overwrite_by_partition( 439 self, 440 table_name: TableName, 441 query_or_df: QueryOrDF, 442 partitioned_by: t.List[exp.Expr], 443 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 444 source_columns: t.Optional[t.List[str]] = None, 445 ) -> None: 446 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 447 query_or_df, 448 target_columns_to_types, 449 target_table=table_name, 450 source_columns=source_columns, 451 ) 452 453 self._insert_overwrite_by_condition( 454 table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False 455 ) 456 457 def _create_table_like( 458 self, 459 target_table_name: TableName, 460 source_table_name: TableName, 461 exists: bool, 462 **kwargs: t.Any, 463 ) -> None: 464 """Create table with identical structure as source table""" 465 self.execute( 466 f"CREATE TABLE {target_table_name}{self._on_cluster_sql()} AS {source_table_name}" 467 ) 468 469 def _get_partition_ids( 470 self, 471 table: exp.Table, 472 partition_col_name: str = "_partition_id", 473 where: t.Optional[exp.Condition] = None, 474 limit: t.Optional[int] = None, 475 ) -> t.Set[t.Any]: 476 """List partition IDs present in table""" 477 partitions_query = exp.select(partition_col_name).distinct().from_(table) 478 if where: 479 partitions_query = partitions_query.where(where) 480 if limit: 481 partitions_query = partitions_query.limit(limit) 482 partitions = self.fetchall(partitions_query) 483 484 return set([part[0] for part in partitions] if partitions else []) 485 486 def _create_table( 487 self, 488 table_name_or_schema: t.Union[exp.Schema, TableName], 489 expression: t.Optional[exp.Expr], 490 exists: bool = True, 491 replace: bool = False, 492 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 493 table_description: t.Optional[str] = None, 494 column_descriptions: t.Optional[t.Dict[str, str]] = None, 495 table_kind: t.Optional[str] = None, 496 track_rows_processed: bool = True, 497 **kwargs: t.Any, 498 ) -> None: 499 """Creates a table in the database. 500 501 Clickhouse Cloud requires doing CTAS in two steps. 502 503 First, we add the `EMPTY` property to the CTAS call to create a table with the proper 504 schema, then insert the data with the CTAS query. 505 """ 506 # ensure columns used for partitioning are non-Nullable 507 # - normally user's responsibility, but we automatically partition by time column in 508 # incremental by time models 509 if kwargs.get("partitioned_by"): 510 partition_cols = [ 511 col.name 512 for part_expr in kwargs["partitioned_by"] 513 for col in part_expr.find_all(exp.Column) 514 ] 515 if isinstance(table_name_or_schema, exp.Schema): 516 for coldef in table_name_or_schema.expressions: 517 if coldef.name in partition_cols: 518 coldef.kind.set("nullable", False) 519 if target_columns_to_types: 520 for col in partition_cols: 521 target_columns_to_types[col].set("nullable", False) 522 523 super()._create_table( 524 table_name_or_schema, 525 expression, 526 exists, 527 replace, 528 target_columns_to_types, 529 table_description, 530 column_descriptions, 531 table_kind, 532 empty_ctas=(self.engine_run_mode.is_cloud and expression is not None), 533 track_rows_processed=track_rows_processed, 534 **kwargs, 535 ) 536 537 # execute the second INSERT step if on cloud and creating a table 538 # - Additional clause is to avoid clickhouse-connect HTTP client bug where CTAS LIMIT 0 539 # returns a success code but malformed response 540 if ( 541 self.engine_run_mode.is_cloud 542 and table_kind != "VIEW" 543 and expression 544 and not ( 545 expression.args.get("limit") is not None 546 and expression.args["limit"].expression.this == "0" 547 ) 548 ): 549 table_name = ( 550 table_name_or_schema.this 551 if isinstance(table_name_or_schema, exp.Schema) 552 else table_name_or_schema 553 ) 554 self._insert_append_query( 555 table_name, 556 expression, # type: ignore 557 target_columns_to_types or self.columns(table_name), 558 ) 559 560 def _exchange_tables( 561 self, 562 old_table_name: TableName, 563 new_table_name: TableName, 564 ) -> None: 565 from clickhouse_connect.driver.exceptions import DatabaseError # type: ignore 566 567 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 568 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 569 570 try: 571 self.execute( 572 f"EXCHANGE TABLES {old_table_sql} AND {new_table_sql}{self._on_cluster_sql()}" 573 ) 574 except DatabaseError as e: 575 if "NOT_IMPLEMENTED" in str(e): 576 # If someone is using an old Clickhouse version, an OS that doesn't support atomic exchanges, 577 # or a database engine that doesn't support atomic exchanges, we do a non-atomic rename instead. 578 # 579 # Executing multiple renames in one call like `RENAME TABLE a to b, c to a` is supported 580 # but not an atomic operation. Because it is not atomic, doing it in two calls is equivalent 581 # and does not require defining an additional method. 582 throwaway_table_name = self._get_temp_table(old_table_name) 583 self._rename_table(old_table_name, throwaway_table_name) 584 self._rename_table(new_table_name, old_table_name) 585 self.drop_table(throwaway_table_name) 586 587 def _rename_table( 588 self, 589 old_table_name: TableName, 590 new_table_name: TableName, 591 ) -> None: 592 old_table_sql = exp.to_table(old_table_name).sql(dialect=self.dialect, identify=True) 593 new_table_sql = exp.to_table(new_table_name).sql(dialect=self.dialect, identify=True) 594 595 self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}") 596 597 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None: 598 delete_expr = exp.delete(table_name, where) 599 if self.engine_run_mode.is_cluster: 600 delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))) 601 self.execute(delete_expr) 602 603 def alter_table( 604 self, 605 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 606 ) -> None: 607 """ 608 Performs the alter statements to change the current table into the structure of the target table. 609 """ 610 with self.transaction(): 611 for alter_expression in [ 612 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 613 ]: 614 if self.engine_run_mode.is_cluster: 615 alter_expression.set( 616 "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) 617 ) 618 self.execute(alter_expression) 619 620 def _drop_object( 621 self, 622 name: TableName | SchemaName, 623 exists: bool = True, 624 kind: str = "TABLE", 625 cascade: bool = False, 626 **drop_args: t.Any, 627 ) -> None: 628 """Drops an object. 629 630 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 631 632 Args: 633 name: The name of the table to drop. 634 exists: If exists, defaults to True. 635 kind: What kind of object to drop. Defaults to TABLE 636 **drop_args: Any extra arguments to set on the Drop expression 637 """ 638 super()._drop_object( 639 name=name, 640 exists=exists, 641 kind=kind, 642 cascade=cascade, 643 cluster=exp.OnCluster(this=exp.to_identifier(self.cluster)) 644 if self.engine_run_mode.is_cluster 645 else None, 646 **drop_args, 647 ) 648 649 def _build_partitioned_by_exp( 650 self, 651 partitioned_by: t.List[exp.Expr], 652 **kwargs: t.Any, 653 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 654 return exp.PartitionedByProperty( 655 this=exp.Schema(expressions=partitioned_by), 656 ) 657 658 def ensure_nulls_for_unmatched_after_join( 659 self, 660 query: Query, 661 ) -> Query: 662 # Set `join_use_nulls = 1` in a query's SETTINGS clause 663 query.append("settings", exp.var("join_use_nulls").eq(exp.Literal.number("1"))) 664 return query 665 666 def use_server_nulls_for_unmatched_after_join( 667 self, 668 query: Query, 669 ) -> Query: 670 # Set the `join_use_nulls` server value in a query's SETTINGS clause 671 # 672 # Use in SCD models: 673 # - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join 674 # are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`. 675 # - The SCD embeds the user's original query in the `source` CTE 676 # - Settings are dynamically scoped, so our setting may override the server's default setting the user expects 677 # for their query. 678 # - To prevent this, we: 679 # - If the user query sets `join_use_nulls`, we do nothing 680 # - If the user query does not set `join_use_nulls`, we query the server for the current setting 681 # - If the server value is 1, we do nothing 682 # - If the server values is not 1, we inject its `join_use_nulls` value into the user query 683 # - We do not need to check user subqueries because our injected setting operates at the same scope the 684 # server value would normally operate at 685 setting_name = "join_use_nulls" 686 setting_value = "1" 687 688 user_settings = query.args.get("settings") 689 # if user has not already set it explicitly 690 if not ( 691 user_settings 692 and any( 693 [ 694 isinstance(setting, exp.EQ) and setting.name == setting_name 695 for setting in user_settings 696 ] 697 ) 698 ): 699 server_value = self.fetchone( 700 exp.select("value") 701 .from_("system.settings") 702 .where(exp.column("name").eq(exp.Literal.string(setting_name))) 703 )[0] 704 # only inject the setting if the server value isn't 1 705 inject_setting = setting_value != server_value 706 setting_value = server_value if inject_setting else setting_value 707 708 if inject_setting: 709 query.append( 710 "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value)) 711 ) 712 713 return query 714 715 def _build_settings_property( 716 self, key: str, value: exp.Expr | str | int | float 717 ) -> exp.SettingsProperty: 718 return exp.SettingsProperty( 719 expressions=[ 720 exp.EQ( 721 this=exp.var(key.lower()), 722 expression=value 723 if isinstance(value, exp.Expr) 724 else exp.Literal(this=value, is_string=isinstance(value, str)), 725 ) 726 ] 727 ) 728 729 def _build_table_properties_exp( 730 self, 731 catalog_name: t.Optional[str] = None, 732 table_format: t.Optional[str] = None, 733 storage_format: t.Optional[str] = None, 734 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 735 partition_interval_unit: t.Optional[IntervalUnit] = None, 736 clustered_by: t.Optional[t.List[exp.Expr]] = None, 737 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 738 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 739 table_description: t.Optional[str] = None, 740 table_kind: t.Optional[str] = None, 741 empty_ctas: bool = False, 742 **kwargs: t.Any, 743 ) -> t.Optional[exp.Properties]: 744 properties: t.List[exp.Expr] = [] 745 746 table_engine = self.DEFAULT_TABLE_ENGINE 747 if storage_format: 748 table_engine = ( 749 storage_format.this if isinstance(storage_format, exp.Var) else storage_format # type: ignore 750 ) 751 properties.append(exp.EngineProperty(this=table_engine)) 752 753 # copy of table_properties so we can pop items off below then consume the rest later 754 table_properties_copy = { 755 k.upper(): v for k, v in (table_properties.copy() if table_properties else {}).items() 756 } 757 758 mergetree_engine = bool(re.search(self.ORDER_BY_TABLE_ENGINE_REGEX, table_engine)) 759 ordered_by_raw = table_properties_copy.pop("ORDER_BY", None) 760 if mergetree_engine: 761 ordered_by_exprs = [] 762 if ordered_by_raw: 763 ordered_by_vals = [] 764 765 if isinstance(ordered_by_raw, (exp.Tuple, exp.Array)): 766 ordered_by_vals = ordered_by_raw.expressions 767 if isinstance(ordered_by_raw, exp.Paren): 768 ordered_by_vals = [ordered_by_raw.this] 769 770 if not ordered_by_vals: 771 ordered_by_vals = ( 772 ordered_by_raw if isinstance(ordered_by_raw, list) else [ordered_by_raw] 773 ) 774 775 for col in ordered_by_vals: 776 ordered_by_exprs.append( 777 col 778 if isinstance(col, exp.Column) 779 else maybe_parse( 780 col.name if isinstance(col, exp.Literal) else col, 781 dialect=self.dialect, 782 into=exp.Ordered, 783 ) 784 ) 785 786 properties.append(exp.Order(expressions=[exp.Tuple(expressions=ordered_by_exprs)])) 787 788 primary_key = table_properties_copy.pop("PRIMARY_KEY", None) 789 if mergetree_engine and primary_key: 790 primary_key_vals = [] 791 if isinstance(primary_key, (exp.Tuple, exp.Array)): 792 primary_key_vals = primary_key.expressions 793 if isinstance(ordered_by_raw, exp.Paren): 794 primary_key_vals = [primary_key.this] 795 796 if not primary_key_vals: 797 primary_key_vals = primary_key if isinstance(primary_key, list) else [primary_key] 798 799 properties.append( 800 exp.PrimaryKey( 801 expressions=[ 802 exp.to_column(k.name if isinstance(k, exp.Literal) else k) 803 for k in primary_key_vals 804 ] 805 ) 806 ) 807 808 ttl = table_properties_copy.pop("TTL", None) 809 if ttl: 810 properties.append( 811 exp.MergeTreeTTL(expressions=[ttl if isinstance(ttl, exp.Expr) else exp.var(ttl)]) 812 ) 813 814 if ( 815 partitioned_by 816 and (partitioned_by_prop := self._build_partitioned_by_exp(partitioned_by)) is not None 817 ): 818 properties.append(partitioned_by_prop) 819 820 if self.engine_run_mode.is_cluster: 821 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 822 823 if empty_ctas: 824 properties.append(exp.EmptyProperty()) 825 826 if table_properties_copy: 827 properties.extend( 828 [self._build_settings_property(k, v) for k, v in table_properties_copy.items()] 829 ) 830 831 if table_description: 832 properties.append( 833 exp.SchemaCommentProperty( 834 this=exp.Literal.string(self._truncate_table_comment(table_description)) 835 ) 836 ) 837 838 if properties: 839 return exp.Properties(expressions=properties) 840 841 return None 842 843 def _build_view_properties_exp( 844 self, 845 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 846 table_description: t.Optional[str] = None, 847 **kwargs: t.Any, 848 ) -> t.Optional[exp.Properties]: 849 """Creates a SQLGlot table properties expression for view""" 850 properties: t.List[exp.Expr] = [] 851 852 view_properties_copy = view_properties.copy() if view_properties else {} 853 854 if self.engine_run_mode.is_cluster: 855 properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 856 857 if view_properties_copy: 858 properties.extend( 859 [self._build_settings_property(k, v) for k, v in view_properties_copy.items()] 860 ) 861 862 if table_description: 863 properties.append( 864 exp.SchemaCommentProperty( 865 this=exp.Literal.string(self._truncate_table_comment(table_description)) 866 ) 867 ) 868 869 if properties: 870 return exp.Properties(expressions=properties) 871 return None 872 873 def _build_create_comment_table_exp( 874 self, table: exp.Table, table_comment: str, table_kind: str, **kwargs: t.Any 875 ) -> exp.Comment | str: 876 table_sql = table.sql(dialect=self.dialect, identify=True) 877 878 truncated_comment = self._truncate_table_comment(table_comment) 879 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 880 881 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} MODIFY COMMENT {comment_sql}" 882 883 def _build_create_comment_column_exp( 884 self, 885 table: exp.Table, 886 column_name: str, 887 column_comment: str, 888 table_kind: str = "TABLE", 889 **kwargs: t.Any, 890 ) -> exp.Comment | str: 891 table_sql = table.sql(dialect=self.dialect, identify=True) 892 column_sql = exp.to_column(column_name).sql(dialect=self.dialect, identify=True) 893 894 truncated_comment = self._truncate_table_comment(column_comment) 895 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 896 897 return f"ALTER TABLE {table_sql}{self._on_cluster_sql()} COMMENT COLUMN {column_sql} {comment_sql}" 898 899 def _on_cluster_sql(self) -> str: 900 if self.engine_run_mode.is_cluster: 901 cluster_name = exp.to_identifier(self.cluster, quoted=True).sql(dialect=self.dialect) # type: ignore 902 return f" ON CLUSTER {cluster_name} " 903 return ""
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
engine_run_mode: sqlmesh.core.engine_adapter.shared.EngineRunMode
46 @property 47 def engine_run_mode(self) -> EngineRunMode: 48 if self._extra_config.get("cloud_mode"): 49 return EngineRunMode.CLOUD 50 # we use the user's specification of a cluster in the connection config to determine if 51 # the engine is in cluster mode 52 if self._extra_config.get("cluster"): 53 return EngineRunMode.CLUSTER 54 return EngineRunMode.STANDALONE
def
fetchone( self, query: Union[sqlglot.expressions.core.Expr, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> Tuple:
66 def fetchone( 67 self, 68 query: t.Union[exp.Expr, str], 69 ignore_unsupported_errors: bool = False, 70 quote_identifiers: bool = False, 71 ) -> t.Tuple: 72 with self.transaction(): 73 self.execute( 74 query, 75 ignore_unsupported_errors=ignore_unsupported_errors, 76 quote_identifiers=quote_identifiers, 77 ) 78 return self.cursor.fetchall()[0]
def
create_schema( self, schema_name: Union[str, sqlglot.expressions.query.Table], ignore_if_exists: bool = True, warn_on_error: bool = True, properties: List[sqlglot.expressions.core.Expr] = []) -> None:
165 def create_schema( 166 self, 167 schema_name: SchemaName, 168 ignore_if_exists: bool = True, 169 warn_on_error: bool = True, 170 properties: t.List[exp.Expr] = [], 171 ) -> None: 172 """Create a Clickhouse database from a name or qualified table name. 173 174 Clickhouse has a two-level naming scheme [database].[table]. 175 """ 176 properties_copy = properties.copy() 177 if self.engine_run_mode.is_cluster: 178 properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) 179 180 # can't call super() because it will try to set a catalog 181 return self._create_schema( 182 schema_name=schema_name, 183 ignore_if_exists=ignore_if_exists, 184 warn_on_error=warn_on_error, 185 properties=properties_copy, 186 # sqlglot transpiles CREATE SCHEMA to CREATE DATABASE, but this text is used in an error message 187 kind="DATABASE", 188 )
Create a Clickhouse database from a name or qualified table name.
Clickhouse has a two-level naming scheme [database].[table].
def
insert_overwrite_by_partition( self, table_name: Union[str, sqlglot.expressions.query.Table], query_or_df: <MagicMock id='132726897462096'>, partitioned_by: List[sqlglot.expressions.core.Expr], target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]] = None, source_columns: Optional[List[str]] = None) -> None:
438 def insert_overwrite_by_partition( 439 self, 440 table_name: TableName, 441 query_or_df: QueryOrDF, 442 partitioned_by: t.List[exp.Expr], 443 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 444 source_columns: t.Optional[t.List[str]] = None, 445 ) -> None: 446 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 447 query_or_df, 448 target_columns_to_types, 449 target_table=table_name, 450 source_columns=source_columns, 451 ) 452 453 self._insert_overwrite_by_condition( 454 table_name, source_queries, target_columns_to_types, keep_existing_partition_rows=False 455 )
def
delete_from( self, table_name: Union[str, sqlglot.expressions.query.Table], where: Union[str, sqlglot.expressions.core.Expr]) -> None:
def
alter_table( self, alter_expressions: Union[List[sqlglot.expressions.ddl.Alter], List[sqlmesh.core.schema_diff.TableAlterOperation]]) -> None:
603 def alter_table( 604 self, 605 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 606 ) -> None: 607 """ 608 Performs the alter statements to change the current table into the structure of the target table. 609 """ 610 with self.transaction(): 611 for alter_expression in [ 612 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 613 ]: 614 if self.engine_run_mode.is_cluster: 615 alter_expression.set( 616 "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) 617 ) 618 self.execute(alter_expression)
Performs the alter statements to change the current table into the structure of the target table.
def
ensure_nulls_for_unmatched_after_join( self, query: <MagicMock id='132726897251504'>) -> <MagicMock id='132726897251504'>:
def
use_server_nulls_for_unmatched_after_join( self, query: <MagicMock id='132726897251504'>) -> <MagicMock id='132726897251504'>:
666 def use_server_nulls_for_unmatched_after_join( 667 self, 668 query: Query, 669 ) -> Query: 670 # Set the `join_use_nulls` server value in a query's SETTINGS clause 671 # 672 # Use in SCD models: 673 # - The SCD query we build must include the setting `join_use_nulls = 1` to ensure that empty cells in a join 674 # are filled with NULL instead of the default data type value. The default join_use_nulls value is `0`. 675 # - The SCD embeds the user's original query in the `source` CTE 676 # - Settings are dynamically scoped, so our setting may override the server's default setting the user expects 677 # for their query. 678 # - To prevent this, we: 679 # - If the user query sets `join_use_nulls`, we do nothing 680 # - If the user query does not set `join_use_nulls`, we query the server for the current setting 681 # - If the server value is 1, we do nothing 682 # - If the server values is not 1, we inject its `join_use_nulls` value into the user query 683 # - We do not need to check user subqueries because our injected setting operates at the same scope the 684 # server value would normally operate at 685 setting_name = "join_use_nulls" 686 setting_value = "1" 687 688 user_settings = query.args.get("settings") 689 # if user has not already set it explicitly 690 if not ( 691 user_settings 692 and any( 693 [ 694 isinstance(setting, exp.EQ) and setting.name == setting_name 695 for setting in user_settings 696 ] 697 ) 698 ): 699 server_value = self.fetchone( 700 exp.select("value") 701 .from_("system.settings") 702 .where(exp.column("name").eq(exp.Literal.string(setting_name))) 703 )[0] 704 # only inject the setting if the server value isn't 1 705 inject_setting = setting_value != server_value 706 setting_value = server_value if inject_setting else setting_value 707 708 if inject_setting: 709 query.append( 710 "settings", exp.var(setting_name).eq(exp.Literal.number(setting_value)) 711 ) 712 713 return query
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- COMMENT_CREATION_TABLE
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- create_view
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- insert_append
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ping
- get_table_last_modified_ts