EngineAdapter
Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to generalize its functionality to different engines that have Python Database API 2.0-compliant connections. Rather than executing queries directly against your data stores, SQLMesh components such as the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic.
1""" 2# EngineAdapter 3 4Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to 5generalize its functionality to different engines that have Python Database API 2.0-compliant 6connections. Rather than executing queries directly against your data stores, SQLMesh components such as 7the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic. 8""" 9 10from __future__ import annotations 11 12import contextlib 13import itertools 14import logging 15import sys 16import typing as t 17from functools import partial 18 19import pandas as pd 20from sqlglot import Dialect, exp 21from sqlglot.errors import ErrorLevel 22from sqlglot.helper import ensure_list 23from sqlglot.optimizer.qualify_columns import quote_identifiers 24 25from sqlmesh.core.dialect import ( 26 add_table, 27 schema_, 28 select_from_values_for_batch_range, 29 to_schema, 30) 31from sqlmesh.core.engine_adapter.shared import ( 32 CatalogSupport, 33 CommentCreationTable, 34 CommentCreationView, 35 DataObject, 36 InsertOverwriteStrategy, 37 SourceQuery, 38 set_catalog, 39) 40from sqlmesh.core.model.kind import TimeColumn 41from sqlmesh.core.schema_diff import SchemaDiffer 42from sqlmesh.utils import columns_to_types_all_known, random_id 43from sqlmesh.utils.connection_pool import create_connection_pool 44from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column 45from sqlmesh.utils.errors import SQLMeshError, UnsupportedCatalogOperationError 46from sqlmesh.utils.pandas import columns_to_types_from_df 47 48if t.TYPE_CHECKING: 49 from sqlmesh.core._typing import SchemaName, SessionProperties, TableName 50 from sqlmesh.core.engine_adapter._typing import ( 51 DF, 52 PySparkDataFrame, 53 PySparkSession, 54 Query, 55 QueryOrDF, 56 ) 57 from sqlmesh.core.node import IntervalUnit 58 59logger = logging.getLogger(__name__) 60 61MERGE_TARGET_ALIAS = "__MERGE_TARGET__" 62MERGE_SOURCE_ALIAS = "__MERGE_SOURCE__" 63 64 65@set_catalog() 66class EngineAdapter: 67 """Base class wrapping a Database API compliant connection. 68 69 The EngineAdapter is an easily-subclassable interface that interacts 70 with the underlying engine and data store. 71 72 Args: 73 connection_factory: a callable which produces a new Database API-compliant 74 connection on every call. 75 dialect: The dialect with which this adapter is associated. 76 multithreaded: Indicates whether this adapter will be used by more than one thread. 77 """ 78 79 DIALECT = "" 80 DEFAULT_BATCH_SIZE = 10000 81 DATA_OBJECT_FILTER_BATCH_SIZE = 4000 82 SUPPORTS_TRANSACTIONS = True 83 SUPPORTS_INDEXES = False 84 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS 85 COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 86 MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None 87 MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None 88 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 89 SUPPORTS_MATERIALIZED_VIEWS = False 90 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False 91 SUPPORTS_CLONING = False 92 SCHEMA_DIFFER = SchemaDiffer() 93 SUPPORTS_TUPLE_IN = True 94 CATALOG_SUPPORT = CatalogSupport.UNSUPPORTED 95 SUPPORTS_ROW_LEVEL_OP = True 96 HAS_VIEW_BINDING = False 97 SUPPORTS_REPLACE_TABLE = True 98 DEFAULT_CATALOG_TYPE = DIALECT 99 QUOTE_IDENTIFIERS_IN_VIEWS = True 100 101 def __init__( 102 self, 103 connection_factory: t.Callable[[], t.Any], 104 dialect: str = "", 105 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 106 multithreaded: bool = False, 107 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 108 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 109 default_catalog: t.Optional[str] = None, 110 execute_log_level: int = logging.DEBUG, 111 register_comments: bool = True, 112 **kwargs: t.Any, 113 ): 114 self.dialect = dialect.lower() or self.DIALECT 115 self._connection_pool = create_connection_pool( 116 connection_factory, multithreaded, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 117 ) 118 self.sql_gen_kwargs = sql_gen_kwargs or {} 119 self._default_catalog = default_catalog 120 self._execute_log_level = execute_log_level 121 self._extra_config = kwargs 122 self.register_comments = register_comments 123 124 def with_log_level(self, level: int) -> EngineAdapter: 125 adapter = self.__class__( 126 lambda: None, 127 dialect=self.dialect, 128 sql_gen_kwargs=self.sql_gen_kwargs, 129 default_catalog=self._default_catalog, 130 execute_log_level=level, 131 register_comments=self.register_comments, 132 **self._extra_config, 133 ) 134 135 adapter._connection_pool = self._connection_pool 136 137 return adapter 138 139 @property 140 def cursor(self) -> t.Any: 141 return self._connection_pool.get_cursor() 142 143 @property 144 def spark(self) -> t.Optional[PySparkSession]: 145 return None 146 147 @property 148 def comments_enabled(self) -> bool: 149 return self.register_comments and self.COMMENT_CREATION_TABLE.is_supported 150 151 @classmethod 152 def is_pandas_df(cls, value: t.Any) -> bool: 153 return isinstance(value, pd.DataFrame) 154 155 @classmethod 156 def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]: 157 return [ 158 exp.alias_(exp.cast(exp.column(column), to=kind), column, copy=False) 159 for column, kind in columns_to_types.items() 160 ] 161 162 @property 163 def default_catalog(self) -> t.Optional[str]: 164 if self.CATALOG_SUPPORT.is_unsupported: 165 return None 166 default_catalog = self._default_catalog or self.get_current_catalog() 167 if not default_catalog: 168 raise SQLMeshError("Could not determine a default catalog despite it being supported.") 169 return default_catalog 170 171 def _get_source_queries( 172 self, 173 query_or_df: QueryOrDF, 174 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 175 target_table: TableName, 176 *, 177 batch_size: t.Optional[int] = None, 178 ) -> t.List[SourceQuery]: 179 batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size 180 if isinstance(query_or_df, (exp.Query, exp.DerivedTable)): 181 return [SourceQuery(query_factory=lambda: query_or_df)] # type: ignore 182 if not columns_to_types: 183 raise SQLMeshError( 184 "It is expected that if a DF is passed in then columns_to_types is set" 185 ) 186 return self._df_to_source_queries( 187 query_or_df, columns_to_types, batch_size, target_table=target_table 188 ) 189 190 def _df_to_source_queries( 191 self, 192 df: DF, 193 columns_to_types: t.Dict[str, exp.DataType], 194 batch_size: int, 195 target_table: TableName, 196 ) -> t.List[SourceQuery]: 197 assert isinstance(df, pd.DataFrame) 198 num_rows = len(df.index) 199 batch_size = sys.maxsize if batch_size == 0 else batch_size 200 values = list(df.itertuples(index=False, name=None)) 201 return [ 202 SourceQuery( 203 query_factory=partial( 204 self._values_to_sql, 205 values=values, 206 columns_to_types=columns_to_types, 207 batch_start=i, 208 batch_end=min(i + batch_size, num_rows), 209 ), 210 ) 211 for i in range(0, num_rows, batch_size) 212 ] 213 214 def _get_source_queries_and_columns_to_types( 215 self, 216 query_or_df: QueryOrDF, 217 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 218 target_table: TableName, 219 *, 220 batch_size: t.Optional[int] = None, 221 ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]: 222 columns_to_types = self._columns_to_types(query_or_df, columns_to_types) 223 return ( 224 self._get_source_queries( 225 query_or_df, columns_to_types, target_table=target_table, batch_size=batch_size 226 ), 227 columns_to_types, 228 ) 229 230 @t.overload 231 def _columns_to_types( 232 self, query_or_df: DF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 233 ) -> t.Dict[str, exp.DataType]: ... 234 235 @t.overload 236 def _columns_to_types( 237 self, query_or_df: Query, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 238 ) -> t.Optional[t.Dict[str, exp.DataType]]: ... 239 240 def _columns_to_types( 241 self, query_or_df: QueryOrDF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 242 ) -> t.Optional[t.Dict[str, exp.DataType]]: 243 if columns_to_types: 244 return columns_to_types 245 if self.is_pandas_df(query_or_df): 246 return columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df)) 247 return columns_to_types 248 249 def recycle(self) -> None: 250 """Closes all open connections and releases all allocated resources associated with any thread 251 except the calling one.""" 252 self._connection_pool.close_all(exclude_calling_thread=True) 253 254 def close(self) -> t.Any: 255 """Closes all open connections and releases all allocated resources.""" 256 self._connection_pool.close_all() 257 258 def get_current_catalog(self) -> t.Optional[str]: 259 """Returns the catalog name of the current connection.""" 260 raise NotImplementedError() 261 262 def set_current_catalog(self, catalog: str) -> None: 263 """Sets the catalog name of the current connection.""" 264 raise NotImplementedError() 265 266 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 267 """Intended to be overridden for data virtualization systems like Trino that, 268 depending on the target catalog, require slightly different properties to be set when creating / updating tables 269 """ 270 if self.CATALOG_SUPPORT.is_unsupported: 271 raise UnsupportedCatalogOperationError( 272 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 273 ) 274 return self.DEFAULT_CATALOG_TYPE 275 276 @property 277 def current_catalog_type(self) -> str: 278 return self.get_catalog_type(self.get_current_catalog()) 279 280 def replace_query( 281 self, 282 table_name: TableName, 283 query_or_df: QueryOrDF, 284 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 285 table_description: t.Optional[str] = None, 286 column_descriptions: t.Optional[t.Dict[str, str]] = None, 287 **kwargs: t.Any, 288 ) -> None: 289 """Replaces an existing table with a query. 290 291 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 292 293 Args: 294 table_name: The name of the table (eg. prod.table) 295 query_or_df: The SQL query to run or a dataframe. 296 columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 297 Expected to be ordered to match the order of values in the dataframe. 298 kwargs: Optional create table properties. 299 """ 300 target_table = exp.to_table(table_name) 301 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 302 query_or_df, columns_to_types, target_table=target_table 303 ) 304 columns_to_types = columns_to_types or self.columns(target_table) 305 query = source_queries[0].query_factory() 306 self_referencing = any( 307 quote_identifiers(table) == quote_identifiers(target_table) 308 for table in query.find_all(exp.Table) 309 ) 310 # If a query references itself then it must have a table created regardless of approach used. 311 if self_referencing: 312 self._create_table_from_columns( 313 target_table, 314 columns_to_types, 315 exists=True, 316 table_description=table_description, 317 column_descriptions=column_descriptions, 318 ) 319 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 320 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 321 if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table): 322 return self._create_table_from_source_queries( 323 target_table, 324 source_queries, 325 columns_to_types, 326 replace=self.SUPPORTS_REPLACE_TABLE, 327 table_description=table_description, 328 column_descriptions=column_descriptions, 329 **kwargs, 330 ) 331 else: 332 if self_referencing: 333 with self.temp_table( 334 self._select_columns(columns_to_types).from_(target_table), 335 name=target_table, 336 columns_to_types=columns_to_types, 337 **kwargs, 338 ) as temp_table: 339 for source_query in source_queries: 340 source_query.add_transform( 341 lambda node: ( # type: ignore 342 temp_table # type: ignore 343 if isinstance(node, exp.Table) 344 and quote_identifiers(node) == quote_identifiers(target_table) 345 else node 346 ) 347 ) 348 return self._insert_overwrite_by_condition( 349 target_table, 350 source_queries, 351 columns_to_types, 352 ) 353 return self._insert_overwrite_by_condition( 354 target_table, 355 source_queries, 356 columns_to_types, 357 ) 358 359 def create_index( 360 self, 361 table_name: TableName, 362 index_name: str, 363 columns: t.Tuple[str, ...], 364 exists: bool = True, 365 ) -> None: 366 """Creates a new index for the given table if supported 367 368 Args: 369 table_name: The name of the target table. 370 index_name: The name of the index. 371 columns: The list of columns that constitute the index. 372 exists: Indicates whether to include the IF NOT EXISTS check. 373 """ 374 if not self.SUPPORTS_INDEXES: 375 return 376 377 expression = exp.Create( 378 this=exp.Index( 379 this=exp.to_identifier(index_name), 380 table=exp.to_table(table_name), 381 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 382 ), 383 kind="INDEX", 384 exists=exists, 385 ) 386 self.execute(expression) 387 388 def create_table( 389 self, 390 table_name: TableName, 391 columns_to_types: t.Dict[str, exp.DataType], 392 primary_key: t.Optional[t.Tuple[str, ...]] = None, 393 exists: bool = True, 394 table_description: t.Optional[str] = None, 395 column_descriptions: t.Optional[t.Dict[str, str]] = None, 396 **kwargs: t.Any, 397 ) -> None: 398 """Create a table using a DDL statement 399 400 Args: 401 table_name: The name of the table to create. Can be fully qualified or just table name. 402 columns_to_types: A mapping between the column name and its data type. 403 primary_key: Determines the table primary key. 404 exists: Indicates whether to include the IF NOT EXISTS check. 405 table_description: Optional table description from MODEL DDL. 406 column_descriptions: Optional column descriptions from model query. 407 kwargs: Optional create table properties. 408 """ 409 self._create_table_from_columns( 410 table_name, 411 columns_to_types, 412 primary_key, 413 exists, 414 table_description, 415 column_descriptions, 416 **kwargs, 417 ) 418 419 def ctas( 420 self, 421 table_name: TableName, 422 query_or_df: QueryOrDF, 423 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 424 exists: bool = True, 425 table_description: t.Optional[str] = None, 426 column_descriptions: t.Optional[t.Dict[str, str]] = None, 427 **kwargs: t.Any, 428 ) -> None: 429 """Create a table using a CTAS statement 430 431 Args: 432 table_name: The name of the table to create. Can be fully qualified or just table name. 433 query_or_df: The SQL query to run or a dataframe for the CTAS. 434 columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 435 exists: Indicates whether to include the IF NOT EXISTS check. 436 table_description: Optional table description from MODEL DDL. 437 column_descriptions: Optional column descriptions from model query. 438 kwargs: Optional create table properties. 439 """ 440 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 441 query_or_df, columns_to_types, target_table=table_name 442 ) 443 return self._create_table_from_source_queries( 444 table_name, 445 source_queries, 446 columns_to_types, 447 exists, 448 table_description=table_description, 449 column_descriptions=column_descriptions, 450 **kwargs, 451 ) 452 453 def create_state_table( 454 self, 455 table_name: str, 456 columns_to_types: t.Dict[str, exp.DataType], 457 primary_key: t.Optional[t.Tuple[str, ...]] = None, 458 ) -> None: 459 """Create a table to store SQLMesh internal state. 460 461 Args: 462 table_name: The name of the table to create. Can be fully qualified or just table name. 463 columns_to_types: A mapping between the column name and its data type. 464 primary_key: Determines the table primary key. 465 """ 466 self.create_table( 467 table_name, 468 columns_to_types, 469 primary_key=primary_key, 470 ) 471 472 def _create_table_from_columns( 473 self, 474 table_name: TableName, 475 columns_to_types: t.Dict[str, exp.DataType], 476 primary_key: t.Optional[t.Tuple[str, ...]] = None, 477 exists: bool = True, 478 table_description: t.Optional[str] = None, 479 column_descriptions: t.Optional[t.Dict[str, str]] = None, 480 **kwargs: t.Any, 481 ) -> None: 482 """ 483 Create a table using a DDL statement. 484 485 Args: 486 table_name: The name of the table to create. Can be fully qualified or just table name. 487 columns_to_types: Mapping between the column name and its data type. 488 primary_key: Determines the table primary key. 489 exists: Indicates whether to include the IF NOT EXISTS check. 490 table_description: Optional table description from MODEL DDL. 491 column_descriptions: Optional column descriptions from model query. 492 kwargs: Optional create table properties. 493 """ 494 table = exp.to_table(table_name) 495 496 if not columns_to_types_all_known(columns_to_types): 497 # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set 498 if exists and self.table_exists(table_name): 499 return 500 raise SQLMeshError( 501 "Cannot create a table without knowing the column types. " 502 "Try casting the columns to an expected type or defining the columns in the model metadata. " 503 f"Columns to types: {columns_to_types}" 504 ) 505 506 primary_key_expression = ( 507 [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])] 508 if primary_key and self.SUPPORTS_INDEXES 509 else [] 510 ) 511 512 schema = self._build_schema_exp( 513 table, 514 columns_to_types, 515 column_descriptions, 516 primary_key_expression, 517 ) 518 519 self._create_table( 520 schema, 521 None, 522 exists=exists, 523 columns_to_types=columns_to_types, 524 table_description=table_description, 525 **kwargs, 526 ) 527 528 # Register comments with commands if the engine doesn't support comments in the schema or CREATE 529 if ( 530 table_description 531 and self.COMMENT_CREATION_TABLE.is_comment_command_only 532 and self.comments_enabled 533 ): 534 self._create_table_comment(table_name, table_description) 535 if ( 536 column_descriptions 537 and self.COMMENT_CREATION_TABLE.is_comment_command_only 538 and self.comments_enabled 539 ): 540 self._create_column_comments(table_name, column_descriptions) 541 542 def _build_schema_exp( 543 self, 544 table: exp.Table, 545 columns_to_types: t.Dict[str, exp.DataType], 546 column_descriptions: t.Optional[t.Dict[str, str]] = None, 547 expressions: t.Optional[t.List[exp.PrimaryKey]] = None, 548 is_view: bool = False, 549 ) -> exp.Schema: 550 """ 551 Build a schema expression for a table, columns, column comments, and additional schema properties. 552 """ 553 expressions = expressions or [] 554 engine_supports_schema_comments = ( 555 self.COMMENT_CREATION_VIEW.supports_schema_def 556 if is_view 557 else self.COMMENT_CREATION_TABLE.supports_schema_def 558 ) 559 return exp.Schema( 560 this=table, 561 expressions=[ 562 exp.ColumnDef( 563 this=exp.to_identifier(column), 564 kind=None if is_view else kind, # don't include column data type for views 565 constraints=( 566 self._build_col_comment_exp(column, column_descriptions) 567 if column_descriptions 568 and engine_supports_schema_comments 569 and self.comments_enabled 570 else None 571 ), 572 ) 573 for column, kind in columns_to_types.items() 574 ] 575 + expressions, 576 ) 577 578 def _build_col_comment_exp( 579 self, col_name: str, column_descriptions: t.Dict[str, str] 580 ) -> t.List[exp.ColumnConstraint]: 581 comment = column_descriptions.get(col_name, None) 582 if comment: 583 return [ 584 exp.ColumnConstraint( 585 kind=exp.CommentColumnConstraint( 586 this=exp.Literal.string(self._truncate_column_comment(comment)) 587 ) 588 ) 589 ] 590 return [] 591 592 def _create_table_from_source_queries( 593 self, 594 table_name: TableName, 595 source_queries: t.List[SourceQuery], 596 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 597 exists: bool = True, 598 replace: bool = False, 599 table_description: t.Optional[str] = None, 600 column_descriptions: t.Optional[t.Dict[str, str]] = None, 601 **kwargs: t.Any, 602 ) -> None: 603 table = exp.to_table(table_name) 604 605 # CTAS calls do not usually include a schema expression. However, most engines 606 # permit them in CTAS expressions, and they allow us to register all column comments 607 # in a single call rather than in a separate comment command call for each column. 608 # 609 # This block conditionally builds a schema expression with column comments if the engine 610 # supports it and we have columns_to_types. column_to_types is required because the 611 # schema expression must include at least column name, data type, and the comment - 612 # for example, `(colname INTEGER COMMENT 'comment')`. 613 # 614 # column_to_types will be available when loading from a DataFrame (by converting from 615 # pandas to SQL types), when a model is "annotated" by explicitly specifying column 616 # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()` 617 # calls and SCD Type 2 model calls. 618 schema = None 619 columns_to_types_known = columns_to_types and columns_to_types_all_known(columns_to_types) 620 if ( 621 column_descriptions 622 and columns_to_types_known 623 and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas 624 and self.comments_enabled 625 ): 626 schema = self._build_schema_exp(table, columns_to_types, column_descriptions) # type: ignore 627 628 with self.transaction(condition=len(source_queries) > 1): 629 for i, source_query in enumerate(source_queries): 630 with source_query as query: 631 if i == 0: 632 self._create_table( 633 schema if schema else table, 634 query, 635 columns_to_types=columns_to_types, 636 exists=exists, 637 replace=replace, 638 table_description=table_description, 639 **kwargs, 640 ) 641 else: 642 self._insert_append_query( 643 table_name, query, columns_to_types or self.columns(table) 644 ) 645 646 # Register comments with commands if the engine supports comments and we weren't able to 647 # register them with the CTAS call's schema expression. 648 if ( 649 table_description 650 and self.COMMENT_CREATION_TABLE.is_comment_command_only 651 and self.comments_enabled 652 ): 653 self._create_table_comment(table_name, table_description) 654 if column_descriptions and schema is None and self.comments_enabled: 655 self._create_column_comments(table_name, column_descriptions) 656 657 def _create_table( 658 self, 659 table_name_or_schema: t.Union[exp.Schema, TableName], 660 expression: t.Optional[exp.Expression], 661 exists: bool = True, 662 replace: bool = False, 663 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 664 table_description: t.Optional[str] = None, 665 column_descriptions: t.Optional[t.Dict[str, str]] = None, 666 **kwargs: t.Any, 667 ) -> None: 668 self.execute( 669 self._build_create_table_exp( 670 table_name_or_schema, 671 expression=expression, 672 exists=exists, 673 replace=replace, 674 columns_to_types=columns_to_types, 675 table_description=( 676 table_description 677 if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled 678 else None 679 ), 680 **kwargs, 681 ) 682 ) 683 684 def _build_create_table_exp( 685 self, 686 table_name_or_schema: t.Union[exp.Schema, TableName], 687 expression: t.Optional[exp.Expression], 688 exists: bool = True, 689 replace: bool = False, 690 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 691 **kwargs: t.Any, 692 ) -> exp.Create: 693 exists = False if replace else exists 694 catalog_name = None 695 if not isinstance(table_name_or_schema, exp.Schema): 696 table_name_or_schema = exp.to_table(table_name_or_schema) 697 catalog_name = table_name_or_schema.catalog 698 else: 699 if isinstance(table_name_or_schema.this, exp.Table): 700 catalog_name = table_name_or_schema.this.catalog 701 702 properties = ( 703 self._build_table_properties_exp( 704 **kwargs, catalog_name=catalog_name, columns_to_types=columns_to_types 705 ) 706 if kwargs 707 else None 708 ) 709 return exp.Create( 710 this=table_name_or_schema, 711 kind="TABLE", 712 replace=replace, 713 exists=exists, 714 expression=expression, 715 properties=properties, 716 ) 717 718 def create_table_like( 719 self, 720 target_table_name: TableName, 721 source_table_name: TableName, 722 exists: bool = True, 723 ) -> None: 724 """ 725 Create a table like another table or view. 726 """ 727 target_table = exp.to_table(target_table_name) 728 source_table = exp.to_table(source_table_name) 729 create_expression = exp.Create( 730 this=target_table, 731 kind="TABLE", 732 exists=exists, 733 properties=exp.Properties( 734 expressions=[ 735 exp.LikeProperty(this=source_table), 736 ] 737 ), 738 ) 739 self.execute(create_expression) 740 741 def clone_table( 742 self, 743 target_table_name: TableName, 744 source_table_name: TableName, 745 replace: bool = False, 746 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 747 **kwargs: t.Any, 748 ) -> None: 749 """Creates a table with the target name by cloning the source table. 750 751 Args: 752 target_table_name: The name of the table that should be created. 753 source_table_name: The name of the source table that should be cloned. 754 replace: Whether or not to replace an existing table. 755 """ 756 if not self.SUPPORTS_CLONING: 757 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 758 self.execute( 759 exp.Create( 760 this=exp.to_table(target_table_name), 761 kind="TABLE", 762 replace=replace, 763 clone=exp.Clone( 764 this=exp.to_table(source_table_name), 765 **(clone_kwargs or {}), 766 ), 767 **kwargs, 768 ) 769 ) 770 771 def drop_table(self, table_name: TableName, exists: bool = True) -> None: 772 """Drops a table. 773 774 Args: 775 table_name: The name of the table to drop. 776 exists: If exists, defaults to True. 777 """ 778 drop_expression = exp.Drop(this=exp.to_table(table_name), kind="TABLE", exists=exists) 779 self.execute(drop_expression) 780 781 def alter_table( 782 self, 783 current_table_name: TableName, 784 target_table_name: TableName, 785 ) -> None: 786 """ 787 Performs the required alter statements to change the current table into the structure of the target table. 788 """ 789 with self.transaction(): 790 for alter_expression in self.SCHEMA_DIFFER.compare_columns( 791 current_table_name, 792 self.columns(current_table_name), 793 self.columns(target_table_name), 794 ): 795 self.execute(alter_expression) 796 797 def create_view( 798 self, 799 view_name: TableName, 800 query_or_df: QueryOrDF, 801 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 802 replace: bool = True, 803 materialized: bool = False, 804 table_description: t.Optional[str] = None, 805 column_descriptions: t.Optional[t.Dict[str, str]] = None, 806 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 807 **create_kwargs: t.Any, 808 ) -> None: 809 """Create a view with a query or dataframe. 810 811 If a dataframe is passed in, it will be converted into a literal values statement. 812 This should only be done if the dataframe is very small! 813 814 Args: 815 view_name: The view name. 816 query_or_df: A query or dataframe. 817 columns_to_types: Columns to use in the view statement. 818 replace: Whether or not to replace an existing view defaults to True. 819 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 820 table_description: Optional table description from MODEL DDL. 821 column_descriptions: Optional column descriptions from model query. 822 view_properties: Optional view properties to add to the view. 823 create_kwargs: Additional kwargs to pass into the Create expression 824 """ 825 if self.is_pandas_df(query_or_df): 826 values = list(t.cast(pd.DataFrame, query_or_df).itertuples(index=False, name=None)) 827 columns_to_types = columns_to_types or self._columns_to_types(query_or_df) 828 if not columns_to_types: 829 raise SQLMeshError("columns_to_types must be provided for dataframes") 830 query_or_df = self._values_to_sql( 831 values, 832 columns_to_types, 833 batch_start=0, 834 batch_end=len(values), 835 ) 836 837 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 838 query_or_df, columns_to_types, batch_size=0, target_table=view_name 839 ) 840 if len(source_queries) != 1: 841 raise SQLMeshError("Only one source query is supported for creating views") 842 843 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 844 if columns_to_types: 845 schema = self._build_schema_exp( 846 exp.to_table(view_name), columns_to_types, column_descriptions, is_view=True 847 ) 848 849 properties = create_kwargs.pop("properties", None) 850 if not properties: 851 properties = exp.Properties(expressions=[]) 852 853 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 854 properties.append("expressions", exp.MaterializedProperty()) 855 856 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 857 schema = schema.this 858 859 create_view_properties = self._build_view_properties_exp( 860 view_properties, 861 ( 862 table_description 863 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 864 else None 865 ), 866 ) 867 if create_view_properties: 868 for view_property in create_view_properties.expressions: 869 properties.append("expressions", view_property) 870 871 if properties.expressions: 872 create_kwargs["properties"] = properties 873 874 with source_queries[0] as query: 875 self.execute( 876 exp.Create( 877 this=schema, 878 kind="VIEW", 879 replace=replace, 880 expression=query, 881 **create_kwargs, 882 ), 883 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 884 ) 885 886 # Register table comment with commands if the engine doesn't support doing it in CREATE 887 if ( 888 table_description 889 and self.COMMENT_CREATION_VIEW.is_comment_command_only 890 and self.comments_enabled 891 ): 892 self._create_table_comment(view_name, table_description, "VIEW") 893 # Register column comments with commands if the engine doesn't support doing it in 894 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 895 # columns_to_types 896 if ( 897 column_descriptions 898 and ( 899 self.COMMENT_CREATION_VIEW.is_comment_command_only 900 or ( 901 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 902 and not columns_to_types 903 ) 904 ) 905 and self.comments_enabled 906 ): 907 self._create_column_comments(view_name, column_descriptions, "VIEW") 908 909 @set_catalog() 910 def create_schema( 911 self, 912 schema_name: SchemaName, 913 ignore_if_exists: bool = True, 914 warn_on_error: bool = True, 915 ) -> None: 916 """Create a schema from a name or qualified table name.""" 917 try: 918 self.execute( 919 exp.Create( 920 this=to_schema(schema_name), 921 kind="SCHEMA", 922 exists=ignore_if_exists, 923 ) 924 ) 925 except Exception as e: 926 if not warn_on_error: 927 raise 928 logger.warning("Failed to create schema '%s': %s", schema_name, e) 929 930 def drop_schema( 931 self, 932 schema_name: SchemaName, 933 ignore_if_not_exists: bool = True, 934 cascade: bool = False, 935 ) -> None: 936 self.execute( 937 exp.Drop( 938 this=to_schema(schema_name), 939 kind="SCHEMA", 940 exists=ignore_if_not_exists, 941 cascade=cascade, 942 ) 943 ) 944 945 def drop_view( 946 self, 947 view_name: TableName, 948 ignore_if_not_exists: bool = True, 949 materialized: bool = False, 950 **kwargs: t.Any, 951 ) -> None: 952 """Drop a view.""" 953 self.execute( 954 exp.Drop( 955 this=exp.to_table(view_name), 956 exists=ignore_if_not_exists, 957 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 958 kind="VIEW", 959 **kwargs, 960 ) 961 ) 962 963 def columns( 964 self, table_name: TableName, include_pseudo_columns: bool = False 965 ) -> t.Dict[str, exp.DataType]: 966 """Fetches column names and types for the target table.""" 967 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 968 describe_output = self.cursor.fetchall() 969 return { 970 # Note: MySQL returns the column type as bytes. 971 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 972 for column_name, column_type, *_ in itertools.takewhile( 973 lambda t: not t[0].startswith("#"), 974 describe_output, 975 ) 976 if column_name and column_name.strip() and column_type and column_type.strip() 977 } 978 979 def table_exists(self, table_name: TableName) -> bool: 980 try: 981 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 982 return True 983 except Exception: 984 return False 985 986 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None: 987 self.execute(exp.delete(table_name, where)) 988 989 def insert_append( 990 self, 991 table_name: TableName, 992 query_or_df: QueryOrDF, 993 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 994 ) -> None: 995 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 996 query_or_df, columns_to_types, target_table=table_name 997 ) 998 self._insert_append_source_queries(table_name, source_queries, columns_to_types) 999 1000 def _insert_append_source_queries( 1001 self, 1002 table_name: TableName, 1003 source_queries: t.List[SourceQuery], 1004 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1005 ) -> None: 1006 with self.transaction(condition=len(source_queries) > 0): 1007 columns_to_types = columns_to_types or self.columns(table_name) 1008 for source_query in source_queries: 1009 with source_query as query: 1010 self._insert_append_query(table_name, query, columns_to_types) 1011 1012 def _insert_append_query( 1013 self, 1014 table_name: TableName, 1015 query: Query, 1016 columns_to_types: t.Dict[str, exp.DataType], 1017 order_projections: bool = True, 1018 ) -> None: 1019 if order_projections: 1020 query = self._order_projections_and_filter(query, columns_to_types) 1021 self.execute(exp.insert(query, table_name, columns=list(columns_to_types))) 1022 1023 def insert_overwrite_by_partition( 1024 self, 1025 table_name: TableName, 1026 query_or_df: QueryOrDF, 1027 partitioned_by: t.List[exp.Expression], 1028 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1029 ) -> None: 1030 raise NotImplementedError( 1031 "Insert Overwrite by Partition (not time) is not supported by this engine" 1032 ) 1033 1034 def insert_overwrite_by_time_partition( 1035 self, 1036 table_name: TableName, 1037 query_or_df: QueryOrDF, 1038 start: TimeLike, 1039 end: TimeLike, 1040 time_formatter: t.Callable[ 1041 [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression 1042 ], 1043 time_column: TimeColumn | exp.Expression | str, 1044 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1045 **kwargs: t.Any, 1046 ) -> None: 1047 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1048 query_or_df, columns_to_types, target_table=table_name 1049 ) 1050 columns_to_types = columns_to_types or self.columns(table_name) 1051 low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)] 1052 if isinstance(time_column, TimeColumn): 1053 time_column = time_column.column 1054 where = exp.Between( 1055 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1056 low=low, 1057 high=high, 1058 ) 1059 self._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where) 1060 1061 def _values_to_sql( 1062 self, 1063 values: t.List[t.Tuple[t.Any, ...]], 1064 columns_to_types: t.Dict[str, exp.DataType], 1065 batch_start: int, 1066 batch_end: int, 1067 alias: str = "t", 1068 ) -> Query: 1069 return select_from_values_for_batch_range( 1070 values=values, 1071 columns_to_types=columns_to_types, 1072 batch_start=batch_start, 1073 batch_end=batch_end, 1074 alias=alias, 1075 ) 1076 1077 def _insert_overwrite_by_condition( 1078 self, 1079 table_name: TableName, 1080 source_queries: t.List[SourceQuery], 1081 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1082 where: t.Optional[exp.Condition] = None, 1083 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 1084 ) -> None: 1085 table = exp.to_table(table_name) 1086 insert_overwrite_strategy = ( 1087 insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY 1088 ) 1089 with self.transaction( 1090 condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert 1091 ): 1092 columns_to_types = columns_to_types or self.columns(table_name) 1093 for i, source_query in enumerate(source_queries): 1094 with source_query as query: 1095 query = self._order_projections_and_filter(query, columns_to_types, where=where) 1096 if i > 0 or insert_overwrite_strategy.is_delete_insert: 1097 if i == 0: 1098 self.delete_from(table_name, where=where or exp.true()) 1099 self._insert_append_query( 1100 table_name, 1101 query, 1102 columns_to_types=columns_to_types, 1103 order_projections=False, 1104 ) 1105 else: 1106 insert_exp = exp.insert( 1107 query, 1108 table, 1109 columns=( 1110 list(columns_to_types) 1111 if not insert_overwrite_strategy.is_replace_where 1112 else None 1113 ), 1114 overwrite=insert_overwrite_strategy.is_insert_overwrite, 1115 ) 1116 if insert_overwrite_strategy.is_replace_where: 1117 insert_exp.set("where", where or exp.true()) 1118 self.execute(insert_exp) 1119 1120 def update_table( 1121 self, 1122 table_name: TableName, 1123 properties: t.Dict[str, t.Any], 1124 where: t.Optional[str | exp.Condition] = None, 1125 ) -> None: 1126 self.execute(exp.update(table_name, properties, where=where)) 1127 1128 def _merge( 1129 self, 1130 target_table: TableName, 1131 query: Query, 1132 on: exp.Expression, 1133 match_expressions: t.List[exp.When], 1134 ) -> None: 1135 this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True) 1136 using = exp.alias_( 1137 exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True 1138 ) 1139 self.execute( 1140 exp.Merge( 1141 this=this, 1142 using=using, 1143 on=on, 1144 expressions=match_expressions, 1145 ) 1146 ) 1147 1148 def scd_type_2_by_time( 1149 self, 1150 target_table: TableName, 1151 source_table: QueryOrDF, 1152 unique_key: t.Sequence[exp.Expression], 1153 valid_from_col: exp.Column, 1154 valid_to_col: exp.Column, 1155 execution_time: TimeLike, 1156 updated_at_col: exp.Column, 1157 invalidate_hard_deletes: bool = True, 1158 updated_at_as_valid_from: bool = False, 1159 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1160 table_description: t.Optional[str] = None, 1161 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1162 truncate: bool = False, 1163 **kwargs: t.Any, 1164 ) -> None: 1165 self._scd_type_2( 1166 target_table=target_table, 1167 source_table=source_table, 1168 unique_key=unique_key, 1169 valid_from_col=valid_from_col, 1170 valid_to_col=valid_to_col, 1171 execution_time=execution_time, 1172 updated_at_col=updated_at_col, 1173 invalidate_hard_deletes=invalidate_hard_deletes, 1174 updated_at_as_valid_from=updated_at_as_valid_from, 1175 columns_to_types=columns_to_types, 1176 table_description=table_description, 1177 column_descriptions=column_descriptions, 1178 truncate=truncate, 1179 ) 1180 1181 def scd_type_2_by_column( 1182 self, 1183 target_table: TableName, 1184 source_table: QueryOrDF, 1185 unique_key: t.Sequence[exp.Expression], 1186 valid_from_col: exp.Column, 1187 valid_to_col: exp.Column, 1188 execution_time: TimeLike, 1189 check_columns: t.Union[exp.Star, t.Sequence[exp.Column]], 1190 invalidate_hard_deletes: bool = True, 1191 execution_time_as_valid_from: bool = False, 1192 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1193 table_description: t.Optional[str] = None, 1194 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1195 truncate: bool = False, 1196 **kwargs: t.Any, 1197 ) -> None: 1198 self._scd_type_2( 1199 target_table=target_table, 1200 source_table=source_table, 1201 unique_key=unique_key, 1202 valid_from_col=valid_from_col, 1203 valid_to_col=valid_to_col, 1204 execution_time=execution_time, 1205 check_columns=check_columns, 1206 columns_to_types=columns_to_types, 1207 invalidate_hard_deletes=invalidate_hard_deletes, 1208 execution_time_as_valid_from=execution_time_as_valid_from, 1209 table_description=table_description, 1210 column_descriptions=column_descriptions, 1211 truncate=truncate, 1212 ) 1213 1214 def _scd_type_2( 1215 self, 1216 target_table: TableName, 1217 source_table: QueryOrDF, 1218 unique_key: t.Sequence[exp.Expression], 1219 valid_from_col: exp.Column, 1220 valid_to_col: exp.Column, 1221 execution_time: TimeLike, 1222 invalidate_hard_deletes: bool = True, 1223 updated_at_col: t.Optional[exp.Column] = None, 1224 check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None, 1225 updated_at_as_valid_from: bool = False, 1226 execution_time_as_valid_from: bool = False, 1227 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1228 table_description: t.Optional[str] = None, 1229 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1230 truncate: bool = False, 1231 ) -> None: 1232 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1233 source_table, columns_to_types, target_table=target_table, batch_size=0 1234 ) 1235 columns_to_types = columns_to_types or self.columns(target_table) 1236 valid_from_name = valid_from_col.name 1237 valid_to_name = valid_to_col.name 1238 updated_at_name = updated_at_col.name if updated_at_col else None 1239 if ( 1240 valid_from_name not in columns_to_types 1241 or valid_to_name not in columns_to_types 1242 or not columns_to_types_all_known(columns_to_types) 1243 ): 1244 columns_to_types = self.columns(target_table) 1245 if not columns_to_types: 1246 raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?") 1247 if not unique_key: 1248 raise SQLMeshError("unique_key must be provided for SCD Type 2") 1249 if check_columns and updated_at_col: 1250 raise SQLMeshError( 1251 "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2" 1252 ) 1253 if check_columns and updated_at_as_valid_from: 1254 raise SQLMeshError( 1255 "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2" 1256 ) 1257 if execution_time_as_valid_from and not check_columns: 1258 raise SQLMeshError( 1259 "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2" 1260 ) 1261 if updated_at_name and updated_at_name not in columns_to_types: 1262 raise SQLMeshError( 1263 f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2" 1264 ) 1265 1266 unmanaged_columns = [ 1267 col for col in columns_to_types if col not in {valid_from_name, valid_to_name} 1268 ] 1269 time_data_type = columns_to_types[valid_from_name] 1270 select_source_columns: t.List[t.Union[str, exp.Alias]] = [ 1271 col for col in unmanaged_columns if col != updated_at_name 1272 ] 1273 table_columns = [exp.column(c, quoted=True) for c in columns_to_types] 1274 if updated_at_name: 1275 select_source_columns.append( 1276 exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this) # type: ignore 1277 ) 1278 1279 # If a star is provided, we include all unmanaged columns in the check. 1280 # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know 1281 # they are equal or not, the extra check is not a problem and we gain simplified logic here. 1282 # If we want to change this, then we just need to check the expressions in unique_key and pull out the 1283 # column names and then remove them from the unmanaged_columns 1284 if check_columns and check_columns == exp.Star(): 1285 check_columns = [exp.column(col) for col in unmanaged_columns] 1286 execution_ts = to_time_column(execution_time, time_data_type) 1287 if updated_at_as_valid_from: 1288 if not updated_at_col: 1289 raise SQLMeshError( 1290 "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2" 1291 ) 1292 update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col 1293 elif execution_time_as_valid_from: 1294 update_valid_from_start = execution_ts 1295 else: 1296 update_valid_from_start = to_time_column("1970-01-01 00:00:00+00:00", time_data_type) 1297 insert_valid_from_start = execution_ts if check_columns else updated_at_col # type: ignore 1298 # joined._exists IS NULL is saying "if the row is deleted" 1299 delete_check = ( 1300 exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None 1301 ) 1302 prefixed_valid_to_col = valid_to_col.copy() 1303 prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}") 1304 prefixed_valid_from_col = valid_from_col.copy() 1305 prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}") 1306 if check_columns: 1307 row_check_conditions = [] 1308 for col in check_columns: 1309 t_col = col.copy() 1310 t_col.this.set("this", f"t_{col.name}") 1311 row_check_conditions.extend( 1312 [ 1313 col.neq(t_col), 1314 exp.and_(t_col.is_(exp.Null()), col.is_(exp.Null()).not_()), 1315 exp.and_(t_col.is_(exp.Null()).not_(), col.is_(exp.Null())), 1316 ] 1317 ) 1318 row_value_check = exp.or_(*row_check_conditions) 1319 unique_key_conditions = [] 1320 for col in unique_key: 1321 t_col = col.copy() 1322 t_col.this.set("this", f"t_{col.name}") 1323 unique_key_conditions.extend( 1324 [t_col.is_(exp.Null()).not_(), col.is_(exp.Null()).not_()] 1325 ) 1326 unique_key_check = exp.and_(*unique_key_conditions) 1327 # unique_key_check is saying "if the row is updated" 1328 # row_value_check is saying "if the row has changed" 1329 updated_row_filter = exp.and_(unique_key_check, row_value_check) 1330 valid_to_case_stmt = ( 1331 exp.Case() 1332 .when( 1333 exp.and_( 1334 exp.or_( 1335 delete_check, 1336 updated_row_filter, 1337 ) 1338 ), 1339 execution_ts, 1340 ) 1341 .else_(prefixed_valid_to_col) 1342 .as_(valid_to_col.this) 1343 ) 1344 valid_from_case_stmt = exp.func( 1345 "COALESCE", 1346 prefixed_valid_from_col, 1347 update_valid_from_start, 1348 ).as_(valid_from_col.this) 1349 else: 1350 assert updated_at_col is not None 1351 prefixed_updated_at_col = updated_at_col.copy() 1352 prefixed_updated_at_col.this.set("this", f"t_{updated_at_col.name}") 1353 updated_row_filter = updated_at_col > prefixed_updated_at_col 1354 1355 valid_to_case_stmt_builder = exp.Case().when(updated_row_filter, updated_at_col) 1356 if delete_check: 1357 valid_to_case_stmt_builder = valid_to_case_stmt_builder.when( 1358 delete_check, execution_ts 1359 ) 1360 valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_( 1361 valid_to_col.this 1362 ) 1363 1364 valid_from_case_stmt = ( 1365 exp.Case() 1366 .when( 1367 exp.and_( 1368 prefixed_valid_from_col.is_(exp.Null()), 1369 exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(), 1370 ), 1371 exp.Case() 1372 .when( 1373 exp.column(valid_to_col.this, "latest_deleted") > updated_at_col, 1374 exp.column(valid_to_col.this, "latest_deleted"), 1375 ) 1376 .else_(updated_at_col), 1377 ) 1378 .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start) 1379 .else_(prefixed_valid_from_col) 1380 ).as_(valid_from_col.this) 1381 1382 existing_rows_query = exp.select(*table_columns).from_(target_table) 1383 if truncate: 1384 existing_rows_query = existing_rows_query.limit(0) 1385 1386 with source_queries[0] as source_query: 1387 prefixed_columns_to_types = [] 1388 for column in columns_to_types: 1389 prefixed_col = exp.column(column).copy() 1390 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 1391 prefixed_columns_to_types.append(prefixed_col) 1392 prefixed_unmanaged_columns = [] 1393 for column in unmanaged_columns: 1394 prefixed_col = exp.column(column).copy() 1395 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 1396 prefixed_unmanaged_columns.append(prefixed_col) 1397 query = ( 1398 exp.Select() # type: ignore 1399 .with_( 1400 "source", 1401 exp.select(exp.true().as_("_exists"), *select_source_columns) 1402 .distinct(*unique_key) 1403 .from_(source_query.subquery("raw_source")), # type: ignore 1404 ) 1405 # Historical Records that Do Not Change 1406 .with_( 1407 "static", 1408 existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), 1409 ) 1410 # Latest Records that can be updated 1411 .with_( 1412 "latest", 1413 existing_rows_query.where(valid_to_col.is_(exp.Null())), 1414 ) 1415 # Deleted records which can be used to determine `valid_from` for undeleted source records 1416 .with_( 1417 "deleted", 1418 exp.select(*[exp.column(col, "static") for col in columns_to_types]) 1419 .from_("static") 1420 .join( 1421 "latest", 1422 on=exp.and_( 1423 *[ 1424 add_table(key, "static").eq(add_table(key, "latest")) 1425 for key in unique_key 1426 ] 1427 ), 1428 join_type="left", 1429 ) 1430 .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())), 1431 ) 1432 # Get the latest `valid_to` deleted record for each unique key 1433 .with_( 1434 "latest_deleted", 1435 exp.select( 1436 exp.true().as_("_exists"), 1437 *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)), 1438 exp.Max(this=valid_to_col).as_(valid_to_col.this), 1439 ) 1440 .from_("deleted") 1441 .group_by(*unique_key), 1442 ) 1443 # Do a full join between latest records and source table in order to combine them together 1444 # MySQL doesn't suport full join so going to do a left then right join and remove dups with union 1445 .with_( 1446 "joined", 1447 exp.select( 1448 exp.column("_exists", table="source"), 1449 *( 1450 exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this) 1451 for i, col in enumerate(columns_to_types) 1452 ), 1453 *(exp.column(col, table="source").as_(col) for col in unmanaged_columns), 1454 ) 1455 .from_("latest") 1456 .join( 1457 "source", 1458 on=exp.and_( 1459 *[ 1460 add_table(key, "latest").eq(add_table(key, "source")) 1461 for key in unique_key 1462 ] 1463 ), 1464 join_type="left", 1465 ) 1466 .union( 1467 exp.select( 1468 exp.column("_exists", table="source"), 1469 *( 1470 exp.column(col, table="latest").as_( 1471 prefixed_columns_to_types[i].this 1472 ) 1473 for i, col in enumerate(columns_to_types) 1474 ), 1475 *( 1476 exp.column(col, table="source").as_(col) 1477 for col in unmanaged_columns 1478 ), 1479 ) 1480 .from_("latest") 1481 .join( 1482 "source", 1483 on=exp.and_( 1484 *[ 1485 add_table(key, "latest").eq(add_table(key, "source")) 1486 for key in unique_key 1487 ] 1488 ), 1489 join_type="right", 1490 ) 1491 ), 1492 ) 1493 # Get deleted, new, no longer current, or unchanged records 1494 .with_( 1495 "updated_rows", 1496 exp.select( 1497 *( 1498 exp.func( 1499 "COALESCE", 1500 exp.column(prefixed_unmanaged_columns[i].this, table="joined"), 1501 exp.column(col, table="joined"), 1502 ).as_(col) 1503 for i, col in enumerate(unmanaged_columns) 1504 ), 1505 valid_from_case_stmt, 1506 valid_to_case_stmt, 1507 ) 1508 .from_("joined") 1509 .join( 1510 "latest_deleted", 1511 on=exp.and_( 1512 *[ 1513 add_table(part, "joined").eq( 1514 exp.column(f"_key{i}", "latest_deleted") 1515 ) 1516 for i, part in enumerate(unique_key) 1517 ] 1518 ), 1519 join_type="left", 1520 ), 1521 ) 1522 # Get records that have been "updated" which means inserting a new record with previous `valid_from` 1523 .with_( 1524 "inserted_rows", 1525 exp.select( 1526 *unmanaged_columns, 1527 insert_valid_from_start.as_(valid_from_col.this), # type: ignore 1528 to_time_column(exp.null(), time_data_type).as_(valid_to_col.this), 1529 ) 1530 .from_("joined") 1531 .where(updated_row_filter), 1532 ) 1533 .select(*table_columns) 1534 .from_("static") 1535 .union( 1536 exp.select(*table_columns).from_("updated_rows"), 1537 distinct=False, 1538 ) 1539 .union( 1540 exp.select(*table_columns).from_("inserted_rows"), 1541 distinct=False, 1542 ) 1543 ) 1544 1545 self.replace_query( 1546 target_table, 1547 query, 1548 columns_to_types=columns_to_types, 1549 table_description=table_description, 1550 column_descriptions=column_descriptions, 1551 ) 1552 1553 def merge( 1554 self, 1555 target_table: TableName, 1556 source_table: QueryOrDF, 1557 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 1558 unique_key: t.Sequence[exp.Expression], 1559 when_matched: t.Optional[exp.When] = None, 1560 ) -> None: 1561 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1562 source_table, columns_to_types, target_table=target_table 1563 ) 1564 columns_to_types = columns_to_types or self.columns(target_table) 1565 on = exp.and_( 1566 *( 1567 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 1568 for part in unique_key 1569 ) 1570 ) 1571 if not when_matched: 1572 when_matched = exp.When( 1573 matched=True, 1574 source=False, 1575 then=exp.Update( 1576 expressions=[ 1577 exp.column(col, MERGE_TARGET_ALIAS).eq(exp.column(col, MERGE_SOURCE_ALIAS)) 1578 for col in columns_to_types 1579 ], 1580 ), 1581 ) 1582 when_not_matched = exp.When( 1583 matched=False, 1584 source=False, 1585 then=exp.Insert( 1586 this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]), 1587 expression=exp.Tuple( 1588 expressions=[exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types] 1589 ), 1590 ), 1591 ) 1592 for source_query in source_queries: 1593 with source_query as query: 1594 self._merge( 1595 target_table=target_table, 1596 query=query, 1597 on=on, 1598 match_expressions=[when_matched, when_not_matched], 1599 ) 1600 1601 def rename_table( 1602 self, 1603 old_table_name: TableName, 1604 new_table_name: TableName, 1605 ) -> None: 1606 new_table = exp.to_table(new_table_name) 1607 if new_table.catalog: 1608 old_table = exp.to_table(old_table_name) 1609 catalog = old_table.catalog or self.get_current_catalog() 1610 if catalog != new_table.catalog: 1611 raise UnsupportedCatalogOperationError( 1612 "Tried to rename table across catalogs which is not supported" 1613 ) 1614 self._rename_table(old_table_name, new_table_name) 1615 1616 def get_data_objects( 1617 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1618 ) -> t.List[DataObject]: 1619 """Lists all data objects in the target schema. 1620 1621 Args: 1622 schema_name: The name of the schema to list data objects from. 1623 object_names: If provided, only return data objects with these names. 1624 1625 Returns: 1626 A list of data objects in the target schema. 1627 """ 1628 if object_names is not None: 1629 if not object_names: 1630 return [] 1631 object_names_list = list(object_names) 1632 batches = [ 1633 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 1634 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 1635 ] 1636 return [ 1637 obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch)) 1638 ] 1639 return self._get_data_objects(schema_name) 1640 1641 def fetchone( 1642 self, 1643 query: t.Union[exp.Expression, str], 1644 ignore_unsupported_errors: bool = False, 1645 quote_identifiers: bool = False, 1646 ) -> t.Tuple: 1647 with self.transaction(): 1648 self.execute( 1649 query, 1650 ignore_unsupported_errors=ignore_unsupported_errors, 1651 quote_identifiers=quote_identifiers, 1652 ) 1653 return self.cursor.fetchone() 1654 1655 def fetchall( 1656 self, 1657 query: t.Union[exp.Expression, str], 1658 ignore_unsupported_errors: bool = False, 1659 quote_identifiers: bool = False, 1660 ) -> t.List[t.Tuple]: 1661 with self.transaction(): 1662 self.execute( 1663 query, 1664 ignore_unsupported_errors=ignore_unsupported_errors, 1665 quote_identifiers=quote_identifiers, 1666 ) 1667 return self.cursor.fetchall() 1668 1669 def _fetch_native_df( 1670 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1671 ) -> DF: 1672 """Fetches a DataFrame that can be either Pandas or PySpark from the cursor""" 1673 with self.transaction(): 1674 self.execute(query, quote_identifiers=quote_identifiers) 1675 return self.cursor.fetchdf() 1676 1677 def fetchdf( 1678 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1679 ) -> pd.DataFrame: 1680 """Fetches a Pandas DataFrame from the cursor""" 1681 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 1682 if not isinstance(df, pd.DataFrame): 1683 raise NotImplementedError( 1684 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 1685 ) 1686 return df 1687 1688 def fetch_pyspark_df( 1689 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1690 ) -> PySparkDataFrame: 1691 """Fetches a PySpark DataFrame from the cursor""" 1692 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}") 1693 1694 def wap_supported(self, table_name: TableName) -> bool: 1695 """Returns whether WAP for the target table is supported.""" 1696 return False 1697 1698 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 1699 """Returns the updated table name for the given WAP ID. 1700 1701 Args: 1702 table_name: The name of the target table. 1703 wap_id: The WAP ID to prepare. 1704 1705 Returns: 1706 The updated table name that should be used for writing. 1707 """ 1708 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 1709 1710 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 1711 """Prepares the target table for WAP and returns the updated table name. 1712 1713 Args: 1714 table_name: The name of the target table. 1715 wap_id: The WAP ID to prepare. 1716 1717 Returns: 1718 The updated table name that should be used for writing. 1719 """ 1720 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 1721 1722 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 1723 """Publishes changes with the given WAP ID to the target table. 1724 1725 Args: 1726 table_name: The name of the target table. 1727 wap_id: The WAP ID to publish. 1728 """ 1729 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 1730 1731 @contextlib.contextmanager 1732 def transaction( 1733 self, 1734 condition: t.Optional[bool] = None, 1735 ) -> t.Iterator[None]: 1736 """A transaction context manager.""" 1737 if ( 1738 self._connection_pool.is_transaction_active 1739 or not self.SUPPORTS_TRANSACTIONS 1740 or (condition is not None and not condition) 1741 ): 1742 yield 1743 return 1744 self._connection_pool.begin() 1745 try: 1746 yield 1747 except Exception as e: 1748 self._connection_pool.rollback() 1749 raise e 1750 else: 1751 self._connection_pool.commit() 1752 1753 @contextlib.contextmanager 1754 def session(self, properties: SessionProperties) -> t.Iterator[None]: 1755 """A session context manager.""" 1756 if self._is_session_active(): 1757 yield 1758 return 1759 1760 self._begin_session(properties) 1761 try: 1762 yield 1763 finally: 1764 self._end_session() 1765 1766 def _begin_session(self, properties: SessionProperties) -> t.Any: 1767 """Begin a new session.""" 1768 1769 def _end_session(self) -> None: 1770 """End the existing session.""" 1771 1772 def _is_session_active(self) -> bool: 1773 """Indicates whether or not a session is active.""" 1774 return False 1775 1776 def execute( 1777 self, 1778 expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]], 1779 ignore_unsupported_errors: bool = False, 1780 quote_identifiers: bool = True, 1781 **kwargs: t.Any, 1782 ) -> None: 1783 """Execute a sql query.""" 1784 to_sql_kwargs = ( 1785 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 1786 ) 1787 1788 with self.transaction(): 1789 for e in ensure_list(expressions): 1790 sql = t.cast( 1791 str, 1792 ( 1793 self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 1794 if isinstance(e, exp.Expression) 1795 else e 1796 ), 1797 ) 1798 self._log_sql(sql) 1799 self._execute(sql, **kwargs) 1800 1801 def _log_sql(self, sql: str) -> None: 1802 logger.log(self._execute_log_level, "Executing SQL: %s", sql) 1803 1804 def _execute(self, sql: str, **kwargs: t.Any) -> None: 1805 self.cursor.execute(sql, **kwargs) 1806 1807 @contextlib.contextmanager 1808 def temp_table( 1809 self, 1810 query_or_df: QueryOrDF, 1811 name: TableName = "diff", 1812 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1813 **kwargs: t.Any, 1814 ) -> t.Iterator[exp.Table]: 1815 """A context manager for working a temp table. 1816 1817 The table will be created with a random guid and cleaned up after the block. 1818 1819 Args: 1820 query_or_df: The query or df to create a temp table for. 1821 name: The base name of the temp table. 1822 columns_to_types: A mapping between the column name and its data type. 1823 1824 Yields: 1825 The table expression 1826 """ 1827 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1828 query_or_df, columns_to_types=columns_to_types, target_table=name 1829 ) 1830 1831 with self.transaction(): 1832 table = self._get_temp_table(name) 1833 if table.db: 1834 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 1835 self._create_table_from_source_queries( 1836 table, 1837 source_queries, 1838 columns_to_types, 1839 exists=True, 1840 table_description=None, 1841 column_descriptions=None, 1842 **kwargs, 1843 ) 1844 1845 try: 1846 yield table 1847 finally: 1848 self.drop_table(table) 1849 1850 def _table_or_view_properties_to_expressions( 1851 self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expression]] = None 1852 ) -> t.List[exp.Property]: 1853 """Converts model properties (either physical or virtual) to a list of property expressions.""" 1854 if not table_or_view_properties: 1855 return [] 1856 return [ 1857 exp.Property(this=key, value=value.copy()) 1858 for key, value in table_or_view_properties.items() 1859 ] 1860 1861 def _build_table_properties_exp( 1862 self, 1863 catalog_name: t.Optional[str] = None, 1864 storage_format: t.Optional[str] = None, 1865 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 1866 partition_interval_unit: t.Optional[IntervalUnit] = None, 1867 clustered_by: t.Optional[t.List[str]] = None, 1868 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1869 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1870 table_description: t.Optional[str] = None, 1871 ) -> t.Optional[exp.Properties]: 1872 """Creates a SQLGlot table properties expression for ddl.""" 1873 properties: t.List[exp.Expression] = [] 1874 1875 if table_description: 1876 properties.append( 1877 exp.SchemaCommentProperty( 1878 this=exp.Literal.string(self._truncate_table_comment(table_description)) 1879 ) 1880 ) 1881 1882 if properties: 1883 return exp.Properties(expressions=properties) 1884 return None 1885 1886 def _build_view_properties_exp( 1887 self, 1888 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1889 table_description: t.Optional[str] = None, 1890 ) -> t.Optional[exp.Properties]: 1891 """Creates a SQLGlot table properties expression for view""" 1892 properties: t.List[exp.Expression] = [] 1893 1894 if table_description: 1895 properties.append( 1896 exp.SchemaCommentProperty( 1897 this=exp.Literal.string(self._truncate_table_comment(table_description)) 1898 ) 1899 ) 1900 1901 if properties: 1902 return exp.Properties(expressions=properties) 1903 return None 1904 1905 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 1906 return comment[:length] if length else comment 1907 1908 def _truncate_table_comment(self, comment: str) -> str: 1909 return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH) 1910 1911 def _truncate_column_comment(self, comment: str) -> str: 1912 return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH) 1913 1914 def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.Any) -> str: 1915 """ 1916 Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default 1917 kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine 1918 adapter, and then finally kwargs provided by the user when calling this method. 1919 """ 1920 sql_gen_kwargs = { 1921 "dialect": self.dialect, 1922 "pretty": False, 1923 "comments": False, 1924 **self.sql_gen_kwargs, 1925 **kwargs, 1926 } 1927 1928 expression = expression.copy() 1929 1930 if quote: 1931 quote_identifiers(expression) 1932 1933 return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore 1934 1935 def _get_data_objects( 1936 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1937 ) -> t.List[DataObject]: 1938 """ 1939 Returns all the data objects that exist in the given schema and optionally catalog. 1940 """ 1941 raise NotImplementedError() 1942 1943 def _get_temp_table(self, table: TableName, table_only: bool = False) -> exp.Table: 1944 """ 1945 Returns the name of the temp table that should be used for the given table name. 1946 """ 1947 table = t.cast(exp.Table, exp.to_table(table).copy()) 1948 table.set( 1949 "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=True) 1950 ) 1951 1952 if table_only: 1953 table.set("db", None) 1954 table.set("catalog", None) 1955 1956 return table 1957 1958 def _order_projections_and_filter( 1959 self, 1960 query: Query, 1961 columns_to_types: t.Dict[str, exp.DataType], 1962 where: t.Optional[exp.Expression] = None, 1963 ) -> Query: 1964 if not isinstance(query, exp.Query) or ( 1965 not where and query.named_selects == list(columns_to_types) 1966 ): 1967 return query 1968 1969 query = t.cast(exp.Query, query.copy()) 1970 with_ = query.args.pop("with", None) 1971 query = self._select_columns(columns_to_types).from_( 1972 query.subquery("_subquery", copy=False), copy=False 1973 ) 1974 if where: 1975 query = query.where(where, copy=False) 1976 1977 if with_: 1978 query.set("with", with_) 1979 1980 return query 1981 1982 def _truncate_table(self, table_name: TableName) -> None: 1983 table = exp.to_table(table_name) 1984 self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") 1985 1986 def _build_create_comment_table_exp( 1987 self, table: exp.Table, table_comment: str, table_kind: str 1988 ) -> exp.Comment | str: 1989 return exp.Comment( 1990 this=table, 1991 kind=table_kind, 1992 expression=exp.Literal.string(self._truncate_table_comment(table_comment)), 1993 ) 1994 1995 def _create_table_comment( 1996 self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" 1997 ) -> None: 1998 table = exp.to_table(table_name) 1999 2000 try: 2001 self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind)) 2002 except Exception: 2003 logger.warning( 2004 f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions.", 2005 exc_info=True, 2006 ) 2007 2008 def _build_create_comment_column_exp( 2009 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 2010 ) -> exp.Comment | str: 2011 return exp.Comment( 2012 this=exp.column(column_name, *reversed(table.parts)), # type: ignore 2013 kind="COLUMN", 2014 expression=exp.Literal.string(self._truncate_column_comment(column_comment)), 2015 ) 2016 2017 def _create_column_comments( 2018 self, 2019 table_name: TableName, 2020 column_comments: t.Dict[str, str], 2021 table_kind: str = "TABLE", 2022 ) -> None: 2023 table = exp.to_table(table_name) 2024 2025 for col, comment in column_comments.items(): 2026 try: 2027 self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind)) 2028 except Exception: 2029 logger.warning( 2030 f"Column comments for table '{table.alias_or_name}' not registered - this may be due to limited permissions.", 2031 exc_info=True, 2032 ) 2033 2034 def _rename_table( 2035 self, 2036 old_table_name: TableName, 2037 new_table_name: TableName, 2038 ) -> None: 2039 self.execute(exp.rename_table(old_table_name, new_table_name)) 2040 2041 @classmethod 2042 def _select_columns(cls, columns: t.Iterable[str]) -> exp.Select: 2043 return exp.select(*(exp.column(c, quoted=True) for c in columns)) 2044 2045 2046class EngineAdapterWithIndexSupport(EngineAdapter): 2047 SUPPORTS_INDEXES = True 2048 2049 2050def _decoded_str(value: t.Union[str, bytes]) -> str: 2051 if isinstance(value, bytes): 2052 return value.decode("utf-8") 2053 return value
67class EngineAdapter: 68 """Base class wrapping a Database API compliant connection. 69 70 The EngineAdapter is an easily-subclassable interface that interacts 71 with the underlying engine and data store. 72 73 Args: 74 connection_factory: a callable which produces a new Database API-compliant 75 connection on every call. 76 dialect: The dialect with which this adapter is associated. 77 multithreaded: Indicates whether this adapter will be used by more than one thread. 78 """ 79 80 DIALECT = "" 81 DEFAULT_BATCH_SIZE = 10000 82 DATA_OBJECT_FILTER_BATCH_SIZE = 4000 83 SUPPORTS_TRANSACTIONS = True 84 SUPPORTS_INDEXES = False 85 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS 86 COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 87 MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None 88 MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None 89 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 90 SUPPORTS_MATERIALIZED_VIEWS = False 91 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False 92 SUPPORTS_CLONING = False 93 SCHEMA_DIFFER = SchemaDiffer() 94 SUPPORTS_TUPLE_IN = True 95 CATALOG_SUPPORT = CatalogSupport.UNSUPPORTED 96 SUPPORTS_ROW_LEVEL_OP = True 97 HAS_VIEW_BINDING = False 98 SUPPORTS_REPLACE_TABLE = True 99 DEFAULT_CATALOG_TYPE = DIALECT 100 QUOTE_IDENTIFIERS_IN_VIEWS = True 101 102 def __init__( 103 self, 104 connection_factory: t.Callable[[], t.Any], 105 dialect: str = "", 106 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 107 multithreaded: bool = False, 108 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 109 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 110 default_catalog: t.Optional[str] = None, 111 execute_log_level: int = logging.DEBUG, 112 register_comments: bool = True, 113 **kwargs: t.Any, 114 ): 115 self.dialect = dialect.lower() or self.DIALECT 116 self._connection_pool = create_connection_pool( 117 connection_factory, multithreaded, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 118 ) 119 self.sql_gen_kwargs = sql_gen_kwargs or {} 120 self._default_catalog = default_catalog 121 self._execute_log_level = execute_log_level 122 self._extra_config = kwargs 123 self.register_comments = register_comments 124 125 def with_log_level(self, level: int) -> EngineAdapter: 126 adapter = self.__class__( 127 lambda: None, 128 dialect=self.dialect, 129 sql_gen_kwargs=self.sql_gen_kwargs, 130 default_catalog=self._default_catalog, 131 execute_log_level=level, 132 register_comments=self.register_comments, 133 **self._extra_config, 134 ) 135 136 adapter._connection_pool = self._connection_pool 137 138 return adapter 139 140 @property 141 def cursor(self) -> t.Any: 142 return self._connection_pool.get_cursor() 143 144 @property 145 def spark(self) -> t.Optional[PySparkSession]: 146 return None 147 148 @property 149 def comments_enabled(self) -> bool: 150 return self.register_comments and self.COMMENT_CREATION_TABLE.is_supported 151 152 @classmethod 153 def is_pandas_df(cls, value: t.Any) -> bool: 154 return isinstance(value, pd.DataFrame) 155 156 @classmethod 157 def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]: 158 return [ 159 exp.alias_(exp.cast(exp.column(column), to=kind), column, copy=False) 160 for column, kind in columns_to_types.items() 161 ] 162 163 @property 164 def default_catalog(self) -> t.Optional[str]: 165 if self.CATALOG_SUPPORT.is_unsupported: 166 return None 167 default_catalog = self._default_catalog or self.get_current_catalog() 168 if not default_catalog: 169 raise SQLMeshError("Could not determine a default catalog despite it being supported.") 170 return default_catalog 171 172 def _get_source_queries( 173 self, 174 query_or_df: QueryOrDF, 175 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 176 target_table: TableName, 177 *, 178 batch_size: t.Optional[int] = None, 179 ) -> t.List[SourceQuery]: 180 batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size 181 if isinstance(query_or_df, (exp.Query, exp.DerivedTable)): 182 return [SourceQuery(query_factory=lambda: query_or_df)] # type: ignore 183 if not columns_to_types: 184 raise SQLMeshError( 185 "It is expected that if a DF is passed in then columns_to_types is set" 186 ) 187 return self._df_to_source_queries( 188 query_or_df, columns_to_types, batch_size, target_table=target_table 189 ) 190 191 def _df_to_source_queries( 192 self, 193 df: DF, 194 columns_to_types: t.Dict[str, exp.DataType], 195 batch_size: int, 196 target_table: TableName, 197 ) -> t.List[SourceQuery]: 198 assert isinstance(df, pd.DataFrame) 199 num_rows = len(df.index) 200 batch_size = sys.maxsize if batch_size == 0 else batch_size 201 values = list(df.itertuples(index=False, name=None)) 202 return [ 203 SourceQuery( 204 query_factory=partial( 205 self._values_to_sql, 206 values=values, 207 columns_to_types=columns_to_types, 208 batch_start=i, 209 batch_end=min(i + batch_size, num_rows), 210 ), 211 ) 212 for i in range(0, num_rows, batch_size) 213 ] 214 215 def _get_source_queries_and_columns_to_types( 216 self, 217 query_or_df: QueryOrDF, 218 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 219 target_table: TableName, 220 *, 221 batch_size: t.Optional[int] = None, 222 ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]: 223 columns_to_types = self._columns_to_types(query_or_df, columns_to_types) 224 return ( 225 self._get_source_queries( 226 query_or_df, columns_to_types, target_table=target_table, batch_size=batch_size 227 ), 228 columns_to_types, 229 ) 230 231 @t.overload 232 def _columns_to_types( 233 self, query_or_df: DF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 234 ) -> t.Dict[str, exp.DataType]: ... 235 236 @t.overload 237 def _columns_to_types( 238 self, query_or_df: Query, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 239 ) -> t.Optional[t.Dict[str, exp.DataType]]: ... 240 241 def _columns_to_types( 242 self, query_or_df: QueryOrDF, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 243 ) -> t.Optional[t.Dict[str, exp.DataType]]: 244 if columns_to_types: 245 return columns_to_types 246 if self.is_pandas_df(query_or_df): 247 return columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df)) 248 return columns_to_types 249 250 def recycle(self) -> None: 251 """Closes all open connections and releases all allocated resources associated with any thread 252 except the calling one.""" 253 self._connection_pool.close_all(exclude_calling_thread=True) 254 255 def close(self) -> t.Any: 256 """Closes all open connections and releases all allocated resources.""" 257 self._connection_pool.close_all() 258 259 def get_current_catalog(self) -> t.Optional[str]: 260 """Returns the catalog name of the current connection.""" 261 raise NotImplementedError() 262 263 def set_current_catalog(self, catalog: str) -> None: 264 """Sets the catalog name of the current connection.""" 265 raise NotImplementedError() 266 267 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 268 """Intended to be overridden for data virtualization systems like Trino that, 269 depending on the target catalog, require slightly different properties to be set when creating / updating tables 270 """ 271 if self.CATALOG_SUPPORT.is_unsupported: 272 raise UnsupportedCatalogOperationError( 273 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 274 ) 275 return self.DEFAULT_CATALOG_TYPE 276 277 @property 278 def current_catalog_type(self) -> str: 279 return self.get_catalog_type(self.get_current_catalog()) 280 281 def replace_query( 282 self, 283 table_name: TableName, 284 query_or_df: QueryOrDF, 285 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 286 table_description: t.Optional[str] = None, 287 column_descriptions: t.Optional[t.Dict[str, str]] = None, 288 **kwargs: t.Any, 289 ) -> None: 290 """Replaces an existing table with a query. 291 292 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 293 294 Args: 295 table_name: The name of the table (eg. prod.table) 296 query_or_df: The SQL query to run or a dataframe. 297 columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 298 Expected to be ordered to match the order of values in the dataframe. 299 kwargs: Optional create table properties. 300 """ 301 target_table = exp.to_table(table_name) 302 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 303 query_or_df, columns_to_types, target_table=target_table 304 ) 305 columns_to_types = columns_to_types or self.columns(target_table) 306 query = source_queries[0].query_factory() 307 self_referencing = any( 308 quote_identifiers(table) == quote_identifiers(target_table) 309 for table in query.find_all(exp.Table) 310 ) 311 # If a query references itself then it must have a table created regardless of approach used. 312 if self_referencing: 313 self._create_table_from_columns( 314 target_table, 315 columns_to_types, 316 exists=True, 317 table_description=table_description, 318 column_descriptions=column_descriptions, 319 ) 320 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 321 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 322 if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table): 323 return self._create_table_from_source_queries( 324 target_table, 325 source_queries, 326 columns_to_types, 327 replace=self.SUPPORTS_REPLACE_TABLE, 328 table_description=table_description, 329 column_descriptions=column_descriptions, 330 **kwargs, 331 ) 332 else: 333 if self_referencing: 334 with self.temp_table( 335 self._select_columns(columns_to_types).from_(target_table), 336 name=target_table, 337 columns_to_types=columns_to_types, 338 **kwargs, 339 ) as temp_table: 340 for source_query in source_queries: 341 source_query.add_transform( 342 lambda node: ( # type: ignore 343 temp_table # type: ignore 344 if isinstance(node, exp.Table) 345 and quote_identifiers(node) == quote_identifiers(target_table) 346 else node 347 ) 348 ) 349 return self._insert_overwrite_by_condition( 350 target_table, 351 source_queries, 352 columns_to_types, 353 ) 354 return self._insert_overwrite_by_condition( 355 target_table, 356 source_queries, 357 columns_to_types, 358 ) 359 360 def create_index( 361 self, 362 table_name: TableName, 363 index_name: str, 364 columns: t.Tuple[str, ...], 365 exists: bool = True, 366 ) -> None: 367 """Creates a new index for the given table if supported 368 369 Args: 370 table_name: The name of the target table. 371 index_name: The name of the index. 372 columns: The list of columns that constitute the index. 373 exists: Indicates whether to include the IF NOT EXISTS check. 374 """ 375 if not self.SUPPORTS_INDEXES: 376 return 377 378 expression = exp.Create( 379 this=exp.Index( 380 this=exp.to_identifier(index_name), 381 table=exp.to_table(table_name), 382 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 383 ), 384 kind="INDEX", 385 exists=exists, 386 ) 387 self.execute(expression) 388 389 def create_table( 390 self, 391 table_name: TableName, 392 columns_to_types: t.Dict[str, exp.DataType], 393 primary_key: t.Optional[t.Tuple[str, ...]] = None, 394 exists: bool = True, 395 table_description: t.Optional[str] = None, 396 column_descriptions: t.Optional[t.Dict[str, str]] = None, 397 **kwargs: t.Any, 398 ) -> None: 399 """Create a table using a DDL statement 400 401 Args: 402 table_name: The name of the table to create. Can be fully qualified or just table name. 403 columns_to_types: A mapping between the column name and its data type. 404 primary_key: Determines the table primary key. 405 exists: Indicates whether to include the IF NOT EXISTS check. 406 table_description: Optional table description from MODEL DDL. 407 column_descriptions: Optional column descriptions from model query. 408 kwargs: Optional create table properties. 409 """ 410 self._create_table_from_columns( 411 table_name, 412 columns_to_types, 413 primary_key, 414 exists, 415 table_description, 416 column_descriptions, 417 **kwargs, 418 ) 419 420 def ctas( 421 self, 422 table_name: TableName, 423 query_or_df: QueryOrDF, 424 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 425 exists: bool = True, 426 table_description: t.Optional[str] = None, 427 column_descriptions: t.Optional[t.Dict[str, str]] = None, 428 **kwargs: t.Any, 429 ) -> None: 430 """Create a table using a CTAS statement 431 432 Args: 433 table_name: The name of the table to create. Can be fully qualified or just table name. 434 query_or_df: The SQL query to run or a dataframe for the CTAS. 435 columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 436 exists: Indicates whether to include the IF NOT EXISTS check. 437 table_description: Optional table description from MODEL DDL. 438 column_descriptions: Optional column descriptions from model query. 439 kwargs: Optional create table properties. 440 """ 441 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 442 query_or_df, columns_to_types, target_table=table_name 443 ) 444 return self._create_table_from_source_queries( 445 table_name, 446 source_queries, 447 columns_to_types, 448 exists, 449 table_description=table_description, 450 column_descriptions=column_descriptions, 451 **kwargs, 452 ) 453 454 def create_state_table( 455 self, 456 table_name: str, 457 columns_to_types: t.Dict[str, exp.DataType], 458 primary_key: t.Optional[t.Tuple[str, ...]] = None, 459 ) -> None: 460 """Create a table to store SQLMesh internal state. 461 462 Args: 463 table_name: The name of the table to create. Can be fully qualified or just table name. 464 columns_to_types: A mapping between the column name and its data type. 465 primary_key: Determines the table primary key. 466 """ 467 self.create_table( 468 table_name, 469 columns_to_types, 470 primary_key=primary_key, 471 ) 472 473 def _create_table_from_columns( 474 self, 475 table_name: TableName, 476 columns_to_types: t.Dict[str, exp.DataType], 477 primary_key: t.Optional[t.Tuple[str, ...]] = None, 478 exists: bool = True, 479 table_description: t.Optional[str] = None, 480 column_descriptions: t.Optional[t.Dict[str, str]] = None, 481 **kwargs: t.Any, 482 ) -> None: 483 """ 484 Create a table using a DDL statement. 485 486 Args: 487 table_name: The name of the table to create. Can be fully qualified or just table name. 488 columns_to_types: Mapping between the column name and its data type. 489 primary_key: Determines the table primary key. 490 exists: Indicates whether to include the IF NOT EXISTS check. 491 table_description: Optional table description from MODEL DDL. 492 column_descriptions: Optional column descriptions from model query. 493 kwargs: Optional create table properties. 494 """ 495 table = exp.to_table(table_name) 496 497 if not columns_to_types_all_known(columns_to_types): 498 # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set 499 if exists and self.table_exists(table_name): 500 return 501 raise SQLMeshError( 502 "Cannot create a table without knowing the column types. " 503 "Try casting the columns to an expected type or defining the columns in the model metadata. " 504 f"Columns to types: {columns_to_types}" 505 ) 506 507 primary_key_expression = ( 508 [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])] 509 if primary_key and self.SUPPORTS_INDEXES 510 else [] 511 ) 512 513 schema = self._build_schema_exp( 514 table, 515 columns_to_types, 516 column_descriptions, 517 primary_key_expression, 518 ) 519 520 self._create_table( 521 schema, 522 None, 523 exists=exists, 524 columns_to_types=columns_to_types, 525 table_description=table_description, 526 **kwargs, 527 ) 528 529 # Register comments with commands if the engine doesn't support comments in the schema or CREATE 530 if ( 531 table_description 532 and self.COMMENT_CREATION_TABLE.is_comment_command_only 533 and self.comments_enabled 534 ): 535 self._create_table_comment(table_name, table_description) 536 if ( 537 column_descriptions 538 and self.COMMENT_CREATION_TABLE.is_comment_command_only 539 and self.comments_enabled 540 ): 541 self._create_column_comments(table_name, column_descriptions) 542 543 def _build_schema_exp( 544 self, 545 table: exp.Table, 546 columns_to_types: t.Dict[str, exp.DataType], 547 column_descriptions: t.Optional[t.Dict[str, str]] = None, 548 expressions: t.Optional[t.List[exp.PrimaryKey]] = None, 549 is_view: bool = False, 550 ) -> exp.Schema: 551 """ 552 Build a schema expression for a table, columns, column comments, and additional schema properties. 553 """ 554 expressions = expressions or [] 555 engine_supports_schema_comments = ( 556 self.COMMENT_CREATION_VIEW.supports_schema_def 557 if is_view 558 else self.COMMENT_CREATION_TABLE.supports_schema_def 559 ) 560 return exp.Schema( 561 this=table, 562 expressions=[ 563 exp.ColumnDef( 564 this=exp.to_identifier(column), 565 kind=None if is_view else kind, # don't include column data type for views 566 constraints=( 567 self._build_col_comment_exp(column, column_descriptions) 568 if column_descriptions 569 and engine_supports_schema_comments 570 and self.comments_enabled 571 else None 572 ), 573 ) 574 for column, kind in columns_to_types.items() 575 ] 576 + expressions, 577 ) 578 579 def _build_col_comment_exp( 580 self, col_name: str, column_descriptions: t.Dict[str, str] 581 ) -> t.List[exp.ColumnConstraint]: 582 comment = column_descriptions.get(col_name, None) 583 if comment: 584 return [ 585 exp.ColumnConstraint( 586 kind=exp.CommentColumnConstraint( 587 this=exp.Literal.string(self._truncate_column_comment(comment)) 588 ) 589 ) 590 ] 591 return [] 592 593 def _create_table_from_source_queries( 594 self, 595 table_name: TableName, 596 source_queries: t.List[SourceQuery], 597 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 598 exists: bool = True, 599 replace: bool = False, 600 table_description: t.Optional[str] = None, 601 column_descriptions: t.Optional[t.Dict[str, str]] = None, 602 **kwargs: t.Any, 603 ) -> None: 604 table = exp.to_table(table_name) 605 606 # CTAS calls do not usually include a schema expression. However, most engines 607 # permit them in CTAS expressions, and they allow us to register all column comments 608 # in a single call rather than in a separate comment command call for each column. 609 # 610 # This block conditionally builds a schema expression with column comments if the engine 611 # supports it and we have columns_to_types. column_to_types is required because the 612 # schema expression must include at least column name, data type, and the comment - 613 # for example, `(colname INTEGER COMMENT 'comment')`. 614 # 615 # column_to_types will be available when loading from a DataFrame (by converting from 616 # pandas to SQL types), when a model is "annotated" by explicitly specifying column 617 # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()` 618 # calls and SCD Type 2 model calls. 619 schema = None 620 columns_to_types_known = columns_to_types and columns_to_types_all_known(columns_to_types) 621 if ( 622 column_descriptions 623 and columns_to_types_known 624 and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas 625 and self.comments_enabled 626 ): 627 schema = self._build_schema_exp(table, columns_to_types, column_descriptions) # type: ignore 628 629 with self.transaction(condition=len(source_queries) > 1): 630 for i, source_query in enumerate(source_queries): 631 with source_query as query: 632 if i == 0: 633 self._create_table( 634 schema if schema else table, 635 query, 636 columns_to_types=columns_to_types, 637 exists=exists, 638 replace=replace, 639 table_description=table_description, 640 **kwargs, 641 ) 642 else: 643 self._insert_append_query( 644 table_name, query, columns_to_types or self.columns(table) 645 ) 646 647 # Register comments with commands if the engine supports comments and we weren't able to 648 # register them with the CTAS call's schema expression. 649 if ( 650 table_description 651 and self.COMMENT_CREATION_TABLE.is_comment_command_only 652 and self.comments_enabled 653 ): 654 self._create_table_comment(table_name, table_description) 655 if column_descriptions and schema is None and self.comments_enabled: 656 self._create_column_comments(table_name, column_descriptions) 657 658 def _create_table( 659 self, 660 table_name_or_schema: t.Union[exp.Schema, TableName], 661 expression: t.Optional[exp.Expression], 662 exists: bool = True, 663 replace: bool = False, 664 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 665 table_description: t.Optional[str] = None, 666 column_descriptions: t.Optional[t.Dict[str, str]] = None, 667 **kwargs: t.Any, 668 ) -> None: 669 self.execute( 670 self._build_create_table_exp( 671 table_name_or_schema, 672 expression=expression, 673 exists=exists, 674 replace=replace, 675 columns_to_types=columns_to_types, 676 table_description=( 677 table_description 678 if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled 679 else None 680 ), 681 **kwargs, 682 ) 683 ) 684 685 def _build_create_table_exp( 686 self, 687 table_name_or_schema: t.Union[exp.Schema, TableName], 688 expression: t.Optional[exp.Expression], 689 exists: bool = True, 690 replace: bool = False, 691 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 692 **kwargs: t.Any, 693 ) -> exp.Create: 694 exists = False if replace else exists 695 catalog_name = None 696 if not isinstance(table_name_or_schema, exp.Schema): 697 table_name_or_schema = exp.to_table(table_name_or_schema) 698 catalog_name = table_name_or_schema.catalog 699 else: 700 if isinstance(table_name_or_schema.this, exp.Table): 701 catalog_name = table_name_or_schema.this.catalog 702 703 properties = ( 704 self._build_table_properties_exp( 705 **kwargs, catalog_name=catalog_name, columns_to_types=columns_to_types 706 ) 707 if kwargs 708 else None 709 ) 710 return exp.Create( 711 this=table_name_or_schema, 712 kind="TABLE", 713 replace=replace, 714 exists=exists, 715 expression=expression, 716 properties=properties, 717 ) 718 719 def create_table_like( 720 self, 721 target_table_name: TableName, 722 source_table_name: TableName, 723 exists: bool = True, 724 ) -> None: 725 """ 726 Create a table like another table or view. 727 """ 728 target_table = exp.to_table(target_table_name) 729 source_table = exp.to_table(source_table_name) 730 create_expression = exp.Create( 731 this=target_table, 732 kind="TABLE", 733 exists=exists, 734 properties=exp.Properties( 735 expressions=[ 736 exp.LikeProperty(this=source_table), 737 ] 738 ), 739 ) 740 self.execute(create_expression) 741 742 def clone_table( 743 self, 744 target_table_name: TableName, 745 source_table_name: TableName, 746 replace: bool = False, 747 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 748 **kwargs: t.Any, 749 ) -> None: 750 """Creates a table with the target name by cloning the source table. 751 752 Args: 753 target_table_name: The name of the table that should be created. 754 source_table_name: The name of the source table that should be cloned. 755 replace: Whether or not to replace an existing table. 756 """ 757 if not self.SUPPORTS_CLONING: 758 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 759 self.execute( 760 exp.Create( 761 this=exp.to_table(target_table_name), 762 kind="TABLE", 763 replace=replace, 764 clone=exp.Clone( 765 this=exp.to_table(source_table_name), 766 **(clone_kwargs or {}), 767 ), 768 **kwargs, 769 ) 770 ) 771 772 def drop_table(self, table_name: TableName, exists: bool = True) -> None: 773 """Drops a table. 774 775 Args: 776 table_name: The name of the table to drop. 777 exists: If exists, defaults to True. 778 """ 779 drop_expression = exp.Drop(this=exp.to_table(table_name), kind="TABLE", exists=exists) 780 self.execute(drop_expression) 781 782 def alter_table( 783 self, 784 current_table_name: TableName, 785 target_table_name: TableName, 786 ) -> None: 787 """ 788 Performs the required alter statements to change the current table into the structure of the target table. 789 """ 790 with self.transaction(): 791 for alter_expression in self.SCHEMA_DIFFER.compare_columns( 792 current_table_name, 793 self.columns(current_table_name), 794 self.columns(target_table_name), 795 ): 796 self.execute(alter_expression) 797 798 def create_view( 799 self, 800 view_name: TableName, 801 query_or_df: QueryOrDF, 802 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 803 replace: bool = True, 804 materialized: bool = False, 805 table_description: t.Optional[str] = None, 806 column_descriptions: t.Optional[t.Dict[str, str]] = None, 807 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 808 **create_kwargs: t.Any, 809 ) -> None: 810 """Create a view with a query or dataframe. 811 812 If a dataframe is passed in, it will be converted into a literal values statement. 813 This should only be done if the dataframe is very small! 814 815 Args: 816 view_name: The view name. 817 query_or_df: A query or dataframe. 818 columns_to_types: Columns to use in the view statement. 819 replace: Whether or not to replace an existing view defaults to True. 820 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 821 table_description: Optional table description from MODEL DDL. 822 column_descriptions: Optional column descriptions from model query. 823 view_properties: Optional view properties to add to the view. 824 create_kwargs: Additional kwargs to pass into the Create expression 825 """ 826 if self.is_pandas_df(query_or_df): 827 values = list(t.cast(pd.DataFrame, query_or_df).itertuples(index=False, name=None)) 828 columns_to_types = columns_to_types or self._columns_to_types(query_or_df) 829 if not columns_to_types: 830 raise SQLMeshError("columns_to_types must be provided for dataframes") 831 query_or_df = self._values_to_sql( 832 values, 833 columns_to_types, 834 batch_start=0, 835 batch_end=len(values), 836 ) 837 838 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 839 query_or_df, columns_to_types, batch_size=0, target_table=view_name 840 ) 841 if len(source_queries) != 1: 842 raise SQLMeshError("Only one source query is supported for creating views") 843 844 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 845 if columns_to_types: 846 schema = self._build_schema_exp( 847 exp.to_table(view_name), columns_to_types, column_descriptions, is_view=True 848 ) 849 850 properties = create_kwargs.pop("properties", None) 851 if not properties: 852 properties = exp.Properties(expressions=[]) 853 854 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 855 properties.append("expressions", exp.MaterializedProperty()) 856 857 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 858 schema = schema.this 859 860 create_view_properties = self._build_view_properties_exp( 861 view_properties, 862 ( 863 table_description 864 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 865 else None 866 ), 867 ) 868 if create_view_properties: 869 for view_property in create_view_properties.expressions: 870 properties.append("expressions", view_property) 871 872 if properties.expressions: 873 create_kwargs["properties"] = properties 874 875 with source_queries[0] as query: 876 self.execute( 877 exp.Create( 878 this=schema, 879 kind="VIEW", 880 replace=replace, 881 expression=query, 882 **create_kwargs, 883 ), 884 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 885 ) 886 887 # Register table comment with commands if the engine doesn't support doing it in CREATE 888 if ( 889 table_description 890 and self.COMMENT_CREATION_VIEW.is_comment_command_only 891 and self.comments_enabled 892 ): 893 self._create_table_comment(view_name, table_description, "VIEW") 894 # Register column comments with commands if the engine doesn't support doing it in 895 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 896 # columns_to_types 897 if ( 898 column_descriptions 899 and ( 900 self.COMMENT_CREATION_VIEW.is_comment_command_only 901 or ( 902 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 903 and not columns_to_types 904 ) 905 ) 906 and self.comments_enabled 907 ): 908 self._create_column_comments(view_name, column_descriptions, "VIEW") 909 910 @set_catalog() 911 def create_schema( 912 self, 913 schema_name: SchemaName, 914 ignore_if_exists: bool = True, 915 warn_on_error: bool = True, 916 ) -> None: 917 """Create a schema from a name or qualified table name.""" 918 try: 919 self.execute( 920 exp.Create( 921 this=to_schema(schema_name), 922 kind="SCHEMA", 923 exists=ignore_if_exists, 924 ) 925 ) 926 except Exception as e: 927 if not warn_on_error: 928 raise 929 logger.warning("Failed to create schema '%s': %s", schema_name, e) 930 931 def drop_schema( 932 self, 933 schema_name: SchemaName, 934 ignore_if_not_exists: bool = True, 935 cascade: bool = False, 936 ) -> None: 937 self.execute( 938 exp.Drop( 939 this=to_schema(schema_name), 940 kind="SCHEMA", 941 exists=ignore_if_not_exists, 942 cascade=cascade, 943 ) 944 ) 945 946 def drop_view( 947 self, 948 view_name: TableName, 949 ignore_if_not_exists: bool = True, 950 materialized: bool = False, 951 **kwargs: t.Any, 952 ) -> None: 953 """Drop a view.""" 954 self.execute( 955 exp.Drop( 956 this=exp.to_table(view_name), 957 exists=ignore_if_not_exists, 958 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 959 kind="VIEW", 960 **kwargs, 961 ) 962 ) 963 964 def columns( 965 self, table_name: TableName, include_pseudo_columns: bool = False 966 ) -> t.Dict[str, exp.DataType]: 967 """Fetches column names and types for the target table.""" 968 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 969 describe_output = self.cursor.fetchall() 970 return { 971 # Note: MySQL returns the column type as bytes. 972 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 973 for column_name, column_type, *_ in itertools.takewhile( 974 lambda t: not t[0].startswith("#"), 975 describe_output, 976 ) 977 if column_name and column_name.strip() and column_type and column_type.strip() 978 } 979 980 def table_exists(self, table_name: TableName) -> bool: 981 try: 982 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 983 return True 984 except Exception: 985 return False 986 987 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None: 988 self.execute(exp.delete(table_name, where)) 989 990 def insert_append( 991 self, 992 table_name: TableName, 993 query_or_df: QueryOrDF, 994 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 995 ) -> None: 996 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 997 query_or_df, columns_to_types, target_table=table_name 998 ) 999 self._insert_append_source_queries(table_name, source_queries, columns_to_types) 1000 1001 def _insert_append_source_queries( 1002 self, 1003 table_name: TableName, 1004 source_queries: t.List[SourceQuery], 1005 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1006 ) -> None: 1007 with self.transaction(condition=len(source_queries) > 0): 1008 columns_to_types = columns_to_types or self.columns(table_name) 1009 for source_query in source_queries: 1010 with source_query as query: 1011 self._insert_append_query(table_name, query, columns_to_types) 1012 1013 def _insert_append_query( 1014 self, 1015 table_name: TableName, 1016 query: Query, 1017 columns_to_types: t.Dict[str, exp.DataType], 1018 order_projections: bool = True, 1019 ) -> None: 1020 if order_projections: 1021 query = self._order_projections_and_filter(query, columns_to_types) 1022 self.execute(exp.insert(query, table_name, columns=list(columns_to_types))) 1023 1024 def insert_overwrite_by_partition( 1025 self, 1026 table_name: TableName, 1027 query_or_df: QueryOrDF, 1028 partitioned_by: t.List[exp.Expression], 1029 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1030 ) -> None: 1031 raise NotImplementedError( 1032 "Insert Overwrite by Partition (not time) is not supported by this engine" 1033 ) 1034 1035 def insert_overwrite_by_time_partition( 1036 self, 1037 table_name: TableName, 1038 query_or_df: QueryOrDF, 1039 start: TimeLike, 1040 end: TimeLike, 1041 time_formatter: t.Callable[ 1042 [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression 1043 ], 1044 time_column: TimeColumn | exp.Expression | str, 1045 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1046 **kwargs: t.Any, 1047 ) -> None: 1048 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1049 query_or_df, columns_to_types, target_table=table_name 1050 ) 1051 columns_to_types = columns_to_types or self.columns(table_name) 1052 low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)] 1053 if isinstance(time_column, TimeColumn): 1054 time_column = time_column.column 1055 where = exp.Between( 1056 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1057 low=low, 1058 high=high, 1059 ) 1060 self._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where) 1061 1062 def _values_to_sql( 1063 self, 1064 values: t.List[t.Tuple[t.Any, ...]], 1065 columns_to_types: t.Dict[str, exp.DataType], 1066 batch_start: int, 1067 batch_end: int, 1068 alias: str = "t", 1069 ) -> Query: 1070 return select_from_values_for_batch_range( 1071 values=values, 1072 columns_to_types=columns_to_types, 1073 batch_start=batch_start, 1074 batch_end=batch_end, 1075 alias=alias, 1076 ) 1077 1078 def _insert_overwrite_by_condition( 1079 self, 1080 table_name: TableName, 1081 source_queries: t.List[SourceQuery], 1082 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1083 where: t.Optional[exp.Condition] = None, 1084 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 1085 ) -> None: 1086 table = exp.to_table(table_name) 1087 insert_overwrite_strategy = ( 1088 insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY 1089 ) 1090 with self.transaction( 1091 condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert 1092 ): 1093 columns_to_types = columns_to_types or self.columns(table_name) 1094 for i, source_query in enumerate(source_queries): 1095 with source_query as query: 1096 query = self._order_projections_and_filter(query, columns_to_types, where=where) 1097 if i > 0 or insert_overwrite_strategy.is_delete_insert: 1098 if i == 0: 1099 self.delete_from(table_name, where=where or exp.true()) 1100 self._insert_append_query( 1101 table_name, 1102 query, 1103 columns_to_types=columns_to_types, 1104 order_projections=False, 1105 ) 1106 else: 1107 insert_exp = exp.insert( 1108 query, 1109 table, 1110 columns=( 1111 list(columns_to_types) 1112 if not insert_overwrite_strategy.is_replace_where 1113 else None 1114 ), 1115 overwrite=insert_overwrite_strategy.is_insert_overwrite, 1116 ) 1117 if insert_overwrite_strategy.is_replace_where: 1118 insert_exp.set("where", where or exp.true()) 1119 self.execute(insert_exp) 1120 1121 def update_table( 1122 self, 1123 table_name: TableName, 1124 properties: t.Dict[str, t.Any], 1125 where: t.Optional[str | exp.Condition] = None, 1126 ) -> None: 1127 self.execute(exp.update(table_name, properties, where=where)) 1128 1129 def _merge( 1130 self, 1131 target_table: TableName, 1132 query: Query, 1133 on: exp.Expression, 1134 match_expressions: t.List[exp.When], 1135 ) -> None: 1136 this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True) 1137 using = exp.alias_( 1138 exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True 1139 ) 1140 self.execute( 1141 exp.Merge( 1142 this=this, 1143 using=using, 1144 on=on, 1145 expressions=match_expressions, 1146 ) 1147 ) 1148 1149 def scd_type_2_by_time( 1150 self, 1151 target_table: TableName, 1152 source_table: QueryOrDF, 1153 unique_key: t.Sequence[exp.Expression], 1154 valid_from_col: exp.Column, 1155 valid_to_col: exp.Column, 1156 execution_time: TimeLike, 1157 updated_at_col: exp.Column, 1158 invalidate_hard_deletes: bool = True, 1159 updated_at_as_valid_from: bool = False, 1160 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1161 table_description: t.Optional[str] = None, 1162 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1163 truncate: bool = False, 1164 **kwargs: t.Any, 1165 ) -> None: 1166 self._scd_type_2( 1167 target_table=target_table, 1168 source_table=source_table, 1169 unique_key=unique_key, 1170 valid_from_col=valid_from_col, 1171 valid_to_col=valid_to_col, 1172 execution_time=execution_time, 1173 updated_at_col=updated_at_col, 1174 invalidate_hard_deletes=invalidate_hard_deletes, 1175 updated_at_as_valid_from=updated_at_as_valid_from, 1176 columns_to_types=columns_to_types, 1177 table_description=table_description, 1178 column_descriptions=column_descriptions, 1179 truncate=truncate, 1180 ) 1181 1182 def scd_type_2_by_column( 1183 self, 1184 target_table: TableName, 1185 source_table: QueryOrDF, 1186 unique_key: t.Sequence[exp.Expression], 1187 valid_from_col: exp.Column, 1188 valid_to_col: exp.Column, 1189 execution_time: TimeLike, 1190 check_columns: t.Union[exp.Star, t.Sequence[exp.Column]], 1191 invalidate_hard_deletes: bool = True, 1192 execution_time_as_valid_from: bool = False, 1193 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1194 table_description: t.Optional[str] = None, 1195 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1196 truncate: bool = False, 1197 **kwargs: t.Any, 1198 ) -> None: 1199 self._scd_type_2( 1200 target_table=target_table, 1201 source_table=source_table, 1202 unique_key=unique_key, 1203 valid_from_col=valid_from_col, 1204 valid_to_col=valid_to_col, 1205 execution_time=execution_time, 1206 check_columns=check_columns, 1207 columns_to_types=columns_to_types, 1208 invalidate_hard_deletes=invalidate_hard_deletes, 1209 execution_time_as_valid_from=execution_time_as_valid_from, 1210 table_description=table_description, 1211 column_descriptions=column_descriptions, 1212 truncate=truncate, 1213 ) 1214 1215 def _scd_type_2( 1216 self, 1217 target_table: TableName, 1218 source_table: QueryOrDF, 1219 unique_key: t.Sequence[exp.Expression], 1220 valid_from_col: exp.Column, 1221 valid_to_col: exp.Column, 1222 execution_time: TimeLike, 1223 invalidate_hard_deletes: bool = True, 1224 updated_at_col: t.Optional[exp.Column] = None, 1225 check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None, 1226 updated_at_as_valid_from: bool = False, 1227 execution_time_as_valid_from: bool = False, 1228 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1229 table_description: t.Optional[str] = None, 1230 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1231 truncate: bool = False, 1232 ) -> None: 1233 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1234 source_table, columns_to_types, target_table=target_table, batch_size=0 1235 ) 1236 columns_to_types = columns_to_types or self.columns(target_table) 1237 valid_from_name = valid_from_col.name 1238 valid_to_name = valid_to_col.name 1239 updated_at_name = updated_at_col.name if updated_at_col else None 1240 if ( 1241 valid_from_name not in columns_to_types 1242 or valid_to_name not in columns_to_types 1243 or not columns_to_types_all_known(columns_to_types) 1244 ): 1245 columns_to_types = self.columns(target_table) 1246 if not columns_to_types: 1247 raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?") 1248 if not unique_key: 1249 raise SQLMeshError("unique_key must be provided for SCD Type 2") 1250 if check_columns and updated_at_col: 1251 raise SQLMeshError( 1252 "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2" 1253 ) 1254 if check_columns and updated_at_as_valid_from: 1255 raise SQLMeshError( 1256 "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2" 1257 ) 1258 if execution_time_as_valid_from and not check_columns: 1259 raise SQLMeshError( 1260 "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2" 1261 ) 1262 if updated_at_name and updated_at_name not in columns_to_types: 1263 raise SQLMeshError( 1264 f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2" 1265 ) 1266 1267 unmanaged_columns = [ 1268 col for col in columns_to_types if col not in {valid_from_name, valid_to_name} 1269 ] 1270 time_data_type = columns_to_types[valid_from_name] 1271 select_source_columns: t.List[t.Union[str, exp.Alias]] = [ 1272 col for col in unmanaged_columns if col != updated_at_name 1273 ] 1274 table_columns = [exp.column(c, quoted=True) for c in columns_to_types] 1275 if updated_at_name: 1276 select_source_columns.append( 1277 exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this) # type: ignore 1278 ) 1279 1280 # If a star is provided, we include all unmanaged columns in the check. 1281 # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know 1282 # they are equal or not, the extra check is not a problem and we gain simplified logic here. 1283 # If we want to change this, then we just need to check the expressions in unique_key and pull out the 1284 # column names and then remove them from the unmanaged_columns 1285 if check_columns and check_columns == exp.Star(): 1286 check_columns = [exp.column(col) for col in unmanaged_columns] 1287 execution_ts = to_time_column(execution_time, time_data_type) 1288 if updated_at_as_valid_from: 1289 if not updated_at_col: 1290 raise SQLMeshError( 1291 "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2" 1292 ) 1293 update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col 1294 elif execution_time_as_valid_from: 1295 update_valid_from_start = execution_ts 1296 else: 1297 update_valid_from_start = to_time_column("1970-01-01 00:00:00+00:00", time_data_type) 1298 insert_valid_from_start = execution_ts if check_columns else updated_at_col # type: ignore 1299 # joined._exists IS NULL is saying "if the row is deleted" 1300 delete_check = ( 1301 exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None 1302 ) 1303 prefixed_valid_to_col = valid_to_col.copy() 1304 prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}") 1305 prefixed_valid_from_col = valid_from_col.copy() 1306 prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}") 1307 if check_columns: 1308 row_check_conditions = [] 1309 for col in check_columns: 1310 t_col = col.copy() 1311 t_col.this.set("this", f"t_{col.name}") 1312 row_check_conditions.extend( 1313 [ 1314 col.neq(t_col), 1315 exp.and_(t_col.is_(exp.Null()), col.is_(exp.Null()).not_()), 1316 exp.and_(t_col.is_(exp.Null()).not_(), col.is_(exp.Null())), 1317 ] 1318 ) 1319 row_value_check = exp.or_(*row_check_conditions) 1320 unique_key_conditions = [] 1321 for col in unique_key: 1322 t_col = col.copy() 1323 t_col.this.set("this", f"t_{col.name}") 1324 unique_key_conditions.extend( 1325 [t_col.is_(exp.Null()).not_(), col.is_(exp.Null()).not_()] 1326 ) 1327 unique_key_check = exp.and_(*unique_key_conditions) 1328 # unique_key_check is saying "if the row is updated" 1329 # row_value_check is saying "if the row has changed" 1330 updated_row_filter = exp.and_(unique_key_check, row_value_check) 1331 valid_to_case_stmt = ( 1332 exp.Case() 1333 .when( 1334 exp.and_( 1335 exp.or_( 1336 delete_check, 1337 updated_row_filter, 1338 ) 1339 ), 1340 execution_ts, 1341 ) 1342 .else_(prefixed_valid_to_col) 1343 .as_(valid_to_col.this) 1344 ) 1345 valid_from_case_stmt = exp.func( 1346 "COALESCE", 1347 prefixed_valid_from_col, 1348 update_valid_from_start, 1349 ).as_(valid_from_col.this) 1350 else: 1351 assert updated_at_col is not None 1352 prefixed_updated_at_col = updated_at_col.copy() 1353 prefixed_updated_at_col.this.set("this", f"t_{updated_at_col.name}") 1354 updated_row_filter = updated_at_col > prefixed_updated_at_col 1355 1356 valid_to_case_stmt_builder = exp.Case().when(updated_row_filter, updated_at_col) 1357 if delete_check: 1358 valid_to_case_stmt_builder = valid_to_case_stmt_builder.when( 1359 delete_check, execution_ts 1360 ) 1361 valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_( 1362 valid_to_col.this 1363 ) 1364 1365 valid_from_case_stmt = ( 1366 exp.Case() 1367 .when( 1368 exp.and_( 1369 prefixed_valid_from_col.is_(exp.Null()), 1370 exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(), 1371 ), 1372 exp.Case() 1373 .when( 1374 exp.column(valid_to_col.this, "latest_deleted") > updated_at_col, 1375 exp.column(valid_to_col.this, "latest_deleted"), 1376 ) 1377 .else_(updated_at_col), 1378 ) 1379 .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start) 1380 .else_(prefixed_valid_from_col) 1381 ).as_(valid_from_col.this) 1382 1383 existing_rows_query = exp.select(*table_columns).from_(target_table) 1384 if truncate: 1385 existing_rows_query = existing_rows_query.limit(0) 1386 1387 with source_queries[0] as source_query: 1388 prefixed_columns_to_types = [] 1389 for column in columns_to_types: 1390 prefixed_col = exp.column(column).copy() 1391 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 1392 prefixed_columns_to_types.append(prefixed_col) 1393 prefixed_unmanaged_columns = [] 1394 for column in unmanaged_columns: 1395 prefixed_col = exp.column(column).copy() 1396 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 1397 prefixed_unmanaged_columns.append(prefixed_col) 1398 query = ( 1399 exp.Select() # type: ignore 1400 .with_( 1401 "source", 1402 exp.select(exp.true().as_("_exists"), *select_source_columns) 1403 .distinct(*unique_key) 1404 .from_(source_query.subquery("raw_source")), # type: ignore 1405 ) 1406 # Historical Records that Do Not Change 1407 .with_( 1408 "static", 1409 existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), 1410 ) 1411 # Latest Records that can be updated 1412 .with_( 1413 "latest", 1414 existing_rows_query.where(valid_to_col.is_(exp.Null())), 1415 ) 1416 # Deleted records which can be used to determine `valid_from` for undeleted source records 1417 .with_( 1418 "deleted", 1419 exp.select(*[exp.column(col, "static") for col in columns_to_types]) 1420 .from_("static") 1421 .join( 1422 "latest", 1423 on=exp.and_( 1424 *[ 1425 add_table(key, "static").eq(add_table(key, "latest")) 1426 for key in unique_key 1427 ] 1428 ), 1429 join_type="left", 1430 ) 1431 .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())), 1432 ) 1433 # Get the latest `valid_to` deleted record for each unique key 1434 .with_( 1435 "latest_deleted", 1436 exp.select( 1437 exp.true().as_("_exists"), 1438 *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)), 1439 exp.Max(this=valid_to_col).as_(valid_to_col.this), 1440 ) 1441 .from_("deleted") 1442 .group_by(*unique_key), 1443 ) 1444 # Do a full join between latest records and source table in order to combine them together 1445 # MySQL doesn't suport full join so going to do a left then right join and remove dups with union 1446 .with_( 1447 "joined", 1448 exp.select( 1449 exp.column("_exists", table="source"), 1450 *( 1451 exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this) 1452 for i, col in enumerate(columns_to_types) 1453 ), 1454 *(exp.column(col, table="source").as_(col) for col in unmanaged_columns), 1455 ) 1456 .from_("latest") 1457 .join( 1458 "source", 1459 on=exp.and_( 1460 *[ 1461 add_table(key, "latest").eq(add_table(key, "source")) 1462 for key in unique_key 1463 ] 1464 ), 1465 join_type="left", 1466 ) 1467 .union( 1468 exp.select( 1469 exp.column("_exists", table="source"), 1470 *( 1471 exp.column(col, table="latest").as_( 1472 prefixed_columns_to_types[i].this 1473 ) 1474 for i, col in enumerate(columns_to_types) 1475 ), 1476 *( 1477 exp.column(col, table="source").as_(col) 1478 for col in unmanaged_columns 1479 ), 1480 ) 1481 .from_("latest") 1482 .join( 1483 "source", 1484 on=exp.and_( 1485 *[ 1486 add_table(key, "latest").eq(add_table(key, "source")) 1487 for key in unique_key 1488 ] 1489 ), 1490 join_type="right", 1491 ) 1492 ), 1493 ) 1494 # Get deleted, new, no longer current, or unchanged records 1495 .with_( 1496 "updated_rows", 1497 exp.select( 1498 *( 1499 exp.func( 1500 "COALESCE", 1501 exp.column(prefixed_unmanaged_columns[i].this, table="joined"), 1502 exp.column(col, table="joined"), 1503 ).as_(col) 1504 for i, col in enumerate(unmanaged_columns) 1505 ), 1506 valid_from_case_stmt, 1507 valid_to_case_stmt, 1508 ) 1509 .from_("joined") 1510 .join( 1511 "latest_deleted", 1512 on=exp.and_( 1513 *[ 1514 add_table(part, "joined").eq( 1515 exp.column(f"_key{i}", "latest_deleted") 1516 ) 1517 for i, part in enumerate(unique_key) 1518 ] 1519 ), 1520 join_type="left", 1521 ), 1522 ) 1523 # Get records that have been "updated" which means inserting a new record with previous `valid_from` 1524 .with_( 1525 "inserted_rows", 1526 exp.select( 1527 *unmanaged_columns, 1528 insert_valid_from_start.as_(valid_from_col.this), # type: ignore 1529 to_time_column(exp.null(), time_data_type).as_(valid_to_col.this), 1530 ) 1531 .from_("joined") 1532 .where(updated_row_filter), 1533 ) 1534 .select(*table_columns) 1535 .from_("static") 1536 .union( 1537 exp.select(*table_columns).from_("updated_rows"), 1538 distinct=False, 1539 ) 1540 .union( 1541 exp.select(*table_columns).from_("inserted_rows"), 1542 distinct=False, 1543 ) 1544 ) 1545 1546 self.replace_query( 1547 target_table, 1548 query, 1549 columns_to_types=columns_to_types, 1550 table_description=table_description, 1551 column_descriptions=column_descriptions, 1552 ) 1553 1554 def merge( 1555 self, 1556 target_table: TableName, 1557 source_table: QueryOrDF, 1558 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 1559 unique_key: t.Sequence[exp.Expression], 1560 when_matched: t.Optional[exp.When] = None, 1561 ) -> None: 1562 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1563 source_table, columns_to_types, target_table=target_table 1564 ) 1565 columns_to_types = columns_to_types or self.columns(target_table) 1566 on = exp.and_( 1567 *( 1568 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 1569 for part in unique_key 1570 ) 1571 ) 1572 if not when_matched: 1573 when_matched = exp.When( 1574 matched=True, 1575 source=False, 1576 then=exp.Update( 1577 expressions=[ 1578 exp.column(col, MERGE_TARGET_ALIAS).eq(exp.column(col, MERGE_SOURCE_ALIAS)) 1579 for col in columns_to_types 1580 ], 1581 ), 1582 ) 1583 when_not_matched = exp.When( 1584 matched=False, 1585 source=False, 1586 then=exp.Insert( 1587 this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]), 1588 expression=exp.Tuple( 1589 expressions=[exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types] 1590 ), 1591 ), 1592 ) 1593 for source_query in source_queries: 1594 with source_query as query: 1595 self._merge( 1596 target_table=target_table, 1597 query=query, 1598 on=on, 1599 match_expressions=[when_matched, when_not_matched], 1600 ) 1601 1602 def rename_table( 1603 self, 1604 old_table_name: TableName, 1605 new_table_name: TableName, 1606 ) -> None: 1607 new_table = exp.to_table(new_table_name) 1608 if new_table.catalog: 1609 old_table = exp.to_table(old_table_name) 1610 catalog = old_table.catalog or self.get_current_catalog() 1611 if catalog != new_table.catalog: 1612 raise UnsupportedCatalogOperationError( 1613 "Tried to rename table across catalogs which is not supported" 1614 ) 1615 self._rename_table(old_table_name, new_table_name) 1616 1617 def get_data_objects( 1618 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1619 ) -> t.List[DataObject]: 1620 """Lists all data objects in the target schema. 1621 1622 Args: 1623 schema_name: The name of the schema to list data objects from. 1624 object_names: If provided, only return data objects with these names. 1625 1626 Returns: 1627 A list of data objects in the target schema. 1628 """ 1629 if object_names is not None: 1630 if not object_names: 1631 return [] 1632 object_names_list = list(object_names) 1633 batches = [ 1634 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 1635 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 1636 ] 1637 return [ 1638 obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch)) 1639 ] 1640 return self._get_data_objects(schema_name) 1641 1642 def fetchone( 1643 self, 1644 query: t.Union[exp.Expression, str], 1645 ignore_unsupported_errors: bool = False, 1646 quote_identifiers: bool = False, 1647 ) -> t.Tuple: 1648 with self.transaction(): 1649 self.execute( 1650 query, 1651 ignore_unsupported_errors=ignore_unsupported_errors, 1652 quote_identifiers=quote_identifiers, 1653 ) 1654 return self.cursor.fetchone() 1655 1656 def fetchall( 1657 self, 1658 query: t.Union[exp.Expression, str], 1659 ignore_unsupported_errors: bool = False, 1660 quote_identifiers: bool = False, 1661 ) -> t.List[t.Tuple]: 1662 with self.transaction(): 1663 self.execute( 1664 query, 1665 ignore_unsupported_errors=ignore_unsupported_errors, 1666 quote_identifiers=quote_identifiers, 1667 ) 1668 return self.cursor.fetchall() 1669 1670 def _fetch_native_df( 1671 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1672 ) -> DF: 1673 """Fetches a DataFrame that can be either Pandas or PySpark from the cursor""" 1674 with self.transaction(): 1675 self.execute(query, quote_identifiers=quote_identifiers) 1676 return self.cursor.fetchdf() 1677 1678 def fetchdf( 1679 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1680 ) -> pd.DataFrame: 1681 """Fetches a Pandas DataFrame from the cursor""" 1682 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 1683 if not isinstance(df, pd.DataFrame): 1684 raise NotImplementedError( 1685 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 1686 ) 1687 return df 1688 1689 def fetch_pyspark_df( 1690 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1691 ) -> PySparkDataFrame: 1692 """Fetches a PySpark DataFrame from the cursor""" 1693 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}") 1694 1695 def wap_supported(self, table_name: TableName) -> bool: 1696 """Returns whether WAP for the target table is supported.""" 1697 return False 1698 1699 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 1700 """Returns the updated table name for the given WAP ID. 1701 1702 Args: 1703 table_name: The name of the target table. 1704 wap_id: The WAP ID to prepare. 1705 1706 Returns: 1707 The updated table name that should be used for writing. 1708 """ 1709 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 1710 1711 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 1712 """Prepares the target table for WAP and returns the updated table name. 1713 1714 Args: 1715 table_name: The name of the target table. 1716 wap_id: The WAP ID to prepare. 1717 1718 Returns: 1719 The updated table name that should be used for writing. 1720 """ 1721 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 1722 1723 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 1724 """Publishes changes with the given WAP ID to the target table. 1725 1726 Args: 1727 table_name: The name of the target table. 1728 wap_id: The WAP ID to publish. 1729 """ 1730 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 1731 1732 @contextlib.contextmanager 1733 def transaction( 1734 self, 1735 condition: t.Optional[bool] = None, 1736 ) -> t.Iterator[None]: 1737 """A transaction context manager.""" 1738 if ( 1739 self._connection_pool.is_transaction_active 1740 or not self.SUPPORTS_TRANSACTIONS 1741 or (condition is not None and not condition) 1742 ): 1743 yield 1744 return 1745 self._connection_pool.begin() 1746 try: 1747 yield 1748 except Exception as e: 1749 self._connection_pool.rollback() 1750 raise e 1751 else: 1752 self._connection_pool.commit() 1753 1754 @contextlib.contextmanager 1755 def session(self, properties: SessionProperties) -> t.Iterator[None]: 1756 """A session context manager.""" 1757 if self._is_session_active(): 1758 yield 1759 return 1760 1761 self._begin_session(properties) 1762 try: 1763 yield 1764 finally: 1765 self._end_session() 1766 1767 def _begin_session(self, properties: SessionProperties) -> t.Any: 1768 """Begin a new session.""" 1769 1770 def _end_session(self) -> None: 1771 """End the existing session.""" 1772 1773 def _is_session_active(self) -> bool: 1774 """Indicates whether or not a session is active.""" 1775 return False 1776 1777 def execute( 1778 self, 1779 expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]], 1780 ignore_unsupported_errors: bool = False, 1781 quote_identifiers: bool = True, 1782 **kwargs: t.Any, 1783 ) -> None: 1784 """Execute a sql query.""" 1785 to_sql_kwargs = ( 1786 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 1787 ) 1788 1789 with self.transaction(): 1790 for e in ensure_list(expressions): 1791 sql = t.cast( 1792 str, 1793 ( 1794 self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 1795 if isinstance(e, exp.Expression) 1796 else e 1797 ), 1798 ) 1799 self._log_sql(sql) 1800 self._execute(sql, **kwargs) 1801 1802 def _log_sql(self, sql: str) -> None: 1803 logger.log(self._execute_log_level, "Executing SQL: %s", sql) 1804 1805 def _execute(self, sql: str, **kwargs: t.Any) -> None: 1806 self.cursor.execute(sql, **kwargs) 1807 1808 @contextlib.contextmanager 1809 def temp_table( 1810 self, 1811 query_or_df: QueryOrDF, 1812 name: TableName = "diff", 1813 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1814 **kwargs: t.Any, 1815 ) -> t.Iterator[exp.Table]: 1816 """A context manager for working a temp table. 1817 1818 The table will be created with a random guid and cleaned up after the block. 1819 1820 Args: 1821 query_or_df: The query or df to create a temp table for. 1822 name: The base name of the temp table. 1823 columns_to_types: A mapping between the column name and its data type. 1824 1825 Yields: 1826 The table expression 1827 """ 1828 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1829 query_or_df, columns_to_types=columns_to_types, target_table=name 1830 ) 1831 1832 with self.transaction(): 1833 table = self._get_temp_table(name) 1834 if table.db: 1835 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 1836 self._create_table_from_source_queries( 1837 table, 1838 source_queries, 1839 columns_to_types, 1840 exists=True, 1841 table_description=None, 1842 column_descriptions=None, 1843 **kwargs, 1844 ) 1845 1846 try: 1847 yield table 1848 finally: 1849 self.drop_table(table) 1850 1851 def _table_or_view_properties_to_expressions( 1852 self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expression]] = None 1853 ) -> t.List[exp.Property]: 1854 """Converts model properties (either physical or virtual) to a list of property expressions.""" 1855 if not table_or_view_properties: 1856 return [] 1857 return [ 1858 exp.Property(this=key, value=value.copy()) 1859 for key, value in table_or_view_properties.items() 1860 ] 1861 1862 def _build_table_properties_exp( 1863 self, 1864 catalog_name: t.Optional[str] = None, 1865 storage_format: t.Optional[str] = None, 1866 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 1867 partition_interval_unit: t.Optional[IntervalUnit] = None, 1868 clustered_by: t.Optional[t.List[str]] = None, 1869 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1870 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1871 table_description: t.Optional[str] = None, 1872 ) -> t.Optional[exp.Properties]: 1873 """Creates a SQLGlot table properties expression for ddl.""" 1874 properties: t.List[exp.Expression] = [] 1875 1876 if table_description: 1877 properties.append( 1878 exp.SchemaCommentProperty( 1879 this=exp.Literal.string(self._truncate_table_comment(table_description)) 1880 ) 1881 ) 1882 1883 if properties: 1884 return exp.Properties(expressions=properties) 1885 return None 1886 1887 def _build_view_properties_exp( 1888 self, 1889 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1890 table_description: t.Optional[str] = None, 1891 ) -> t.Optional[exp.Properties]: 1892 """Creates a SQLGlot table properties expression for view""" 1893 properties: t.List[exp.Expression] = [] 1894 1895 if table_description: 1896 properties.append( 1897 exp.SchemaCommentProperty( 1898 this=exp.Literal.string(self._truncate_table_comment(table_description)) 1899 ) 1900 ) 1901 1902 if properties: 1903 return exp.Properties(expressions=properties) 1904 return None 1905 1906 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 1907 return comment[:length] if length else comment 1908 1909 def _truncate_table_comment(self, comment: str) -> str: 1910 return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH) 1911 1912 def _truncate_column_comment(self, comment: str) -> str: 1913 return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH) 1914 1915 def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.Any) -> str: 1916 """ 1917 Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default 1918 kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine 1919 adapter, and then finally kwargs provided by the user when calling this method. 1920 """ 1921 sql_gen_kwargs = { 1922 "dialect": self.dialect, 1923 "pretty": False, 1924 "comments": False, 1925 **self.sql_gen_kwargs, 1926 **kwargs, 1927 } 1928 1929 expression = expression.copy() 1930 1931 if quote: 1932 quote_identifiers(expression) 1933 1934 return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore 1935 1936 def _get_data_objects( 1937 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1938 ) -> t.List[DataObject]: 1939 """ 1940 Returns all the data objects that exist in the given schema and optionally catalog. 1941 """ 1942 raise NotImplementedError() 1943 1944 def _get_temp_table(self, table: TableName, table_only: bool = False) -> exp.Table: 1945 """ 1946 Returns the name of the temp table that should be used for the given table name. 1947 """ 1948 table = t.cast(exp.Table, exp.to_table(table).copy()) 1949 table.set( 1950 "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=True) 1951 ) 1952 1953 if table_only: 1954 table.set("db", None) 1955 table.set("catalog", None) 1956 1957 return table 1958 1959 def _order_projections_and_filter( 1960 self, 1961 query: Query, 1962 columns_to_types: t.Dict[str, exp.DataType], 1963 where: t.Optional[exp.Expression] = None, 1964 ) -> Query: 1965 if not isinstance(query, exp.Query) or ( 1966 not where and query.named_selects == list(columns_to_types) 1967 ): 1968 return query 1969 1970 query = t.cast(exp.Query, query.copy()) 1971 with_ = query.args.pop("with", None) 1972 query = self._select_columns(columns_to_types).from_( 1973 query.subquery("_subquery", copy=False), copy=False 1974 ) 1975 if where: 1976 query = query.where(where, copy=False) 1977 1978 if with_: 1979 query.set("with", with_) 1980 1981 return query 1982 1983 def _truncate_table(self, table_name: TableName) -> None: 1984 table = exp.to_table(table_name) 1985 self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") 1986 1987 def _build_create_comment_table_exp( 1988 self, table: exp.Table, table_comment: str, table_kind: str 1989 ) -> exp.Comment | str: 1990 return exp.Comment( 1991 this=table, 1992 kind=table_kind, 1993 expression=exp.Literal.string(self._truncate_table_comment(table_comment)), 1994 ) 1995 1996 def _create_table_comment( 1997 self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" 1998 ) -> None: 1999 table = exp.to_table(table_name) 2000 2001 try: 2002 self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind)) 2003 except Exception: 2004 logger.warning( 2005 f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions.", 2006 exc_info=True, 2007 ) 2008 2009 def _build_create_comment_column_exp( 2010 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 2011 ) -> exp.Comment | str: 2012 return exp.Comment( 2013 this=exp.column(column_name, *reversed(table.parts)), # type: ignore 2014 kind="COLUMN", 2015 expression=exp.Literal.string(self._truncate_column_comment(column_comment)), 2016 ) 2017 2018 def _create_column_comments( 2019 self, 2020 table_name: TableName, 2021 column_comments: t.Dict[str, str], 2022 table_kind: str = "TABLE", 2023 ) -> None: 2024 table = exp.to_table(table_name) 2025 2026 for col, comment in column_comments.items(): 2027 try: 2028 self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind)) 2029 except Exception: 2030 logger.warning( 2031 f"Column comments for table '{table.alias_or_name}' not registered - this may be due to limited permissions.", 2032 exc_info=True, 2033 ) 2034 2035 def _rename_table( 2036 self, 2037 old_table_name: TableName, 2038 new_table_name: TableName, 2039 ) -> None: 2040 self.execute(exp.rename_table(old_table_name, new_table_name)) 2041 2042 @classmethod 2043 def _select_columns(cls, columns: t.Iterable[str]) -> exp.Select: 2044 return exp.select(*(exp.column(c, quoted=True) for c in columns))
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
102 def __init__( 103 self, 104 connection_factory: t.Callable[[], t.Any], 105 dialect: str = "", 106 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 107 multithreaded: bool = False, 108 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 109 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 110 default_catalog: t.Optional[str] = None, 111 execute_log_level: int = logging.DEBUG, 112 register_comments: bool = True, 113 **kwargs: t.Any, 114 ): 115 self.dialect = dialect.lower() or self.DIALECT 116 self._connection_pool = create_connection_pool( 117 connection_factory, multithreaded, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 118 ) 119 self.sql_gen_kwargs = sql_gen_kwargs or {} 120 self._default_catalog = default_catalog 121 self._execute_log_level = execute_log_level 122 self._extra_config = kwargs 123 self.register_comments = register_comments
125 def with_log_level(self, level: int) -> EngineAdapter: 126 adapter = self.__class__( 127 lambda: None, 128 dialect=self.dialect, 129 sql_gen_kwargs=self.sql_gen_kwargs, 130 default_catalog=self._default_catalog, 131 execute_log_level=level, 132 register_comments=self.register_comments, 133 **self._extra_config, 134 ) 135 136 adapter._connection_pool = self._connection_pool 137 138 return adapter
250 def recycle(self) -> None: 251 """Closes all open connections and releases all allocated resources associated with any thread 252 except the calling one.""" 253 self._connection_pool.close_all(exclude_calling_thread=True)
Closes all open connections and releases all allocated resources associated with any thread except the calling one.
255 def close(self) -> t.Any: 256 """Closes all open connections and releases all allocated resources.""" 257 self._connection_pool.close_all()
Closes all open connections and releases all allocated resources.
259 def get_current_catalog(self) -> t.Optional[str]: 260 """Returns the catalog name of the current connection.""" 261 raise NotImplementedError()
Returns the catalog name of the current connection.
263 def set_current_catalog(self, catalog: str) -> None: 264 """Sets the catalog name of the current connection.""" 265 raise NotImplementedError()
Sets the catalog name of the current connection.
267 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 268 """Intended to be overridden for data virtualization systems like Trino that, 269 depending on the target catalog, require slightly different properties to be set when creating / updating tables 270 """ 271 if self.CATALOG_SUPPORT.is_unsupported: 272 raise UnsupportedCatalogOperationError( 273 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 274 ) 275 return self.DEFAULT_CATALOG_TYPE
Intended to be overridden for data virtualization systems like Trino that, depending on the target catalog, require slightly different properties to be set when creating / updating tables
281 def replace_query( 282 self, 283 table_name: TableName, 284 query_or_df: QueryOrDF, 285 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 286 table_description: t.Optional[str] = None, 287 column_descriptions: t.Optional[t.Dict[str, str]] = None, 288 **kwargs: t.Any, 289 ) -> None: 290 """Replaces an existing table with a query. 291 292 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 293 294 Args: 295 table_name: The name of the table (eg. prod.table) 296 query_or_df: The SQL query to run or a dataframe. 297 columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 298 Expected to be ordered to match the order of values in the dataframe. 299 kwargs: Optional create table properties. 300 """ 301 target_table = exp.to_table(table_name) 302 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 303 query_or_df, columns_to_types, target_table=target_table 304 ) 305 columns_to_types = columns_to_types or self.columns(target_table) 306 query = source_queries[0].query_factory() 307 self_referencing = any( 308 quote_identifiers(table) == quote_identifiers(target_table) 309 for table in query.find_all(exp.Table) 310 ) 311 # If a query references itself then it must have a table created regardless of approach used. 312 if self_referencing: 313 self._create_table_from_columns( 314 target_table, 315 columns_to_types, 316 exists=True, 317 table_description=table_description, 318 column_descriptions=column_descriptions, 319 ) 320 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 321 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 322 if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table): 323 return self._create_table_from_source_queries( 324 target_table, 325 source_queries, 326 columns_to_types, 327 replace=self.SUPPORTS_REPLACE_TABLE, 328 table_description=table_description, 329 column_descriptions=column_descriptions, 330 **kwargs, 331 ) 332 else: 333 if self_referencing: 334 with self.temp_table( 335 self._select_columns(columns_to_types).from_(target_table), 336 name=target_table, 337 columns_to_types=columns_to_types, 338 **kwargs, 339 ) as temp_table: 340 for source_query in source_queries: 341 source_query.add_transform( 342 lambda node: ( # type: ignore 343 temp_table # type: ignore 344 if isinstance(node, exp.Table) 345 and quote_identifiers(node) == quote_identifiers(target_table) 346 else node 347 ) 348 ) 349 return self._insert_overwrite_by_condition( 350 target_table, 351 source_queries, 352 columns_to_types, 353 ) 354 return self._insert_overwrite_by_condition( 355 target_table, 356 source_queries, 357 columns_to_types, 358 )
Replaces an existing table with a query.
For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.
Arguments:
- table_name: The name of the table (eg. prod.table)
- query_or_df: The SQL query to run or a dataframe.
- columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. Expected to be ordered to match the order of values in the dataframe.
- kwargs: Optional create table properties.
360 def create_index( 361 self, 362 table_name: TableName, 363 index_name: str, 364 columns: t.Tuple[str, ...], 365 exists: bool = True, 366 ) -> None: 367 """Creates a new index for the given table if supported 368 369 Args: 370 table_name: The name of the target table. 371 index_name: The name of the index. 372 columns: The list of columns that constitute the index. 373 exists: Indicates whether to include the IF NOT EXISTS check. 374 """ 375 if not self.SUPPORTS_INDEXES: 376 return 377 378 expression = exp.Create( 379 this=exp.Index( 380 this=exp.to_identifier(index_name), 381 table=exp.to_table(table_name), 382 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 383 ), 384 kind="INDEX", 385 exists=exists, 386 ) 387 self.execute(expression)
Creates a new index for the given table if supported
Arguments:
- table_name: The name of the target table.
- index_name: The name of the index.
- columns: The list of columns that constitute the index.
- exists: Indicates whether to include the IF NOT EXISTS check.
389 def create_table( 390 self, 391 table_name: TableName, 392 columns_to_types: t.Dict[str, exp.DataType], 393 primary_key: t.Optional[t.Tuple[str, ...]] = None, 394 exists: bool = True, 395 table_description: t.Optional[str] = None, 396 column_descriptions: t.Optional[t.Dict[str, str]] = None, 397 **kwargs: t.Any, 398 ) -> None: 399 """Create a table using a DDL statement 400 401 Args: 402 table_name: The name of the table to create. Can be fully qualified or just table name. 403 columns_to_types: A mapping between the column name and its data type. 404 primary_key: Determines the table primary key. 405 exists: Indicates whether to include the IF NOT EXISTS check. 406 table_description: Optional table description from MODEL DDL. 407 column_descriptions: Optional column descriptions from model query. 408 kwargs: Optional create table properties. 409 """ 410 self._create_table_from_columns( 411 table_name, 412 columns_to_types, 413 primary_key, 414 exists, 415 table_description, 416 column_descriptions, 417 **kwargs, 418 )
Create a table using a DDL statement
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
- exists: Indicates whether to include the IF NOT EXISTS check.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
420 def ctas( 421 self, 422 table_name: TableName, 423 query_or_df: QueryOrDF, 424 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 425 exists: bool = True, 426 table_description: t.Optional[str] = None, 427 column_descriptions: t.Optional[t.Dict[str, str]] = None, 428 **kwargs: t.Any, 429 ) -> None: 430 """Create a table using a CTAS statement 431 432 Args: 433 table_name: The name of the table to create. Can be fully qualified or just table name. 434 query_or_df: The SQL query to run or a dataframe for the CTAS. 435 columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 436 exists: Indicates whether to include the IF NOT EXISTS check. 437 table_description: Optional table description from MODEL DDL. 438 column_descriptions: Optional column descriptions from model query. 439 kwargs: Optional create table properties. 440 """ 441 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 442 query_or_df, columns_to_types, target_table=table_name 443 ) 444 return self._create_table_from_source_queries( 445 table_name, 446 source_queries, 447 columns_to_types, 448 exists, 449 table_description=table_description, 450 column_descriptions=column_descriptions, 451 **kwargs, 452 )
Create a table using a CTAS statement
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- query_or_df: The SQL query to run or a dataframe for the CTAS.
- columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
- exists: Indicates whether to include the IF NOT EXISTS check.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
454 def create_state_table( 455 self, 456 table_name: str, 457 columns_to_types: t.Dict[str, exp.DataType], 458 primary_key: t.Optional[t.Tuple[str, ...]] = None, 459 ) -> None: 460 """Create a table to store SQLMesh internal state. 461 462 Args: 463 table_name: The name of the table to create. Can be fully qualified or just table name. 464 columns_to_types: A mapping between the column name and its data type. 465 primary_key: Determines the table primary key. 466 """ 467 self.create_table( 468 table_name, 469 columns_to_types, 470 primary_key=primary_key, 471 )
Create a table to store SQLMesh internal state.
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
719 def create_table_like( 720 self, 721 target_table_name: TableName, 722 source_table_name: TableName, 723 exists: bool = True, 724 ) -> None: 725 """ 726 Create a table like another table or view. 727 """ 728 target_table = exp.to_table(target_table_name) 729 source_table = exp.to_table(source_table_name) 730 create_expression = exp.Create( 731 this=target_table, 732 kind="TABLE", 733 exists=exists, 734 properties=exp.Properties( 735 expressions=[ 736 exp.LikeProperty(this=source_table), 737 ] 738 ), 739 ) 740 self.execute(create_expression)
Create a table like another table or view.
742 def clone_table( 743 self, 744 target_table_name: TableName, 745 source_table_name: TableName, 746 replace: bool = False, 747 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 748 **kwargs: t.Any, 749 ) -> None: 750 """Creates a table with the target name by cloning the source table. 751 752 Args: 753 target_table_name: The name of the table that should be created. 754 source_table_name: The name of the source table that should be cloned. 755 replace: Whether or not to replace an existing table. 756 """ 757 if not self.SUPPORTS_CLONING: 758 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 759 self.execute( 760 exp.Create( 761 this=exp.to_table(target_table_name), 762 kind="TABLE", 763 replace=replace, 764 clone=exp.Clone( 765 this=exp.to_table(source_table_name), 766 **(clone_kwargs or {}), 767 ), 768 **kwargs, 769 ) 770 )
Creates a table with the target name by cloning the source table.
Arguments:
- target_table_name: The name of the table that should be created.
- source_table_name: The name of the source table that should be cloned.
- replace: Whether or not to replace an existing table.
772 def drop_table(self, table_name: TableName, exists: bool = True) -> None: 773 """Drops a table. 774 775 Args: 776 table_name: The name of the table to drop. 777 exists: If exists, defaults to True. 778 """ 779 drop_expression = exp.Drop(this=exp.to_table(table_name), kind="TABLE", exists=exists) 780 self.execute(drop_expression)
Drops a table.
Arguments:
- table_name: The name of the table to drop.
- exists: If exists, defaults to True.
782 def alter_table( 783 self, 784 current_table_name: TableName, 785 target_table_name: TableName, 786 ) -> None: 787 """ 788 Performs the required alter statements to change the current table into the structure of the target table. 789 """ 790 with self.transaction(): 791 for alter_expression in self.SCHEMA_DIFFER.compare_columns( 792 current_table_name, 793 self.columns(current_table_name), 794 self.columns(target_table_name), 795 ): 796 self.execute(alter_expression)
Performs the required alter statements to change the current table into the structure of the target table.
798 def create_view( 799 self, 800 view_name: TableName, 801 query_or_df: QueryOrDF, 802 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 803 replace: bool = True, 804 materialized: bool = False, 805 table_description: t.Optional[str] = None, 806 column_descriptions: t.Optional[t.Dict[str, str]] = None, 807 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 808 **create_kwargs: t.Any, 809 ) -> None: 810 """Create a view with a query or dataframe. 811 812 If a dataframe is passed in, it will be converted into a literal values statement. 813 This should only be done if the dataframe is very small! 814 815 Args: 816 view_name: The view name. 817 query_or_df: A query or dataframe. 818 columns_to_types: Columns to use in the view statement. 819 replace: Whether or not to replace an existing view defaults to True. 820 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 821 table_description: Optional table description from MODEL DDL. 822 column_descriptions: Optional column descriptions from model query. 823 view_properties: Optional view properties to add to the view. 824 create_kwargs: Additional kwargs to pass into the Create expression 825 """ 826 if self.is_pandas_df(query_or_df): 827 values = list(t.cast(pd.DataFrame, query_or_df).itertuples(index=False, name=None)) 828 columns_to_types = columns_to_types or self._columns_to_types(query_or_df) 829 if not columns_to_types: 830 raise SQLMeshError("columns_to_types must be provided for dataframes") 831 query_or_df = self._values_to_sql( 832 values, 833 columns_to_types, 834 batch_start=0, 835 batch_end=len(values), 836 ) 837 838 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 839 query_or_df, columns_to_types, batch_size=0, target_table=view_name 840 ) 841 if len(source_queries) != 1: 842 raise SQLMeshError("Only one source query is supported for creating views") 843 844 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 845 if columns_to_types: 846 schema = self._build_schema_exp( 847 exp.to_table(view_name), columns_to_types, column_descriptions, is_view=True 848 ) 849 850 properties = create_kwargs.pop("properties", None) 851 if not properties: 852 properties = exp.Properties(expressions=[]) 853 854 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 855 properties.append("expressions", exp.MaterializedProperty()) 856 857 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 858 schema = schema.this 859 860 create_view_properties = self._build_view_properties_exp( 861 view_properties, 862 ( 863 table_description 864 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 865 else None 866 ), 867 ) 868 if create_view_properties: 869 for view_property in create_view_properties.expressions: 870 properties.append("expressions", view_property) 871 872 if properties.expressions: 873 create_kwargs["properties"] = properties 874 875 with source_queries[0] as query: 876 self.execute( 877 exp.Create( 878 this=schema, 879 kind="VIEW", 880 replace=replace, 881 expression=query, 882 **create_kwargs, 883 ), 884 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 885 ) 886 887 # Register table comment with commands if the engine doesn't support doing it in CREATE 888 if ( 889 table_description 890 and self.COMMENT_CREATION_VIEW.is_comment_command_only 891 and self.comments_enabled 892 ): 893 self._create_table_comment(view_name, table_description, "VIEW") 894 # Register column comments with commands if the engine doesn't support doing it in 895 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 896 # columns_to_types 897 if ( 898 column_descriptions 899 and ( 900 self.COMMENT_CREATION_VIEW.is_comment_command_only 901 or ( 902 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 903 and not columns_to_types 904 ) 905 ) 906 and self.comments_enabled 907 ): 908 self._create_column_comments(view_name, column_descriptions, "VIEW")
Create a view with a query or dataframe.
If a dataframe is passed in, it will be converted into a literal values statement. This should only be done if the dataframe is very small!
Arguments:
- view_name: The view name.
- query_or_df: A query or dataframe.
- columns_to_types: Columns to use in the view statement.
- replace: Whether or not to replace an existing view defaults to True.
- materialized: Whether to create a a materialized view. Only used for engines that support this feature.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- view_properties: Optional view properties to add to the view.
- create_kwargs: Additional kwargs to pass into the Create expression
910 @set_catalog() 911 def create_schema( 912 self, 913 schema_name: SchemaName, 914 ignore_if_exists: bool = True, 915 warn_on_error: bool = True, 916 ) -> None: 917 """Create a schema from a name or qualified table name.""" 918 try: 919 self.execute( 920 exp.Create( 921 this=to_schema(schema_name), 922 kind="SCHEMA", 923 exists=ignore_if_exists, 924 ) 925 ) 926 except Exception as e: 927 if not warn_on_error: 928 raise 929 logger.warning("Failed to create schema '%s': %s", schema_name, e)
Create a schema from a name or qualified table name.
946 def drop_view( 947 self, 948 view_name: TableName, 949 ignore_if_not_exists: bool = True, 950 materialized: bool = False, 951 **kwargs: t.Any, 952 ) -> None: 953 """Drop a view.""" 954 self.execute( 955 exp.Drop( 956 this=exp.to_table(view_name), 957 exists=ignore_if_not_exists, 958 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 959 kind="VIEW", 960 **kwargs, 961 ) 962 )
Drop a view.
964 def columns( 965 self, table_name: TableName, include_pseudo_columns: bool = False 966 ) -> t.Dict[str, exp.DataType]: 967 """Fetches column names and types for the target table.""" 968 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 969 describe_output = self.cursor.fetchall() 970 return { 971 # Note: MySQL returns the column type as bytes. 972 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 973 for column_name, column_type, *_ in itertools.takewhile( 974 lambda t: not t[0].startswith("#"), 975 describe_output, 976 ) 977 if column_name and column_name.strip() and column_type and column_type.strip() 978 }
Fetches column names and types for the target table.
990 def insert_append( 991 self, 992 table_name: TableName, 993 query_or_df: QueryOrDF, 994 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 995 ) -> None: 996 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 997 query_or_df, columns_to_types, target_table=table_name 998 ) 999 self._insert_append_source_queries(table_name, source_queries, columns_to_types)
1024 def insert_overwrite_by_partition( 1025 self, 1026 table_name: TableName, 1027 query_or_df: QueryOrDF, 1028 partitioned_by: t.List[exp.Expression], 1029 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1030 ) -> None: 1031 raise NotImplementedError( 1032 "Insert Overwrite by Partition (not time) is not supported by this engine" 1033 )
1035 def insert_overwrite_by_time_partition( 1036 self, 1037 table_name: TableName, 1038 query_or_df: QueryOrDF, 1039 start: TimeLike, 1040 end: TimeLike, 1041 time_formatter: t.Callable[ 1042 [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression 1043 ], 1044 time_column: TimeColumn | exp.Expression | str, 1045 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1046 **kwargs: t.Any, 1047 ) -> None: 1048 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1049 query_or_df, columns_to_types, target_table=table_name 1050 ) 1051 columns_to_types = columns_to_types or self.columns(table_name) 1052 low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)] 1053 if isinstance(time_column, TimeColumn): 1054 time_column = time_column.column 1055 where = exp.Between( 1056 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1057 low=low, 1058 high=high, 1059 ) 1060 self._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where)
1149 def scd_type_2_by_time( 1150 self, 1151 target_table: TableName, 1152 source_table: QueryOrDF, 1153 unique_key: t.Sequence[exp.Expression], 1154 valid_from_col: exp.Column, 1155 valid_to_col: exp.Column, 1156 execution_time: TimeLike, 1157 updated_at_col: exp.Column, 1158 invalidate_hard_deletes: bool = True, 1159 updated_at_as_valid_from: bool = False, 1160 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1161 table_description: t.Optional[str] = None, 1162 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1163 truncate: bool = False, 1164 **kwargs: t.Any, 1165 ) -> None: 1166 self._scd_type_2( 1167 target_table=target_table, 1168 source_table=source_table, 1169 unique_key=unique_key, 1170 valid_from_col=valid_from_col, 1171 valid_to_col=valid_to_col, 1172 execution_time=execution_time, 1173 updated_at_col=updated_at_col, 1174 invalidate_hard_deletes=invalidate_hard_deletes, 1175 updated_at_as_valid_from=updated_at_as_valid_from, 1176 columns_to_types=columns_to_types, 1177 table_description=table_description, 1178 column_descriptions=column_descriptions, 1179 truncate=truncate, 1180 )
1182 def scd_type_2_by_column( 1183 self, 1184 target_table: TableName, 1185 source_table: QueryOrDF, 1186 unique_key: t.Sequence[exp.Expression], 1187 valid_from_col: exp.Column, 1188 valid_to_col: exp.Column, 1189 execution_time: TimeLike, 1190 check_columns: t.Union[exp.Star, t.Sequence[exp.Column]], 1191 invalidate_hard_deletes: bool = True, 1192 execution_time_as_valid_from: bool = False, 1193 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1194 table_description: t.Optional[str] = None, 1195 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1196 truncate: bool = False, 1197 **kwargs: t.Any, 1198 ) -> None: 1199 self._scd_type_2( 1200 target_table=target_table, 1201 source_table=source_table, 1202 unique_key=unique_key, 1203 valid_from_col=valid_from_col, 1204 valid_to_col=valid_to_col, 1205 execution_time=execution_time, 1206 check_columns=check_columns, 1207 columns_to_types=columns_to_types, 1208 invalidate_hard_deletes=invalidate_hard_deletes, 1209 execution_time_as_valid_from=execution_time_as_valid_from, 1210 table_description=table_description, 1211 column_descriptions=column_descriptions, 1212 truncate=truncate, 1213 )
1554 def merge( 1555 self, 1556 target_table: TableName, 1557 source_table: QueryOrDF, 1558 columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 1559 unique_key: t.Sequence[exp.Expression], 1560 when_matched: t.Optional[exp.When] = None, 1561 ) -> None: 1562 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1563 source_table, columns_to_types, target_table=target_table 1564 ) 1565 columns_to_types = columns_to_types or self.columns(target_table) 1566 on = exp.and_( 1567 *( 1568 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 1569 for part in unique_key 1570 ) 1571 ) 1572 if not when_matched: 1573 when_matched = exp.When( 1574 matched=True, 1575 source=False, 1576 then=exp.Update( 1577 expressions=[ 1578 exp.column(col, MERGE_TARGET_ALIAS).eq(exp.column(col, MERGE_SOURCE_ALIAS)) 1579 for col in columns_to_types 1580 ], 1581 ), 1582 ) 1583 when_not_matched = exp.When( 1584 matched=False, 1585 source=False, 1586 then=exp.Insert( 1587 this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]), 1588 expression=exp.Tuple( 1589 expressions=[exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types] 1590 ), 1591 ), 1592 ) 1593 for source_query in source_queries: 1594 with source_query as query: 1595 self._merge( 1596 target_table=target_table, 1597 query=query, 1598 on=on, 1599 match_expressions=[when_matched, when_not_matched], 1600 )
1602 def rename_table( 1603 self, 1604 old_table_name: TableName, 1605 new_table_name: TableName, 1606 ) -> None: 1607 new_table = exp.to_table(new_table_name) 1608 if new_table.catalog: 1609 old_table = exp.to_table(old_table_name) 1610 catalog = old_table.catalog or self.get_current_catalog() 1611 if catalog != new_table.catalog: 1612 raise UnsupportedCatalogOperationError( 1613 "Tried to rename table across catalogs which is not supported" 1614 ) 1615 self._rename_table(old_table_name, new_table_name)
1617 def get_data_objects( 1618 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1619 ) -> t.List[DataObject]: 1620 """Lists all data objects in the target schema. 1621 1622 Args: 1623 schema_name: The name of the schema to list data objects from. 1624 object_names: If provided, only return data objects with these names. 1625 1626 Returns: 1627 A list of data objects in the target schema. 1628 """ 1629 if object_names is not None: 1630 if not object_names: 1631 return [] 1632 object_names_list = list(object_names) 1633 batches = [ 1634 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 1635 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 1636 ] 1637 return [ 1638 obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch)) 1639 ] 1640 return self._get_data_objects(schema_name)
Lists all data objects in the target schema.
Arguments:
- schema_name: The name of the schema to list data objects from.
- object_names: If provided, only return data objects with these names.
Returns:
A list of data objects in the target schema.
1642 def fetchone( 1643 self, 1644 query: t.Union[exp.Expression, str], 1645 ignore_unsupported_errors: bool = False, 1646 quote_identifiers: bool = False, 1647 ) -> t.Tuple: 1648 with self.transaction(): 1649 self.execute( 1650 query, 1651 ignore_unsupported_errors=ignore_unsupported_errors, 1652 quote_identifiers=quote_identifiers, 1653 ) 1654 return self.cursor.fetchone()
1656 def fetchall( 1657 self, 1658 query: t.Union[exp.Expression, str], 1659 ignore_unsupported_errors: bool = False, 1660 quote_identifiers: bool = False, 1661 ) -> t.List[t.Tuple]: 1662 with self.transaction(): 1663 self.execute( 1664 query, 1665 ignore_unsupported_errors=ignore_unsupported_errors, 1666 quote_identifiers=quote_identifiers, 1667 ) 1668 return self.cursor.fetchall()
1678 def fetchdf( 1679 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1680 ) -> pd.DataFrame: 1681 """Fetches a Pandas DataFrame from the cursor""" 1682 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 1683 if not isinstance(df, pd.DataFrame): 1684 raise NotImplementedError( 1685 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 1686 ) 1687 return df
Fetches a Pandas DataFrame from the cursor
1689 def fetch_pyspark_df( 1690 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 1691 ) -> PySparkDataFrame: 1692 """Fetches a PySpark DataFrame from the cursor""" 1693 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")
Fetches a PySpark DataFrame from the cursor
1695 def wap_supported(self, table_name: TableName) -> bool: 1696 """Returns whether WAP for the target table is supported.""" 1697 return False
Returns whether WAP for the target table is supported.
1699 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 1700 """Returns the updated table name for the given WAP ID. 1701 1702 Args: 1703 table_name: The name of the target table. 1704 wap_id: The WAP ID to prepare. 1705 1706 Returns: 1707 The updated table name that should be used for writing. 1708 """ 1709 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Returns the updated table name for the given WAP ID.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to prepare.
Returns:
The updated table name that should be used for writing.
1711 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 1712 """Prepares the target table for WAP and returns the updated table name. 1713 1714 Args: 1715 table_name: The name of the target table. 1716 wap_id: The WAP ID to prepare. 1717 1718 Returns: 1719 The updated table name that should be used for writing. 1720 """ 1721 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Prepares the target table for WAP and returns the updated table name.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to prepare.
Returns:
The updated table name that should be used for writing.
1723 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 1724 """Publishes changes with the given WAP ID to the target table. 1725 1726 Args: 1727 table_name: The name of the target table. 1728 wap_id: The WAP ID to publish. 1729 """ 1730 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Publishes changes with the given WAP ID to the target table.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to publish.
1732 @contextlib.contextmanager 1733 def transaction( 1734 self, 1735 condition: t.Optional[bool] = None, 1736 ) -> t.Iterator[None]: 1737 """A transaction context manager.""" 1738 if ( 1739 self._connection_pool.is_transaction_active 1740 or not self.SUPPORTS_TRANSACTIONS 1741 or (condition is not None and not condition) 1742 ): 1743 yield 1744 return 1745 self._connection_pool.begin() 1746 try: 1747 yield 1748 except Exception as e: 1749 self._connection_pool.rollback() 1750 raise e 1751 else: 1752 self._connection_pool.commit()
A transaction context manager.
1754 @contextlib.contextmanager 1755 def session(self, properties: SessionProperties) -> t.Iterator[None]: 1756 """A session context manager.""" 1757 if self._is_session_active(): 1758 yield 1759 return 1760 1761 self._begin_session(properties) 1762 try: 1763 yield 1764 finally: 1765 self._end_session()
A session context manager.
1777 def execute( 1778 self, 1779 expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]], 1780 ignore_unsupported_errors: bool = False, 1781 quote_identifiers: bool = True, 1782 **kwargs: t.Any, 1783 ) -> None: 1784 """Execute a sql query.""" 1785 to_sql_kwargs = ( 1786 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 1787 ) 1788 1789 with self.transaction(): 1790 for e in ensure_list(expressions): 1791 sql = t.cast( 1792 str, 1793 ( 1794 self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 1795 if isinstance(e, exp.Expression) 1796 else e 1797 ), 1798 ) 1799 self._log_sql(sql) 1800 self._execute(sql, **kwargs)
Execute a sql query.
1808 @contextlib.contextmanager 1809 def temp_table( 1810 self, 1811 query_or_df: QueryOrDF, 1812 name: TableName = "diff", 1813 columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1814 **kwargs: t.Any, 1815 ) -> t.Iterator[exp.Table]: 1816 """A context manager for working a temp table. 1817 1818 The table will be created with a random guid and cleaned up after the block. 1819 1820 Args: 1821 query_or_df: The query or df to create a temp table for. 1822 name: The base name of the temp table. 1823 columns_to_types: A mapping between the column name and its data type. 1824 1825 Yields: 1826 The table expression 1827 """ 1828 source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( 1829 query_or_df, columns_to_types=columns_to_types, target_table=name 1830 ) 1831 1832 with self.transaction(): 1833 table = self._get_temp_table(name) 1834 if table.db: 1835 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 1836 self._create_table_from_source_queries( 1837 table, 1838 source_queries, 1839 columns_to_types, 1840 exists=True, 1841 table_description=None, 1842 column_descriptions=None, 1843 **kwargs, 1844 ) 1845 1846 try: 1847 yield table 1848 finally: 1849 self.drop_table(table)
A context manager for working a temp table.
The table will be created with a random guid and cleaned up after the block.
Arguments:
- query_or_df: The query or df to create a temp table for.
- name: The base name of the temp table.
- columns_to_types: A mapping between the column name and its data type.
Yields:
The table expression
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- EngineAdapter
- EngineAdapter
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- INSERT_OVERWRITE_STRATEGY
- CATALOG_SUPPORT
- with_log_level
- is_pandas_df
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- replace_query
- create_index
- create_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_table
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- transaction
- session
- execute
- temp_table