EngineAdapter
Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to generalize its functionality to different engines that have Python Database API 2.0-compliant connections. Rather than executing queries directly against your data stores, SQLMesh components such as the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic.
1""" 2# EngineAdapter 3 4Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to 5generalize its functionality to different engines that have Python Database API 2.0-compliant 6connections. Rather than executing queries directly against your data stores, SQLMesh components such as 7the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic. 8""" 9 10from __future__ import annotations 11 12import contextlib 13import itertools 14import logging 15import sys 16import typing as t 17from functools import cached_property, partial 18 19from sqlglot import Dialect, exp 20from sqlglot.errors import ErrorLevel 21from sqlglot.helper import ensure_list, seq_get 22from sqlglot.optimizer.qualify_columns import quote_identifiers 23 24from sqlmesh.core.dialect import ( 25 add_table, 26 schema_, 27 select_from_values_for_batch_range, 28 to_schema, 29) 30from sqlmesh.core.engine_adapter.shared import ( 31 CatalogSupport, 32 CommentCreationTable, 33 CommentCreationView, 34 DataObject, 35 DataObjectType, 36 EngineRunMode, 37 InsertOverwriteStrategy, 38 SourceQuery, 39 set_catalog, 40) 41from sqlmesh.core.model.kind import TimeColumn 42from sqlmesh.core.schema_diff import SchemaDiffer, TableAlterOperation 43from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker 44from sqlmesh.utils import ( 45 CorrelationId, 46 columns_to_types_all_known, 47 random_id, 48 get_source_columns_to_types, 49) 50from sqlmesh.utils.connection_pool import ConnectionPool, create_connection_pool 51from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column 52from sqlmesh.utils.errors import ( 53 MissingDefaultCatalogError, 54 SQLMeshError, 55 UnsupportedCatalogOperationError, 56) 57from sqlmesh.utils.pandas import columns_to_types_from_df 58 59if t.TYPE_CHECKING: 60 import pandas as pd 61 62 from sqlmesh.core._typing import SchemaName, SessionProperties, TableName 63 from sqlmesh.core.engine_adapter._typing import ( 64 DF, 65 BigframeSession, 66 GrantsConfig, 67 PySparkDataFrame, 68 PySparkSession, 69 Query, 70 QueryOrDF, 71 SnowparkSession, 72 ) 73 from sqlmesh.core.node import IntervalUnit 74 75logger = logging.getLogger(__name__) 76 77MERGE_TARGET_ALIAS = "__MERGE_TARGET__" 78MERGE_SOURCE_ALIAS = "__MERGE_SOURCE__" 79 80KEY_FOR_CREATABLE_TYPE = "CREATABLE_TYPE" 81 82 83@set_catalog() 84class EngineAdapter: 85 """Base class wrapping a Database API compliant connection. 86 87 The EngineAdapter is an easily-subclassable interface that interacts 88 with the underlying engine and data store. 89 90 Args: 91 connection_factory_or_pool: a callable which produces a new Database API-compliant 92 connection on every call. 93 dialect: The dialect with which this adapter is associated. 94 multithreaded: Indicates whether this adapter will be used by more than one thread. 95 """ 96 97 DIALECT = "" 98 DEFAULT_BATCH_SIZE = 10000 99 DATA_OBJECT_FILTER_BATCH_SIZE = 4000 100 SUPPORTS_TRANSACTIONS = True 101 SUPPORTS_INDEXES = False 102 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS 103 COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 104 MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None 105 MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None 106 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 107 SUPPORTS_MATERIALIZED_VIEWS = False 108 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False 109 SUPPORTS_VIEW_SCHEMA = True 110 SUPPORTS_CLONING = False 111 SUPPORTS_MANAGED_MODELS = False 112 SUPPORTS_CREATE_DROP_CATALOG = False 113 SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = [] 114 SCHEMA_DIFFER_KWARGS: t.Dict[str, t.Any] = {} 115 SUPPORTS_TUPLE_IN = True 116 HAS_VIEW_BINDING = False 117 SUPPORTS_REPLACE_TABLE = True 118 SUPPORTS_GRANTS = False 119 DEFAULT_CATALOG_TYPE = DIALECT 120 QUOTE_IDENTIFIERS_IN_VIEWS = True 121 MAX_IDENTIFIER_LENGTH: t.Optional[int] = None 122 ATTACH_CORRELATION_ID = True 123 SUPPORTS_QUERY_EXECUTION_TRACKING = False 124 SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS = False 125 126 def __init__( 127 self, 128 connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool], 129 dialect: str = "", 130 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 131 multithreaded: bool = False, 132 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 133 default_catalog: t.Optional[str] = None, 134 execute_log_level: int = logging.DEBUG, 135 register_comments: bool = True, 136 pre_ping: bool = False, 137 pretty_sql: bool = False, 138 shared_connection: bool = False, 139 correlation_id: t.Optional[CorrelationId] = None, 140 schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None, 141 query_execution_tracker: t.Optional[QueryExecutionTracker] = None, 142 **kwargs: t.Any, 143 ): 144 self.dialect = dialect.lower() or self.DIALECT 145 self._connection_pool = ( 146 connection_factory_or_pool 147 if isinstance(connection_factory_or_pool, ConnectionPool) 148 else create_connection_pool( 149 connection_factory_or_pool, 150 multithreaded, 151 shared_connection=shared_connection, 152 cursor_init=cursor_init, 153 ) 154 ) 155 self._sql_gen_kwargs = sql_gen_kwargs or {} 156 self._default_catalog = default_catalog 157 self._execute_log_level = execute_log_level 158 self._extra_config = kwargs 159 self._register_comments = register_comments 160 self._pre_ping = pre_ping 161 self._pretty_sql = pretty_sql 162 self._multithreaded = multithreaded 163 self.correlation_id = correlation_id 164 self._schema_differ_overrides = schema_differ_overrides 165 self._query_execution_tracker = query_execution_tracker 166 self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {} 167 168 def with_settings(self, **kwargs: t.Any) -> EngineAdapter: 169 extra_kwargs = { 170 "null_connection": True, 171 "execute_log_level": kwargs.pop("execute_log_level", self._execute_log_level), 172 "correlation_id": kwargs.pop("correlation_id", self.correlation_id), 173 "query_execution_tracker": kwargs.pop( 174 "query_execution_tracker", self._query_execution_tracker 175 ), 176 **self._extra_config, 177 **kwargs, 178 } 179 180 adapter = self.__class__( 181 self._connection_pool, 182 dialect=self.dialect, 183 sql_gen_kwargs=self._sql_gen_kwargs, 184 default_catalog=self._default_catalog, 185 register_comments=self._register_comments, 186 multithreaded=self._multithreaded, 187 pretty_sql=self._pretty_sql, 188 **extra_kwargs, 189 ) 190 191 return adapter 192 193 @property 194 def cursor(self) -> t.Any: 195 return self._connection_pool.get_cursor() 196 197 @property 198 def connection(self) -> t.Any: 199 return self._connection_pool.get() 200 201 @property 202 def spark(self) -> t.Optional[PySparkSession]: 203 return None 204 205 @property 206 def snowpark(self) -> t.Optional[SnowparkSession]: 207 return None 208 209 @property 210 def bigframe(self) -> t.Optional[BigframeSession]: 211 return None 212 213 @property 214 def comments_enabled(self) -> bool: 215 return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported 216 217 @property 218 def catalog_support(self) -> CatalogSupport: 219 return CatalogSupport.UNSUPPORTED 220 221 @cached_property 222 def schema_differ(self) -> SchemaDiffer: 223 return SchemaDiffer( 224 **{ 225 **self.SCHEMA_DIFFER_KWARGS, 226 **(self._schema_differ_overrides or {}), 227 } 228 ) 229 230 @property 231 def _catalog_type_overrides(self) -> t.Dict[str, str]: 232 return self._extra_config.get("catalog_type_overrides") or {} 233 234 @classmethod 235 def _casted_columns( 236 cls, 237 target_columns_to_types: t.Dict[str, exp.DataType], 238 source_columns: t.Optional[t.List[str]] = None, 239 ) -> t.List[exp.Expr]: 240 source_columns_lookup = set(source_columns or target_columns_to_types) 241 return [ 242 exp.alias_( 243 exp.cast( 244 exp.column(column, quoted=True) 245 if column in source_columns_lookup 246 else exp.Null(), 247 to=kind, 248 ), 249 column, 250 copy=False, 251 quoted=True, 252 ) 253 for column, kind in target_columns_to_types.items() 254 ] 255 256 @property 257 def default_catalog(self) -> t.Optional[str]: 258 if self.catalog_support.is_unsupported: 259 return None 260 default_catalog = self._default_catalog or self.get_current_catalog() 261 if not default_catalog: 262 raise MissingDefaultCatalogError( 263 "Could not determine a default catalog despite it being supported." 264 ) 265 return default_catalog 266 267 @property 268 def engine_run_mode(self) -> EngineRunMode: 269 return EngineRunMode.SINGLE_MODE_ENGINE 270 271 def _get_source_queries( 272 self, 273 query_or_df: QueryOrDF, 274 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 275 target_table: TableName, 276 *, 277 batch_size: t.Optional[int] = None, 278 source_columns: t.Optional[t.List[str]] = None, 279 ) -> t.List[SourceQuery]: 280 import pandas as pd 281 282 batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size 283 if isinstance(query_or_df, exp.Query): 284 query_factory = lambda: query_or_df 285 if source_columns: 286 source_columns_lookup = set(source_columns) 287 if not target_columns_to_types: 288 raise SQLMeshError("columns_to_types must be set if source_columns is set") 289 if not set(target_columns_to_types).issubset(source_columns_lookup): 290 select_columns = [ 291 exp.column(c, quoted=True) 292 if c in source_columns_lookup 293 else exp.cast(exp.Null(), target_columns_to_types[c], copy=False).as_( 294 c, copy=False, quoted=True 295 ) 296 for c in target_columns_to_types 297 ] 298 query_factory = ( 299 lambda: exp.Select() 300 .select(*select_columns) 301 .from_(query_or_df.subquery("select_source_columns")) 302 ) 303 return [SourceQuery(query_factory=query_factory)] # type: ignore 304 305 if not target_columns_to_types: 306 raise SQLMeshError( 307 "It is expected that if a DataFrame is passed in then columns_to_types is set" 308 ) 309 310 if isinstance(query_or_df, pd.DataFrame) and query_or_df.empty: 311 raise SQLMeshError( 312 "Cannot construct source query from an empty DataFrame. This error is commonly " 313 "related to Python models that produce no data. For such models, consider yielding " 314 "from an empty generator if the resulting set is empty, i.e. use `yield from ()`." 315 ) 316 317 return self._df_to_source_queries( 318 query_or_df, 319 target_columns_to_types, 320 batch_size, 321 target_table=target_table, 322 source_columns=source_columns, 323 ) 324 325 def _df_to_source_queries( 326 self, 327 df: DF, 328 target_columns_to_types: t.Dict[str, exp.DataType], 329 batch_size: int, 330 target_table: TableName, 331 source_columns: t.Optional[t.List[str]] = None, 332 ) -> t.List[SourceQuery]: 333 import pandas as pd 334 335 assert isinstance(df, pd.DataFrame) 336 num_rows = len(df.index) 337 batch_size = sys.maxsize if batch_size == 0 else batch_size 338 339 # we need to ensure that the order of the columns in columns_to_types columns matches the order of the values 340 # they can differ if a user specifies columns() on a python model in a different order than what's in the DataFrame's emitted by that model 341 df = df[list(source_columns or target_columns_to_types)] 342 values = list(df.itertuples(index=False, name=None)) 343 344 return [ 345 SourceQuery( 346 query_factory=partial( 347 self._values_to_sql, 348 values=values, # type: ignore 349 target_columns_to_types=target_columns_to_types, 350 batch_start=i, 351 batch_end=min(i + batch_size, num_rows), 352 source_columns=source_columns, 353 ), 354 ) 355 for i in range(0, num_rows, batch_size) 356 ] 357 358 def _get_source_queries_and_columns_to_types( 359 self, 360 query_or_df: QueryOrDF, 361 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 362 target_table: TableName, 363 *, 364 batch_size: t.Optional[int] = None, 365 source_columns: t.Optional[t.List[str]] = None, 366 ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]: 367 target_columns_to_types, source_columns = self._columns_to_types( 368 query_or_df, target_columns_to_types, source_columns 369 ) 370 source_queries = self._get_source_queries( 371 query_or_df, 372 target_columns_to_types, 373 target_table=target_table, 374 batch_size=batch_size, 375 source_columns=source_columns, 376 ) 377 return source_queries, target_columns_to_types 378 379 @t.overload 380 def _columns_to_types( 381 self, 382 query_or_df: DF, 383 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 384 source_columns: t.Optional[t.List[str]] = None, 385 ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ... 386 387 @t.overload 388 def _columns_to_types( 389 self, 390 query_or_df: Query, 391 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 392 source_columns: t.Optional[t.List[str]] = None, 393 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ... 394 395 def _columns_to_types( 396 self, 397 query_or_df: QueryOrDF, 398 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 399 source_columns: t.Optional[t.List[str]] = None, 400 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: 401 import pandas as pd 402 403 if not target_columns_to_types and isinstance(query_or_df, pd.DataFrame): 404 target_columns_to_types = columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df)) 405 if not source_columns and target_columns_to_types: 406 source_columns = list(target_columns_to_types) 407 # source columns should only contain columns that are defined in the target. If there are extras then 408 # that means they are intended to be ignored and will be excluded 409 source_columns = ( 410 [x for x in source_columns if x in target_columns_to_types] 411 if source_columns and target_columns_to_types 412 else None 413 ) 414 return target_columns_to_types, source_columns 415 416 def recycle(self) -> None: 417 """Closes all open connections and releases all allocated resources associated with any thread 418 except the calling one.""" 419 self._connection_pool.close_all(exclude_calling_thread=True) 420 421 def close(self) -> t.Any: 422 """Closes all open connections and releases all allocated resources.""" 423 self._connection_pool.close_all() 424 425 def get_current_catalog(self) -> t.Optional[str]: 426 """Returns the catalog name of the current connection.""" 427 raise NotImplementedError() 428 429 def set_current_catalog(self, catalog: str) -> None: 430 """Sets the catalog name of the current connection.""" 431 raise NotImplementedError() 432 433 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 434 """Intended to be overridden for data virtualization systems like Trino that, 435 depending on the target catalog, require slightly different properties to be set when creating / updating tables 436 """ 437 if self.catalog_support.is_unsupported: 438 raise UnsupportedCatalogOperationError( 439 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 440 ) 441 return ( 442 self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE) 443 if catalog 444 else self.DEFAULT_CATALOG_TYPE 445 ) 446 447 def get_catalog_type_from_table(self, table: TableName) -> str: 448 """Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type""" 449 catalog = exp.to_table(table).catalog or self.get_current_catalog() 450 return self.get_catalog_type(catalog) 451 452 @property 453 def current_catalog_type(self) -> str: 454 # `get_catalog_type_from_table` should be used over this property. Reason is that the table that is the target 455 # of the operation is what matters and not the catalog type of the connection. 456 # This still remains for legacy reasons and should be refactored out. 457 return self.get_catalog_type(self.get_current_catalog()) 458 459 def replace_query( 460 self, 461 table_name: TableName, 462 query_or_df: QueryOrDF, 463 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 464 table_description: t.Optional[str] = None, 465 column_descriptions: t.Optional[t.Dict[str, str]] = None, 466 source_columns: t.Optional[t.List[str]] = None, 467 supports_replace_table_override: t.Optional[bool] = None, 468 **kwargs: t.Any, 469 ) -> None: 470 """Replaces an existing table with a query. 471 472 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 473 474 Args: 475 table_name: The name of the table (eg. prod.table) 476 query_or_df: The SQL query to run or a dataframe. 477 target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 478 Expected to be ordered to match the order of values in the dataframe. 479 kwargs: Optional create table properties. 480 """ 481 target_table = exp.to_table(table_name) 482 483 target_data_object = self.get_data_object(target_table) 484 table_exists = target_data_object is not None 485 if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE): 486 table_exists = False 487 488 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 489 query_or_df, 490 target_columns_to_types, 491 target_table=target_table, 492 source_columns=source_columns, 493 ) 494 if not target_columns_to_types and table_exists: 495 target_columns_to_types = self.columns(target_table) 496 query = source_queries[0].query_factory() 497 self_referencing = any( 498 quote_identifiers(table) == quote_identifiers(target_table) 499 for table in query.find_all(exp.Table) 500 ) 501 # If a query references itself then it must have a table created regardless of approach used. 502 if self_referencing: 503 if not target_columns_to_types: 504 raise SQLMeshError( 505 f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. " 506 "Try casting the columns to an expected type or defining the columns in the model metadata. " 507 ) 508 self._create_table_from_columns( 509 target_table, 510 target_columns_to_types, 511 exists=True, 512 table_description=table_description, 513 column_descriptions=column_descriptions, 514 **kwargs, 515 ) 516 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 517 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 518 supports_replace_table = ( 519 self.SUPPORTS_REPLACE_TABLE 520 if supports_replace_table_override is None 521 else supports_replace_table_override 522 ) 523 if supports_replace_table or not table_exists: 524 return self._create_table_from_source_queries( 525 target_table, 526 source_queries, 527 target_columns_to_types, 528 replace=supports_replace_table, 529 table_description=table_description, 530 column_descriptions=column_descriptions, 531 **kwargs, 532 ) 533 if self_referencing: 534 assert target_columns_to_types is not None 535 with self.temp_table( 536 self._select_columns(target_columns_to_types).from_(target_table), 537 name=target_table, 538 target_columns_to_types=target_columns_to_types, 539 **kwargs, 540 ) as temp_table: 541 for source_query in source_queries: 542 source_query.add_transform( 543 lambda node: ( # type: ignore 544 temp_table # type: ignore 545 if isinstance(node, exp.Table) 546 and quote_identifiers(node) == quote_identifiers(target_table) 547 else node 548 ) 549 ) 550 return self._insert_overwrite_by_condition( 551 target_table, 552 source_queries, 553 target_columns_to_types, 554 **kwargs, 555 ) 556 return self._insert_overwrite_by_condition( 557 target_table, 558 source_queries, 559 target_columns_to_types, 560 **kwargs, 561 ) 562 563 def create_index( 564 self, 565 table_name: TableName, 566 index_name: str, 567 columns: t.Tuple[str, ...], 568 exists: bool = True, 569 ) -> None: 570 """Creates a new index for the given table if supported 571 572 Args: 573 table_name: The name of the target table. 574 index_name: The name of the index. 575 columns: The list of columns that constitute the index. 576 exists: Indicates whether to include the IF NOT EXISTS check. 577 """ 578 if not self.SUPPORTS_INDEXES: 579 return 580 581 expression = exp.Create( 582 this=exp.Index( 583 this=exp.to_identifier(index_name), 584 table=exp.to_table(table_name), 585 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 586 ), 587 kind="INDEX", 588 exists=exists, 589 ) 590 self.execute(expression) 591 592 def _pop_creatable_type_from_properties( 593 self, 594 properties: t.Dict[str, exp.Expr], 595 ) -> t.Optional[exp.Property]: 596 """Pop out the creatable_type from the properties dictionary (if exists (return it/remove it) else return none). 597 It also checks that none of the expressions are MATERIALIZE as that conflicts with the `materialize` parameter. 598 """ 599 for key in list(properties.keys()): 600 upper_key = key.upper() 601 if upper_key == KEY_FOR_CREATABLE_TYPE: 602 value = properties.pop(key).name 603 parsed_properties = exp.maybe_parse( 604 value, into=exp.Properties, dialect=self.dialect 605 ) 606 property, *others = parsed_properties.expressions 607 if others: 608 # Multiple properties are unsupported today, can look into it in the future if needed 609 raise SQLMeshError( 610 f"Invalid creatable_type value with multiple properties: {value}" 611 ) 612 if isinstance(property, exp.MaterializedProperty): 613 raise SQLMeshError( 614 f"Cannot use {value} as a creatable_type as it conflicts with the `materialize` parameter." 615 ) 616 return property 617 return None 618 619 def create_table( 620 self, 621 table_name: TableName, 622 target_columns_to_types: t.Dict[str, exp.DataType], 623 primary_key: t.Optional[t.Tuple[str, ...]] = None, 624 exists: bool = True, 625 table_description: t.Optional[str] = None, 626 column_descriptions: t.Optional[t.Dict[str, str]] = None, 627 **kwargs: t.Any, 628 ) -> None: 629 """Create a table using a DDL statement 630 631 Args: 632 table_name: The name of the table to create. Can be fully qualified or just table name. 633 target_columns_to_types: A mapping between the column name and its data type. 634 primary_key: Determines the table primary key. 635 exists: Indicates whether to include the IF NOT EXISTS check. 636 table_description: Optional table description from MODEL DDL. 637 column_descriptions: Optional column descriptions from model query. 638 kwargs: Optional create table properties. 639 """ 640 self._create_table_from_columns( 641 table_name, 642 target_columns_to_types, 643 primary_key, 644 exists, 645 table_description, 646 column_descriptions, 647 **kwargs, 648 ) 649 650 def create_managed_table( 651 self, 652 table_name: TableName, 653 query: Query, 654 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 655 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 656 clustered_by: t.Optional[t.List[exp.Expr]] = None, 657 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 658 table_description: t.Optional[str] = None, 659 column_descriptions: t.Optional[t.Dict[str, str]] = None, 660 source_columns: t.Optional[t.List[str]] = None, 661 **kwargs: t.Any, 662 ) -> None: 663 """Create a managed table using a query. 664 665 "Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh. 666 667 Args: 668 table_name: The name of the table to create. Can be fully qualified or just table name. 669 query: The SQL query for the engine to base the managed table on 670 target_columns_to_types: A mapping between the column name and its data type. 671 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 672 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 673 table_properties: Optional mapping of engine-specific properties to be set on the managed table 674 table_description: Optional table description from MODEL DDL. 675 column_descriptions: Optional column descriptions from model query. 676 kwargs: Optional create table properties. 677 """ 678 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 679 680 def ctas( 681 self, 682 table_name: TableName, 683 query_or_df: QueryOrDF, 684 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 685 exists: bool = True, 686 table_description: t.Optional[str] = None, 687 column_descriptions: t.Optional[t.Dict[str, str]] = None, 688 source_columns: t.Optional[t.List[str]] = None, 689 **kwargs: t.Any, 690 ) -> None: 691 """Create a table using a CTAS statement 692 693 Args: 694 table_name: The name of the table to create. Can be fully qualified or just table name. 695 query_or_df: The SQL query to run or a dataframe for the CTAS. 696 target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 697 exists: Indicates whether to include the IF NOT EXISTS check. 698 table_description: Optional table description from MODEL DDL. 699 column_descriptions: Optional column descriptions from model query. 700 kwargs: Optional create table properties. 701 """ 702 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 703 query_or_df, 704 target_columns_to_types, 705 target_table=table_name, 706 source_columns=source_columns, 707 ) 708 return self._create_table_from_source_queries( 709 table_name, 710 source_queries, 711 target_columns_to_types, 712 exists, 713 table_description=table_description, 714 column_descriptions=column_descriptions, 715 **kwargs, 716 ) 717 718 def create_state_table( 719 self, 720 table_name: str, 721 target_columns_to_types: t.Dict[str, exp.DataType], 722 primary_key: t.Optional[t.Tuple[str, ...]] = None, 723 ) -> None: 724 """Create a table to store SQLMesh internal state. 725 726 Args: 727 table_name: The name of the table to create. Can be fully qualified or just table name. 728 target_columns_to_types: A mapping between the column name and its data type. 729 primary_key: Determines the table primary key. 730 """ 731 self.create_table( 732 table_name, 733 target_columns_to_types, 734 primary_key=primary_key, 735 ) 736 737 def _create_table_from_columns( 738 self, 739 table_name: TableName, 740 target_columns_to_types: t.Dict[str, exp.DataType], 741 primary_key: t.Optional[t.Tuple[str, ...]] = None, 742 exists: bool = True, 743 table_description: t.Optional[str] = None, 744 column_descriptions: t.Optional[t.Dict[str, str]] = None, 745 **kwargs: t.Any, 746 ) -> None: 747 """ 748 Create a table using a DDL statement. 749 750 Args: 751 table_name: The name of the table to create. Can be fully qualified or just table name. 752 target_columns_to_types: Mapping between the column name and its data type. 753 primary_key: Determines the table primary key. 754 exists: Indicates whether to include the IF NOT EXISTS check. 755 table_description: Optional table description from MODEL DDL. 756 column_descriptions: Optional column descriptions from model query. 757 kwargs: Optional create table properties. 758 """ 759 table = exp.to_table(table_name) 760 761 if not columns_to_types_all_known(target_columns_to_types): 762 # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set 763 if exists and self.table_exists(table_name): 764 return 765 raise SQLMeshError( 766 "Cannot create a table without knowing the column types. " 767 "Try casting the columns to an expected type or defining the columns in the model metadata. " 768 f"Columns to types: {target_columns_to_types}" 769 ) 770 771 primary_key_expression = ( 772 [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])] 773 if primary_key and self.SUPPORTS_INDEXES 774 else [] 775 ) 776 777 schema = self._build_schema_exp( 778 table, 779 target_columns_to_types, 780 column_descriptions, 781 primary_key_expression, 782 ) 783 784 self._create_table( 785 schema, 786 None, 787 exists=exists, 788 target_columns_to_types=target_columns_to_types, 789 table_description=table_description, 790 **kwargs, 791 ) 792 793 # Register comments with commands if the engine doesn't support comments in the schema or CREATE 794 if ( 795 table_description 796 and self.COMMENT_CREATION_TABLE.is_comment_command_only 797 and self.comments_enabled 798 ): 799 self._create_table_comment(table_name, table_description) 800 if ( 801 column_descriptions 802 and self.COMMENT_CREATION_TABLE.is_comment_command_only 803 and self.comments_enabled 804 ): 805 self._create_column_comments(table_name, column_descriptions) 806 807 def _build_schema_exp( 808 self, 809 table: exp.Table, 810 target_columns_to_types: t.Dict[str, exp.DataType], 811 column_descriptions: t.Optional[t.Dict[str, str]] = None, 812 expressions: t.Optional[t.List[exp.PrimaryKey]] = None, 813 is_view: bool = False, 814 materialized: bool = False, 815 ) -> exp.Schema: 816 """ 817 Build a schema expression for a table, columns, column comments, and additional schema properties. 818 """ 819 expressions = expressions or [] 820 821 return exp.Schema( 822 this=table, 823 expressions=self._build_column_defs( 824 target_columns_to_types=target_columns_to_types, 825 column_descriptions=column_descriptions, 826 is_view=is_view, 827 materialized=materialized, 828 ) 829 + expressions, 830 ) 831 832 def _build_column_defs( 833 self, 834 target_columns_to_types: t.Dict[str, exp.DataType], 835 column_descriptions: t.Optional[t.Dict[str, str]] = None, 836 is_view: bool = False, 837 materialized: bool = False, 838 ) -> t.List[exp.ColumnDef]: 839 engine_supports_schema_comments = ( 840 self.COMMENT_CREATION_VIEW.supports_schema_def 841 if is_view 842 else self.COMMENT_CREATION_TABLE.supports_schema_def 843 ) 844 return [ 845 self._build_column_def( 846 column, 847 column_descriptions=column_descriptions, 848 engine_supports_schema_comments=engine_supports_schema_comments, 849 col_type=None if is_view else kind, # don't include column data type for views 850 ) 851 for column, kind in target_columns_to_types.items() 852 ] 853 854 def _build_column_def( 855 self, 856 col_name: str, 857 column_descriptions: t.Optional[t.Dict[str, str]] = None, 858 engine_supports_schema_comments: bool = False, 859 col_type: t.Optional[exp.DATA_TYPE] = None, 860 nested_names: t.List[str] = [], 861 ) -> exp.ColumnDef: 862 return exp.ColumnDef( 863 this=exp.to_identifier(col_name), 864 kind=col_type, 865 constraints=( 866 self._build_col_comment_exp(col_name, column_descriptions) 867 if engine_supports_schema_comments and self.comments_enabled and column_descriptions 868 else None 869 ), 870 ) 871 872 def _build_col_comment_exp( 873 self, col_name: str, column_descriptions: t.Dict[str, str] 874 ) -> t.List[exp.ColumnConstraint]: 875 comment = column_descriptions.get(col_name, None) 876 if comment: 877 return [ 878 exp.ColumnConstraint( 879 kind=exp.CommentColumnConstraint( 880 this=exp.Literal.string(self._truncate_column_comment(comment)) 881 ) 882 ) 883 ] 884 return [] 885 886 def _create_table_from_source_queries( 887 self, 888 table_name: TableName, 889 source_queries: t.List[SourceQuery], 890 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 891 exists: bool = True, 892 replace: bool = False, 893 table_description: t.Optional[str] = None, 894 column_descriptions: t.Optional[t.Dict[str, str]] = None, 895 table_kind: t.Optional[str] = None, 896 track_rows_processed: bool = True, 897 **kwargs: t.Any, 898 ) -> None: 899 table = exp.to_table(table_name) 900 901 # CTAS calls do not usually include a schema expression. However, most engines 902 # permit them in CTAS expressions, and they allow us to register all column comments 903 # in a single call rather than in a separate comment command call for each column. 904 # 905 # This block conditionally builds a schema expression with column comments if the engine 906 # supports it and we have columns_to_types. column_to_types is required because the 907 # schema expression must include at least column name, data type, and the comment - 908 # for example, `(colname INTEGER COMMENT 'comment')`. 909 # 910 # column_to_types will be available when loading from a DataFrame (by converting from 911 # pandas to SQL types), when a model is "annotated" by explicitly specifying column 912 # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()` 913 # calls and SCD Type 2 model calls. 914 schema = None 915 target_columns_to_types_known = target_columns_to_types and columns_to_types_all_known( 916 target_columns_to_types 917 ) 918 if ( 919 column_descriptions 920 and target_columns_to_types_known 921 and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas 922 and self.comments_enabled 923 ): 924 schema = self._build_schema_exp(table, target_columns_to_types, column_descriptions) # type: ignore 925 926 with self.transaction(condition=len(source_queries) > 1): 927 for i, source_query in enumerate(source_queries): 928 with source_query as query: 929 if target_columns_to_types and target_columns_to_types_known: 930 query = self._order_projections_and_filter( 931 query, target_columns_to_types, coerce_types=True 932 ) 933 if i == 0: 934 self._create_table( 935 schema if schema else table, 936 query, 937 target_columns_to_types=target_columns_to_types, 938 exists=exists, 939 replace=replace, 940 table_description=table_description, 941 table_kind=table_kind, 942 track_rows_processed=track_rows_processed, 943 **kwargs, 944 ) 945 else: 946 self._insert_append_query( 947 table_name, 948 query, 949 target_columns_to_types or self.columns(table), 950 track_rows_processed=track_rows_processed, 951 ) 952 953 # Register comments with commands if the engine supports comments and we weren't able to 954 # register them with the CTAS call's schema expression. 955 if ( 956 table_description 957 and self.COMMENT_CREATION_TABLE.is_comment_command_only 958 and self.comments_enabled 959 ): 960 self._create_table_comment(table_name, table_description) 961 if column_descriptions and schema is None and self.comments_enabled: 962 self._create_column_comments(table_name, column_descriptions) 963 964 def _create_table( 965 self, 966 table_name_or_schema: t.Union[exp.Schema, TableName], 967 expression: t.Optional[exp.Expr], 968 exists: bool = True, 969 replace: bool = False, 970 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 971 table_description: t.Optional[str] = None, 972 column_descriptions: t.Optional[t.Dict[str, str]] = None, 973 table_kind: t.Optional[str] = None, 974 track_rows_processed: bool = True, 975 **kwargs: t.Any, 976 ) -> None: 977 self.execute( 978 self._build_create_table_exp( 979 table_name_or_schema, 980 expression=expression, 981 exists=exists, 982 replace=replace, 983 target_columns_to_types=target_columns_to_types, 984 table_description=( 985 table_description 986 if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled 987 else None 988 ), 989 table_kind=table_kind, 990 **kwargs, 991 ), 992 track_rows_processed=track_rows_processed, 993 ) 994 # Extract table name to clear cache 995 table_name = ( 996 table_name_or_schema.this 997 if isinstance(table_name_or_schema, exp.Schema) 998 else table_name_or_schema 999 ) 1000 self._clear_data_object_cache(table_name) 1001 1002 def _build_create_table_exp( 1003 self, 1004 table_name_or_schema: t.Union[exp.Schema, TableName], 1005 expression: t.Optional[exp.Expr], 1006 exists: bool = True, 1007 replace: bool = False, 1008 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1009 table_description: t.Optional[str] = None, 1010 table_kind: t.Optional[str] = None, 1011 **kwargs: t.Any, 1012 ) -> exp.Create: 1013 exists = False if replace else exists 1014 catalog_name = None 1015 if not isinstance(table_name_or_schema, exp.Schema): 1016 table_name_or_schema = exp.to_table(table_name_or_schema) 1017 catalog_name = table_name_or_schema.catalog 1018 else: 1019 if isinstance(table_name_or_schema.this, exp.Table): 1020 catalog_name = table_name_or_schema.this.catalog 1021 1022 properties = ( 1023 self._build_table_properties_exp( 1024 **kwargs, 1025 catalog_name=catalog_name, 1026 target_columns_to_types=target_columns_to_types, 1027 table_description=table_description, 1028 table_kind=table_kind, 1029 ) 1030 if kwargs or table_description 1031 else None 1032 ) 1033 return exp.Create( 1034 this=table_name_or_schema, 1035 kind=table_kind or "TABLE", 1036 replace=replace, 1037 exists=exists, 1038 expression=expression, 1039 properties=properties, 1040 ) 1041 1042 def create_table_like( 1043 self, 1044 target_table_name: TableName, 1045 source_table_name: TableName, 1046 exists: bool = True, 1047 **kwargs: t.Any, 1048 ) -> None: 1049 """Create a table to store SQLMesh internal state based on the definition of another table, including any 1050 column attributes and indexes defined in the original table. 1051 1052 Args: 1053 target_table_name: The name of the table to create. Can be fully qualified or just table name. 1054 source_table_name: The name of the table to base the new table on. 1055 """ 1056 self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs) 1057 self._clear_data_object_cache(target_table_name) 1058 1059 def clone_table( 1060 self, 1061 target_table_name: TableName, 1062 source_table_name: TableName, 1063 replace: bool = False, 1064 exists: bool = True, 1065 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 1066 **kwargs: t.Any, 1067 ) -> None: 1068 """Creates a table with the target name by cloning the source table. 1069 1070 Args: 1071 target_table_name: The name of the table that should be created. 1072 source_table_name: The name of the source table that should be cloned. 1073 replace: Whether or not to replace an existing table. 1074 exists: Indicates whether to include the IF NOT EXISTS check. 1075 """ 1076 if not self.SUPPORTS_CLONING: 1077 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 1078 1079 kwargs.pop("rendered_physical_properties", None) 1080 self.execute( 1081 exp.Create( 1082 this=exp.to_table(target_table_name), 1083 kind="TABLE", 1084 replace=replace, 1085 exists=exists, 1086 clone=exp.Clone( 1087 this=exp.to_table(source_table_name), 1088 **(clone_kwargs or {}), 1089 ), 1090 **kwargs, 1091 ) 1092 ) 1093 self._clear_data_object_cache(target_table_name) 1094 1095 def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None: 1096 """Drops a data object of arbitrary type. 1097 1098 Args: 1099 data_object: The data object to drop. 1100 ignore_if_not_exists: If True, no error will be raised if the data object does not exist. 1101 """ 1102 if data_object.type.is_view: 1103 self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists) 1104 elif data_object.type.is_materialized_view: 1105 self.drop_view( 1106 data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True 1107 ) 1108 elif data_object.type.is_table: 1109 self.drop_table(data_object.to_table(), exists=ignore_if_not_exists) 1110 elif data_object.type.is_managed_table: 1111 self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists) 1112 else: 1113 raise SQLMeshError( 1114 f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" 1115 ) 1116 1117 def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: 1118 """Drops a table. 1119 1120 Args: 1121 table_name: The name of the table to drop. 1122 exists: If exists, defaults to True. 1123 """ 1124 self._drop_object(name=table_name, exists=exists, **kwargs) 1125 1126 def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: 1127 """Drops a managed table. 1128 1129 Args: 1130 table_name: The name of the table to drop. 1131 exists: If exists, defaults to True. 1132 """ 1133 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 1134 1135 def _drop_object( 1136 self, 1137 name: TableName | SchemaName, 1138 exists: bool = True, 1139 kind: str = "TABLE", 1140 cascade: bool = False, 1141 **drop_args: t.Any, 1142 ) -> None: 1143 """Drops an object. 1144 1145 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 1146 1147 Args: 1148 name: The name of the table to drop. 1149 exists: If exists, defaults to True. 1150 kind: What kind of object to drop. Defaults to TABLE 1151 cascade: Whether or not to DROP ... CASCADE. 1152 Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS 1153 **drop_args: Any extra arguments to set on the Drop expression 1154 """ 1155 if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS: 1156 drop_args["cascade"] = cascade 1157 1158 self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args)) 1159 self._clear_data_object_cache(name) 1160 1161 def get_alter_operations( 1162 self, 1163 current_table_name: TableName, 1164 target_table_name: TableName, 1165 *, 1166 ignore_destructive: bool = False, 1167 ignore_additive: bool = False, 1168 ) -> t.List[TableAlterOperation]: 1169 """ 1170 Determines the alter statements needed to change the current table into the structure of the target table. 1171 """ 1172 return t.cast( 1173 t.List[TableAlterOperation], 1174 self.schema_differ.compare_columns( 1175 current_table_name, 1176 self.columns(current_table_name), 1177 self.columns(target_table_name), 1178 ignore_destructive=ignore_destructive, 1179 ignore_additive=ignore_additive, 1180 ), 1181 ) 1182 1183 def alter_table( 1184 self, 1185 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 1186 ) -> None: 1187 """ 1188 Performs the alter statements to change the current table into the structure of the target table. 1189 """ 1190 with self.transaction(): 1191 for alter_expression in [ 1192 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 1193 ]: 1194 self.execute(alter_expression) 1195 1196 def create_view( 1197 self, 1198 view_name: TableName, 1199 query_or_df: QueryOrDF, 1200 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1201 replace: bool = True, 1202 materialized: bool = False, 1203 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 1204 table_description: t.Optional[str] = None, 1205 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1206 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 1207 source_columns: t.Optional[t.List[str]] = None, 1208 **create_kwargs: t.Any, 1209 ) -> None: 1210 """Create a view with a query or dataframe. 1211 1212 If a dataframe is passed in, it will be converted into a literal values statement. 1213 This should only be done if the dataframe is very small! 1214 1215 Args: 1216 view_name: The view name. 1217 query_or_df: A query or dataframe. 1218 target_columns_to_types: Columns to use in the view statement. 1219 replace: Whether or not to replace an existing view defaults to True. 1220 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 1221 materialized_properties: Optional materialized view properties to add to the view. 1222 table_description: Optional table description from MODEL DDL. 1223 column_descriptions: Optional column descriptions from model query. 1224 view_properties: Optional view properties to add to the view. 1225 create_kwargs: Additional kwargs to pass into the Create expression 1226 """ 1227 import pandas as pd 1228 1229 if materialized_properties and not materialized: 1230 raise SQLMeshError("Materialized properties are only supported for materialized views") 1231 1232 query_or_df = self._native_df_to_pandas_df(query_or_df) 1233 1234 if isinstance(query_or_df, pd.DataFrame): 1235 values: t.List[t.Tuple[t.Any, ...]] = list( 1236 query_or_df.itertuples(index=False, name=None) 1237 ) 1238 target_columns_to_types, source_columns = self._columns_to_types( 1239 query_or_df, target_columns_to_types, source_columns 1240 ) 1241 if not target_columns_to_types: 1242 raise SQLMeshError("columns_to_types must be provided for dataframes") 1243 source_columns_to_types = get_source_columns_to_types( 1244 target_columns_to_types, source_columns 1245 ) 1246 query_or_df = self._values_to_sql( 1247 values, 1248 source_columns_to_types, 1249 batch_start=0, 1250 batch_end=len(values), 1251 ) 1252 1253 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1254 query_or_df, 1255 target_columns_to_types, 1256 batch_size=0, 1257 target_table=view_name, 1258 source_columns=source_columns, 1259 ) 1260 if len(source_queries) != 1: 1261 raise SQLMeshError("Only one source query is supported for creating views") 1262 1263 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 1264 if target_columns_to_types: 1265 schema = self._build_schema_exp( 1266 exp.to_table(view_name), 1267 target_columns_to_types, 1268 column_descriptions, 1269 is_view=True, 1270 materialized=materialized, 1271 ) 1272 1273 properties = create_kwargs.pop("properties", None) 1274 if not properties: 1275 properties = exp.Properties(expressions=[]) 1276 1277 if view_properties: 1278 table_type = self._pop_creatable_type_from_properties(view_properties) 1279 if table_type: 1280 properties.append("expressions", table_type) 1281 1282 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 1283 properties.append("expressions", exp.MaterializedProperty()) 1284 1285 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1286 schema = schema.this 1287 1288 if not self.SUPPORTS_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1289 schema = schema.this 1290 1291 if materialized_properties: 1292 partitioned_by = materialized_properties.pop("partitioned_by", None) 1293 clustered_by = materialized_properties.pop("clustered_by", None) 1294 if ( 1295 partitioned_by 1296 and ( 1297 partitioned_by_prop := self._build_partitioned_by_exp( 1298 partitioned_by, **materialized_properties 1299 ) 1300 ) 1301 is not None 1302 ): 1303 materialized_properties["catalog_name"] = exp.to_table(view_name).catalog 1304 properties.append("expressions", partitioned_by_prop) 1305 if ( 1306 clustered_by 1307 and ( 1308 clustered_by_prop := self._build_clustered_by_exp( 1309 clustered_by, **materialized_properties 1310 ) 1311 ) 1312 is not None 1313 ): 1314 properties.append("expressions", clustered_by_prop) 1315 1316 create_view_properties = self._build_view_properties_exp( 1317 view_properties, 1318 ( 1319 table_description 1320 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 1321 else None 1322 ), 1323 physical_cluster=create_kwargs.pop("physical_cluster", None), 1324 ) 1325 if create_view_properties: 1326 for view_property in create_view_properties.expressions: 1327 # Small hack to make sure SECURE goes at the beginning before materialized as required by Snowflake 1328 if isinstance(view_property, exp.SecureProperty): 1329 properties.set("expressions", view_property, index=0, overwrite=False) 1330 else: 1331 properties.append("expressions", view_property) 1332 1333 if properties.expressions: 1334 create_kwargs["properties"] = properties 1335 1336 if replace: 1337 self.drop_data_object_on_type_mismatch( 1338 self.get_data_object(view_name), 1339 DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, 1340 ) 1341 1342 with source_queries[0] as query: 1343 self.execute( 1344 exp.Create( 1345 this=schema, 1346 kind="VIEW", 1347 replace=replace, 1348 expression=query, 1349 **create_kwargs, 1350 ), 1351 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 1352 ) 1353 1354 self._clear_data_object_cache(view_name) 1355 1356 # Register table comment with commands if the engine doesn't support doing it in CREATE 1357 if ( 1358 table_description 1359 and self.COMMENT_CREATION_VIEW.is_comment_command_only 1360 and self.comments_enabled 1361 ): 1362 self._create_table_comment(view_name, table_description, "VIEW") 1363 # Register column comments with commands if the engine doesn't support doing it in 1364 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 1365 # columns_to_types 1366 if ( 1367 column_descriptions 1368 and ( 1369 self.COMMENT_CREATION_VIEW.is_comment_command_only 1370 or ( 1371 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 1372 and not target_columns_to_types 1373 ) 1374 ) 1375 and self.comments_enabled 1376 ): 1377 self._create_column_comments(view_name, column_descriptions, "VIEW", materialized) 1378 1379 @set_catalog() 1380 def create_schema( 1381 self, 1382 schema_name: SchemaName, 1383 ignore_if_exists: bool = True, 1384 warn_on_error: bool = True, 1385 properties: t.Optional[t.List[exp.Expr]] = None, 1386 ) -> None: 1387 properties = properties or [] 1388 return self._create_schema( 1389 schema_name=schema_name, 1390 ignore_if_exists=ignore_if_exists, 1391 warn_on_error=warn_on_error, 1392 properties=properties, 1393 kind="SCHEMA", 1394 ) 1395 1396 def _create_schema( 1397 self, 1398 schema_name: SchemaName, 1399 ignore_if_exists: bool, 1400 warn_on_error: bool, 1401 properties: t.List[exp.Expr], 1402 kind: str, 1403 ) -> None: 1404 """Create a schema from a name or qualified table name.""" 1405 try: 1406 self.execute( 1407 exp.Create( 1408 this=to_schema(schema_name), 1409 kind=kind, 1410 exists=ignore_if_exists, 1411 properties=exp.Properties( # this renders as '' (empty string) if expressions is empty 1412 expressions=properties 1413 ), 1414 ) 1415 ) 1416 except Exception as e: 1417 if not warn_on_error: 1418 raise 1419 logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e) 1420 1421 def drop_schema( 1422 self, 1423 schema_name: SchemaName, 1424 ignore_if_not_exists: bool = True, 1425 cascade: bool = False, 1426 **drop_args: t.Dict[str, exp.Expr], 1427 ) -> None: 1428 return self._drop_object( 1429 name=schema_name, 1430 exists=ignore_if_not_exists, 1431 kind="SCHEMA", 1432 cascade=cascade, 1433 **drop_args, 1434 ) 1435 1436 def drop_view( 1437 self, 1438 view_name: TableName, 1439 ignore_if_not_exists: bool = True, 1440 materialized: bool = False, 1441 **kwargs: t.Any, 1442 ) -> None: 1443 """Drop a view.""" 1444 self._drop_object( 1445 name=view_name, 1446 exists=ignore_if_not_exists, 1447 kind="VIEW", 1448 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 1449 **kwargs, 1450 ) 1451 1452 def create_catalog(self, catalog_name: str | exp.Identifier) -> None: 1453 return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1454 1455 def _create_catalog(self, catalog_name: exp.Identifier) -> None: 1456 raise SQLMeshError( 1457 f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1458 ) 1459 1460 def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: 1461 return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1462 1463 def _drop_catalog(self, catalog_name: exp.Identifier) -> None: 1464 raise SQLMeshError( 1465 f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1466 ) 1467 1468 def columns( 1469 self, table_name: TableName, include_pseudo_columns: bool = False 1470 ) -> t.Dict[str, exp.DataType]: 1471 """Fetches column names and types for the target table.""" 1472 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 1473 describe_output = self.cursor.fetchall() 1474 return { 1475 # Note: MySQL returns the column type as bytes. 1476 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 1477 for column_name, column_type, *_ in itertools.takewhile( 1478 lambda t: not t[0].startswith("#"), 1479 describe_output, 1480 ) 1481 if column_name and column_name.strip() and column_type and column_type.strip() 1482 } 1483 1484 def table_exists(self, table_name: TableName) -> bool: 1485 table = exp.to_table(table_name) 1486 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 1487 if data_object_cache_key in self._data_object_cache: 1488 logger.debug("Table existence cache hit: %s", data_object_cache_key) 1489 return self._data_object_cache[data_object_cache_key] is not None 1490 1491 try: 1492 self.execute(exp.Describe(this=table, kind="TABLE")) 1493 return True 1494 except Exception: 1495 return False 1496 1497 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None: 1498 self.execute(exp.delete(table_name, where)) 1499 1500 def insert_append( 1501 self, 1502 table_name: TableName, 1503 query_or_df: QueryOrDF, 1504 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1505 track_rows_processed: bool = True, 1506 source_columns: t.Optional[t.List[str]] = None, 1507 ) -> None: 1508 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1509 query_or_df, 1510 target_columns_to_types, 1511 target_table=table_name, 1512 source_columns=source_columns, 1513 ) 1514 self._insert_append_source_queries( 1515 table_name, source_queries, target_columns_to_types, track_rows_processed 1516 ) 1517 1518 def _insert_append_source_queries( 1519 self, 1520 table_name: TableName, 1521 source_queries: t.List[SourceQuery], 1522 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1523 track_rows_processed: bool = True, 1524 ) -> None: 1525 with self.transaction(condition=len(source_queries) > 0): 1526 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1527 for source_query in source_queries: 1528 with source_query as query: 1529 self._insert_append_query( 1530 table_name, 1531 query, 1532 target_columns_to_types, 1533 track_rows_processed=track_rows_processed, 1534 ) 1535 1536 def _insert_append_query( 1537 self, 1538 table_name: TableName, 1539 query: Query, 1540 target_columns_to_types: t.Dict[str, exp.DataType], 1541 order_projections: bool = True, 1542 track_rows_processed: bool = True, 1543 ) -> None: 1544 if order_projections: 1545 query = self._order_projections_and_filter(query, target_columns_to_types) 1546 self.execute( 1547 exp.insert(query, table_name, columns=list(target_columns_to_types)), 1548 track_rows_processed=track_rows_processed, 1549 ) 1550 1551 def insert_overwrite_by_partition( 1552 self, 1553 table_name: TableName, 1554 query_or_df: QueryOrDF, 1555 partitioned_by: t.List[exp.Expr], 1556 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1557 source_columns: t.Optional[t.List[str]] = None, 1558 ) -> None: 1559 if self.INSERT_OVERWRITE_STRATEGY.is_insert_overwrite: 1560 target_table = exp.to_table(table_name) 1561 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1562 query_or_df, 1563 target_columns_to_types, 1564 target_table=target_table, 1565 source_columns=source_columns, 1566 ) 1567 self._insert_overwrite_by_condition( 1568 table_name, source_queries, target_columns_to_types=target_columns_to_types 1569 ) 1570 else: 1571 self._replace_by_key( 1572 table_name, 1573 query_or_df, 1574 target_columns_to_types, 1575 partitioned_by, 1576 is_unique_key=False, 1577 source_columns=source_columns, 1578 ) 1579 1580 def insert_overwrite_by_time_partition( 1581 self, 1582 table_name: TableName, 1583 query_or_df: QueryOrDF, 1584 start: TimeLike, 1585 end: TimeLike, 1586 time_formatter: t.Callable[[TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expr], 1587 time_column: TimeColumn | exp.Expr | str, 1588 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1589 source_columns: t.Optional[t.List[str]] = None, 1590 **kwargs: t.Any, 1591 ) -> None: 1592 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1593 query_or_df, 1594 target_columns_to_types, 1595 target_table=table_name, 1596 source_columns=source_columns, 1597 ) 1598 if not target_columns_to_types or not columns_to_types_all_known(target_columns_to_types): 1599 target_columns_to_types = self.columns(table_name) 1600 low, high = [ 1601 time_formatter(dt, target_columns_to_types) 1602 for dt in make_inclusive(start, end, self.dialect) 1603 ] 1604 if isinstance(time_column, TimeColumn): 1605 time_column = time_column.column 1606 where = exp.Between( 1607 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1608 low=low, 1609 high=high, 1610 ) 1611 return self._insert_overwrite_by_time_partition( 1612 table_name, source_queries, target_columns_to_types, where, **kwargs 1613 ) 1614 1615 def _insert_overwrite_by_time_partition( 1616 self, 1617 table_name: TableName, 1618 source_queries: t.List[SourceQuery], 1619 target_columns_to_types: t.Dict[str, exp.DataType], 1620 where: exp.Condition, 1621 **kwargs: t.Any, 1622 ) -> None: 1623 return self._insert_overwrite_by_condition( 1624 table_name, source_queries, target_columns_to_types, where, **kwargs 1625 ) 1626 1627 def _values_to_sql( 1628 self, 1629 values: t.List[t.Tuple[t.Any, ...]], 1630 target_columns_to_types: t.Dict[str, exp.DataType], 1631 batch_start: int, 1632 batch_end: int, 1633 alias: str = "t", 1634 source_columns: t.Optional[t.List[str]] = None, 1635 ) -> Query: 1636 return select_from_values_for_batch_range( 1637 values=values, 1638 target_columns_to_types=target_columns_to_types, 1639 batch_start=batch_start, 1640 batch_end=batch_end, 1641 alias=alias, 1642 source_columns=source_columns, 1643 ) 1644 1645 def _insert_overwrite_by_condition( 1646 self, 1647 table_name: TableName, 1648 source_queries: t.List[SourceQuery], 1649 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1650 where: t.Optional[exp.Condition] = None, 1651 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 1652 **kwargs: t.Any, 1653 ) -> None: 1654 table = exp.to_table(table_name) 1655 insert_overwrite_strategy = ( 1656 insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY 1657 ) 1658 with self.transaction( 1659 condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert 1660 ): 1661 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1662 for i, source_query in enumerate(source_queries): 1663 with source_query as query: 1664 query = self._order_projections_and_filter( 1665 query, target_columns_to_types, where=where 1666 ) 1667 if i > 0 or insert_overwrite_strategy.is_delete_insert: 1668 if i == 0: 1669 self.delete_from(table_name, where=where or exp.true()) 1670 self._insert_append_query( 1671 table_name, 1672 query, 1673 target_columns_to_types=target_columns_to_types, 1674 order_projections=False, 1675 ) 1676 elif insert_overwrite_strategy.is_merge: 1677 columns = [exp.column(col) for col in target_columns_to_types] 1678 when_not_matched_by_source = exp.When( 1679 matched=False, 1680 source=True, 1681 condition=where, 1682 then=exp.Delete(), 1683 ) 1684 when_not_matched_by_target = exp.When( 1685 matched=False, 1686 source=False, 1687 then=exp.Insert( 1688 this=exp.Tuple(expressions=columns), 1689 expression=exp.Tuple(expressions=columns), 1690 ), 1691 ) 1692 self._merge( 1693 target_table=table_name, 1694 query=query, 1695 on=exp.false(), 1696 whens=exp.Whens( 1697 expressions=[when_not_matched_by_source, when_not_matched_by_target] 1698 ), 1699 ) 1700 else: 1701 insert_exp = exp.insert( 1702 query, 1703 table, 1704 columns=( 1705 list(target_columns_to_types) 1706 if not insert_overwrite_strategy.is_replace_where 1707 else None 1708 ), 1709 overwrite=insert_overwrite_strategy.is_insert_overwrite, 1710 ) 1711 if insert_overwrite_strategy.is_replace_where: 1712 insert_exp.set("where", where or exp.true()) 1713 self.execute(insert_exp, track_rows_processed=True) 1714 1715 def update_table( 1716 self, 1717 table_name: TableName, 1718 properties: t.Dict[str, t.Any], 1719 where: t.Optional[str | exp.Condition] = None, 1720 ) -> None: 1721 self.execute(exp.update(table_name, properties, where=where)) 1722 1723 def _merge( 1724 self, 1725 target_table: TableName, 1726 query: Query, 1727 on: exp.Expr, 1728 whens: exp.Whens, 1729 ) -> None: 1730 this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True) 1731 using = exp.alias_( 1732 exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True 1733 ) 1734 self.execute( 1735 exp.Merge(this=this, using=using, on=on, whens=whens), track_rows_processed=True 1736 ) 1737 1738 def scd_type_2_by_time( 1739 self, 1740 target_table: TableName, 1741 source_table: QueryOrDF, 1742 unique_key: t.Sequence[exp.Expr], 1743 valid_from_col: exp.Column, 1744 valid_to_col: exp.Column, 1745 execution_time: t.Union[TimeLike, exp.Column], 1746 updated_at_col: exp.Column, 1747 invalidate_hard_deletes: bool = True, 1748 updated_at_as_valid_from: bool = False, 1749 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1750 table_description: t.Optional[str] = None, 1751 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1752 truncate: bool = False, 1753 source_columns: t.Optional[t.List[str]] = None, 1754 **kwargs: t.Any, 1755 ) -> None: 1756 self._scd_type_2( 1757 target_table=target_table, 1758 source_table=source_table, 1759 unique_key=unique_key, 1760 valid_from_col=valid_from_col, 1761 valid_to_col=valid_to_col, 1762 execution_time=execution_time, 1763 updated_at_col=updated_at_col, 1764 invalidate_hard_deletes=invalidate_hard_deletes, 1765 updated_at_as_valid_from=updated_at_as_valid_from, 1766 target_columns_to_types=target_columns_to_types, 1767 table_description=table_description, 1768 column_descriptions=column_descriptions, 1769 truncate=truncate, 1770 source_columns=source_columns, 1771 **kwargs, 1772 ) 1773 1774 def scd_type_2_by_column( 1775 self, 1776 target_table: TableName, 1777 source_table: QueryOrDF, 1778 unique_key: t.Sequence[exp.Expr], 1779 valid_from_col: exp.Column, 1780 valid_to_col: exp.Column, 1781 execution_time: t.Union[TimeLike, exp.Column], 1782 check_columns: t.Union[exp.Star, t.Sequence[exp.Expr]], 1783 invalidate_hard_deletes: bool = True, 1784 execution_time_as_valid_from: bool = False, 1785 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1786 table_description: t.Optional[str] = None, 1787 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1788 truncate: bool = False, 1789 source_columns: t.Optional[t.List[str]] = None, 1790 **kwargs: t.Any, 1791 ) -> None: 1792 self._scd_type_2( 1793 target_table=target_table, 1794 source_table=source_table, 1795 unique_key=unique_key, 1796 valid_from_col=valid_from_col, 1797 valid_to_col=valid_to_col, 1798 execution_time=execution_time, 1799 check_columns=check_columns, 1800 target_columns_to_types=target_columns_to_types, 1801 invalidate_hard_deletes=invalidate_hard_deletes, 1802 execution_time_as_valid_from=execution_time_as_valid_from, 1803 table_description=table_description, 1804 column_descriptions=column_descriptions, 1805 truncate=truncate, 1806 source_columns=source_columns, 1807 **kwargs, 1808 ) 1809 1810 def _scd_type_2( 1811 self, 1812 target_table: TableName, 1813 source_table: QueryOrDF, 1814 unique_key: t.Sequence[exp.Expr], 1815 valid_from_col: exp.Column, 1816 valid_to_col: exp.Column, 1817 execution_time: t.Union[TimeLike, exp.Column], 1818 invalidate_hard_deletes: bool = True, 1819 updated_at_col: t.Optional[exp.Column] = None, 1820 check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Expr]]] = None, 1821 updated_at_as_valid_from: bool = False, 1822 execution_time_as_valid_from: bool = False, 1823 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1824 table_description: t.Optional[str] = None, 1825 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1826 truncate: bool = False, 1827 source_columns: t.Optional[t.List[str]] = None, 1828 **kwargs: t.Any, 1829 ) -> None: 1830 def remove_managed_columns( 1831 cols_to_types: t.Dict[str, exp.DataType], 1832 ) -> t.Dict[str, exp.DataType]: 1833 return { 1834 k: v for k, v in cols_to_types.items() if k not in {valid_from_name, valid_to_name} 1835 } 1836 1837 valid_from_name = valid_from_col.name 1838 valid_to_name = valid_to_col.name 1839 target_columns_to_types = target_columns_to_types or self.columns(target_table) 1840 if ( 1841 valid_from_name not in target_columns_to_types 1842 or valid_to_name not in target_columns_to_types 1843 or not columns_to_types_all_known(target_columns_to_types) 1844 ): 1845 target_columns_to_types = self.columns(target_table) 1846 unmanaged_columns_to_types = ( 1847 remove_managed_columns(target_columns_to_types) if target_columns_to_types else None 1848 ) 1849 source_queries, unmanaged_columns_to_types = self._get_source_queries_and_columns_to_types( 1850 source_table, 1851 unmanaged_columns_to_types, 1852 target_table=target_table, 1853 batch_size=0, 1854 source_columns=source_columns, 1855 ) 1856 updated_at_name = updated_at_col.name if updated_at_col else None 1857 if not target_columns_to_types: 1858 raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?") 1859 unmanaged_columns_to_types = unmanaged_columns_to_types or remove_managed_columns( 1860 target_columns_to_types 1861 ) 1862 if not unique_key: 1863 raise SQLMeshError("unique_key must be provided for SCD Type 2") 1864 if check_columns and updated_at_col: 1865 raise SQLMeshError( 1866 "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2" 1867 ) 1868 if check_columns and updated_at_as_valid_from: 1869 raise SQLMeshError( 1870 "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2" 1871 ) 1872 if execution_time_as_valid_from and not check_columns: 1873 raise SQLMeshError( 1874 "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2" 1875 ) 1876 if updated_at_name and updated_at_name not in target_columns_to_types: 1877 raise SQLMeshError( 1878 f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2" 1879 ) 1880 time_data_type = target_columns_to_types[valid_from_name] 1881 select_source_columns: t.List[t.Union[str, exp.Alias]] = [ 1882 col for col in unmanaged_columns_to_types if col != updated_at_name 1883 ] 1884 table_columns = [exp.column(c, quoted=True) for c in target_columns_to_types] 1885 if updated_at_name: 1886 select_source_columns.append( 1887 exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this) # type: ignore 1888 ) 1889 1890 # If a star is provided, we include all unmanaged columns in the check. 1891 # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know 1892 # they are equal or not, the extra check is not a problem and we gain simplified logic here. 1893 # If we want to change this, then we just need to check the expressions in unique_key and pull out the 1894 # column names and then remove them from the unmanaged_columns 1895 if check_columns: 1896 # Handle both Star directly and [Star()] (which can happen during serialization/deserialization) 1897 if isinstance(seq_get(ensure_list(check_columns), 0), exp.Star): 1898 check_columns = [exp.column(col) for col in unmanaged_columns_to_types] 1899 execution_ts = ( 1900 exp.cast(execution_time, time_data_type, dialect=self.dialect) 1901 if isinstance(execution_time, exp.Column) 1902 else to_time_column(execution_time, time_data_type, self.dialect, nullable=True) 1903 ) 1904 if updated_at_as_valid_from: 1905 if not updated_at_col: 1906 raise SQLMeshError( 1907 "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2" 1908 ) 1909 update_valid_from_start: t.Union[str, exp.Expr] = updated_at_col 1910 # If using check_columns and the user doesn't always want execution_time for valid from 1911 # then we only use epoch 0 if we are truncating the table and loading rows for the first time. 1912 # All future new rows should have execution time. 1913 elif check_columns and (execution_time_as_valid_from or not truncate): 1914 update_valid_from_start = execution_ts 1915 else: 1916 update_valid_from_start = to_time_column( 1917 "1970-01-01 00:00:00+00:00", time_data_type, self.dialect, nullable=True 1918 ) 1919 insert_valid_from_start = execution_ts if check_columns else updated_at_col # type: ignore 1920 # joined._exists IS NULL is saying "if the row is deleted" 1921 delete_check = ( 1922 exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None 1923 ) 1924 prefixed_valid_to_col = valid_to_col.copy() 1925 prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}") 1926 prefixed_valid_from_col = valid_from_col.copy() 1927 prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}") 1928 if check_columns: 1929 row_check_conditions = [] 1930 for col in check_columns: 1931 col_qualified = col.copy() 1932 col_qualified.set("table", exp.to_identifier("joined")) 1933 1934 t_col = col_qualified.copy() 1935 for column in t_col.find_all(exp.Column): 1936 column.this.set("this", f"t_{column.name}") 1937 1938 row_check_conditions.extend( 1939 [ 1940 col_qualified.neq(t_col), 1941 exp.and_(t_col.is_(exp.Null()), col_qualified.is_(exp.Null()).not_()), 1942 exp.and_(t_col.is_(exp.Null()).not_(), col_qualified.is_(exp.Null())), 1943 ] 1944 ) 1945 row_value_check = exp.or_(*row_check_conditions) 1946 unique_key_conditions = [] 1947 for key in unique_key: 1948 key_qualified = key.copy() 1949 key_qualified.set("table", exp.to_identifier("joined")) 1950 t_key = key_qualified.copy() 1951 for col in t_key.find_all(exp.Column): 1952 col.this.set("this", f"t_{col.name}") 1953 unique_key_conditions.extend( 1954 [t_key.is_(exp.Null()).not_(), key_qualified.is_(exp.Null()).not_()] 1955 ) 1956 unique_key_check = exp.and_(*unique_key_conditions) 1957 # unique_key_check is saying "if the row is updated" 1958 # row_value_check is saying "if the row has changed" 1959 updated_row_filter = exp.and_(unique_key_check, row_value_check) 1960 valid_to_case_stmt = ( 1961 exp.Case() 1962 .when( 1963 exp.and_( 1964 exp.or_( 1965 delete_check, 1966 updated_row_filter, 1967 ) 1968 ), 1969 execution_ts, 1970 ) 1971 .else_(prefixed_valid_to_col) 1972 .as_(valid_to_col.this) 1973 ) 1974 valid_from_case_stmt = exp.func( 1975 "COALESCE", 1976 prefixed_valid_from_col, 1977 update_valid_from_start, 1978 ).as_(valid_from_col.this) 1979 else: 1980 assert updated_at_col is not None 1981 updated_at_col_qualified = updated_at_col.copy() 1982 updated_at_col_qualified.set("table", exp.to_identifier("joined")) 1983 prefixed_updated_at_col = updated_at_col_qualified.copy() 1984 prefixed_updated_at_col.this.set("this", f"t_{updated_at_col_qualified.name}") 1985 updated_row_filter = updated_at_col_qualified > prefixed_updated_at_col 1986 1987 valid_to_case_stmt_builder = exp.Case().when( 1988 updated_row_filter, updated_at_col_qualified 1989 ) 1990 if delete_check: 1991 valid_to_case_stmt_builder = valid_to_case_stmt_builder.when( 1992 delete_check, execution_ts 1993 ) 1994 valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_( 1995 valid_to_col.this 1996 ) 1997 1998 valid_from_case_stmt = ( 1999 exp.Case() 2000 .when( 2001 exp.and_( 2002 prefixed_valid_from_col.is_(exp.Null()), 2003 exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(), 2004 ), 2005 exp.Case() 2006 .when( 2007 exp.column(valid_to_col.this, "latest_deleted") > updated_at_col, 2008 exp.column(valid_to_col.this, "latest_deleted"), 2009 ) 2010 .else_(updated_at_col), 2011 ) 2012 .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start) 2013 .else_(prefixed_valid_from_col) 2014 ).as_(valid_from_col.this) 2015 2016 existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_( 2017 target_table 2018 ) 2019 if truncate: 2020 existing_rows_query = existing_rows_query.limit(0) 2021 2022 with source_queries[0] as source_query: 2023 prefixed_columns_to_types = [] 2024 for column in target_columns_to_types: 2025 prefixed_col = exp.column(column).copy() 2026 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2027 prefixed_columns_to_types.append(prefixed_col) 2028 prefixed_unmanaged_columns = [] 2029 for column in unmanaged_columns_to_types: 2030 prefixed_col = exp.column(column).copy() 2031 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2032 prefixed_unmanaged_columns.append(prefixed_col) 2033 query = ( 2034 exp.Select() # type: ignore 2035 .select(*table_columns) 2036 .from_("static") 2037 .union( 2038 exp.select(*table_columns).from_("updated_rows"), 2039 distinct=False, 2040 ) 2041 .union( 2042 exp.select(*table_columns).from_("inserted_rows"), 2043 distinct=False, 2044 ) 2045 .with_( 2046 "source", 2047 exp.select(exp.true().as_("_exists"), *select_source_columns) 2048 .distinct(*unique_key) 2049 .from_( 2050 self.use_server_nulls_for_unmatched_after_join(source_query).subquery( # type: ignore 2051 "raw_source" 2052 ) 2053 ), 2054 ) 2055 # Historical Records that Do Not Change 2056 .with_( 2057 "static", 2058 existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), 2059 ) 2060 # Latest Records that can be updated 2061 .with_( 2062 "latest", 2063 existing_rows_query.where(valid_to_col.is_(exp.Null())), 2064 ) 2065 # Deleted records which can be used to determine `valid_from` for undeleted source records 2066 .with_( 2067 "deleted", 2068 exp.select(*[exp.column(col, "static") for col in target_columns_to_types]) 2069 .from_("static") 2070 .join( 2071 "latest", 2072 on=exp.and_( 2073 *[ 2074 add_table(key, "static").eq(add_table(key, "latest")) 2075 for key in unique_key 2076 ] 2077 ), 2078 join_type="left", 2079 ) 2080 .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())), 2081 ) 2082 # Get the latest `valid_to` deleted record for each unique key 2083 .with_( 2084 "latest_deleted", 2085 exp.select( 2086 exp.true().as_("_exists"), 2087 *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)), 2088 exp.Max(this=valid_to_col).as_(valid_to_col.this), 2089 ) 2090 .from_("deleted") 2091 .group_by(*unique_key), 2092 ) 2093 # Do a full join between latest records and source table in order to combine them together 2094 # MySQL doesn't support full join so going to do a left then right join and remove dups with union 2095 # We do a left/right and filter right on only matching to remove the need to do union distinct 2096 # which allows scd type 2 to be compatible with unhashable data types 2097 .with_( 2098 "joined", 2099 exp.select( 2100 exp.column("_exists", table="source").as_("_exists"), 2101 *( 2102 exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this) 2103 for i, col in enumerate(target_columns_to_types) 2104 ), 2105 *( 2106 exp.column(col, table="source").as_(col) 2107 for col in unmanaged_columns_to_types 2108 ), 2109 ) 2110 .from_("latest") 2111 .join( 2112 "source", 2113 on=exp.and_( 2114 *[ 2115 add_table(key, "latest").eq(add_table(key, "source")) 2116 for key in unique_key 2117 ] 2118 ), 2119 join_type="left", 2120 ) 2121 .union( 2122 exp.select( 2123 exp.column("_exists", table="source").as_("_exists"), 2124 *( 2125 exp.column(col, table="latest").as_( 2126 prefixed_columns_to_types[i].this 2127 ) 2128 for i, col in enumerate(target_columns_to_types) 2129 ), 2130 *( 2131 exp.column(col, table="source").as_(col) 2132 for col in unmanaged_columns_to_types 2133 ), 2134 ) 2135 .from_("latest") 2136 .join( 2137 "source", 2138 on=exp.and_( 2139 *[ 2140 add_table(key, "latest").eq(add_table(key, "source")) 2141 for key in unique_key 2142 ] 2143 ), 2144 join_type="right", 2145 ) 2146 .where(exp.column("_exists", table="latest").is_(exp.Null())), 2147 distinct=False, 2148 ), 2149 ) 2150 # Get deleted, new, no longer current, or unchanged records 2151 .with_( 2152 "updated_rows", 2153 exp.select( 2154 *( 2155 exp.func( 2156 "COALESCE", 2157 exp.column(prefixed_unmanaged_columns[i].this, table="joined"), 2158 exp.column(col, table="joined"), 2159 ).as_(col) 2160 for i, col in enumerate(unmanaged_columns_to_types) 2161 ), 2162 valid_from_case_stmt, 2163 valid_to_case_stmt, 2164 ) 2165 .from_("joined") 2166 .join( 2167 "latest_deleted", 2168 on=exp.and_( 2169 *[ 2170 add_table(part, "joined").eq( 2171 exp.column(f"_key{i}", "latest_deleted") 2172 ) 2173 for i, part in enumerate(unique_key) 2174 ] 2175 ), 2176 join_type="left", 2177 ), 2178 ) 2179 # Get records that have been "updated" which means inserting a new record with previous `valid_from` 2180 .with_( 2181 "inserted_rows", 2182 exp.select( 2183 *unmanaged_columns_to_types, 2184 insert_valid_from_start.as_(valid_from_col.this), # type: ignore 2185 to_time_column(exp.null(), time_data_type, self.dialect, nullable=True).as_( 2186 valid_to_col.this 2187 ), 2188 ) 2189 .from_("joined") 2190 .where(updated_row_filter), 2191 ) 2192 ) 2193 2194 self.replace_query( 2195 target_table, 2196 self.ensure_nulls_for_unmatched_after_join(query), 2197 target_columns_to_types=target_columns_to_types, 2198 table_description=table_description, 2199 column_descriptions=column_descriptions, 2200 **kwargs, 2201 ) 2202 2203 def merge( 2204 self, 2205 target_table: TableName, 2206 source_table: QueryOrDF, 2207 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2208 unique_key: t.Sequence[exp.Expr], 2209 when_matched: t.Optional[exp.Whens] = None, 2210 merge_filter: t.Optional[exp.Expr] = None, 2211 source_columns: t.Optional[t.List[str]] = None, 2212 **kwargs: t.Any, 2213 ) -> None: 2214 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2215 source_table, 2216 target_columns_to_types, 2217 target_table=target_table, 2218 source_columns=source_columns, 2219 ) 2220 target_columns_to_types = target_columns_to_types or self.columns(target_table) 2221 on = exp.and_( 2222 *( 2223 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 2224 for part in unique_key 2225 ) 2226 ) 2227 if merge_filter: 2228 on = exp.and_(merge_filter, on) 2229 2230 if not when_matched: 2231 match_expressions = [ 2232 exp.When( 2233 matched=True, 2234 source=False, 2235 then=exp.Update( 2236 expressions=[ 2237 exp.column(col, MERGE_TARGET_ALIAS).eq( 2238 exp.column(col, MERGE_SOURCE_ALIAS) 2239 ) 2240 for col in target_columns_to_types 2241 ], 2242 ), 2243 ) 2244 ] 2245 else: 2246 match_expressions = when_matched.copy().expressions 2247 2248 match_expressions.append( 2249 exp.When( 2250 matched=False, 2251 source=False, 2252 then=exp.Insert( 2253 this=exp.Tuple( 2254 expressions=[exp.column(col) for col in target_columns_to_types] 2255 ), 2256 expression=exp.Tuple( 2257 expressions=[ 2258 exp.column(col, MERGE_SOURCE_ALIAS) for col in target_columns_to_types 2259 ] 2260 ), 2261 ), 2262 ) 2263 ) 2264 for source_query in source_queries: 2265 with source_query as query: 2266 self._merge( 2267 target_table=target_table, 2268 query=query, 2269 on=on, 2270 whens=exp.Whens(expressions=match_expressions), 2271 ) 2272 2273 def rename_table( 2274 self, 2275 old_table_name: TableName, 2276 new_table_name: TableName, 2277 ) -> None: 2278 new_table = exp.to_table(new_table_name) 2279 if new_table.catalog: 2280 old_table = exp.to_table(old_table_name) 2281 catalog = old_table.catalog or self.get_current_catalog() 2282 if catalog != new_table.catalog: 2283 raise UnsupportedCatalogOperationError( 2284 "Tried to rename table across catalogs which is not supported" 2285 ) 2286 self._rename_table(old_table_name, new_table_name) 2287 self._clear_data_object_cache(old_table_name) 2288 self._clear_data_object_cache(new_table_name) 2289 2290 def get_data_object( 2291 self, target_name: TableName, safe_to_cache: bool = False 2292 ) -> t.Optional[DataObject]: 2293 target_table = exp.to_table(target_name) 2294 existing_data_objects = self.get_data_objects( 2295 schema_(target_table.db, target_table.catalog), 2296 {target_table.name}, 2297 safe_to_cache=safe_to_cache, 2298 ) 2299 if existing_data_objects: 2300 return existing_data_objects[0] 2301 return None 2302 2303 def get_data_objects( 2304 self, 2305 schema_name: SchemaName, 2306 object_names: t.Optional[t.Set[str]] = None, 2307 safe_to_cache: bool = False, 2308 ) -> t.List[DataObject]: 2309 """Lists all data objects in the target schema. 2310 2311 Args: 2312 schema_name: The name of the schema to list data objects from. 2313 object_names: If provided, only return data objects with these names. 2314 safe_to_cache: Whether it is safe to cache the results of this call. 2315 2316 Returns: 2317 A list of data objects in the target schema. 2318 """ 2319 if object_names is not None: 2320 if not object_names: 2321 return [] 2322 2323 # Check cache for each object name 2324 target_schema = to_schema(schema_name) 2325 cached_objects = [] 2326 missing_names = set() 2327 2328 for name in object_names: 2329 cache_key = _get_data_object_cache_key( 2330 target_schema.catalog, target_schema.db, name 2331 ) 2332 if cache_key in self._data_object_cache: 2333 logger.debug("Data object cache hit: %s", cache_key) 2334 data_object = self._data_object_cache[cache_key] 2335 # If the object is none, then the table was previously looked for but not found 2336 if data_object: 2337 cached_objects.append(data_object) 2338 else: 2339 logger.debug("Data object cache miss: %s", cache_key) 2340 missing_names.add(name) 2341 2342 # Fetch missing objects from database 2343 if missing_names: 2344 object_names_list = list(missing_names) 2345 batches = [ 2346 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 2347 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 2348 ] 2349 2350 fetched_objects = [] 2351 fetched_object_names = set() 2352 for batch in batches: 2353 objects = self._get_data_objects(schema_name, set(batch)) 2354 for obj in objects: 2355 if safe_to_cache: 2356 cache_key = _get_data_object_cache_key( 2357 obj.catalog, obj.schema_name, obj.name 2358 ) 2359 self._data_object_cache[cache_key] = obj 2360 fetched_objects.append(obj) 2361 fetched_object_names.add(obj.name) 2362 2363 if safe_to_cache: 2364 for missing_name in missing_names - fetched_object_names: 2365 cache_key = _get_data_object_cache_key( 2366 target_schema.catalog, target_schema.db, missing_name 2367 ) 2368 self._data_object_cache[cache_key] = None 2369 2370 return cached_objects + fetched_objects 2371 2372 return cached_objects 2373 2374 fetched_objects = self._get_data_objects(schema_name) 2375 if safe_to_cache: 2376 for obj in fetched_objects: 2377 cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name) 2378 self._data_object_cache[cache_key] = obj 2379 return fetched_objects 2380 2381 def fetchone( 2382 self, 2383 query: t.Union[exp.Expr, str], 2384 ignore_unsupported_errors: bool = False, 2385 quote_identifiers: bool = False, 2386 ) -> t.Optional[t.Tuple]: 2387 with self.transaction(): 2388 self.execute( 2389 query, 2390 ignore_unsupported_errors=ignore_unsupported_errors, 2391 quote_identifiers=quote_identifiers, 2392 ) 2393 return self.cursor.fetchone() 2394 2395 def fetchall( 2396 self, 2397 query: t.Union[exp.Expr, str], 2398 ignore_unsupported_errors: bool = False, 2399 quote_identifiers: bool = False, 2400 ) -> t.List[t.Tuple]: 2401 with self.transaction(): 2402 self.execute( 2403 query, 2404 ignore_unsupported_errors=ignore_unsupported_errors, 2405 quote_identifiers=quote_identifiers, 2406 ) 2407 return self.cursor.fetchall() 2408 2409 def _fetch_native_df( 2410 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2411 ) -> DF: 2412 """Fetches a DataFrame that can be either Pandas or PySpark from the cursor""" 2413 with self.transaction(): 2414 self.execute(query, quote_identifiers=quote_identifiers) 2415 return self.cursor.fetchdf() 2416 2417 def _native_df_to_pandas_df( 2418 self, 2419 query_or_df: QueryOrDF, 2420 ) -> t.Union[Query, pd.DataFrame]: 2421 """ 2422 Take a "native" DataFrame (eg Pyspark, Bigframe, Snowpark etc) and convert it to Pandas 2423 """ 2424 import pandas as pd 2425 2426 if isinstance(query_or_df, (exp.Query, pd.DataFrame)): 2427 return query_or_df 2428 2429 # EngineAdapter subclasses that have native DataFrame types should override this 2430 raise NotImplementedError(f"Unable to convert {type(query_or_df)} to Pandas") 2431 2432 def fetchdf( 2433 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2434 ) -> pd.DataFrame: 2435 """Fetches a Pandas DataFrame from the cursor""" 2436 import pandas as pd 2437 2438 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 2439 if not isinstance(df, pd.DataFrame): 2440 raise NotImplementedError( 2441 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 2442 ) 2443 return df 2444 2445 def fetch_pyspark_df( 2446 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2447 ) -> PySparkDataFrame: 2448 """Fetches a PySpark DataFrame from the cursor""" 2449 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}") 2450 2451 @property 2452 def wap_enabled(self) -> bool: 2453 """Returns whether WAP is enabled for this engine.""" 2454 return self._extra_config.get("wap_enabled", False) 2455 2456 def wap_supported(self, table_name: TableName) -> bool: 2457 """Returns whether WAP for the target table is supported.""" 2458 return False 2459 2460 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 2461 """Returns the updated table name for the given WAP ID. 2462 2463 Args: 2464 table_name: The name of the target table. 2465 wap_id: The WAP ID to prepare. 2466 2467 Returns: 2468 The updated table name that should be used for writing. 2469 """ 2470 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2471 2472 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 2473 """Prepares the target table for WAP and returns the updated table name. 2474 2475 Args: 2476 table_name: The name of the target table. 2477 wap_id: The WAP ID to prepare. 2478 2479 Returns: 2480 The updated table name that should be used for writing. 2481 """ 2482 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2483 2484 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 2485 """Publishes changes with the given WAP ID to the target table. 2486 2487 Args: 2488 table_name: The name of the target table. 2489 wap_id: The WAP ID to publish. 2490 """ 2491 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2492 2493 def sync_grants_config( 2494 self, 2495 table: exp.Table, 2496 grants_config: GrantsConfig, 2497 table_type: DataObjectType = DataObjectType.TABLE, 2498 ) -> None: 2499 """Applies the grants_config to a table authoritatively. 2500 It first compares the specified grants against the current grants, and then 2501 applies the diffs to the table by revoking and granting privileges as needed. 2502 2503 Args: 2504 table: The table/view to apply grants to. 2505 grants_config: Dictionary mapping privileges to lists of grantees. 2506 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 2507 """ 2508 if not self.SUPPORTS_GRANTS: 2509 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 2510 2511 current_grants = self._get_current_grants_config(table) 2512 new_grants, revoked_grants = self._diff_grants_configs(grants_config, current_grants) 2513 revoke_exprs = self._revoke_grants_config_expr(table, revoked_grants, table_type) 2514 grant_exprs = self._apply_grants_config_expr(table, new_grants, table_type) 2515 dcl_exprs = revoke_exprs + grant_exprs 2516 2517 if dcl_exprs: 2518 self.execute(dcl_exprs) 2519 2520 @contextlib.contextmanager 2521 def transaction( 2522 self, 2523 condition: t.Optional[bool] = None, 2524 ) -> t.Iterator[None]: 2525 """A transaction context manager.""" 2526 if ( 2527 self._connection_pool.is_transaction_active 2528 or not self.SUPPORTS_TRANSACTIONS 2529 or (condition is not None and not condition) 2530 ): 2531 yield 2532 return 2533 2534 if self._pre_ping: 2535 try: 2536 logger.debug("Pinging the database to check the connection") 2537 self.ping() 2538 except Exception: 2539 logger.info("Connection to the database was lost. Reconnecting...") 2540 self._connection_pool.close() 2541 2542 self._connection_pool.begin() 2543 try: 2544 yield 2545 except Exception as e: 2546 self._connection_pool.rollback() 2547 raise e 2548 else: 2549 self._connection_pool.commit() 2550 2551 @contextlib.contextmanager 2552 def session(self, properties: SessionProperties) -> t.Iterator[None]: 2553 """A session context manager.""" 2554 if self._is_session_active(): 2555 yield 2556 return 2557 2558 self._begin_session(properties) 2559 try: 2560 yield 2561 finally: 2562 self._end_session() 2563 2564 def _begin_session(self, properties: SessionProperties) -> t.Any: 2565 """Begin a new session.""" 2566 2567 def _end_session(self) -> None: 2568 """End the existing session.""" 2569 2570 def _is_session_active(self) -> bool: 2571 """Indicates whether or not a session is active.""" 2572 return False 2573 2574 def execute( 2575 self, 2576 expressions: t.Union[str, exp.Expr, t.Sequence[exp.Expr]], 2577 ignore_unsupported_errors: bool = False, 2578 quote_identifiers: bool = True, 2579 track_rows_processed: bool = False, 2580 **kwargs: t.Any, 2581 ) -> None: 2582 """Execute a sql query.""" 2583 to_sql_kwargs = ( 2584 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 2585 ) 2586 with self.transaction(): 2587 for e in ensure_list(expressions): 2588 if isinstance(e, exp.Expr): 2589 self._check_identifier_length(e) 2590 sql = self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 2591 else: 2592 sql = t.cast(str, e) 2593 2594 sql = self._attach_correlation_id(sql) 2595 2596 self._log_sql( 2597 sql, 2598 expression=e if isinstance(e, exp.Expr) else None, 2599 quote_identifiers=quote_identifiers, 2600 ) 2601 self._execute(sql, track_rows_processed, **kwargs) 2602 2603 def _attach_correlation_id(self, sql: str) -> str: 2604 if self.ATTACH_CORRELATION_ID and self.correlation_id: 2605 return f"/* {self.correlation_id} */ {sql}" 2606 return sql 2607 2608 def _log_sql( 2609 self, 2610 sql: str, 2611 expression: t.Optional[exp.Expr] = None, 2612 quote_identifiers: bool = True, 2613 ) -> None: 2614 if not logger.isEnabledFor(self._execute_log_level): 2615 return 2616 2617 sql_to_log = sql 2618 if expression is not None and not isinstance(expression, exp.Query): 2619 values = expression.find(exp.Values) 2620 if values: 2621 values.set("expressions", [exp.to_identifier("<REDACTED VALUES>")]) 2622 sql_to_log = self._to_sql(expression, quote=quote_identifiers) 2623 2624 logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log) 2625 2626 def _record_execution_stats( 2627 self, sql: str, rowcount: t.Optional[int] = None, bytes_processed: t.Optional[int] = None 2628 ) -> None: 2629 if self._query_execution_tracker: 2630 self._query_execution_tracker.record_execution(sql, rowcount, bytes_processed) 2631 2632 def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: 2633 self.cursor.execute(sql, **kwargs) 2634 2635 if ( 2636 self.SUPPORTS_QUERY_EXECUTION_TRACKING 2637 and track_rows_processed 2638 and self._query_execution_tracker 2639 and self._query_execution_tracker.is_tracking() 2640 ): 2641 if ( 2642 rowcount := getattr(self.cursor, "rowcount", None) 2643 ) is not None and rowcount is not None: 2644 try: 2645 self._record_execution_stats(sql, int(rowcount)) 2646 except (TypeError, ValueError): 2647 return 2648 2649 @contextlib.contextmanager 2650 def temp_table( 2651 self, 2652 query_or_df: QueryOrDF, 2653 name: TableName = "diff", 2654 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2655 source_columns: t.Optional[t.List[str]] = None, 2656 **kwargs: t.Any, 2657 ) -> t.Iterator[exp.Table]: 2658 """A context manager for working a temp table. 2659 2660 The table will be created with a random guid and cleaned up after the block. 2661 2662 Args: 2663 query_or_df: The query or df to create a temp table for. 2664 name: The base name of the temp table. 2665 target_columns_to_types: A mapping between the column name and its data type. 2666 2667 Yields: 2668 The table expression 2669 """ 2670 name = exp.to_table(name) 2671 # ensure that we use default catalog if none is not specified 2672 if isinstance(name, exp.Table) and not name.catalog and name.db and self.default_catalog: 2673 name.set("catalog", exp.parse_identifier(self.default_catalog)) 2674 2675 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2676 query_or_df, 2677 target_columns_to_types=target_columns_to_types, 2678 target_table=name, 2679 source_columns=source_columns, 2680 ) 2681 2682 with self.transaction(): 2683 table = self._get_temp_table(name) 2684 if table.db: 2685 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 2686 self._create_table_from_source_queries( 2687 table, 2688 source_queries, 2689 target_columns_to_types, 2690 exists=True, 2691 table_description=None, 2692 column_descriptions=None, 2693 track_rows_processed=False, 2694 **kwargs, 2695 ) 2696 2697 try: 2698 yield table 2699 finally: 2700 self.drop_table(table) 2701 2702 def _table_or_view_properties_to_expressions( 2703 self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expr]] = None 2704 ) -> t.List[exp.Property]: 2705 """Converts model properties (either physical or virtual) to a list of property expressions.""" 2706 if not table_or_view_properties: 2707 return [] 2708 return [ 2709 exp.Property(this=key, value=value.copy()) 2710 for key, value in table_or_view_properties.items() 2711 ] 2712 2713 def _build_partitioned_by_exp( 2714 self, 2715 partitioned_by: t.List[exp.Expr], 2716 *, 2717 partition_interval_unit: t.Optional[IntervalUnit] = None, 2718 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2719 catalog_name: t.Optional[str] = None, 2720 **kwargs: t.Any, 2721 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 2722 return None 2723 2724 def _build_clustered_by_exp( 2725 self, 2726 clustered_by: t.List[exp.Expr], 2727 **kwargs: t.Any, 2728 ) -> t.Optional[exp.Cluster]: 2729 return None 2730 2731 def _build_table_properties_exp( 2732 self, 2733 catalog_name: t.Optional[str] = None, 2734 table_format: t.Optional[str] = None, 2735 storage_format: t.Optional[str] = None, 2736 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 2737 partition_interval_unit: t.Optional[IntervalUnit] = None, 2738 clustered_by: t.Optional[t.List[exp.Expr]] = None, 2739 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 2740 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2741 table_description: t.Optional[str] = None, 2742 table_kind: t.Optional[str] = None, 2743 **kwargs: t.Any, 2744 ) -> t.Optional[exp.Properties]: 2745 """Creates a SQLGlot table properties expression for ddl.""" 2746 properties: t.List[exp.Expr] = [] 2747 2748 if table_description: 2749 properties.append( 2750 exp.SchemaCommentProperty( 2751 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2752 ) 2753 ) 2754 2755 if table_properties: 2756 table_type = self._pop_creatable_type_from_properties(table_properties) 2757 properties.extend(ensure_list(table_type)) 2758 2759 if properties: 2760 return exp.Properties(expressions=properties) 2761 return None 2762 2763 def _build_view_properties_exp( 2764 self, 2765 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 2766 table_description: t.Optional[str] = None, 2767 **kwargs: t.Any, 2768 ) -> t.Optional[exp.Properties]: 2769 """Creates a SQLGlot table properties expression for view""" 2770 properties: t.List[exp.Expr] = [] 2771 2772 if table_description: 2773 properties.append( 2774 exp.SchemaCommentProperty( 2775 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2776 ) 2777 ) 2778 2779 if properties: 2780 return exp.Properties(expressions=properties) 2781 return None 2782 2783 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 2784 return comment[:length] if length else comment 2785 2786 def _truncate_table_comment(self, comment: str) -> str: 2787 return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH) 2788 2789 def _truncate_column_comment(self, comment: str) -> str: 2790 return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH) 2791 2792 def _to_sql(self, expression: exp.Expr, quote: bool = True, **kwargs: t.Any) -> str: 2793 """ 2794 Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default 2795 kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine 2796 adapter, and then finally kwargs provided by the user when calling this method. 2797 """ 2798 sql_gen_kwargs = { 2799 "dialect": self.dialect, 2800 "pretty": self._pretty_sql, 2801 "comments": False, 2802 **self._sql_gen_kwargs, 2803 **kwargs, 2804 } 2805 2806 expression = expression.copy() 2807 2808 if quote: 2809 quote_identifiers(expression) 2810 2811 return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore 2812 2813 def _clear_data_object_cache(self, table_name: t.Optional[TableName] = None) -> None: 2814 """Clears the cache entry for the given table name, or clears the entire cache if table_name is None.""" 2815 if table_name is None: 2816 logger.debug("Clearing entire data object cache") 2817 self._data_object_cache.clear() 2818 else: 2819 table = exp.to_table(table_name) 2820 cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 2821 logger.debug("Clearing data object cache key: %s", cache_key) 2822 self._data_object_cache.pop(cache_key, None) 2823 2824 def _get_data_objects( 2825 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 2826 ) -> t.List[DataObject]: 2827 """ 2828 Returns all the data objects that exist in the given schema and optionally catalog. 2829 """ 2830 raise NotImplementedError() 2831 2832 def _get_temp_table( 2833 self, table: TableName, table_only: bool = False, quoted: bool = True 2834 ) -> exp.Table: 2835 """ 2836 Returns the name of the temp table that should be used for the given table name. 2837 """ 2838 table = t.cast(exp.Table, exp.to_table(table).copy()) 2839 table.set( 2840 "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=quoted) 2841 ) 2842 2843 if table_only: 2844 table.set("db", None) 2845 table.set("catalog", None) 2846 2847 return table 2848 2849 def _order_projections_and_filter( 2850 self, 2851 query: Query, 2852 target_columns_to_types: t.Dict[str, exp.DataType], 2853 where: t.Optional[exp.Expr] = None, 2854 coerce_types: bool = False, 2855 ) -> Query: 2856 if not isinstance(query, exp.Query) or ( 2857 not where and not coerce_types and query.named_selects == list(target_columns_to_types) 2858 ): 2859 return query 2860 2861 query = t.cast(exp.Query, query.copy()) 2862 with_ = query.args.pop("with_", None) 2863 2864 select_exprs: t.List[exp.Expr] = [ 2865 exp.column(c, quoted=True) for c in target_columns_to_types 2866 ] 2867 if coerce_types and columns_to_types_all_known(target_columns_to_types): 2868 select_exprs = [ 2869 exp.cast(select_exprs[i], col_tpe).as_(col, quoted=True) 2870 for i, (col, col_tpe) in enumerate(target_columns_to_types.items()) 2871 ] 2872 2873 query = exp.select(*select_exprs).from_(query.subquery("_subquery", copy=False), copy=False) 2874 if where: 2875 query = query.where(where, copy=False) 2876 2877 if with_: 2878 query.set("with_", with_) 2879 2880 return query 2881 2882 def _truncate_table(self, table_name: TableName) -> None: 2883 table = exp.to_table(table_name) 2884 self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") 2885 2886 def drop_data_object_on_type_mismatch( 2887 self, data_object: t.Optional[DataObject], expected_type: DataObjectType 2888 ) -> bool: 2889 """Drops a data object if it exists and is not of the expected type. 2890 2891 Args: 2892 data_object: The data object to check. 2893 expected_type: The expected type of the data object. 2894 2895 Returns: 2896 True if the data object was dropped, False otherwise. 2897 """ 2898 if data_object is None or data_object.type == expected_type: 2899 return False 2900 2901 logger.warning( 2902 "Target data object '%s' is a %s and not a %s, dropping it", 2903 data_object.to_table().sql(dialect=self.dialect), 2904 data_object.type.value, 2905 expected_type.value, 2906 ) 2907 self.drop_data_object(data_object) 2908 return True 2909 2910 def _replace_by_key( 2911 self, 2912 target_table: TableName, 2913 source_table: QueryOrDF, 2914 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2915 key: t.Sequence[exp.Expr], 2916 is_unique_key: bool, 2917 source_columns: t.Optional[t.List[str]] = None, 2918 ) -> None: 2919 if target_columns_to_types is None: 2920 target_columns_to_types = self.columns(target_table) 2921 2922 temp_table = self._get_temp_table(target_table) 2923 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 2924 column_names = list(target_columns_to_types or []) 2925 2926 with self.transaction(): 2927 self.ctas( 2928 temp_table, 2929 source_table, 2930 target_columns_to_types=target_columns_to_types, 2931 exists=False, 2932 source_columns=source_columns, 2933 ) 2934 2935 try: 2936 delete_query = exp.select(key_exp).from_(temp_table) 2937 insert_query = self._select_columns(target_columns_to_types).from_(temp_table) 2938 if not is_unique_key: 2939 delete_query = delete_query.distinct() 2940 else: 2941 insert_query = insert_query.distinct(*key) 2942 2943 insert_statement = exp.insert( 2944 insert_query, 2945 target_table, 2946 columns=column_names, 2947 ) 2948 delete_filter = key_exp.isin(query=delete_query) 2949 2950 if not self.INSERT_OVERWRITE_STRATEGY.is_replace_where: 2951 self.delete_from(target_table, delete_filter) 2952 else: 2953 insert_statement.set("where", delete_filter) 2954 insert_statement.set("this", exp.to_table(target_table)) 2955 2956 self.execute(insert_statement, track_rows_processed=True) 2957 finally: 2958 self.drop_table(temp_table) 2959 2960 def _build_create_comment_table_exp( 2961 self, table: exp.Table, table_comment: str, table_kind: str 2962 ) -> exp.Comment | str: 2963 return exp.Comment( 2964 this=table, 2965 kind=table_kind, 2966 expression=exp.Literal.string(self._truncate_table_comment(table_comment)), 2967 ) 2968 2969 def _create_table_comment( 2970 self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" 2971 ) -> None: 2972 table = exp.to_table(table_name) 2973 2974 try: 2975 self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind)) 2976 except Exception: 2977 logger.warning( 2978 f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions", 2979 exc_info=True, 2980 ) 2981 2982 def _build_create_comment_column_exp( 2983 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 2984 ) -> exp.Comment | str: 2985 return exp.Comment( 2986 this=exp.column(column_name, *reversed(table.parts)), # type: ignore 2987 kind="COLUMN", 2988 expression=exp.Literal.string(self._truncate_column_comment(column_comment)), 2989 ) 2990 2991 def _create_column_comments( 2992 self, 2993 table_name: TableName, 2994 column_comments: t.Dict[str, str], 2995 table_kind: str = "TABLE", 2996 materialized_view: bool = False, 2997 ) -> None: 2998 table = exp.to_table(table_name) 2999 3000 for col, comment in column_comments.items(): 3001 try: 3002 self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind)) 3003 except Exception: 3004 logger.warning( 3005 f"Column comments for column '{col}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions", 3006 exc_info=True, 3007 ) 3008 3009 def _create_table_like( 3010 self, 3011 target_table_name: TableName, 3012 source_table_name: TableName, 3013 exists: bool, 3014 **kwargs: t.Any, 3015 ) -> None: 3016 self.create_table(target_table_name, self.columns(source_table_name), exists=exists) 3017 3018 def _rename_table( 3019 self, 3020 old_table_name: TableName, 3021 new_table_name: TableName, 3022 ) -> None: 3023 self.execute(exp.rename_table(old_table_name, new_table_name)) 3024 3025 def ensure_nulls_for_unmatched_after_join( 3026 self, 3027 query: Query, 3028 ) -> Query: 3029 return query 3030 3031 def use_server_nulls_for_unmatched_after_join( 3032 self, 3033 query: Query, 3034 ) -> Query: 3035 return query 3036 3037 def ping(self) -> None: 3038 try: 3039 self._execute(exp.select("1").sql(dialect=self.dialect)) 3040 finally: 3041 self._connection_pool.close_cursor() 3042 3043 @classmethod 3044 def _select_columns( 3045 cls, columns: t.Iterable[str], source_columns: t.Optional[t.List[str]] = None 3046 ) -> exp.Select: 3047 return exp.select( 3048 *( 3049 exp.column(c, quoted=True) 3050 if c in (source_columns or columns) 3051 else exp.alias_(exp.Null(), c, quoted=True) 3052 for c in columns 3053 ) 3054 ) 3055 3056 def _check_identifier_length(self, expression: exp.Expr) -> None: 3057 if self.MAX_IDENTIFIER_LENGTH is None or not isinstance(expression, exp.DDL): 3058 return 3059 3060 for identifier in expression.find_all(exp.Identifier): 3061 name = identifier.name 3062 name_length = len(name) 3063 if name_length > self.MAX_IDENTIFIER_LENGTH: 3064 raise SQLMeshError( 3065 f"Identifier name '{name}' (length {name_length}) exceeds {self.dialect.capitalize()}'s max identifier limit of {self.MAX_IDENTIFIER_LENGTH} characters" 3066 ) 3067 3068 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 3069 raise NotImplementedError() 3070 3071 @classmethod 3072 def _diff_grants_configs( 3073 cls, new_config: GrantsConfig, old_config: GrantsConfig 3074 ) -> t.Tuple[GrantsConfig, GrantsConfig]: 3075 """Compute additions and removals between two grants configurations. 3076 3077 This method compares new (desired) and old (current) GrantsConfigs case-insensitively 3078 for both privilege keys and grantees, while preserving original casing 3079 in the output GrantsConfigs. 3080 3081 Args: 3082 new_config: Desired grants configuration (specified by the user). 3083 old_config: Current grants configuration (returned by the database). 3084 3085 Returns: 3086 A tuple of (additions, removals) GrantsConfig where: 3087 - additions contains privileges/grantees present in new_config but not in old_config 3088 - additions uses keys and grantee strings from new_config (user-specified casing) 3089 - removals contains privileges/grantees present in old_config but not in new_config 3090 - removals uses keys and grantee strings from old_config (database-returned casing) 3091 3092 Notes: 3093 - Comparison is case-insensitive using casefold(); original casing is preserved in results. 3094 - Overlapping grantees (case-insensitive) are excluded from the results. 3095 """ 3096 3097 def _diffs(config1: GrantsConfig, config2: GrantsConfig) -> GrantsConfig: 3098 diffs: GrantsConfig = {} 3099 cf_config2 = {k.casefold(): {g.casefold() for g in v} for k, v in config2.items()} 3100 for key, grantees in config1.items(): 3101 cf_key = key.casefold() 3102 3103 # Missing key (add all grantees) 3104 if cf_key not in cf_config2: 3105 diffs[key] = grantees.copy() 3106 continue 3107 3108 # Include only grantees not in config2 3109 cf_grantees2 = cf_config2[cf_key] 3110 diff_grantees = [] 3111 for grantee in grantees: 3112 if grantee.casefold() not in cf_grantees2: 3113 diff_grantees.append(grantee) 3114 if diff_grantees: 3115 diffs[key] = diff_grantees 3116 return diffs 3117 3118 return _diffs(new_config, old_config), _diffs(old_config, new_config) 3119 3120 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 3121 """Returns current grants for a table as a dictionary. 3122 3123 This method queries the database and returns the current grants/permissions 3124 for the given table, parsed into a dictionary format. The it handles 3125 case-insensitive comparison between these current grants and the desired 3126 grants from model configuration. 3127 3128 Args: 3129 table: The table/view to query grants for. 3130 3131 Returns: 3132 Dictionary mapping permissions to lists of grantees. Permission names 3133 should be returned as the database provides them (typically uppercase 3134 for standard SQL permissions, but engine-specific roles may vary). 3135 3136 Raises: 3137 NotImplementedError: If the engine does not support grants. 3138 """ 3139 if not self.SUPPORTS_GRANTS: 3140 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3141 raise NotImplementedError("Subclass must implement get_current_grants") 3142 3143 def _apply_grants_config_expr( 3144 self, 3145 table: exp.Table, 3146 grants_config: GrantsConfig, 3147 table_type: DataObjectType = DataObjectType.TABLE, 3148 ) -> t.List[exp.Expr]: 3149 """Returns SQLGlot Grant expressions to apply grants to a table. 3150 3151 Args: 3152 table: The table/view to grant permissions on. 3153 grants_config: Dictionary mapping permissions to lists of grantees. 3154 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3155 3156 Returns: 3157 List of SQLGlot expressions for grant operations. 3158 3159 Raises: 3160 NotImplementedError: If the engine does not support grants. 3161 """ 3162 if not self.SUPPORTS_GRANTS: 3163 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3164 raise NotImplementedError("Subclass must implement _apply_grants_config_expr") 3165 3166 def _revoke_grants_config_expr( 3167 self, 3168 table: exp.Table, 3169 grants_config: GrantsConfig, 3170 table_type: DataObjectType = DataObjectType.TABLE, 3171 ) -> t.List[exp.Expr]: 3172 """Returns SQLGlot expressions to revoke grants from a table. 3173 3174 Args: 3175 table: The table/view to revoke permissions from. 3176 grants_config: Dictionary mapping permissions to lists of grantees. 3177 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3178 3179 Returns: 3180 List of SQLGlot expressions for revoke operations. 3181 3182 Raises: 3183 NotImplementedError: If the engine does not support grants. 3184 """ 3185 if not self.SUPPORTS_GRANTS: 3186 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3187 raise NotImplementedError("Subclass must implement _revoke_grants_config_expr") 3188 3189 3190class EngineAdapterWithIndexSupport(EngineAdapter): 3191 SUPPORTS_INDEXES = True 3192 3193 3194def _decoded_str(value: t.Union[str, bytes]) -> str: 3195 if isinstance(value, bytes): 3196 return value.decode("utf-8") 3197 return value 3198 3199 3200def _get_data_object_cache_key(catalog: t.Optional[str], schema_name: str, object_name: str) -> str: 3201 """Returns a cache key for a data object based on its fully qualified name.""" 3202 catalog = f"{catalog}." if catalog else "" 3203 return f"{catalog}{schema_name}.{object_name}"
84@set_catalog() 85class EngineAdapter: 86 """Base class wrapping a Database API compliant connection. 87 88 The EngineAdapter is an easily-subclassable interface that interacts 89 with the underlying engine and data store. 90 91 Args: 92 connection_factory_or_pool: a callable which produces a new Database API-compliant 93 connection on every call. 94 dialect: The dialect with which this adapter is associated. 95 multithreaded: Indicates whether this adapter will be used by more than one thread. 96 """ 97 98 DIALECT = "" 99 DEFAULT_BATCH_SIZE = 10000 100 DATA_OBJECT_FILTER_BATCH_SIZE = 4000 101 SUPPORTS_TRANSACTIONS = True 102 SUPPORTS_INDEXES = False 103 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS 104 COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 105 MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None 106 MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None 107 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 108 SUPPORTS_MATERIALIZED_VIEWS = False 109 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False 110 SUPPORTS_VIEW_SCHEMA = True 111 SUPPORTS_CLONING = False 112 SUPPORTS_MANAGED_MODELS = False 113 SUPPORTS_CREATE_DROP_CATALOG = False 114 SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = [] 115 SCHEMA_DIFFER_KWARGS: t.Dict[str, t.Any] = {} 116 SUPPORTS_TUPLE_IN = True 117 HAS_VIEW_BINDING = False 118 SUPPORTS_REPLACE_TABLE = True 119 SUPPORTS_GRANTS = False 120 DEFAULT_CATALOG_TYPE = DIALECT 121 QUOTE_IDENTIFIERS_IN_VIEWS = True 122 MAX_IDENTIFIER_LENGTH: t.Optional[int] = None 123 ATTACH_CORRELATION_ID = True 124 SUPPORTS_QUERY_EXECUTION_TRACKING = False 125 SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS = False 126 127 def __init__( 128 self, 129 connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool], 130 dialect: str = "", 131 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 132 multithreaded: bool = False, 133 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 134 default_catalog: t.Optional[str] = None, 135 execute_log_level: int = logging.DEBUG, 136 register_comments: bool = True, 137 pre_ping: bool = False, 138 pretty_sql: bool = False, 139 shared_connection: bool = False, 140 correlation_id: t.Optional[CorrelationId] = None, 141 schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None, 142 query_execution_tracker: t.Optional[QueryExecutionTracker] = None, 143 **kwargs: t.Any, 144 ): 145 self.dialect = dialect.lower() or self.DIALECT 146 self._connection_pool = ( 147 connection_factory_or_pool 148 if isinstance(connection_factory_or_pool, ConnectionPool) 149 else create_connection_pool( 150 connection_factory_or_pool, 151 multithreaded, 152 shared_connection=shared_connection, 153 cursor_init=cursor_init, 154 ) 155 ) 156 self._sql_gen_kwargs = sql_gen_kwargs or {} 157 self._default_catalog = default_catalog 158 self._execute_log_level = execute_log_level 159 self._extra_config = kwargs 160 self._register_comments = register_comments 161 self._pre_ping = pre_ping 162 self._pretty_sql = pretty_sql 163 self._multithreaded = multithreaded 164 self.correlation_id = correlation_id 165 self._schema_differ_overrides = schema_differ_overrides 166 self._query_execution_tracker = query_execution_tracker 167 self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {} 168 169 def with_settings(self, **kwargs: t.Any) -> EngineAdapter: 170 extra_kwargs = { 171 "null_connection": True, 172 "execute_log_level": kwargs.pop("execute_log_level", self._execute_log_level), 173 "correlation_id": kwargs.pop("correlation_id", self.correlation_id), 174 "query_execution_tracker": kwargs.pop( 175 "query_execution_tracker", self._query_execution_tracker 176 ), 177 **self._extra_config, 178 **kwargs, 179 } 180 181 adapter = self.__class__( 182 self._connection_pool, 183 dialect=self.dialect, 184 sql_gen_kwargs=self._sql_gen_kwargs, 185 default_catalog=self._default_catalog, 186 register_comments=self._register_comments, 187 multithreaded=self._multithreaded, 188 pretty_sql=self._pretty_sql, 189 **extra_kwargs, 190 ) 191 192 return adapter 193 194 @property 195 def cursor(self) -> t.Any: 196 return self._connection_pool.get_cursor() 197 198 @property 199 def connection(self) -> t.Any: 200 return self._connection_pool.get() 201 202 @property 203 def spark(self) -> t.Optional[PySparkSession]: 204 return None 205 206 @property 207 def snowpark(self) -> t.Optional[SnowparkSession]: 208 return None 209 210 @property 211 def bigframe(self) -> t.Optional[BigframeSession]: 212 return None 213 214 @property 215 def comments_enabled(self) -> bool: 216 return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported 217 218 @property 219 def catalog_support(self) -> CatalogSupport: 220 return CatalogSupport.UNSUPPORTED 221 222 @cached_property 223 def schema_differ(self) -> SchemaDiffer: 224 return SchemaDiffer( 225 **{ 226 **self.SCHEMA_DIFFER_KWARGS, 227 **(self._schema_differ_overrides or {}), 228 } 229 ) 230 231 @property 232 def _catalog_type_overrides(self) -> t.Dict[str, str]: 233 return self._extra_config.get("catalog_type_overrides") or {} 234 235 @classmethod 236 def _casted_columns( 237 cls, 238 target_columns_to_types: t.Dict[str, exp.DataType], 239 source_columns: t.Optional[t.List[str]] = None, 240 ) -> t.List[exp.Expr]: 241 source_columns_lookup = set(source_columns or target_columns_to_types) 242 return [ 243 exp.alias_( 244 exp.cast( 245 exp.column(column, quoted=True) 246 if column in source_columns_lookup 247 else exp.Null(), 248 to=kind, 249 ), 250 column, 251 copy=False, 252 quoted=True, 253 ) 254 for column, kind in target_columns_to_types.items() 255 ] 256 257 @property 258 def default_catalog(self) -> t.Optional[str]: 259 if self.catalog_support.is_unsupported: 260 return None 261 default_catalog = self._default_catalog or self.get_current_catalog() 262 if not default_catalog: 263 raise MissingDefaultCatalogError( 264 "Could not determine a default catalog despite it being supported." 265 ) 266 return default_catalog 267 268 @property 269 def engine_run_mode(self) -> EngineRunMode: 270 return EngineRunMode.SINGLE_MODE_ENGINE 271 272 def _get_source_queries( 273 self, 274 query_or_df: QueryOrDF, 275 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 276 target_table: TableName, 277 *, 278 batch_size: t.Optional[int] = None, 279 source_columns: t.Optional[t.List[str]] = None, 280 ) -> t.List[SourceQuery]: 281 import pandas as pd 282 283 batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size 284 if isinstance(query_or_df, exp.Query): 285 query_factory = lambda: query_or_df 286 if source_columns: 287 source_columns_lookup = set(source_columns) 288 if not target_columns_to_types: 289 raise SQLMeshError("columns_to_types must be set if source_columns is set") 290 if not set(target_columns_to_types).issubset(source_columns_lookup): 291 select_columns = [ 292 exp.column(c, quoted=True) 293 if c in source_columns_lookup 294 else exp.cast(exp.Null(), target_columns_to_types[c], copy=False).as_( 295 c, copy=False, quoted=True 296 ) 297 for c in target_columns_to_types 298 ] 299 query_factory = ( 300 lambda: exp.Select() 301 .select(*select_columns) 302 .from_(query_or_df.subquery("select_source_columns")) 303 ) 304 return [SourceQuery(query_factory=query_factory)] # type: ignore 305 306 if not target_columns_to_types: 307 raise SQLMeshError( 308 "It is expected that if a DataFrame is passed in then columns_to_types is set" 309 ) 310 311 if isinstance(query_or_df, pd.DataFrame) and query_or_df.empty: 312 raise SQLMeshError( 313 "Cannot construct source query from an empty DataFrame. This error is commonly " 314 "related to Python models that produce no data. For such models, consider yielding " 315 "from an empty generator if the resulting set is empty, i.e. use `yield from ()`." 316 ) 317 318 return self._df_to_source_queries( 319 query_or_df, 320 target_columns_to_types, 321 batch_size, 322 target_table=target_table, 323 source_columns=source_columns, 324 ) 325 326 def _df_to_source_queries( 327 self, 328 df: DF, 329 target_columns_to_types: t.Dict[str, exp.DataType], 330 batch_size: int, 331 target_table: TableName, 332 source_columns: t.Optional[t.List[str]] = None, 333 ) -> t.List[SourceQuery]: 334 import pandas as pd 335 336 assert isinstance(df, pd.DataFrame) 337 num_rows = len(df.index) 338 batch_size = sys.maxsize if batch_size == 0 else batch_size 339 340 # we need to ensure that the order of the columns in columns_to_types columns matches the order of the values 341 # they can differ if a user specifies columns() on a python model in a different order than what's in the DataFrame's emitted by that model 342 df = df[list(source_columns or target_columns_to_types)] 343 values = list(df.itertuples(index=False, name=None)) 344 345 return [ 346 SourceQuery( 347 query_factory=partial( 348 self._values_to_sql, 349 values=values, # type: ignore 350 target_columns_to_types=target_columns_to_types, 351 batch_start=i, 352 batch_end=min(i + batch_size, num_rows), 353 source_columns=source_columns, 354 ), 355 ) 356 for i in range(0, num_rows, batch_size) 357 ] 358 359 def _get_source_queries_and_columns_to_types( 360 self, 361 query_or_df: QueryOrDF, 362 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 363 target_table: TableName, 364 *, 365 batch_size: t.Optional[int] = None, 366 source_columns: t.Optional[t.List[str]] = None, 367 ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]: 368 target_columns_to_types, source_columns = self._columns_to_types( 369 query_or_df, target_columns_to_types, source_columns 370 ) 371 source_queries = self._get_source_queries( 372 query_or_df, 373 target_columns_to_types, 374 target_table=target_table, 375 batch_size=batch_size, 376 source_columns=source_columns, 377 ) 378 return source_queries, target_columns_to_types 379 380 @t.overload 381 def _columns_to_types( 382 self, 383 query_or_df: DF, 384 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 385 source_columns: t.Optional[t.List[str]] = None, 386 ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ... 387 388 @t.overload 389 def _columns_to_types( 390 self, 391 query_or_df: Query, 392 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 393 source_columns: t.Optional[t.List[str]] = None, 394 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ... 395 396 def _columns_to_types( 397 self, 398 query_or_df: QueryOrDF, 399 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 400 source_columns: t.Optional[t.List[str]] = None, 401 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: 402 import pandas as pd 403 404 if not target_columns_to_types and isinstance(query_or_df, pd.DataFrame): 405 target_columns_to_types = columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df)) 406 if not source_columns and target_columns_to_types: 407 source_columns = list(target_columns_to_types) 408 # source columns should only contain columns that are defined in the target. If there are extras then 409 # that means they are intended to be ignored and will be excluded 410 source_columns = ( 411 [x for x in source_columns if x in target_columns_to_types] 412 if source_columns and target_columns_to_types 413 else None 414 ) 415 return target_columns_to_types, source_columns 416 417 def recycle(self) -> None: 418 """Closes all open connections and releases all allocated resources associated with any thread 419 except the calling one.""" 420 self._connection_pool.close_all(exclude_calling_thread=True) 421 422 def close(self) -> t.Any: 423 """Closes all open connections and releases all allocated resources.""" 424 self._connection_pool.close_all() 425 426 def get_current_catalog(self) -> t.Optional[str]: 427 """Returns the catalog name of the current connection.""" 428 raise NotImplementedError() 429 430 def set_current_catalog(self, catalog: str) -> None: 431 """Sets the catalog name of the current connection.""" 432 raise NotImplementedError() 433 434 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 435 """Intended to be overridden for data virtualization systems like Trino that, 436 depending on the target catalog, require slightly different properties to be set when creating / updating tables 437 """ 438 if self.catalog_support.is_unsupported: 439 raise UnsupportedCatalogOperationError( 440 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 441 ) 442 return ( 443 self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE) 444 if catalog 445 else self.DEFAULT_CATALOG_TYPE 446 ) 447 448 def get_catalog_type_from_table(self, table: TableName) -> str: 449 """Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type""" 450 catalog = exp.to_table(table).catalog or self.get_current_catalog() 451 return self.get_catalog_type(catalog) 452 453 @property 454 def current_catalog_type(self) -> str: 455 # `get_catalog_type_from_table` should be used over this property. Reason is that the table that is the target 456 # of the operation is what matters and not the catalog type of the connection. 457 # This still remains for legacy reasons and should be refactored out. 458 return self.get_catalog_type(self.get_current_catalog()) 459 460 def replace_query( 461 self, 462 table_name: TableName, 463 query_or_df: QueryOrDF, 464 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 465 table_description: t.Optional[str] = None, 466 column_descriptions: t.Optional[t.Dict[str, str]] = None, 467 source_columns: t.Optional[t.List[str]] = None, 468 supports_replace_table_override: t.Optional[bool] = None, 469 **kwargs: t.Any, 470 ) -> None: 471 """Replaces an existing table with a query. 472 473 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 474 475 Args: 476 table_name: The name of the table (eg. prod.table) 477 query_or_df: The SQL query to run or a dataframe. 478 target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 479 Expected to be ordered to match the order of values in the dataframe. 480 kwargs: Optional create table properties. 481 """ 482 target_table = exp.to_table(table_name) 483 484 target_data_object = self.get_data_object(target_table) 485 table_exists = target_data_object is not None 486 if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE): 487 table_exists = False 488 489 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 490 query_or_df, 491 target_columns_to_types, 492 target_table=target_table, 493 source_columns=source_columns, 494 ) 495 if not target_columns_to_types and table_exists: 496 target_columns_to_types = self.columns(target_table) 497 query = source_queries[0].query_factory() 498 self_referencing = any( 499 quote_identifiers(table) == quote_identifiers(target_table) 500 for table in query.find_all(exp.Table) 501 ) 502 # If a query references itself then it must have a table created regardless of approach used. 503 if self_referencing: 504 if not target_columns_to_types: 505 raise SQLMeshError( 506 f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. " 507 "Try casting the columns to an expected type or defining the columns in the model metadata. " 508 ) 509 self._create_table_from_columns( 510 target_table, 511 target_columns_to_types, 512 exists=True, 513 table_description=table_description, 514 column_descriptions=column_descriptions, 515 **kwargs, 516 ) 517 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 518 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 519 supports_replace_table = ( 520 self.SUPPORTS_REPLACE_TABLE 521 if supports_replace_table_override is None 522 else supports_replace_table_override 523 ) 524 if supports_replace_table or not table_exists: 525 return self._create_table_from_source_queries( 526 target_table, 527 source_queries, 528 target_columns_to_types, 529 replace=supports_replace_table, 530 table_description=table_description, 531 column_descriptions=column_descriptions, 532 **kwargs, 533 ) 534 if self_referencing: 535 assert target_columns_to_types is not None 536 with self.temp_table( 537 self._select_columns(target_columns_to_types).from_(target_table), 538 name=target_table, 539 target_columns_to_types=target_columns_to_types, 540 **kwargs, 541 ) as temp_table: 542 for source_query in source_queries: 543 source_query.add_transform( 544 lambda node: ( # type: ignore 545 temp_table # type: ignore 546 if isinstance(node, exp.Table) 547 and quote_identifiers(node) == quote_identifiers(target_table) 548 else node 549 ) 550 ) 551 return self._insert_overwrite_by_condition( 552 target_table, 553 source_queries, 554 target_columns_to_types, 555 **kwargs, 556 ) 557 return self._insert_overwrite_by_condition( 558 target_table, 559 source_queries, 560 target_columns_to_types, 561 **kwargs, 562 ) 563 564 def create_index( 565 self, 566 table_name: TableName, 567 index_name: str, 568 columns: t.Tuple[str, ...], 569 exists: bool = True, 570 ) -> None: 571 """Creates a new index for the given table if supported 572 573 Args: 574 table_name: The name of the target table. 575 index_name: The name of the index. 576 columns: The list of columns that constitute the index. 577 exists: Indicates whether to include the IF NOT EXISTS check. 578 """ 579 if not self.SUPPORTS_INDEXES: 580 return 581 582 expression = exp.Create( 583 this=exp.Index( 584 this=exp.to_identifier(index_name), 585 table=exp.to_table(table_name), 586 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 587 ), 588 kind="INDEX", 589 exists=exists, 590 ) 591 self.execute(expression) 592 593 def _pop_creatable_type_from_properties( 594 self, 595 properties: t.Dict[str, exp.Expr], 596 ) -> t.Optional[exp.Property]: 597 """Pop out the creatable_type from the properties dictionary (if exists (return it/remove it) else return none). 598 It also checks that none of the expressions are MATERIALIZE as that conflicts with the `materialize` parameter. 599 """ 600 for key in list(properties.keys()): 601 upper_key = key.upper() 602 if upper_key == KEY_FOR_CREATABLE_TYPE: 603 value = properties.pop(key).name 604 parsed_properties = exp.maybe_parse( 605 value, into=exp.Properties, dialect=self.dialect 606 ) 607 property, *others = parsed_properties.expressions 608 if others: 609 # Multiple properties are unsupported today, can look into it in the future if needed 610 raise SQLMeshError( 611 f"Invalid creatable_type value with multiple properties: {value}" 612 ) 613 if isinstance(property, exp.MaterializedProperty): 614 raise SQLMeshError( 615 f"Cannot use {value} as a creatable_type as it conflicts with the `materialize` parameter." 616 ) 617 return property 618 return None 619 620 def create_table( 621 self, 622 table_name: TableName, 623 target_columns_to_types: t.Dict[str, exp.DataType], 624 primary_key: t.Optional[t.Tuple[str, ...]] = None, 625 exists: bool = True, 626 table_description: t.Optional[str] = None, 627 column_descriptions: t.Optional[t.Dict[str, str]] = None, 628 **kwargs: t.Any, 629 ) -> None: 630 """Create a table using a DDL statement 631 632 Args: 633 table_name: The name of the table to create. Can be fully qualified or just table name. 634 target_columns_to_types: A mapping between the column name and its data type. 635 primary_key: Determines the table primary key. 636 exists: Indicates whether to include the IF NOT EXISTS check. 637 table_description: Optional table description from MODEL DDL. 638 column_descriptions: Optional column descriptions from model query. 639 kwargs: Optional create table properties. 640 """ 641 self._create_table_from_columns( 642 table_name, 643 target_columns_to_types, 644 primary_key, 645 exists, 646 table_description, 647 column_descriptions, 648 **kwargs, 649 ) 650 651 def create_managed_table( 652 self, 653 table_name: TableName, 654 query: Query, 655 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 656 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 657 clustered_by: t.Optional[t.List[exp.Expr]] = None, 658 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 659 table_description: t.Optional[str] = None, 660 column_descriptions: t.Optional[t.Dict[str, str]] = None, 661 source_columns: t.Optional[t.List[str]] = None, 662 **kwargs: t.Any, 663 ) -> None: 664 """Create a managed table using a query. 665 666 "Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh. 667 668 Args: 669 table_name: The name of the table to create. Can be fully qualified or just table name. 670 query: The SQL query for the engine to base the managed table on 671 target_columns_to_types: A mapping between the column name and its data type. 672 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 673 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 674 table_properties: Optional mapping of engine-specific properties to be set on the managed table 675 table_description: Optional table description from MODEL DDL. 676 column_descriptions: Optional column descriptions from model query. 677 kwargs: Optional create table properties. 678 """ 679 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 680 681 def ctas( 682 self, 683 table_name: TableName, 684 query_or_df: QueryOrDF, 685 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 686 exists: bool = True, 687 table_description: t.Optional[str] = None, 688 column_descriptions: t.Optional[t.Dict[str, str]] = None, 689 source_columns: t.Optional[t.List[str]] = None, 690 **kwargs: t.Any, 691 ) -> None: 692 """Create a table using a CTAS statement 693 694 Args: 695 table_name: The name of the table to create. Can be fully qualified or just table name. 696 query_or_df: The SQL query to run or a dataframe for the CTAS. 697 target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 698 exists: Indicates whether to include the IF NOT EXISTS check. 699 table_description: Optional table description from MODEL DDL. 700 column_descriptions: Optional column descriptions from model query. 701 kwargs: Optional create table properties. 702 """ 703 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 704 query_or_df, 705 target_columns_to_types, 706 target_table=table_name, 707 source_columns=source_columns, 708 ) 709 return self._create_table_from_source_queries( 710 table_name, 711 source_queries, 712 target_columns_to_types, 713 exists, 714 table_description=table_description, 715 column_descriptions=column_descriptions, 716 **kwargs, 717 ) 718 719 def create_state_table( 720 self, 721 table_name: str, 722 target_columns_to_types: t.Dict[str, exp.DataType], 723 primary_key: t.Optional[t.Tuple[str, ...]] = None, 724 ) -> None: 725 """Create a table to store SQLMesh internal state. 726 727 Args: 728 table_name: The name of the table to create. Can be fully qualified or just table name. 729 target_columns_to_types: A mapping between the column name and its data type. 730 primary_key: Determines the table primary key. 731 """ 732 self.create_table( 733 table_name, 734 target_columns_to_types, 735 primary_key=primary_key, 736 ) 737 738 def _create_table_from_columns( 739 self, 740 table_name: TableName, 741 target_columns_to_types: t.Dict[str, exp.DataType], 742 primary_key: t.Optional[t.Tuple[str, ...]] = None, 743 exists: bool = True, 744 table_description: t.Optional[str] = None, 745 column_descriptions: t.Optional[t.Dict[str, str]] = None, 746 **kwargs: t.Any, 747 ) -> None: 748 """ 749 Create a table using a DDL statement. 750 751 Args: 752 table_name: The name of the table to create. Can be fully qualified or just table name. 753 target_columns_to_types: Mapping between the column name and its data type. 754 primary_key: Determines the table primary key. 755 exists: Indicates whether to include the IF NOT EXISTS check. 756 table_description: Optional table description from MODEL DDL. 757 column_descriptions: Optional column descriptions from model query. 758 kwargs: Optional create table properties. 759 """ 760 table = exp.to_table(table_name) 761 762 if not columns_to_types_all_known(target_columns_to_types): 763 # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set 764 if exists and self.table_exists(table_name): 765 return 766 raise SQLMeshError( 767 "Cannot create a table without knowing the column types. " 768 "Try casting the columns to an expected type or defining the columns in the model metadata. " 769 f"Columns to types: {target_columns_to_types}" 770 ) 771 772 primary_key_expression = ( 773 [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])] 774 if primary_key and self.SUPPORTS_INDEXES 775 else [] 776 ) 777 778 schema = self._build_schema_exp( 779 table, 780 target_columns_to_types, 781 column_descriptions, 782 primary_key_expression, 783 ) 784 785 self._create_table( 786 schema, 787 None, 788 exists=exists, 789 target_columns_to_types=target_columns_to_types, 790 table_description=table_description, 791 **kwargs, 792 ) 793 794 # Register comments with commands if the engine doesn't support comments in the schema or CREATE 795 if ( 796 table_description 797 and self.COMMENT_CREATION_TABLE.is_comment_command_only 798 and self.comments_enabled 799 ): 800 self._create_table_comment(table_name, table_description) 801 if ( 802 column_descriptions 803 and self.COMMENT_CREATION_TABLE.is_comment_command_only 804 and self.comments_enabled 805 ): 806 self._create_column_comments(table_name, column_descriptions) 807 808 def _build_schema_exp( 809 self, 810 table: exp.Table, 811 target_columns_to_types: t.Dict[str, exp.DataType], 812 column_descriptions: t.Optional[t.Dict[str, str]] = None, 813 expressions: t.Optional[t.List[exp.PrimaryKey]] = None, 814 is_view: bool = False, 815 materialized: bool = False, 816 ) -> exp.Schema: 817 """ 818 Build a schema expression for a table, columns, column comments, and additional schema properties. 819 """ 820 expressions = expressions or [] 821 822 return exp.Schema( 823 this=table, 824 expressions=self._build_column_defs( 825 target_columns_to_types=target_columns_to_types, 826 column_descriptions=column_descriptions, 827 is_view=is_view, 828 materialized=materialized, 829 ) 830 + expressions, 831 ) 832 833 def _build_column_defs( 834 self, 835 target_columns_to_types: t.Dict[str, exp.DataType], 836 column_descriptions: t.Optional[t.Dict[str, str]] = None, 837 is_view: bool = False, 838 materialized: bool = False, 839 ) -> t.List[exp.ColumnDef]: 840 engine_supports_schema_comments = ( 841 self.COMMENT_CREATION_VIEW.supports_schema_def 842 if is_view 843 else self.COMMENT_CREATION_TABLE.supports_schema_def 844 ) 845 return [ 846 self._build_column_def( 847 column, 848 column_descriptions=column_descriptions, 849 engine_supports_schema_comments=engine_supports_schema_comments, 850 col_type=None if is_view else kind, # don't include column data type for views 851 ) 852 for column, kind in target_columns_to_types.items() 853 ] 854 855 def _build_column_def( 856 self, 857 col_name: str, 858 column_descriptions: t.Optional[t.Dict[str, str]] = None, 859 engine_supports_schema_comments: bool = False, 860 col_type: t.Optional[exp.DATA_TYPE] = None, 861 nested_names: t.List[str] = [], 862 ) -> exp.ColumnDef: 863 return exp.ColumnDef( 864 this=exp.to_identifier(col_name), 865 kind=col_type, 866 constraints=( 867 self._build_col_comment_exp(col_name, column_descriptions) 868 if engine_supports_schema_comments and self.comments_enabled and column_descriptions 869 else None 870 ), 871 ) 872 873 def _build_col_comment_exp( 874 self, col_name: str, column_descriptions: t.Dict[str, str] 875 ) -> t.List[exp.ColumnConstraint]: 876 comment = column_descriptions.get(col_name, None) 877 if comment: 878 return [ 879 exp.ColumnConstraint( 880 kind=exp.CommentColumnConstraint( 881 this=exp.Literal.string(self._truncate_column_comment(comment)) 882 ) 883 ) 884 ] 885 return [] 886 887 def _create_table_from_source_queries( 888 self, 889 table_name: TableName, 890 source_queries: t.List[SourceQuery], 891 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 892 exists: bool = True, 893 replace: bool = False, 894 table_description: t.Optional[str] = None, 895 column_descriptions: t.Optional[t.Dict[str, str]] = None, 896 table_kind: t.Optional[str] = None, 897 track_rows_processed: bool = True, 898 **kwargs: t.Any, 899 ) -> None: 900 table = exp.to_table(table_name) 901 902 # CTAS calls do not usually include a schema expression. However, most engines 903 # permit them in CTAS expressions, and they allow us to register all column comments 904 # in a single call rather than in a separate comment command call for each column. 905 # 906 # This block conditionally builds a schema expression with column comments if the engine 907 # supports it and we have columns_to_types. column_to_types is required because the 908 # schema expression must include at least column name, data type, and the comment - 909 # for example, `(colname INTEGER COMMENT 'comment')`. 910 # 911 # column_to_types will be available when loading from a DataFrame (by converting from 912 # pandas to SQL types), when a model is "annotated" by explicitly specifying column 913 # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()` 914 # calls and SCD Type 2 model calls. 915 schema = None 916 target_columns_to_types_known = target_columns_to_types and columns_to_types_all_known( 917 target_columns_to_types 918 ) 919 if ( 920 column_descriptions 921 and target_columns_to_types_known 922 and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas 923 and self.comments_enabled 924 ): 925 schema = self._build_schema_exp(table, target_columns_to_types, column_descriptions) # type: ignore 926 927 with self.transaction(condition=len(source_queries) > 1): 928 for i, source_query in enumerate(source_queries): 929 with source_query as query: 930 if target_columns_to_types and target_columns_to_types_known: 931 query = self._order_projections_and_filter( 932 query, target_columns_to_types, coerce_types=True 933 ) 934 if i == 0: 935 self._create_table( 936 schema if schema else table, 937 query, 938 target_columns_to_types=target_columns_to_types, 939 exists=exists, 940 replace=replace, 941 table_description=table_description, 942 table_kind=table_kind, 943 track_rows_processed=track_rows_processed, 944 **kwargs, 945 ) 946 else: 947 self._insert_append_query( 948 table_name, 949 query, 950 target_columns_to_types or self.columns(table), 951 track_rows_processed=track_rows_processed, 952 ) 953 954 # Register comments with commands if the engine supports comments and we weren't able to 955 # register them with the CTAS call's schema expression. 956 if ( 957 table_description 958 and self.COMMENT_CREATION_TABLE.is_comment_command_only 959 and self.comments_enabled 960 ): 961 self._create_table_comment(table_name, table_description) 962 if column_descriptions and schema is None and self.comments_enabled: 963 self._create_column_comments(table_name, column_descriptions) 964 965 def _create_table( 966 self, 967 table_name_or_schema: t.Union[exp.Schema, TableName], 968 expression: t.Optional[exp.Expr], 969 exists: bool = True, 970 replace: bool = False, 971 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 972 table_description: t.Optional[str] = None, 973 column_descriptions: t.Optional[t.Dict[str, str]] = None, 974 table_kind: t.Optional[str] = None, 975 track_rows_processed: bool = True, 976 **kwargs: t.Any, 977 ) -> None: 978 self.execute( 979 self._build_create_table_exp( 980 table_name_or_schema, 981 expression=expression, 982 exists=exists, 983 replace=replace, 984 target_columns_to_types=target_columns_to_types, 985 table_description=( 986 table_description 987 if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled 988 else None 989 ), 990 table_kind=table_kind, 991 **kwargs, 992 ), 993 track_rows_processed=track_rows_processed, 994 ) 995 # Extract table name to clear cache 996 table_name = ( 997 table_name_or_schema.this 998 if isinstance(table_name_or_schema, exp.Schema) 999 else table_name_or_schema 1000 ) 1001 self._clear_data_object_cache(table_name) 1002 1003 def _build_create_table_exp( 1004 self, 1005 table_name_or_schema: t.Union[exp.Schema, TableName], 1006 expression: t.Optional[exp.Expr], 1007 exists: bool = True, 1008 replace: bool = False, 1009 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1010 table_description: t.Optional[str] = None, 1011 table_kind: t.Optional[str] = None, 1012 **kwargs: t.Any, 1013 ) -> exp.Create: 1014 exists = False if replace else exists 1015 catalog_name = None 1016 if not isinstance(table_name_or_schema, exp.Schema): 1017 table_name_or_schema = exp.to_table(table_name_or_schema) 1018 catalog_name = table_name_or_schema.catalog 1019 else: 1020 if isinstance(table_name_or_schema.this, exp.Table): 1021 catalog_name = table_name_or_schema.this.catalog 1022 1023 properties = ( 1024 self._build_table_properties_exp( 1025 **kwargs, 1026 catalog_name=catalog_name, 1027 target_columns_to_types=target_columns_to_types, 1028 table_description=table_description, 1029 table_kind=table_kind, 1030 ) 1031 if kwargs or table_description 1032 else None 1033 ) 1034 return exp.Create( 1035 this=table_name_or_schema, 1036 kind=table_kind or "TABLE", 1037 replace=replace, 1038 exists=exists, 1039 expression=expression, 1040 properties=properties, 1041 ) 1042 1043 def create_table_like( 1044 self, 1045 target_table_name: TableName, 1046 source_table_name: TableName, 1047 exists: bool = True, 1048 **kwargs: t.Any, 1049 ) -> None: 1050 """Create a table to store SQLMesh internal state based on the definition of another table, including any 1051 column attributes and indexes defined in the original table. 1052 1053 Args: 1054 target_table_name: The name of the table to create. Can be fully qualified or just table name. 1055 source_table_name: The name of the table to base the new table on. 1056 """ 1057 self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs) 1058 self._clear_data_object_cache(target_table_name) 1059 1060 def clone_table( 1061 self, 1062 target_table_name: TableName, 1063 source_table_name: TableName, 1064 replace: bool = False, 1065 exists: bool = True, 1066 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 1067 **kwargs: t.Any, 1068 ) -> None: 1069 """Creates a table with the target name by cloning the source table. 1070 1071 Args: 1072 target_table_name: The name of the table that should be created. 1073 source_table_name: The name of the source table that should be cloned. 1074 replace: Whether or not to replace an existing table. 1075 exists: Indicates whether to include the IF NOT EXISTS check. 1076 """ 1077 if not self.SUPPORTS_CLONING: 1078 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 1079 1080 kwargs.pop("rendered_physical_properties", None) 1081 self.execute( 1082 exp.Create( 1083 this=exp.to_table(target_table_name), 1084 kind="TABLE", 1085 replace=replace, 1086 exists=exists, 1087 clone=exp.Clone( 1088 this=exp.to_table(source_table_name), 1089 **(clone_kwargs or {}), 1090 ), 1091 **kwargs, 1092 ) 1093 ) 1094 self._clear_data_object_cache(target_table_name) 1095 1096 def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None: 1097 """Drops a data object of arbitrary type. 1098 1099 Args: 1100 data_object: The data object to drop. 1101 ignore_if_not_exists: If True, no error will be raised if the data object does not exist. 1102 """ 1103 if data_object.type.is_view: 1104 self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists) 1105 elif data_object.type.is_materialized_view: 1106 self.drop_view( 1107 data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True 1108 ) 1109 elif data_object.type.is_table: 1110 self.drop_table(data_object.to_table(), exists=ignore_if_not_exists) 1111 elif data_object.type.is_managed_table: 1112 self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists) 1113 else: 1114 raise SQLMeshError( 1115 f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" 1116 ) 1117 1118 def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: 1119 """Drops a table. 1120 1121 Args: 1122 table_name: The name of the table to drop. 1123 exists: If exists, defaults to True. 1124 """ 1125 self._drop_object(name=table_name, exists=exists, **kwargs) 1126 1127 def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: 1128 """Drops a managed table. 1129 1130 Args: 1131 table_name: The name of the table to drop. 1132 exists: If exists, defaults to True. 1133 """ 1134 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 1135 1136 def _drop_object( 1137 self, 1138 name: TableName | SchemaName, 1139 exists: bool = True, 1140 kind: str = "TABLE", 1141 cascade: bool = False, 1142 **drop_args: t.Any, 1143 ) -> None: 1144 """Drops an object. 1145 1146 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 1147 1148 Args: 1149 name: The name of the table to drop. 1150 exists: If exists, defaults to True. 1151 kind: What kind of object to drop. Defaults to TABLE 1152 cascade: Whether or not to DROP ... CASCADE. 1153 Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS 1154 **drop_args: Any extra arguments to set on the Drop expression 1155 """ 1156 if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS: 1157 drop_args["cascade"] = cascade 1158 1159 self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args)) 1160 self._clear_data_object_cache(name) 1161 1162 def get_alter_operations( 1163 self, 1164 current_table_name: TableName, 1165 target_table_name: TableName, 1166 *, 1167 ignore_destructive: bool = False, 1168 ignore_additive: bool = False, 1169 ) -> t.List[TableAlterOperation]: 1170 """ 1171 Determines the alter statements needed to change the current table into the structure of the target table. 1172 """ 1173 return t.cast( 1174 t.List[TableAlterOperation], 1175 self.schema_differ.compare_columns( 1176 current_table_name, 1177 self.columns(current_table_name), 1178 self.columns(target_table_name), 1179 ignore_destructive=ignore_destructive, 1180 ignore_additive=ignore_additive, 1181 ), 1182 ) 1183 1184 def alter_table( 1185 self, 1186 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 1187 ) -> None: 1188 """ 1189 Performs the alter statements to change the current table into the structure of the target table. 1190 """ 1191 with self.transaction(): 1192 for alter_expression in [ 1193 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 1194 ]: 1195 self.execute(alter_expression) 1196 1197 def create_view( 1198 self, 1199 view_name: TableName, 1200 query_or_df: QueryOrDF, 1201 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1202 replace: bool = True, 1203 materialized: bool = False, 1204 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 1205 table_description: t.Optional[str] = None, 1206 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1207 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 1208 source_columns: t.Optional[t.List[str]] = None, 1209 **create_kwargs: t.Any, 1210 ) -> None: 1211 """Create a view with a query or dataframe. 1212 1213 If a dataframe is passed in, it will be converted into a literal values statement. 1214 This should only be done if the dataframe is very small! 1215 1216 Args: 1217 view_name: The view name. 1218 query_or_df: A query or dataframe. 1219 target_columns_to_types: Columns to use in the view statement. 1220 replace: Whether or not to replace an existing view defaults to True. 1221 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 1222 materialized_properties: Optional materialized view properties to add to the view. 1223 table_description: Optional table description from MODEL DDL. 1224 column_descriptions: Optional column descriptions from model query. 1225 view_properties: Optional view properties to add to the view. 1226 create_kwargs: Additional kwargs to pass into the Create expression 1227 """ 1228 import pandas as pd 1229 1230 if materialized_properties and not materialized: 1231 raise SQLMeshError("Materialized properties are only supported for materialized views") 1232 1233 query_or_df = self._native_df_to_pandas_df(query_or_df) 1234 1235 if isinstance(query_or_df, pd.DataFrame): 1236 values: t.List[t.Tuple[t.Any, ...]] = list( 1237 query_or_df.itertuples(index=False, name=None) 1238 ) 1239 target_columns_to_types, source_columns = self._columns_to_types( 1240 query_or_df, target_columns_to_types, source_columns 1241 ) 1242 if not target_columns_to_types: 1243 raise SQLMeshError("columns_to_types must be provided for dataframes") 1244 source_columns_to_types = get_source_columns_to_types( 1245 target_columns_to_types, source_columns 1246 ) 1247 query_or_df = self._values_to_sql( 1248 values, 1249 source_columns_to_types, 1250 batch_start=0, 1251 batch_end=len(values), 1252 ) 1253 1254 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1255 query_or_df, 1256 target_columns_to_types, 1257 batch_size=0, 1258 target_table=view_name, 1259 source_columns=source_columns, 1260 ) 1261 if len(source_queries) != 1: 1262 raise SQLMeshError("Only one source query is supported for creating views") 1263 1264 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 1265 if target_columns_to_types: 1266 schema = self._build_schema_exp( 1267 exp.to_table(view_name), 1268 target_columns_to_types, 1269 column_descriptions, 1270 is_view=True, 1271 materialized=materialized, 1272 ) 1273 1274 properties = create_kwargs.pop("properties", None) 1275 if not properties: 1276 properties = exp.Properties(expressions=[]) 1277 1278 if view_properties: 1279 table_type = self._pop_creatable_type_from_properties(view_properties) 1280 if table_type: 1281 properties.append("expressions", table_type) 1282 1283 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 1284 properties.append("expressions", exp.MaterializedProperty()) 1285 1286 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1287 schema = schema.this 1288 1289 if not self.SUPPORTS_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1290 schema = schema.this 1291 1292 if materialized_properties: 1293 partitioned_by = materialized_properties.pop("partitioned_by", None) 1294 clustered_by = materialized_properties.pop("clustered_by", None) 1295 if ( 1296 partitioned_by 1297 and ( 1298 partitioned_by_prop := self._build_partitioned_by_exp( 1299 partitioned_by, **materialized_properties 1300 ) 1301 ) 1302 is not None 1303 ): 1304 materialized_properties["catalog_name"] = exp.to_table(view_name).catalog 1305 properties.append("expressions", partitioned_by_prop) 1306 if ( 1307 clustered_by 1308 and ( 1309 clustered_by_prop := self._build_clustered_by_exp( 1310 clustered_by, **materialized_properties 1311 ) 1312 ) 1313 is not None 1314 ): 1315 properties.append("expressions", clustered_by_prop) 1316 1317 create_view_properties = self._build_view_properties_exp( 1318 view_properties, 1319 ( 1320 table_description 1321 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 1322 else None 1323 ), 1324 physical_cluster=create_kwargs.pop("physical_cluster", None), 1325 ) 1326 if create_view_properties: 1327 for view_property in create_view_properties.expressions: 1328 # Small hack to make sure SECURE goes at the beginning before materialized as required by Snowflake 1329 if isinstance(view_property, exp.SecureProperty): 1330 properties.set("expressions", view_property, index=0, overwrite=False) 1331 else: 1332 properties.append("expressions", view_property) 1333 1334 if properties.expressions: 1335 create_kwargs["properties"] = properties 1336 1337 if replace: 1338 self.drop_data_object_on_type_mismatch( 1339 self.get_data_object(view_name), 1340 DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, 1341 ) 1342 1343 with source_queries[0] as query: 1344 self.execute( 1345 exp.Create( 1346 this=schema, 1347 kind="VIEW", 1348 replace=replace, 1349 expression=query, 1350 **create_kwargs, 1351 ), 1352 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 1353 ) 1354 1355 self._clear_data_object_cache(view_name) 1356 1357 # Register table comment with commands if the engine doesn't support doing it in CREATE 1358 if ( 1359 table_description 1360 and self.COMMENT_CREATION_VIEW.is_comment_command_only 1361 and self.comments_enabled 1362 ): 1363 self._create_table_comment(view_name, table_description, "VIEW") 1364 # Register column comments with commands if the engine doesn't support doing it in 1365 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 1366 # columns_to_types 1367 if ( 1368 column_descriptions 1369 and ( 1370 self.COMMENT_CREATION_VIEW.is_comment_command_only 1371 or ( 1372 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 1373 and not target_columns_to_types 1374 ) 1375 ) 1376 and self.comments_enabled 1377 ): 1378 self._create_column_comments(view_name, column_descriptions, "VIEW", materialized) 1379 1380 @set_catalog() 1381 def create_schema( 1382 self, 1383 schema_name: SchemaName, 1384 ignore_if_exists: bool = True, 1385 warn_on_error: bool = True, 1386 properties: t.Optional[t.List[exp.Expr]] = None, 1387 ) -> None: 1388 properties = properties or [] 1389 return self._create_schema( 1390 schema_name=schema_name, 1391 ignore_if_exists=ignore_if_exists, 1392 warn_on_error=warn_on_error, 1393 properties=properties, 1394 kind="SCHEMA", 1395 ) 1396 1397 def _create_schema( 1398 self, 1399 schema_name: SchemaName, 1400 ignore_if_exists: bool, 1401 warn_on_error: bool, 1402 properties: t.List[exp.Expr], 1403 kind: str, 1404 ) -> None: 1405 """Create a schema from a name or qualified table name.""" 1406 try: 1407 self.execute( 1408 exp.Create( 1409 this=to_schema(schema_name), 1410 kind=kind, 1411 exists=ignore_if_exists, 1412 properties=exp.Properties( # this renders as '' (empty string) if expressions is empty 1413 expressions=properties 1414 ), 1415 ) 1416 ) 1417 except Exception as e: 1418 if not warn_on_error: 1419 raise 1420 logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e) 1421 1422 def drop_schema( 1423 self, 1424 schema_name: SchemaName, 1425 ignore_if_not_exists: bool = True, 1426 cascade: bool = False, 1427 **drop_args: t.Dict[str, exp.Expr], 1428 ) -> None: 1429 return self._drop_object( 1430 name=schema_name, 1431 exists=ignore_if_not_exists, 1432 kind="SCHEMA", 1433 cascade=cascade, 1434 **drop_args, 1435 ) 1436 1437 def drop_view( 1438 self, 1439 view_name: TableName, 1440 ignore_if_not_exists: bool = True, 1441 materialized: bool = False, 1442 **kwargs: t.Any, 1443 ) -> None: 1444 """Drop a view.""" 1445 self._drop_object( 1446 name=view_name, 1447 exists=ignore_if_not_exists, 1448 kind="VIEW", 1449 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 1450 **kwargs, 1451 ) 1452 1453 def create_catalog(self, catalog_name: str | exp.Identifier) -> None: 1454 return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1455 1456 def _create_catalog(self, catalog_name: exp.Identifier) -> None: 1457 raise SQLMeshError( 1458 f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1459 ) 1460 1461 def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: 1462 return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1463 1464 def _drop_catalog(self, catalog_name: exp.Identifier) -> None: 1465 raise SQLMeshError( 1466 f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1467 ) 1468 1469 def columns( 1470 self, table_name: TableName, include_pseudo_columns: bool = False 1471 ) -> t.Dict[str, exp.DataType]: 1472 """Fetches column names and types for the target table.""" 1473 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 1474 describe_output = self.cursor.fetchall() 1475 return { 1476 # Note: MySQL returns the column type as bytes. 1477 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 1478 for column_name, column_type, *_ in itertools.takewhile( 1479 lambda t: not t[0].startswith("#"), 1480 describe_output, 1481 ) 1482 if column_name and column_name.strip() and column_type and column_type.strip() 1483 } 1484 1485 def table_exists(self, table_name: TableName) -> bool: 1486 table = exp.to_table(table_name) 1487 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 1488 if data_object_cache_key in self._data_object_cache: 1489 logger.debug("Table existence cache hit: %s", data_object_cache_key) 1490 return self._data_object_cache[data_object_cache_key] is not None 1491 1492 try: 1493 self.execute(exp.Describe(this=table, kind="TABLE")) 1494 return True 1495 except Exception: 1496 return False 1497 1498 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None: 1499 self.execute(exp.delete(table_name, where)) 1500 1501 def insert_append( 1502 self, 1503 table_name: TableName, 1504 query_or_df: QueryOrDF, 1505 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1506 track_rows_processed: bool = True, 1507 source_columns: t.Optional[t.List[str]] = None, 1508 ) -> None: 1509 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1510 query_or_df, 1511 target_columns_to_types, 1512 target_table=table_name, 1513 source_columns=source_columns, 1514 ) 1515 self._insert_append_source_queries( 1516 table_name, source_queries, target_columns_to_types, track_rows_processed 1517 ) 1518 1519 def _insert_append_source_queries( 1520 self, 1521 table_name: TableName, 1522 source_queries: t.List[SourceQuery], 1523 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1524 track_rows_processed: bool = True, 1525 ) -> None: 1526 with self.transaction(condition=len(source_queries) > 0): 1527 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1528 for source_query in source_queries: 1529 with source_query as query: 1530 self._insert_append_query( 1531 table_name, 1532 query, 1533 target_columns_to_types, 1534 track_rows_processed=track_rows_processed, 1535 ) 1536 1537 def _insert_append_query( 1538 self, 1539 table_name: TableName, 1540 query: Query, 1541 target_columns_to_types: t.Dict[str, exp.DataType], 1542 order_projections: bool = True, 1543 track_rows_processed: bool = True, 1544 ) -> None: 1545 if order_projections: 1546 query = self._order_projections_and_filter(query, target_columns_to_types) 1547 self.execute( 1548 exp.insert(query, table_name, columns=list(target_columns_to_types)), 1549 track_rows_processed=track_rows_processed, 1550 ) 1551 1552 def insert_overwrite_by_partition( 1553 self, 1554 table_name: TableName, 1555 query_or_df: QueryOrDF, 1556 partitioned_by: t.List[exp.Expr], 1557 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1558 source_columns: t.Optional[t.List[str]] = None, 1559 ) -> None: 1560 if self.INSERT_OVERWRITE_STRATEGY.is_insert_overwrite: 1561 target_table = exp.to_table(table_name) 1562 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1563 query_or_df, 1564 target_columns_to_types, 1565 target_table=target_table, 1566 source_columns=source_columns, 1567 ) 1568 self._insert_overwrite_by_condition( 1569 table_name, source_queries, target_columns_to_types=target_columns_to_types 1570 ) 1571 else: 1572 self._replace_by_key( 1573 table_name, 1574 query_or_df, 1575 target_columns_to_types, 1576 partitioned_by, 1577 is_unique_key=False, 1578 source_columns=source_columns, 1579 ) 1580 1581 def insert_overwrite_by_time_partition( 1582 self, 1583 table_name: TableName, 1584 query_or_df: QueryOrDF, 1585 start: TimeLike, 1586 end: TimeLike, 1587 time_formatter: t.Callable[[TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expr], 1588 time_column: TimeColumn | exp.Expr | str, 1589 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1590 source_columns: t.Optional[t.List[str]] = None, 1591 **kwargs: t.Any, 1592 ) -> None: 1593 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1594 query_or_df, 1595 target_columns_to_types, 1596 target_table=table_name, 1597 source_columns=source_columns, 1598 ) 1599 if not target_columns_to_types or not columns_to_types_all_known(target_columns_to_types): 1600 target_columns_to_types = self.columns(table_name) 1601 low, high = [ 1602 time_formatter(dt, target_columns_to_types) 1603 for dt in make_inclusive(start, end, self.dialect) 1604 ] 1605 if isinstance(time_column, TimeColumn): 1606 time_column = time_column.column 1607 where = exp.Between( 1608 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1609 low=low, 1610 high=high, 1611 ) 1612 return self._insert_overwrite_by_time_partition( 1613 table_name, source_queries, target_columns_to_types, where, **kwargs 1614 ) 1615 1616 def _insert_overwrite_by_time_partition( 1617 self, 1618 table_name: TableName, 1619 source_queries: t.List[SourceQuery], 1620 target_columns_to_types: t.Dict[str, exp.DataType], 1621 where: exp.Condition, 1622 **kwargs: t.Any, 1623 ) -> None: 1624 return self._insert_overwrite_by_condition( 1625 table_name, source_queries, target_columns_to_types, where, **kwargs 1626 ) 1627 1628 def _values_to_sql( 1629 self, 1630 values: t.List[t.Tuple[t.Any, ...]], 1631 target_columns_to_types: t.Dict[str, exp.DataType], 1632 batch_start: int, 1633 batch_end: int, 1634 alias: str = "t", 1635 source_columns: t.Optional[t.List[str]] = None, 1636 ) -> Query: 1637 return select_from_values_for_batch_range( 1638 values=values, 1639 target_columns_to_types=target_columns_to_types, 1640 batch_start=batch_start, 1641 batch_end=batch_end, 1642 alias=alias, 1643 source_columns=source_columns, 1644 ) 1645 1646 def _insert_overwrite_by_condition( 1647 self, 1648 table_name: TableName, 1649 source_queries: t.List[SourceQuery], 1650 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1651 where: t.Optional[exp.Condition] = None, 1652 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 1653 **kwargs: t.Any, 1654 ) -> None: 1655 table = exp.to_table(table_name) 1656 insert_overwrite_strategy = ( 1657 insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY 1658 ) 1659 with self.transaction( 1660 condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert 1661 ): 1662 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1663 for i, source_query in enumerate(source_queries): 1664 with source_query as query: 1665 query = self._order_projections_and_filter( 1666 query, target_columns_to_types, where=where 1667 ) 1668 if i > 0 or insert_overwrite_strategy.is_delete_insert: 1669 if i == 0: 1670 self.delete_from(table_name, where=where or exp.true()) 1671 self._insert_append_query( 1672 table_name, 1673 query, 1674 target_columns_to_types=target_columns_to_types, 1675 order_projections=False, 1676 ) 1677 elif insert_overwrite_strategy.is_merge: 1678 columns = [exp.column(col) for col in target_columns_to_types] 1679 when_not_matched_by_source = exp.When( 1680 matched=False, 1681 source=True, 1682 condition=where, 1683 then=exp.Delete(), 1684 ) 1685 when_not_matched_by_target = exp.When( 1686 matched=False, 1687 source=False, 1688 then=exp.Insert( 1689 this=exp.Tuple(expressions=columns), 1690 expression=exp.Tuple(expressions=columns), 1691 ), 1692 ) 1693 self._merge( 1694 target_table=table_name, 1695 query=query, 1696 on=exp.false(), 1697 whens=exp.Whens( 1698 expressions=[when_not_matched_by_source, when_not_matched_by_target] 1699 ), 1700 ) 1701 else: 1702 insert_exp = exp.insert( 1703 query, 1704 table, 1705 columns=( 1706 list(target_columns_to_types) 1707 if not insert_overwrite_strategy.is_replace_where 1708 else None 1709 ), 1710 overwrite=insert_overwrite_strategy.is_insert_overwrite, 1711 ) 1712 if insert_overwrite_strategy.is_replace_where: 1713 insert_exp.set("where", where or exp.true()) 1714 self.execute(insert_exp, track_rows_processed=True) 1715 1716 def update_table( 1717 self, 1718 table_name: TableName, 1719 properties: t.Dict[str, t.Any], 1720 where: t.Optional[str | exp.Condition] = None, 1721 ) -> None: 1722 self.execute(exp.update(table_name, properties, where=where)) 1723 1724 def _merge( 1725 self, 1726 target_table: TableName, 1727 query: Query, 1728 on: exp.Expr, 1729 whens: exp.Whens, 1730 ) -> None: 1731 this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True) 1732 using = exp.alias_( 1733 exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True 1734 ) 1735 self.execute( 1736 exp.Merge(this=this, using=using, on=on, whens=whens), track_rows_processed=True 1737 ) 1738 1739 def scd_type_2_by_time( 1740 self, 1741 target_table: TableName, 1742 source_table: QueryOrDF, 1743 unique_key: t.Sequence[exp.Expr], 1744 valid_from_col: exp.Column, 1745 valid_to_col: exp.Column, 1746 execution_time: t.Union[TimeLike, exp.Column], 1747 updated_at_col: exp.Column, 1748 invalidate_hard_deletes: bool = True, 1749 updated_at_as_valid_from: bool = False, 1750 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1751 table_description: t.Optional[str] = None, 1752 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1753 truncate: bool = False, 1754 source_columns: t.Optional[t.List[str]] = None, 1755 **kwargs: t.Any, 1756 ) -> None: 1757 self._scd_type_2( 1758 target_table=target_table, 1759 source_table=source_table, 1760 unique_key=unique_key, 1761 valid_from_col=valid_from_col, 1762 valid_to_col=valid_to_col, 1763 execution_time=execution_time, 1764 updated_at_col=updated_at_col, 1765 invalidate_hard_deletes=invalidate_hard_deletes, 1766 updated_at_as_valid_from=updated_at_as_valid_from, 1767 target_columns_to_types=target_columns_to_types, 1768 table_description=table_description, 1769 column_descriptions=column_descriptions, 1770 truncate=truncate, 1771 source_columns=source_columns, 1772 **kwargs, 1773 ) 1774 1775 def scd_type_2_by_column( 1776 self, 1777 target_table: TableName, 1778 source_table: QueryOrDF, 1779 unique_key: t.Sequence[exp.Expr], 1780 valid_from_col: exp.Column, 1781 valid_to_col: exp.Column, 1782 execution_time: t.Union[TimeLike, exp.Column], 1783 check_columns: t.Union[exp.Star, t.Sequence[exp.Expr]], 1784 invalidate_hard_deletes: bool = True, 1785 execution_time_as_valid_from: bool = False, 1786 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1787 table_description: t.Optional[str] = None, 1788 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1789 truncate: bool = False, 1790 source_columns: t.Optional[t.List[str]] = None, 1791 **kwargs: t.Any, 1792 ) -> None: 1793 self._scd_type_2( 1794 target_table=target_table, 1795 source_table=source_table, 1796 unique_key=unique_key, 1797 valid_from_col=valid_from_col, 1798 valid_to_col=valid_to_col, 1799 execution_time=execution_time, 1800 check_columns=check_columns, 1801 target_columns_to_types=target_columns_to_types, 1802 invalidate_hard_deletes=invalidate_hard_deletes, 1803 execution_time_as_valid_from=execution_time_as_valid_from, 1804 table_description=table_description, 1805 column_descriptions=column_descriptions, 1806 truncate=truncate, 1807 source_columns=source_columns, 1808 **kwargs, 1809 ) 1810 1811 def _scd_type_2( 1812 self, 1813 target_table: TableName, 1814 source_table: QueryOrDF, 1815 unique_key: t.Sequence[exp.Expr], 1816 valid_from_col: exp.Column, 1817 valid_to_col: exp.Column, 1818 execution_time: t.Union[TimeLike, exp.Column], 1819 invalidate_hard_deletes: bool = True, 1820 updated_at_col: t.Optional[exp.Column] = None, 1821 check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Expr]]] = None, 1822 updated_at_as_valid_from: bool = False, 1823 execution_time_as_valid_from: bool = False, 1824 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1825 table_description: t.Optional[str] = None, 1826 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1827 truncate: bool = False, 1828 source_columns: t.Optional[t.List[str]] = None, 1829 **kwargs: t.Any, 1830 ) -> None: 1831 def remove_managed_columns( 1832 cols_to_types: t.Dict[str, exp.DataType], 1833 ) -> t.Dict[str, exp.DataType]: 1834 return { 1835 k: v for k, v in cols_to_types.items() if k not in {valid_from_name, valid_to_name} 1836 } 1837 1838 valid_from_name = valid_from_col.name 1839 valid_to_name = valid_to_col.name 1840 target_columns_to_types = target_columns_to_types or self.columns(target_table) 1841 if ( 1842 valid_from_name not in target_columns_to_types 1843 or valid_to_name not in target_columns_to_types 1844 or not columns_to_types_all_known(target_columns_to_types) 1845 ): 1846 target_columns_to_types = self.columns(target_table) 1847 unmanaged_columns_to_types = ( 1848 remove_managed_columns(target_columns_to_types) if target_columns_to_types else None 1849 ) 1850 source_queries, unmanaged_columns_to_types = self._get_source_queries_and_columns_to_types( 1851 source_table, 1852 unmanaged_columns_to_types, 1853 target_table=target_table, 1854 batch_size=0, 1855 source_columns=source_columns, 1856 ) 1857 updated_at_name = updated_at_col.name if updated_at_col else None 1858 if not target_columns_to_types: 1859 raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?") 1860 unmanaged_columns_to_types = unmanaged_columns_to_types or remove_managed_columns( 1861 target_columns_to_types 1862 ) 1863 if not unique_key: 1864 raise SQLMeshError("unique_key must be provided for SCD Type 2") 1865 if check_columns and updated_at_col: 1866 raise SQLMeshError( 1867 "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2" 1868 ) 1869 if check_columns and updated_at_as_valid_from: 1870 raise SQLMeshError( 1871 "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2" 1872 ) 1873 if execution_time_as_valid_from and not check_columns: 1874 raise SQLMeshError( 1875 "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2" 1876 ) 1877 if updated_at_name and updated_at_name not in target_columns_to_types: 1878 raise SQLMeshError( 1879 f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2" 1880 ) 1881 time_data_type = target_columns_to_types[valid_from_name] 1882 select_source_columns: t.List[t.Union[str, exp.Alias]] = [ 1883 col for col in unmanaged_columns_to_types if col != updated_at_name 1884 ] 1885 table_columns = [exp.column(c, quoted=True) for c in target_columns_to_types] 1886 if updated_at_name: 1887 select_source_columns.append( 1888 exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this) # type: ignore 1889 ) 1890 1891 # If a star is provided, we include all unmanaged columns in the check. 1892 # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know 1893 # they are equal or not, the extra check is not a problem and we gain simplified logic here. 1894 # If we want to change this, then we just need to check the expressions in unique_key and pull out the 1895 # column names and then remove them from the unmanaged_columns 1896 if check_columns: 1897 # Handle both Star directly and [Star()] (which can happen during serialization/deserialization) 1898 if isinstance(seq_get(ensure_list(check_columns), 0), exp.Star): 1899 check_columns = [exp.column(col) for col in unmanaged_columns_to_types] 1900 execution_ts = ( 1901 exp.cast(execution_time, time_data_type, dialect=self.dialect) 1902 if isinstance(execution_time, exp.Column) 1903 else to_time_column(execution_time, time_data_type, self.dialect, nullable=True) 1904 ) 1905 if updated_at_as_valid_from: 1906 if not updated_at_col: 1907 raise SQLMeshError( 1908 "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2" 1909 ) 1910 update_valid_from_start: t.Union[str, exp.Expr] = updated_at_col 1911 # If using check_columns and the user doesn't always want execution_time for valid from 1912 # then we only use epoch 0 if we are truncating the table and loading rows for the first time. 1913 # All future new rows should have execution time. 1914 elif check_columns and (execution_time_as_valid_from or not truncate): 1915 update_valid_from_start = execution_ts 1916 else: 1917 update_valid_from_start = to_time_column( 1918 "1970-01-01 00:00:00+00:00", time_data_type, self.dialect, nullable=True 1919 ) 1920 insert_valid_from_start = execution_ts if check_columns else updated_at_col # type: ignore 1921 # joined._exists IS NULL is saying "if the row is deleted" 1922 delete_check = ( 1923 exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None 1924 ) 1925 prefixed_valid_to_col = valid_to_col.copy() 1926 prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}") 1927 prefixed_valid_from_col = valid_from_col.copy() 1928 prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}") 1929 if check_columns: 1930 row_check_conditions = [] 1931 for col in check_columns: 1932 col_qualified = col.copy() 1933 col_qualified.set("table", exp.to_identifier("joined")) 1934 1935 t_col = col_qualified.copy() 1936 for column in t_col.find_all(exp.Column): 1937 column.this.set("this", f"t_{column.name}") 1938 1939 row_check_conditions.extend( 1940 [ 1941 col_qualified.neq(t_col), 1942 exp.and_(t_col.is_(exp.Null()), col_qualified.is_(exp.Null()).not_()), 1943 exp.and_(t_col.is_(exp.Null()).not_(), col_qualified.is_(exp.Null())), 1944 ] 1945 ) 1946 row_value_check = exp.or_(*row_check_conditions) 1947 unique_key_conditions = [] 1948 for key in unique_key: 1949 key_qualified = key.copy() 1950 key_qualified.set("table", exp.to_identifier("joined")) 1951 t_key = key_qualified.copy() 1952 for col in t_key.find_all(exp.Column): 1953 col.this.set("this", f"t_{col.name}") 1954 unique_key_conditions.extend( 1955 [t_key.is_(exp.Null()).not_(), key_qualified.is_(exp.Null()).not_()] 1956 ) 1957 unique_key_check = exp.and_(*unique_key_conditions) 1958 # unique_key_check is saying "if the row is updated" 1959 # row_value_check is saying "if the row has changed" 1960 updated_row_filter = exp.and_(unique_key_check, row_value_check) 1961 valid_to_case_stmt = ( 1962 exp.Case() 1963 .when( 1964 exp.and_( 1965 exp.or_( 1966 delete_check, 1967 updated_row_filter, 1968 ) 1969 ), 1970 execution_ts, 1971 ) 1972 .else_(prefixed_valid_to_col) 1973 .as_(valid_to_col.this) 1974 ) 1975 valid_from_case_stmt = exp.func( 1976 "COALESCE", 1977 prefixed_valid_from_col, 1978 update_valid_from_start, 1979 ).as_(valid_from_col.this) 1980 else: 1981 assert updated_at_col is not None 1982 updated_at_col_qualified = updated_at_col.copy() 1983 updated_at_col_qualified.set("table", exp.to_identifier("joined")) 1984 prefixed_updated_at_col = updated_at_col_qualified.copy() 1985 prefixed_updated_at_col.this.set("this", f"t_{updated_at_col_qualified.name}") 1986 updated_row_filter = updated_at_col_qualified > prefixed_updated_at_col 1987 1988 valid_to_case_stmt_builder = exp.Case().when( 1989 updated_row_filter, updated_at_col_qualified 1990 ) 1991 if delete_check: 1992 valid_to_case_stmt_builder = valid_to_case_stmt_builder.when( 1993 delete_check, execution_ts 1994 ) 1995 valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_( 1996 valid_to_col.this 1997 ) 1998 1999 valid_from_case_stmt = ( 2000 exp.Case() 2001 .when( 2002 exp.and_( 2003 prefixed_valid_from_col.is_(exp.Null()), 2004 exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(), 2005 ), 2006 exp.Case() 2007 .when( 2008 exp.column(valid_to_col.this, "latest_deleted") > updated_at_col, 2009 exp.column(valid_to_col.this, "latest_deleted"), 2010 ) 2011 .else_(updated_at_col), 2012 ) 2013 .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start) 2014 .else_(prefixed_valid_from_col) 2015 ).as_(valid_from_col.this) 2016 2017 existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_( 2018 target_table 2019 ) 2020 if truncate: 2021 existing_rows_query = existing_rows_query.limit(0) 2022 2023 with source_queries[0] as source_query: 2024 prefixed_columns_to_types = [] 2025 for column in target_columns_to_types: 2026 prefixed_col = exp.column(column).copy() 2027 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2028 prefixed_columns_to_types.append(prefixed_col) 2029 prefixed_unmanaged_columns = [] 2030 for column in unmanaged_columns_to_types: 2031 prefixed_col = exp.column(column).copy() 2032 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2033 prefixed_unmanaged_columns.append(prefixed_col) 2034 query = ( 2035 exp.Select() # type: ignore 2036 .select(*table_columns) 2037 .from_("static") 2038 .union( 2039 exp.select(*table_columns).from_("updated_rows"), 2040 distinct=False, 2041 ) 2042 .union( 2043 exp.select(*table_columns).from_("inserted_rows"), 2044 distinct=False, 2045 ) 2046 .with_( 2047 "source", 2048 exp.select(exp.true().as_("_exists"), *select_source_columns) 2049 .distinct(*unique_key) 2050 .from_( 2051 self.use_server_nulls_for_unmatched_after_join(source_query).subquery( # type: ignore 2052 "raw_source" 2053 ) 2054 ), 2055 ) 2056 # Historical Records that Do Not Change 2057 .with_( 2058 "static", 2059 existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), 2060 ) 2061 # Latest Records that can be updated 2062 .with_( 2063 "latest", 2064 existing_rows_query.where(valid_to_col.is_(exp.Null())), 2065 ) 2066 # Deleted records which can be used to determine `valid_from` for undeleted source records 2067 .with_( 2068 "deleted", 2069 exp.select(*[exp.column(col, "static") for col in target_columns_to_types]) 2070 .from_("static") 2071 .join( 2072 "latest", 2073 on=exp.and_( 2074 *[ 2075 add_table(key, "static").eq(add_table(key, "latest")) 2076 for key in unique_key 2077 ] 2078 ), 2079 join_type="left", 2080 ) 2081 .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())), 2082 ) 2083 # Get the latest `valid_to` deleted record for each unique key 2084 .with_( 2085 "latest_deleted", 2086 exp.select( 2087 exp.true().as_("_exists"), 2088 *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)), 2089 exp.Max(this=valid_to_col).as_(valid_to_col.this), 2090 ) 2091 .from_("deleted") 2092 .group_by(*unique_key), 2093 ) 2094 # Do a full join between latest records and source table in order to combine them together 2095 # MySQL doesn't support full join so going to do a left then right join and remove dups with union 2096 # We do a left/right and filter right on only matching to remove the need to do union distinct 2097 # which allows scd type 2 to be compatible with unhashable data types 2098 .with_( 2099 "joined", 2100 exp.select( 2101 exp.column("_exists", table="source").as_("_exists"), 2102 *( 2103 exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this) 2104 for i, col in enumerate(target_columns_to_types) 2105 ), 2106 *( 2107 exp.column(col, table="source").as_(col) 2108 for col in unmanaged_columns_to_types 2109 ), 2110 ) 2111 .from_("latest") 2112 .join( 2113 "source", 2114 on=exp.and_( 2115 *[ 2116 add_table(key, "latest").eq(add_table(key, "source")) 2117 for key in unique_key 2118 ] 2119 ), 2120 join_type="left", 2121 ) 2122 .union( 2123 exp.select( 2124 exp.column("_exists", table="source").as_("_exists"), 2125 *( 2126 exp.column(col, table="latest").as_( 2127 prefixed_columns_to_types[i].this 2128 ) 2129 for i, col in enumerate(target_columns_to_types) 2130 ), 2131 *( 2132 exp.column(col, table="source").as_(col) 2133 for col in unmanaged_columns_to_types 2134 ), 2135 ) 2136 .from_("latest") 2137 .join( 2138 "source", 2139 on=exp.and_( 2140 *[ 2141 add_table(key, "latest").eq(add_table(key, "source")) 2142 for key in unique_key 2143 ] 2144 ), 2145 join_type="right", 2146 ) 2147 .where(exp.column("_exists", table="latest").is_(exp.Null())), 2148 distinct=False, 2149 ), 2150 ) 2151 # Get deleted, new, no longer current, or unchanged records 2152 .with_( 2153 "updated_rows", 2154 exp.select( 2155 *( 2156 exp.func( 2157 "COALESCE", 2158 exp.column(prefixed_unmanaged_columns[i].this, table="joined"), 2159 exp.column(col, table="joined"), 2160 ).as_(col) 2161 for i, col in enumerate(unmanaged_columns_to_types) 2162 ), 2163 valid_from_case_stmt, 2164 valid_to_case_stmt, 2165 ) 2166 .from_("joined") 2167 .join( 2168 "latest_deleted", 2169 on=exp.and_( 2170 *[ 2171 add_table(part, "joined").eq( 2172 exp.column(f"_key{i}", "latest_deleted") 2173 ) 2174 for i, part in enumerate(unique_key) 2175 ] 2176 ), 2177 join_type="left", 2178 ), 2179 ) 2180 # Get records that have been "updated" which means inserting a new record with previous `valid_from` 2181 .with_( 2182 "inserted_rows", 2183 exp.select( 2184 *unmanaged_columns_to_types, 2185 insert_valid_from_start.as_(valid_from_col.this), # type: ignore 2186 to_time_column(exp.null(), time_data_type, self.dialect, nullable=True).as_( 2187 valid_to_col.this 2188 ), 2189 ) 2190 .from_("joined") 2191 .where(updated_row_filter), 2192 ) 2193 ) 2194 2195 self.replace_query( 2196 target_table, 2197 self.ensure_nulls_for_unmatched_after_join(query), 2198 target_columns_to_types=target_columns_to_types, 2199 table_description=table_description, 2200 column_descriptions=column_descriptions, 2201 **kwargs, 2202 ) 2203 2204 def merge( 2205 self, 2206 target_table: TableName, 2207 source_table: QueryOrDF, 2208 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2209 unique_key: t.Sequence[exp.Expr], 2210 when_matched: t.Optional[exp.Whens] = None, 2211 merge_filter: t.Optional[exp.Expr] = None, 2212 source_columns: t.Optional[t.List[str]] = None, 2213 **kwargs: t.Any, 2214 ) -> None: 2215 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2216 source_table, 2217 target_columns_to_types, 2218 target_table=target_table, 2219 source_columns=source_columns, 2220 ) 2221 target_columns_to_types = target_columns_to_types or self.columns(target_table) 2222 on = exp.and_( 2223 *( 2224 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 2225 for part in unique_key 2226 ) 2227 ) 2228 if merge_filter: 2229 on = exp.and_(merge_filter, on) 2230 2231 if not when_matched: 2232 match_expressions = [ 2233 exp.When( 2234 matched=True, 2235 source=False, 2236 then=exp.Update( 2237 expressions=[ 2238 exp.column(col, MERGE_TARGET_ALIAS).eq( 2239 exp.column(col, MERGE_SOURCE_ALIAS) 2240 ) 2241 for col in target_columns_to_types 2242 ], 2243 ), 2244 ) 2245 ] 2246 else: 2247 match_expressions = when_matched.copy().expressions 2248 2249 match_expressions.append( 2250 exp.When( 2251 matched=False, 2252 source=False, 2253 then=exp.Insert( 2254 this=exp.Tuple( 2255 expressions=[exp.column(col) for col in target_columns_to_types] 2256 ), 2257 expression=exp.Tuple( 2258 expressions=[ 2259 exp.column(col, MERGE_SOURCE_ALIAS) for col in target_columns_to_types 2260 ] 2261 ), 2262 ), 2263 ) 2264 ) 2265 for source_query in source_queries: 2266 with source_query as query: 2267 self._merge( 2268 target_table=target_table, 2269 query=query, 2270 on=on, 2271 whens=exp.Whens(expressions=match_expressions), 2272 ) 2273 2274 def rename_table( 2275 self, 2276 old_table_name: TableName, 2277 new_table_name: TableName, 2278 ) -> None: 2279 new_table = exp.to_table(new_table_name) 2280 if new_table.catalog: 2281 old_table = exp.to_table(old_table_name) 2282 catalog = old_table.catalog or self.get_current_catalog() 2283 if catalog != new_table.catalog: 2284 raise UnsupportedCatalogOperationError( 2285 "Tried to rename table across catalogs which is not supported" 2286 ) 2287 self._rename_table(old_table_name, new_table_name) 2288 self._clear_data_object_cache(old_table_name) 2289 self._clear_data_object_cache(new_table_name) 2290 2291 def get_data_object( 2292 self, target_name: TableName, safe_to_cache: bool = False 2293 ) -> t.Optional[DataObject]: 2294 target_table = exp.to_table(target_name) 2295 existing_data_objects = self.get_data_objects( 2296 schema_(target_table.db, target_table.catalog), 2297 {target_table.name}, 2298 safe_to_cache=safe_to_cache, 2299 ) 2300 if existing_data_objects: 2301 return existing_data_objects[0] 2302 return None 2303 2304 def get_data_objects( 2305 self, 2306 schema_name: SchemaName, 2307 object_names: t.Optional[t.Set[str]] = None, 2308 safe_to_cache: bool = False, 2309 ) -> t.List[DataObject]: 2310 """Lists all data objects in the target schema. 2311 2312 Args: 2313 schema_name: The name of the schema to list data objects from. 2314 object_names: If provided, only return data objects with these names. 2315 safe_to_cache: Whether it is safe to cache the results of this call. 2316 2317 Returns: 2318 A list of data objects in the target schema. 2319 """ 2320 if object_names is not None: 2321 if not object_names: 2322 return [] 2323 2324 # Check cache for each object name 2325 target_schema = to_schema(schema_name) 2326 cached_objects = [] 2327 missing_names = set() 2328 2329 for name in object_names: 2330 cache_key = _get_data_object_cache_key( 2331 target_schema.catalog, target_schema.db, name 2332 ) 2333 if cache_key in self._data_object_cache: 2334 logger.debug("Data object cache hit: %s", cache_key) 2335 data_object = self._data_object_cache[cache_key] 2336 # If the object is none, then the table was previously looked for but not found 2337 if data_object: 2338 cached_objects.append(data_object) 2339 else: 2340 logger.debug("Data object cache miss: %s", cache_key) 2341 missing_names.add(name) 2342 2343 # Fetch missing objects from database 2344 if missing_names: 2345 object_names_list = list(missing_names) 2346 batches = [ 2347 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 2348 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 2349 ] 2350 2351 fetched_objects = [] 2352 fetched_object_names = set() 2353 for batch in batches: 2354 objects = self._get_data_objects(schema_name, set(batch)) 2355 for obj in objects: 2356 if safe_to_cache: 2357 cache_key = _get_data_object_cache_key( 2358 obj.catalog, obj.schema_name, obj.name 2359 ) 2360 self._data_object_cache[cache_key] = obj 2361 fetched_objects.append(obj) 2362 fetched_object_names.add(obj.name) 2363 2364 if safe_to_cache: 2365 for missing_name in missing_names - fetched_object_names: 2366 cache_key = _get_data_object_cache_key( 2367 target_schema.catalog, target_schema.db, missing_name 2368 ) 2369 self._data_object_cache[cache_key] = None 2370 2371 return cached_objects + fetched_objects 2372 2373 return cached_objects 2374 2375 fetched_objects = self._get_data_objects(schema_name) 2376 if safe_to_cache: 2377 for obj in fetched_objects: 2378 cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name) 2379 self._data_object_cache[cache_key] = obj 2380 return fetched_objects 2381 2382 def fetchone( 2383 self, 2384 query: t.Union[exp.Expr, str], 2385 ignore_unsupported_errors: bool = False, 2386 quote_identifiers: bool = False, 2387 ) -> t.Optional[t.Tuple]: 2388 with self.transaction(): 2389 self.execute( 2390 query, 2391 ignore_unsupported_errors=ignore_unsupported_errors, 2392 quote_identifiers=quote_identifiers, 2393 ) 2394 return self.cursor.fetchone() 2395 2396 def fetchall( 2397 self, 2398 query: t.Union[exp.Expr, str], 2399 ignore_unsupported_errors: bool = False, 2400 quote_identifiers: bool = False, 2401 ) -> t.List[t.Tuple]: 2402 with self.transaction(): 2403 self.execute( 2404 query, 2405 ignore_unsupported_errors=ignore_unsupported_errors, 2406 quote_identifiers=quote_identifiers, 2407 ) 2408 return self.cursor.fetchall() 2409 2410 def _fetch_native_df( 2411 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2412 ) -> DF: 2413 """Fetches a DataFrame that can be either Pandas or PySpark from the cursor""" 2414 with self.transaction(): 2415 self.execute(query, quote_identifiers=quote_identifiers) 2416 return self.cursor.fetchdf() 2417 2418 def _native_df_to_pandas_df( 2419 self, 2420 query_or_df: QueryOrDF, 2421 ) -> t.Union[Query, pd.DataFrame]: 2422 """ 2423 Take a "native" DataFrame (eg Pyspark, Bigframe, Snowpark etc) and convert it to Pandas 2424 """ 2425 import pandas as pd 2426 2427 if isinstance(query_or_df, (exp.Query, pd.DataFrame)): 2428 return query_or_df 2429 2430 # EngineAdapter subclasses that have native DataFrame types should override this 2431 raise NotImplementedError(f"Unable to convert {type(query_or_df)} to Pandas") 2432 2433 def fetchdf( 2434 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2435 ) -> pd.DataFrame: 2436 """Fetches a Pandas DataFrame from the cursor""" 2437 import pandas as pd 2438 2439 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 2440 if not isinstance(df, pd.DataFrame): 2441 raise NotImplementedError( 2442 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 2443 ) 2444 return df 2445 2446 def fetch_pyspark_df( 2447 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2448 ) -> PySparkDataFrame: 2449 """Fetches a PySpark DataFrame from the cursor""" 2450 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}") 2451 2452 @property 2453 def wap_enabled(self) -> bool: 2454 """Returns whether WAP is enabled for this engine.""" 2455 return self._extra_config.get("wap_enabled", False) 2456 2457 def wap_supported(self, table_name: TableName) -> bool: 2458 """Returns whether WAP for the target table is supported.""" 2459 return False 2460 2461 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 2462 """Returns the updated table name for the given WAP ID. 2463 2464 Args: 2465 table_name: The name of the target table. 2466 wap_id: The WAP ID to prepare. 2467 2468 Returns: 2469 The updated table name that should be used for writing. 2470 """ 2471 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2472 2473 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 2474 """Prepares the target table for WAP and returns the updated table name. 2475 2476 Args: 2477 table_name: The name of the target table. 2478 wap_id: The WAP ID to prepare. 2479 2480 Returns: 2481 The updated table name that should be used for writing. 2482 """ 2483 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2484 2485 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 2486 """Publishes changes with the given WAP ID to the target table. 2487 2488 Args: 2489 table_name: The name of the target table. 2490 wap_id: The WAP ID to publish. 2491 """ 2492 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2493 2494 def sync_grants_config( 2495 self, 2496 table: exp.Table, 2497 grants_config: GrantsConfig, 2498 table_type: DataObjectType = DataObjectType.TABLE, 2499 ) -> None: 2500 """Applies the grants_config to a table authoritatively. 2501 It first compares the specified grants against the current grants, and then 2502 applies the diffs to the table by revoking and granting privileges as needed. 2503 2504 Args: 2505 table: The table/view to apply grants to. 2506 grants_config: Dictionary mapping privileges to lists of grantees. 2507 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 2508 """ 2509 if not self.SUPPORTS_GRANTS: 2510 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 2511 2512 current_grants = self._get_current_grants_config(table) 2513 new_grants, revoked_grants = self._diff_grants_configs(grants_config, current_grants) 2514 revoke_exprs = self._revoke_grants_config_expr(table, revoked_grants, table_type) 2515 grant_exprs = self._apply_grants_config_expr(table, new_grants, table_type) 2516 dcl_exprs = revoke_exprs + grant_exprs 2517 2518 if dcl_exprs: 2519 self.execute(dcl_exprs) 2520 2521 @contextlib.contextmanager 2522 def transaction( 2523 self, 2524 condition: t.Optional[bool] = None, 2525 ) -> t.Iterator[None]: 2526 """A transaction context manager.""" 2527 if ( 2528 self._connection_pool.is_transaction_active 2529 or not self.SUPPORTS_TRANSACTIONS 2530 or (condition is not None and not condition) 2531 ): 2532 yield 2533 return 2534 2535 if self._pre_ping: 2536 try: 2537 logger.debug("Pinging the database to check the connection") 2538 self.ping() 2539 except Exception: 2540 logger.info("Connection to the database was lost. Reconnecting...") 2541 self._connection_pool.close() 2542 2543 self._connection_pool.begin() 2544 try: 2545 yield 2546 except Exception as e: 2547 self._connection_pool.rollback() 2548 raise e 2549 else: 2550 self._connection_pool.commit() 2551 2552 @contextlib.contextmanager 2553 def session(self, properties: SessionProperties) -> t.Iterator[None]: 2554 """A session context manager.""" 2555 if self._is_session_active(): 2556 yield 2557 return 2558 2559 self._begin_session(properties) 2560 try: 2561 yield 2562 finally: 2563 self._end_session() 2564 2565 def _begin_session(self, properties: SessionProperties) -> t.Any: 2566 """Begin a new session.""" 2567 2568 def _end_session(self) -> None: 2569 """End the existing session.""" 2570 2571 def _is_session_active(self) -> bool: 2572 """Indicates whether or not a session is active.""" 2573 return False 2574 2575 def execute( 2576 self, 2577 expressions: t.Union[str, exp.Expr, t.Sequence[exp.Expr]], 2578 ignore_unsupported_errors: bool = False, 2579 quote_identifiers: bool = True, 2580 track_rows_processed: bool = False, 2581 **kwargs: t.Any, 2582 ) -> None: 2583 """Execute a sql query.""" 2584 to_sql_kwargs = ( 2585 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 2586 ) 2587 with self.transaction(): 2588 for e in ensure_list(expressions): 2589 if isinstance(e, exp.Expr): 2590 self._check_identifier_length(e) 2591 sql = self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 2592 else: 2593 sql = t.cast(str, e) 2594 2595 sql = self._attach_correlation_id(sql) 2596 2597 self._log_sql( 2598 sql, 2599 expression=e if isinstance(e, exp.Expr) else None, 2600 quote_identifiers=quote_identifiers, 2601 ) 2602 self._execute(sql, track_rows_processed, **kwargs) 2603 2604 def _attach_correlation_id(self, sql: str) -> str: 2605 if self.ATTACH_CORRELATION_ID and self.correlation_id: 2606 return f"/* {self.correlation_id} */ {sql}" 2607 return sql 2608 2609 def _log_sql( 2610 self, 2611 sql: str, 2612 expression: t.Optional[exp.Expr] = None, 2613 quote_identifiers: bool = True, 2614 ) -> None: 2615 if not logger.isEnabledFor(self._execute_log_level): 2616 return 2617 2618 sql_to_log = sql 2619 if expression is not None and not isinstance(expression, exp.Query): 2620 values = expression.find(exp.Values) 2621 if values: 2622 values.set("expressions", [exp.to_identifier("<REDACTED VALUES>")]) 2623 sql_to_log = self._to_sql(expression, quote=quote_identifiers) 2624 2625 logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log) 2626 2627 def _record_execution_stats( 2628 self, sql: str, rowcount: t.Optional[int] = None, bytes_processed: t.Optional[int] = None 2629 ) -> None: 2630 if self._query_execution_tracker: 2631 self._query_execution_tracker.record_execution(sql, rowcount, bytes_processed) 2632 2633 def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: 2634 self.cursor.execute(sql, **kwargs) 2635 2636 if ( 2637 self.SUPPORTS_QUERY_EXECUTION_TRACKING 2638 and track_rows_processed 2639 and self._query_execution_tracker 2640 and self._query_execution_tracker.is_tracking() 2641 ): 2642 if ( 2643 rowcount := getattr(self.cursor, "rowcount", None) 2644 ) is not None and rowcount is not None: 2645 try: 2646 self._record_execution_stats(sql, int(rowcount)) 2647 except (TypeError, ValueError): 2648 return 2649 2650 @contextlib.contextmanager 2651 def temp_table( 2652 self, 2653 query_or_df: QueryOrDF, 2654 name: TableName = "diff", 2655 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2656 source_columns: t.Optional[t.List[str]] = None, 2657 **kwargs: t.Any, 2658 ) -> t.Iterator[exp.Table]: 2659 """A context manager for working a temp table. 2660 2661 The table will be created with a random guid and cleaned up after the block. 2662 2663 Args: 2664 query_or_df: The query or df to create a temp table for. 2665 name: The base name of the temp table. 2666 target_columns_to_types: A mapping between the column name and its data type. 2667 2668 Yields: 2669 The table expression 2670 """ 2671 name = exp.to_table(name) 2672 # ensure that we use default catalog if none is not specified 2673 if isinstance(name, exp.Table) and not name.catalog and name.db and self.default_catalog: 2674 name.set("catalog", exp.parse_identifier(self.default_catalog)) 2675 2676 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2677 query_or_df, 2678 target_columns_to_types=target_columns_to_types, 2679 target_table=name, 2680 source_columns=source_columns, 2681 ) 2682 2683 with self.transaction(): 2684 table = self._get_temp_table(name) 2685 if table.db: 2686 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 2687 self._create_table_from_source_queries( 2688 table, 2689 source_queries, 2690 target_columns_to_types, 2691 exists=True, 2692 table_description=None, 2693 column_descriptions=None, 2694 track_rows_processed=False, 2695 **kwargs, 2696 ) 2697 2698 try: 2699 yield table 2700 finally: 2701 self.drop_table(table) 2702 2703 def _table_or_view_properties_to_expressions( 2704 self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expr]] = None 2705 ) -> t.List[exp.Property]: 2706 """Converts model properties (either physical or virtual) to a list of property expressions.""" 2707 if not table_or_view_properties: 2708 return [] 2709 return [ 2710 exp.Property(this=key, value=value.copy()) 2711 for key, value in table_or_view_properties.items() 2712 ] 2713 2714 def _build_partitioned_by_exp( 2715 self, 2716 partitioned_by: t.List[exp.Expr], 2717 *, 2718 partition_interval_unit: t.Optional[IntervalUnit] = None, 2719 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2720 catalog_name: t.Optional[str] = None, 2721 **kwargs: t.Any, 2722 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 2723 return None 2724 2725 def _build_clustered_by_exp( 2726 self, 2727 clustered_by: t.List[exp.Expr], 2728 **kwargs: t.Any, 2729 ) -> t.Optional[exp.Cluster]: 2730 return None 2731 2732 def _build_table_properties_exp( 2733 self, 2734 catalog_name: t.Optional[str] = None, 2735 table_format: t.Optional[str] = None, 2736 storage_format: t.Optional[str] = None, 2737 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 2738 partition_interval_unit: t.Optional[IntervalUnit] = None, 2739 clustered_by: t.Optional[t.List[exp.Expr]] = None, 2740 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 2741 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2742 table_description: t.Optional[str] = None, 2743 table_kind: t.Optional[str] = None, 2744 **kwargs: t.Any, 2745 ) -> t.Optional[exp.Properties]: 2746 """Creates a SQLGlot table properties expression for ddl.""" 2747 properties: t.List[exp.Expr] = [] 2748 2749 if table_description: 2750 properties.append( 2751 exp.SchemaCommentProperty( 2752 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2753 ) 2754 ) 2755 2756 if table_properties: 2757 table_type = self._pop_creatable_type_from_properties(table_properties) 2758 properties.extend(ensure_list(table_type)) 2759 2760 if properties: 2761 return exp.Properties(expressions=properties) 2762 return None 2763 2764 def _build_view_properties_exp( 2765 self, 2766 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 2767 table_description: t.Optional[str] = None, 2768 **kwargs: t.Any, 2769 ) -> t.Optional[exp.Properties]: 2770 """Creates a SQLGlot table properties expression for view""" 2771 properties: t.List[exp.Expr] = [] 2772 2773 if table_description: 2774 properties.append( 2775 exp.SchemaCommentProperty( 2776 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2777 ) 2778 ) 2779 2780 if properties: 2781 return exp.Properties(expressions=properties) 2782 return None 2783 2784 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 2785 return comment[:length] if length else comment 2786 2787 def _truncate_table_comment(self, comment: str) -> str: 2788 return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH) 2789 2790 def _truncate_column_comment(self, comment: str) -> str: 2791 return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH) 2792 2793 def _to_sql(self, expression: exp.Expr, quote: bool = True, **kwargs: t.Any) -> str: 2794 """ 2795 Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default 2796 kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine 2797 adapter, and then finally kwargs provided by the user when calling this method. 2798 """ 2799 sql_gen_kwargs = { 2800 "dialect": self.dialect, 2801 "pretty": self._pretty_sql, 2802 "comments": False, 2803 **self._sql_gen_kwargs, 2804 **kwargs, 2805 } 2806 2807 expression = expression.copy() 2808 2809 if quote: 2810 quote_identifiers(expression) 2811 2812 return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore 2813 2814 def _clear_data_object_cache(self, table_name: t.Optional[TableName] = None) -> None: 2815 """Clears the cache entry for the given table name, or clears the entire cache if table_name is None.""" 2816 if table_name is None: 2817 logger.debug("Clearing entire data object cache") 2818 self._data_object_cache.clear() 2819 else: 2820 table = exp.to_table(table_name) 2821 cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 2822 logger.debug("Clearing data object cache key: %s", cache_key) 2823 self._data_object_cache.pop(cache_key, None) 2824 2825 def _get_data_objects( 2826 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 2827 ) -> t.List[DataObject]: 2828 """ 2829 Returns all the data objects that exist in the given schema and optionally catalog. 2830 """ 2831 raise NotImplementedError() 2832 2833 def _get_temp_table( 2834 self, table: TableName, table_only: bool = False, quoted: bool = True 2835 ) -> exp.Table: 2836 """ 2837 Returns the name of the temp table that should be used for the given table name. 2838 """ 2839 table = t.cast(exp.Table, exp.to_table(table).copy()) 2840 table.set( 2841 "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=quoted) 2842 ) 2843 2844 if table_only: 2845 table.set("db", None) 2846 table.set("catalog", None) 2847 2848 return table 2849 2850 def _order_projections_and_filter( 2851 self, 2852 query: Query, 2853 target_columns_to_types: t.Dict[str, exp.DataType], 2854 where: t.Optional[exp.Expr] = None, 2855 coerce_types: bool = False, 2856 ) -> Query: 2857 if not isinstance(query, exp.Query) or ( 2858 not where and not coerce_types and query.named_selects == list(target_columns_to_types) 2859 ): 2860 return query 2861 2862 query = t.cast(exp.Query, query.copy()) 2863 with_ = query.args.pop("with_", None) 2864 2865 select_exprs: t.List[exp.Expr] = [ 2866 exp.column(c, quoted=True) for c in target_columns_to_types 2867 ] 2868 if coerce_types and columns_to_types_all_known(target_columns_to_types): 2869 select_exprs = [ 2870 exp.cast(select_exprs[i], col_tpe).as_(col, quoted=True) 2871 for i, (col, col_tpe) in enumerate(target_columns_to_types.items()) 2872 ] 2873 2874 query = exp.select(*select_exprs).from_(query.subquery("_subquery", copy=False), copy=False) 2875 if where: 2876 query = query.where(where, copy=False) 2877 2878 if with_: 2879 query.set("with_", with_) 2880 2881 return query 2882 2883 def _truncate_table(self, table_name: TableName) -> None: 2884 table = exp.to_table(table_name) 2885 self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") 2886 2887 def drop_data_object_on_type_mismatch( 2888 self, data_object: t.Optional[DataObject], expected_type: DataObjectType 2889 ) -> bool: 2890 """Drops a data object if it exists and is not of the expected type. 2891 2892 Args: 2893 data_object: The data object to check. 2894 expected_type: The expected type of the data object. 2895 2896 Returns: 2897 True if the data object was dropped, False otherwise. 2898 """ 2899 if data_object is None or data_object.type == expected_type: 2900 return False 2901 2902 logger.warning( 2903 "Target data object '%s' is a %s and not a %s, dropping it", 2904 data_object.to_table().sql(dialect=self.dialect), 2905 data_object.type.value, 2906 expected_type.value, 2907 ) 2908 self.drop_data_object(data_object) 2909 return True 2910 2911 def _replace_by_key( 2912 self, 2913 target_table: TableName, 2914 source_table: QueryOrDF, 2915 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2916 key: t.Sequence[exp.Expr], 2917 is_unique_key: bool, 2918 source_columns: t.Optional[t.List[str]] = None, 2919 ) -> None: 2920 if target_columns_to_types is None: 2921 target_columns_to_types = self.columns(target_table) 2922 2923 temp_table = self._get_temp_table(target_table) 2924 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 2925 column_names = list(target_columns_to_types or []) 2926 2927 with self.transaction(): 2928 self.ctas( 2929 temp_table, 2930 source_table, 2931 target_columns_to_types=target_columns_to_types, 2932 exists=False, 2933 source_columns=source_columns, 2934 ) 2935 2936 try: 2937 delete_query = exp.select(key_exp).from_(temp_table) 2938 insert_query = self._select_columns(target_columns_to_types).from_(temp_table) 2939 if not is_unique_key: 2940 delete_query = delete_query.distinct() 2941 else: 2942 insert_query = insert_query.distinct(*key) 2943 2944 insert_statement = exp.insert( 2945 insert_query, 2946 target_table, 2947 columns=column_names, 2948 ) 2949 delete_filter = key_exp.isin(query=delete_query) 2950 2951 if not self.INSERT_OVERWRITE_STRATEGY.is_replace_where: 2952 self.delete_from(target_table, delete_filter) 2953 else: 2954 insert_statement.set("where", delete_filter) 2955 insert_statement.set("this", exp.to_table(target_table)) 2956 2957 self.execute(insert_statement, track_rows_processed=True) 2958 finally: 2959 self.drop_table(temp_table) 2960 2961 def _build_create_comment_table_exp( 2962 self, table: exp.Table, table_comment: str, table_kind: str 2963 ) -> exp.Comment | str: 2964 return exp.Comment( 2965 this=table, 2966 kind=table_kind, 2967 expression=exp.Literal.string(self._truncate_table_comment(table_comment)), 2968 ) 2969 2970 def _create_table_comment( 2971 self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" 2972 ) -> None: 2973 table = exp.to_table(table_name) 2974 2975 try: 2976 self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind)) 2977 except Exception: 2978 logger.warning( 2979 f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions", 2980 exc_info=True, 2981 ) 2982 2983 def _build_create_comment_column_exp( 2984 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 2985 ) -> exp.Comment | str: 2986 return exp.Comment( 2987 this=exp.column(column_name, *reversed(table.parts)), # type: ignore 2988 kind="COLUMN", 2989 expression=exp.Literal.string(self._truncate_column_comment(column_comment)), 2990 ) 2991 2992 def _create_column_comments( 2993 self, 2994 table_name: TableName, 2995 column_comments: t.Dict[str, str], 2996 table_kind: str = "TABLE", 2997 materialized_view: bool = False, 2998 ) -> None: 2999 table = exp.to_table(table_name) 3000 3001 for col, comment in column_comments.items(): 3002 try: 3003 self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind)) 3004 except Exception: 3005 logger.warning( 3006 f"Column comments for column '{col}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions", 3007 exc_info=True, 3008 ) 3009 3010 def _create_table_like( 3011 self, 3012 target_table_name: TableName, 3013 source_table_name: TableName, 3014 exists: bool, 3015 **kwargs: t.Any, 3016 ) -> None: 3017 self.create_table(target_table_name, self.columns(source_table_name), exists=exists) 3018 3019 def _rename_table( 3020 self, 3021 old_table_name: TableName, 3022 new_table_name: TableName, 3023 ) -> None: 3024 self.execute(exp.rename_table(old_table_name, new_table_name)) 3025 3026 def ensure_nulls_for_unmatched_after_join( 3027 self, 3028 query: Query, 3029 ) -> Query: 3030 return query 3031 3032 def use_server_nulls_for_unmatched_after_join( 3033 self, 3034 query: Query, 3035 ) -> Query: 3036 return query 3037 3038 def ping(self) -> None: 3039 try: 3040 self._execute(exp.select("1").sql(dialect=self.dialect)) 3041 finally: 3042 self._connection_pool.close_cursor() 3043 3044 @classmethod 3045 def _select_columns( 3046 cls, columns: t.Iterable[str], source_columns: t.Optional[t.List[str]] = None 3047 ) -> exp.Select: 3048 return exp.select( 3049 *( 3050 exp.column(c, quoted=True) 3051 if c in (source_columns or columns) 3052 else exp.alias_(exp.Null(), c, quoted=True) 3053 for c in columns 3054 ) 3055 ) 3056 3057 def _check_identifier_length(self, expression: exp.Expr) -> None: 3058 if self.MAX_IDENTIFIER_LENGTH is None or not isinstance(expression, exp.DDL): 3059 return 3060 3061 for identifier in expression.find_all(exp.Identifier): 3062 name = identifier.name 3063 name_length = len(name) 3064 if name_length > self.MAX_IDENTIFIER_LENGTH: 3065 raise SQLMeshError( 3066 f"Identifier name '{name}' (length {name_length}) exceeds {self.dialect.capitalize()}'s max identifier limit of {self.MAX_IDENTIFIER_LENGTH} characters" 3067 ) 3068 3069 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 3070 raise NotImplementedError() 3071 3072 @classmethod 3073 def _diff_grants_configs( 3074 cls, new_config: GrantsConfig, old_config: GrantsConfig 3075 ) -> t.Tuple[GrantsConfig, GrantsConfig]: 3076 """Compute additions and removals between two grants configurations. 3077 3078 This method compares new (desired) and old (current) GrantsConfigs case-insensitively 3079 for both privilege keys and grantees, while preserving original casing 3080 in the output GrantsConfigs. 3081 3082 Args: 3083 new_config: Desired grants configuration (specified by the user). 3084 old_config: Current grants configuration (returned by the database). 3085 3086 Returns: 3087 A tuple of (additions, removals) GrantsConfig where: 3088 - additions contains privileges/grantees present in new_config but not in old_config 3089 - additions uses keys and grantee strings from new_config (user-specified casing) 3090 - removals contains privileges/grantees present in old_config but not in new_config 3091 - removals uses keys and grantee strings from old_config (database-returned casing) 3092 3093 Notes: 3094 - Comparison is case-insensitive using casefold(); original casing is preserved in results. 3095 - Overlapping grantees (case-insensitive) are excluded from the results. 3096 """ 3097 3098 def _diffs(config1: GrantsConfig, config2: GrantsConfig) -> GrantsConfig: 3099 diffs: GrantsConfig = {} 3100 cf_config2 = {k.casefold(): {g.casefold() for g in v} for k, v in config2.items()} 3101 for key, grantees in config1.items(): 3102 cf_key = key.casefold() 3103 3104 # Missing key (add all grantees) 3105 if cf_key not in cf_config2: 3106 diffs[key] = grantees.copy() 3107 continue 3108 3109 # Include only grantees not in config2 3110 cf_grantees2 = cf_config2[cf_key] 3111 diff_grantees = [] 3112 for grantee in grantees: 3113 if grantee.casefold() not in cf_grantees2: 3114 diff_grantees.append(grantee) 3115 if diff_grantees: 3116 diffs[key] = diff_grantees 3117 return diffs 3118 3119 return _diffs(new_config, old_config), _diffs(old_config, new_config) 3120 3121 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 3122 """Returns current grants for a table as a dictionary. 3123 3124 This method queries the database and returns the current grants/permissions 3125 for the given table, parsed into a dictionary format. The it handles 3126 case-insensitive comparison between these current grants and the desired 3127 grants from model configuration. 3128 3129 Args: 3130 table: The table/view to query grants for. 3131 3132 Returns: 3133 Dictionary mapping permissions to lists of grantees. Permission names 3134 should be returned as the database provides them (typically uppercase 3135 for standard SQL permissions, but engine-specific roles may vary). 3136 3137 Raises: 3138 NotImplementedError: If the engine does not support grants. 3139 """ 3140 if not self.SUPPORTS_GRANTS: 3141 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3142 raise NotImplementedError("Subclass must implement get_current_grants") 3143 3144 def _apply_grants_config_expr( 3145 self, 3146 table: exp.Table, 3147 grants_config: GrantsConfig, 3148 table_type: DataObjectType = DataObjectType.TABLE, 3149 ) -> t.List[exp.Expr]: 3150 """Returns SQLGlot Grant expressions to apply grants to a table. 3151 3152 Args: 3153 table: The table/view to grant permissions on. 3154 grants_config: Dictionary mapping permissions to lists of grantees. 3155 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3156 3157 Returns: 3158 List of SQLGlot expressions for grant operations. 3159 3160 Raises: 3161 NotImplementedError: If the engine does not support grants. 3162 """ 3163 if not self.SUPPORTS_GRANTS: 3164 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3165 raise NotImplementedError("Subclass must implement _apply_grants_config_expr") 3166 3167 def _revoke_grants_config_expr( 3168 self, 3169 table: exp.Table, 3170 grants_config: GrantsConfig, 3171 table_type: DataObjectType = DataObjectType.TABLE, 3172 ) -> t.List[exp.Expr]: 3173 """Returns SQLGlot expressions to revoke grants from a table. 3174 3175 Args: 3176 table: The table/view to revoke permissions from. 3177 grants_config: Dictionary mapping permissions to lists of grantees. 3178 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3179 3180 Returns: 3181 List of SQLGlot expressions for revoke operations. 3182 3183 Raises: 3184 NotImplementedError: If the engine does not support grants. 3185 """ 3186 if not self.SUPPORTS_GRANTS: 3187 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3188 raise NotImplementedError("Subclass must implement _revoke_grants_config_expr")
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
127 def __init__( 128 self, 129 connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool], 130 dialect: str = "", 131 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 132 multithreaded: bool = False, 133 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 134 default_catalog: t.Optional[str] = None, 135 execute_log_level: int = logging.DEBUG, 136 register_comments: bool = True, 137 pre_ping: bool = False, 138 pretty_sql: bool = False, 139 shared_connection: bool = False, 140 correlation_id: t.Optional[CorrelationId] = None, 141 schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None, 142 query_execution_tracker: t.Optional[QueryExecutionTracker] = None, 143 **kwargs: t.Any, 144 ): 145 self.dialect = dialect.lower() or self.DIALECT 146 self._connection_pool = ( 147 connection_factory_or_pool 148 if isinstance(connection_factory_or_pool, ConnectionPool) 149 else create_connection_pool( 150 connection_factory_or_pool, 151 multithreaded, 152 shared_connection=shared_connection, 153 cursor_init=cursor_init, 154 ) 155 ) 156 self._sql_gen_kwargs = sql_gen_kwargs or {} 157 self._default_catalog = default_catalog 158 self._execute_log_level = execute_log_level 159 self._extra_config = kwargs 160 self._register_comments = register_comments 161 self._pre_ping = pre_ping 162 self._pretty_sql = pretty_sql 163 self._multithreaded = multithreaded 164 self.correlation_id = correlation_id 165 self._schema_differ_overrides = schema_differ_overrides 166 self._query_execution_tracker = query_execution_tracker 167 self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {}
169 def with_settings(self, **kwargs: t.Any) -> EngineAdapter: 170 extra_kwargs = { 171 "null_connection": True, 172 "execute_log_level": kwargs.pop("execute_log_level", self._execute_log_level), 173 "correlation_id": kwargs.pop("correlation_id", self.correlation_id), 174 "query_execution_tracker": kwargs.pop( 175 "query_execution_tracker", self._query_execution_tracker 176 ), 177 **self._extra_config, 178 **kwargs, 179 } 180 181 adapter = self.__class__( 182 self._connection_pool, 183 dialect=self.dialect, 184 sql_gen_kwargs=self._sql_gen_kwargs, 185 default_catalog=self._default_catalog, 186 register_comments=self._register_comments, 187 multithreaded=self._multithreaded, 188 pretty_sql=self._pretty_sql, 189 **extra_kwargs, 190 ) 191 192 return adapter
257 @property 258 def default_catalog(self) -> t.Optional[str]: 259 if self.catalog_support.is_unsupported: 260 return None 261 default_catalog = self._default_catalog or self.get_current_catalog() 262 if not default_catalog: 263 raise MissingDefaultCatalogError( 264 "Could not determine a default catalog despite it being supported." 265 ) 266 return default_catalog
417 def recycle(self) -> None: 418 """Closes all open connections and releases all allocated resources associated with any thread 419 except the calling one.""" 420 self._connection_pool.close_all(exclude_calling_thread=True)
Closes all open connections and releases all allocated resources associated with any thread except the calling one.
422 def close(self) -> t.Any: 423 """Closes all open connections and releases all allocated resources.""" 424 self._connection_pool.close_all()
Closes all open connections and releases all allocated resources.
426 def get_current_catalog(self) -> t.Optional[str]: 427 """Returns the catalog name of the current connection.""" 428 raise NotImplementedError()
Returns the catalog name of the current connection.
430 def set_current_catalog(self, catalog: str) -> None: 431 """Sets the catalog name of the current connection.""" 432 raise NotImplementedError()
Sets the catalog name of the current connection.
434 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 435 """Intended to be overridden for data virtualization systems like Trino that, 436 depending on the target catalog, require slightly different properties to be set when creating / updating tables 437 """ 438 if self.catalog_support.is_unsupported: 439 raise UnsupportedCatalogOperationError( 440 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 441 ) 442 return ( 443 self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE) 444 if catalog 445 else self.DEFAULT_CATALOG_TYPE 446 )
Intended to be overridden for data virtualization systems like Trino that, depending on the target catalog, require slightly different properties to be set when creating / updating tables
448 def get_catalog_type_from_table(self, table: TableName) -> str: 449 """Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type""" 450 catalog = exp.to_table(table).catalog or self.get_current_catalog() 451 return self.get_catalog_type(catalog)
Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type
453 @property 454 def current_catalog_type(self) -> str: 455 # `get_catalog_type_from_table` should be used over this property. Reason is that the table that is the target 456 # of the operation is what matters and not the catalog type of the connection. 457 # This still remains for legacy reasons and should be refactored out. 458 return self.get_catalog_type(self.get_current_catalog())
460 def replace_query( 461 self, 462 table_name: TableName, 463 query_or_df: QueryOrDF, 464 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 465 table_description: t.Optional[str] = None, 466 column_descriptions: t.Optional[t.Dict[str, str]] = None, 467 source_columns: t.Optional[t.List[str]] = None, 468 supports_replace_table_override: t.Optional[bool] = None, 469 **kwargs: t.Any, 470 ) -> None: 471 """Replaces an existing table with a query. 472 473 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 474 475 Args: 476 table_name: The name of the table (eg. prod.table) 477 query_or_df: The SQL query to run or a dataframe. 478 target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 479 Expected to be ordered to match the order of values in the dataframe. 480 kwargs: Optional create table properties. 481 """ 482 target_table = exp.to_table(table_name) 483 484 target_data_object = self.get_data_object(target_table) 485 table_exists = target_data_object is not None 486 if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE): 487 table_exists = False 488 489 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 490 query_or_df, 491 target_columns_to_types, 492 target_table=target_table, 493 source_columns=source_columns, 494 ) 495 if not target_columns_to_types and table_exists: 496 target_columns_to_types = self.columns(target_table) 497 query = source_queries[0].query_factory() 498 self_referencing = any( 499 quote_identifiers(table) == quote_identifiers(target_table) 500 for table in query.find_all(exp.Table) 501 ) 502 # If a query references itself then it must have a table created regardless of approach used. 503 if self_referencing: 504 if not target_columns_to_types: 505 raise SQLMeshError( 506 f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. " 507 "Try casting the columns to an expected type or defining the columns in the model metadata. " 508 ) 509 self._create_table_from_columns( 510 target_table, 511 target_columns_to_types, 512 exists=True, 513 table_description=table_description, 514 column_descriptions=column_descriptions, 515 **kwargs, 516 ) 517 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 518 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 519 supports_replace_table = ( 520 self.SUPPORTS_REPLACE_TABLE 521 if supports_replace_table_override is None 522 else supports_replace_table_override 523 ) 524 if supports_replace_table or not table_exists: 525 return self._create_table_from_source_queries( 526 target_table, 527 source_queries, 528 target_columns_to_types, 529 replace=supports_replace_table, 530 table_description=table_description, 531 column_descriptions=column_descriptions, 532 **kwargs, 533 ) 534 if self_referencing: 535 assert target_columns_to_types is not None 536 with self.temp_table( 537 self._select_columns(target_columns_to_types).from_(target_table), 538 name=target_table, 539 target_columns_to_types=target_columns_to_types, 540 **kwargs, 541 ) as temp_table: 542 for source_query in source_queries: 543 source_query.add_transform( 544 lambda node: ( # type: ignore 545 temp_table # type: ignore 546 if isinstance(node, exp.Table) 547 and quote_identifiers(node) == quote_identifiers(target_table) 548 else node 549 ) 550 ) 551 return self._insert_overwrite_by_condition( 552 target_table, 553 source_queries, 554 target_columns_to_types, 555 **kwargs, 556 ) 557 return self._insert_overwrite_by_condition( 558 target_table, 559 source_queries, 560 target_columns_to_types, 561 **kwargs, 562 )
Replaces an existing table with a query.
For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.
Arguments:
- table_name: The name of the table (eg. prod.table)
- query_or_df: The SQL query to run or a dataframe.
- target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. Expected to be ordered to match the order of values in the dataframe.
- kwargs: Optional create table properties.
564 def create_index( 565 self, 566 table_name: TableName, 567 index_name: str, 568 columns: t.Tuple[str, ...], 569 exists: bool = True, 570 ) -> None: 571 """Creates a new index for the given table if supported 572 573 Args: 574 table_name: The name of the target table. 575 index_name: The name of the index. 576 columns: The list of columns that constitute the index. 577 exists: Indicates whether to include the IF NOT EXISTS check. 578 """ 579 if not self.SUPPORTS_INDEXES: 580 return 581 582 expression = exp.Create( 583 this=exp.Index( 584 this=exp.to_identifier(index_name), 585 table=exp.to_table(table_name), 586 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 587 ), 588 kind="INDEX", 589 exists=exists, 590 ) 591 self.execute(expression)
Creates a new index for the given table if supported
Arguments:
- table_name: The name of the target table.
- index_name: The name of the index.
- columns: The list of columns that constitute the index.
- exists: Indicates whether to include the IF NOT EXISTS check.
620 def create_table( 621 self, 622 table_name: TableName, 623 target_columns_to_types: t.Dict[str, exp.DataType], 624 primary_key: t.Optional[t.Tuple[str, ...]] = None, 625 exists: bool = True, 626 table_description: t.Optional[str] = None, 627 column_descriptions: t.Optional[t.Dict[str, str]] = None, 628 **kwargs: t.Any, 629 ) -> None: 630 """Create a table using a DDL statement 631 632 Args: 633 table_name: The name of the table to create. Can be fully qualified or just table name. 634 target_columns_to_types: A mapping between the column name and its data type. 635 primary_key: Determines the table primary key. 636 exists: Indicates whether to include the IF NOT EXISTS check. 637 table_description: Optional table description from MODEL DDL. 638 column_descriptions: Optional column descriptions from model query. 639 kwargs: Optional create table properties. 640 """ 641 self._create_table_from_columns( 642 table_name, 643 target_columns_to_types, 644 primary_key, 645 exists, 646 table_description, 647 column_descriptions, 648 **kwargs, 649 )
Create a table using a DDL statement
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- target_columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
- exists: Indicates whether to include the IF NOT EXISTS check.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
651 def create_managed_table( 652 self, 653 table_name: TableName, 654 query: Query, 655 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 656 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 657 clustered_by: t.Optional[t.List[exp.Expr]] = None, 658 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 659 table_description: t.Optional[str] = None, 660 column_descriptions: t.Optional[t.Dict[str, str]] = None, 661 source_columns: t.Optional[t.List[str]] = None, 662 **kwargs: t.Any, 663 ) -> None: 664 """Create a managed table using a query. 665 666 "Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh. 667 668 Args: 669 table_name: The name of the table to create. Can be fully qualified or just table name. 670 query: The SQL query for the engine to base the managed table on 671 target_columns_to_types: A mapping between the column name and its data type. 672 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 673 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 674 table_properties: Optional mapping of engine-specific properties to be set on the managed table 675 table_description: Optional table description from MODEL DDL. 676 column_descriptions: Optional column descriptions from model query. 677 kwargs: Optional create table properties. 678 """ 679 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}")
Create a managed table using a query.
"Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh.
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- query: The SQL query for the engine to base the managed table on
- target_columns_to_types: A mapping between the column name and its data type.
- partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour))
- clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour))
- table_properties: Optional mapping of engine-specific properties to be set on the managed table
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
681 def ctas( 682 self, 683 table_name: TableName, 684 query_or_df: QueryOrDF, 685 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 686 exists: bool = True, 687 table_description: t.Optional[str] = None, 688 column_descriptions: t.Optional[t.Dict[str, str]] = None, 689 source_columns: t.Optional[t.List[str]] = None, 690 **kwargs: t.Any, 691 ) -> None: 692 """Create a table using a CTAS statement 693 694 Args: 695 table_name: The name of the table to create. Can be fully qualified or just table name. 696 query_or_df: The SQL query to run or a dataframe for the CTAS. 697 target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 698 exists: Indicates whether to include the IF NOT EXISTS check. 699 table_description: Optional table description from MODEL DDL. 700 column_descriptions: Optional column descriptions from model query. 701 kwargs: Optional create table properties. 702 """ 703 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 704 query_or_df, 705 target_columns_to_types, 706 target_table=table_name, 707 source_columns=source_columns, 708 ) 709 return self._create_table_from_source_queries( 710 table_name, 711 source_queries, 712 target_columns_to_types, 713 exists, 714 table_description=table_description, 715 column_descriptions=column_descriptions, 716 **kwargs, 717 )
Create a table using a CTAS statement
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- query_or_df: The SQL query to run or a dataframe for the CTAS.
- target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
- exists: Indicates whether to include the IF NOT EXISTS check.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
719 def create_state_table( 720 self, 721 table_name: str, 722 target_columns_to_types: t.Dict[str, exp.DataType], 723 primary_key: t.Optional[t.Tuple[str, ...]] = None, 724 ) -> None: 725 """Create a table to store SQLMesh internal state. 726 727 Args: 728 table_name: The name of the table to create. Can be fully qualified or just table name. 729 target_columns_to_types: A mapping between the column name and its data type. 730 primary_key: Determines the table primary key. 731 """ 732 self.create_table( 733 table_name, 734 target_columns_to_types, 735 primary_key=primary_key, 736 )
Create a table to store SQLMesh internal state.
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- target_columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
1043 def create_table_like( 1044 self, 1045 target_table_name: TableName, 1046 source_table_name: TableName, 1047 exists: bool = True, 1048 **kwargs: t.Any, 1049 ) -> None: 1050 """Create a table to store SQLMesh internal state based on the definition of another table, including any 1051 column attributes and indexes defined in the original table. 1052 1053 Args: 1054 target_table_name: The name of the table to create. Can be fully qualified or just table name. 1055 source_table_name: The name of the table to base the new table on. 1056 """ 1057 self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs) 1058 self._clear_data_object_cache(target_table_name)
Create a table to store SQLMesh internal state based on the definition of another table, including any column attributes and indexes defined in the original table.
Arguments:
- target_table_name: The name of the table to create. Can be fully qualified or just table name.
- source_table_name: The name of the table to base the new table on.
1060 def clone_table( 1061 self, 1062 target_table_name: TableName, 1063 source_table_name: TableName, 1064 replace: bool = False, 1065 exists: bool = True, 1066 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 1067 **kwargs: t.Any, 1068 ) -> None: 1069 """Creates a table with the target name by cloning the source table. 1070 1071 Args: 1072 target_table_name: The name of the table that should be created. 1073 source_table_name: The name of the source table that should be cloned. 1074 replace: Whether or not to replace an existing table. 1075 exists: Indicates whether to include the IF NOT EXISTS check. 1076 """ 1077 if not self.SUPPORTS_CLONING: 1078 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 1079 1080 kwargs.pop("rendered_physical_properties", None) 1081 self.execute( 1082 exp.Create( 1083 this=exp.to_table(target_table_name), 1084 kind="TABLE", 1085 replace=replace, 1086 exists=exists, 1087 clone=exp.Clone( 1088 this=exp.to_table(source_table_name), 1089 **(clone_kwargs or {}), 1090 ), 1091 **kwargs, 1092 ) 1093 ) 1094 self._clear_data_object_cache(target_table_name)
Creates a table with the target name by cloning the source table.
Arguments:
- target_table_name: The name of the table that should be created.
- source_table_name: The name of the source table that should be cloned.
- replace: Whether or not to replace an existing table.
- exists: Indicates whether to include the IF NOT EXISTS check.
1096 def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None: 1097 """Drops a data object of arbitrary type. 1098 1099 Args: 1100 data_object: The data object to drop. 1101 ignore_if_not_exists: If True, no error will be raised if the data object does not exist. 1102 """ 1103 if data_object.type.is_view: 1104 self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists) 1105 elif data_object.type.is_materialized_view: 1106 self.drop_view( 1107 data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True 1108 ) 1109 elif data_object.type.is_table: 1110 self.drop_table(data_object.to_table(), exists=ignore_if_not_exists) 1111 elif data_object.type.is_managed_table: 1112 self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists) 1113 else: 1114 raise SQLMeshError( 1115 f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" 1116 )
Drops a data object of arbitrary type.
Arguments:
- data_object: The data object to drop.
- ignore_if_not_exists: If True, no error will be raised if the data object does not exist.
1118 def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: 1119 """Drops a table. 1120 1121 Args: 1122 table_name: The name of the table to drop. 1123 exists: If exists, defaults to True. 1124 """ 1125 self._drop_object(name=table_name, exists=exists, **kwargs)
Drops a table.
Arguments:
- table_name: The name of the table to drop.
- exists: If exists, defaults to True.
1127 def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: 1128 """Drops a managed table. 1129 1130 Args: 1131 table_name: The name of the table to drop. 1132 exists: If exists, defaults to True. 1133 """ 1134 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}")
Drops a managed table.
Arguments:
- table_name: The name of the table to drop.
- exists: If exists, defaults to True.
1162 def get_alter_operations( 1163 self, 1164 current_table_name: TableName, 1165 target_table_name: TableName, 1166 *, 1167 ignore_destructive: bool = False, 1168 ignore_additive: bool = False, 1169 ) -> t.List[TableAlterOperation]: 1170 """ 1171 Determines the alter statements needed to change the current table into the structure of the target table. 1172 """ 1173 return t.cast( 1174 t.List[TableAlterOperation], 1175 self.schema_differ.compare_columns( 1176 current_table_name, 1177 self.columns(current_table_name), 1178 self.columns(target_table_name), 1179 ignore_destructive=ignore_destructive, 1180 ignore_additive=ignore_additive, 1181 ), 1182 )
Determines the alter statements needed to change the current table into the structure of the target table.
1184 def alter_table( 1185 self, 1186 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 1187 ) -> None: 1188 """ 1189 Performs the alter statements to change the current table into the structure of the target table. 1190 """ 1191 with self.transaction(): 1192 for alter_expression in [ 1193 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 1194 ]: 1195 self.execute(alter_expression)
Performs the alter statements to change the current table into the structure of the target table.
1197 def create_view( 1198 self, 1199 view_name: TableName, 1200 query_or_df: QueryOrDF, 1201 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1202 replace: bool = True, 1203 materialized: bool = False, 1204 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 1205 table_description: t.Optional[str] = None, 1206 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1207 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 1208 source_columns: t.Optional[t.List[str]] = None, 1209 **create_kwargs: t.Any, 1210 ) -> None: 1211 """Create a view with a query or dataframe. 1212 1213 If a dataframe is passed in, it will be converted into a literal values statement. 1214 This should only be done if the dataframe is very small! 1215 1216 Args: 1217 view_name: The view name. 1218 query_or_df: A query or dataframe. 1219 target_columns_to_types: Columns to use in the view statement. 1220 replace: Whether or not to replace an existing view defaults to True. 1221 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 1222 materialized_properties: Optional materialized view properties to add to the view. 1223 table_description: Optional table description from MODEL DDL. 1224 column_descriptions: Optional column descriptions from model query. 1225 view_properties: Optional view properties to add to the view. 1226 create_kwargs: Additional kwargs to pass into the Create expression 1227 """ 1228 import pandas as pd 1229 1230 if materialized_properties and not materialized: 1231 raise SQLMeshError("Materialized properties are only supported for materialized views") 1232 1233 query_or_df = self._native_df_to_pandas_df(query_or_df) 1234 1235 if isinstance(query_or_df, pd.DataFrame): 1236 values: t.List[t.Tuple[t.Any, ...]] = list( 1237 query_or_df.itertuples(index=False, name=None) 1238 ) 1239 target_columns_to_types, source_columns = self._columns_to_types( 1240 query_or_df, target_columns_to_types, source_columns 1241 ) 1242 if not target_columns_to_types: 1243 raise SQLMeshError("columns_to_types must be provided for dataframes") 1244 source_columns_to_types = get_source_columns_to_types( 1245 target_columns_to_types, source_columns 1246 ) 1247 query_or_df = self._values_to_sql( 1248 values, 1249 source_columns_to_types, 1250 batch_start=0, 1251 batch_end=len(values), 1252 ) 1253 1254 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1255 query_or_df, 1256 target_columns_to_types, 1257 batch_size=0, 1258 target_table=view_name, 1259 source_columns=source_columns, 1260 ) 1261 if len(source_queries) != 1: 1262 raise SQLMeshError("Only one source query is supported for creating views") 1263 1264 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 1265 if target_columns_to_types: 1266 schema = self._build_schema_exp( 1267 exp.to_table(view_name), 1268 target_columns_to_types, 1269 column_descriptions, 1270 is_view=True, 1271 materialized=materialized, 1272 ) 1273 1274 properties = create_kwargs.pop("properties", None) 1275 if not properties: 1276 properties = exp.Properties(expressions=[]) 1277 1278 if view_properties: 1279 table_type = self._pop_creatable_type_from_properties(view_properties) 1280 if table_type: 1281 properties.append("expressions", table_type) 1282 1283 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 1284 properties.append("expressions", exp.MaterializedProperty()) 1285 1286 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1287 schema = schema.this 1288 1289 if not self.SUPPORTS_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1290 schema = schema.this 1291 1292 if materialized_properties: 1293 partitioned_by = materialized_properties.pop("partitioned_by", None) 1294 clustered_by = materialized_properties.pop("clustered_by", None) 1295 if ( 1296 partitioned_by 1297 and ( 1298 partitioned_by_prop := self._build_partitioned_by_exp( 1299 partitioned_by, **materialized_properties 1300 ) 1301 ) 1302 is not None 1303 ): 1304 materialized_properties["catalog_name"] = exp.to_table(view_name).catalog 1305 properties.append("expressions", partitioned_by_prop) 1306 if ( 1307 clustered_by 1308 and ( 1309 clustered_by_prop := self._build_clustered_by_exp( 1310 clustered_by, **materialized_properties 1311 ) 1312 ) 1313 is not None 1314 ): 1315 properties.append("expressions", clustered_by_prop) 1316 1317 create_view_properties = self._build_view_properties_exp( 1318 view_properties, 1319 ( 1320 table_description 1321 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 1322 else None 1323 ), 1324 physical_cluster=create_kwargs.pop("physical_cluster", None), 1325 ) 1326 if create_view_properties: 1327 for view_property in create_view_properties.expressions: 1328 # Small hack to make sure SECURE goes at the beginning before materialized as required by Snowflake 1329 if isinstance(view_property, exp.SecureProperty): 1330 properties.set("expressions", view_property, index=0, overwrite=False) 1331 else: 1332 properties.append("expressions", view_property) 1333 1334 if properties.expressions: 1335 create_kwargs["properties"] = properties 1336 1337 if replace: 1338 self.drop_data_object_on_type_mismatch( 1339 self.get_data_object(view_name), 1340 DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, 1341 ) 1342 1343 with source_queries[0] as query: 1344 self.execute( 1345 exp.Create( 1346 this=schema, 1347 kind="VIEW", 1348 replace=replace, 1349 expression=query, 1350 **create_kwargs, 1351 ), 1352 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 1353 ) 1354 1355 self._clear_data_object_cache(view_name) 1356 1357 # Register table comment with commands if the engine doesn't support doing it in CREATE 1358 if ( 1359 table_description 1360 and self.COMMENT_CREATION_VIEW.is_comment_command_only 1361 and self.comments_enabled 1362 ): 1363 self._create_table_comment(view_name, table_description, "VIEW") 1364 # Register column comments with commands if the engine doesn't support doing it in 1365 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 1366 # columns_to_types 1367 if ( 1368 column_descriptions 1369 and ( 1370 self.COMMENT_CREATION_VIEW.is_comment_command_only 1371 or ( 1372 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 1373 and not target_columns_to_types 1374 ) 1375 ) 1376 and self.comments_enabled 1377 ): 1378 self._create_column_comments(view_name, column_descriptions, "VIEW", materialized)
Create a view with a query or dataframe.
If a dataframe is passed in, it will be converted into a literal values statement. This should only be done if the dataframe is very small!
Arguments:
- view_name: The view name.
- query_or_df: A query or dataframe.
- target_columns_to_types: Columns to use in the view statement.
- replace: Whether or not to replace an existing view defaults to True.
- materialized: Whether to create a a materialized view. Only used for engines that support this feature.
- materialized_properties: Optional materialized view properties to add to the view.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- view_properties: Optional view properties to add to the view.
- create_kwargs: Additional kwargs to pass into the Create expression
1380 @set_catalog() 1381 def create_schema( 1382 self, 1383 schema_name: SchemaName, 1384 ignore_if_exists: bool = True, 1385 warn_on_error: bool = True, 1386 properties: t.Optional[t.List[exp.Expr]] = None, 1387 ) -> None: 1388 properties = properties or [] 1389 return self._create_schema( 1390 schema_name=schema_name, 1391 ignore_if_exists=ignore_if_exists, 1392 warn_on_error=warn_on_error, 1393 properties=properties, 1394 kind="SCHEMA", 1395 )
1422 def drop_schema( 1423 self, 1424 schema_name: SchemaName, 1425 ignore_if_not_exists: bool = True, 1426 cascade: bool = False, 1427 **drop_args: t.Dict[str, exp.Expr], 1428 ) -> None: 1429 return self._drop_object( 1430 name=schema_name, 1431 exists=ignore_if_not_exists, 1432 kind="SCHEMA", 1433 cascade=cascade, 1434 **drop_args, 1435 )
1437 def drop_view( 1438 self, 1439 view_name: TableName, 1440 ignore_if_not_exists: bool = True, 1441 materialized: bool = False, 1442 **kwargs: t.Any, 1443 ) -> None: 1444 """Drop a view.""" 1445 self._drop_object( 1446 name=view_name, 1447 exists=ignore_if_not_exists, 1448 kind="VIEW", 1449 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 1450 **kwargs, 1451 )
Drop a view.
1469 def columns( 1470 self, table_name: TableName, include_pseudo_columns: bool = False 1471 ) -> t.Dict[str, exp.DataType]: 1472 """Fetches column names and types for the target table.""" 1473 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 1474 describe_output = self.cursor.fetchall() 1475 return { 1476 # Note: MySQL returns the column type as bytes. 1477 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 1478 for column_name, column_type, *_ in itertools.takewhile( 1479 lambda t: not t[0].startswith("#"), 1480 describe_output, 1481 ) 1482 if column_name and column_name.strip() and column_type and column_type.strip() 1483 }
Fetches column names and types for the target table.
1485 def table_exists(self, table_name: TableName) -> bool: 1486 table = exp.to_table(table_name) 1487 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 1488 if data_object_cache_key in self._data_object_cache: 1489 logger.debug("Table existence cache hit: %s", data_object_cache_key) 1490 return self._data_object_cache[data_object_cache_key] is not None 1491 1492 try: 1493 self.execute(exp.Describe(this=table, kind="TABLE")) 1494 return True 1495 except Exception: 1496 return False
1501 def insert_append( 1502 self, 1503 table_name: TableName, 1504 query_or_df: QueryOrDF, 1505 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1506 track_rows_processed: bool = True, 1507 source_columns: t.Optional[t.List[str]] = None, 1508 ) -> None: 1509 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1510 query_or_df, 1511 target_columns_to_types, 1512 target_table=table_name, 1513 source_columns=source_columns, 1514 ) 1515 self._insert_append_source_queries( 1516 table_name, source_queries, target_columns_to_types, track_rows_processed 1517 )
1552 def insert_overwrite_by_partition( 1553 self, 1554 table_name: TableName, 1555 query_or_df: QueryOrDF, 1556 partitioned_by: t.List[exp.Expr], 1557 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1558 source_columns: t.Optional[t.List[str]] = None, 1559 ) -> None: 1560 if self.INSERT_OVERWRITE_STRATEGY.is_insert_overwrite: 1561 target_table = exp.to_table(table_name) 1562 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1563 query_or_df, 1564 target_columns_to_types, 1565 target_table=target_table, 1566 source_columns=source_columns, 1567 ) 1568 self._insert_overwrite_by_condition( 1569 table_name, source_queries, target_columns_to_types=target_columns_to_types 1570 ) 1571 else: 1572 self._replace_by_key( 1573 table_name, 1574 query_or_df, 1575 target_columns_to_types, 1576 partitioned_by, 1577 is_unique_key=False, 1578 source_columns=source_columns, 1579 )
1581 def insert_overwrite_by_time_partition( 1582 self, 1583 table_name: TableName, 1584 query_or_df: QueryOrDF, 1585 start: TimeLike, 1586 end: TimeLike, 1587 time_formatter: t.Callable[[TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expr], 1588 time_column: TimeColumn | exp.Expr | str, 1589 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1590 source_columns: t.Optional[t.List[str]] = None, 1591 **kwargs: t.Any, 1592 ) -> None: 1593 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1594 query_or_df, 1595 target_columns_to_types, 1596 target_table=table_name, 1597 source_columns=source_columns, 1598 ) 1599 if not target_columns_to_types or not columns_to_types_all_known(target_columns_to_types): 1600 target_columns_to_types = self.columns(table_name) 1601 low, high = [ 1602 time_formatter(dt, target_columns_to_types) 1603 for dt in make_inclusive(start, end, self.dialect) 1604 ] 1605 if isinstance(time_column, TimeColumn): 1606 time_column = time_column.column 1607 where = exp.Between( 1608 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1609 low=low, 1610 high=high, 1611 ) 1612 return self._insert_overwrite_by_time_partition( 1613 table_name, source_queries, target_columns_to_types, where, **kwargs 1614 )
1739 def scd_type_2_by_time( 1740 self, 1741 target_table: TableName, 1742 source_table: QueryOrDF, 1743 unique_key: t.Sequence[exp.Expr], 1744 valid_from_col: exp.Column, 1745 valid_to_col: exp.Column, 1746 execution_time: t.Union[TimeLike, exp.Column], 1747 updated_at_col: exp.Column, 1748 invalidate_hard_deletes: bool = True, 1749 updated_at_as_valid_from: bool = False, 1750 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1751 table_description: t.Optional[str] = None, 1752 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1753 truncate: bool = False, 1754 source_columns: t.Optional[t.List[str]] = None, 1755 **kwargs: t.Any, 1756 ) -> None: 1757 self._scd_type_2( 1758 target_table=target_table, 1759 source_table=source_table, 1760 unique_key=unique_key, 1761 valid_from_col=valid_from_col, 1762 valid_to_col=valid_to_col, 1763 execution_time=execution_time, 1764 updated_at_col=updated_at_col, 1765 invalidate_hard_deletes=invalidate_hard_deletes, 1766 updated_at_as_valid_from=updated_at_as_valid_from, 1767 target_columns_to_types=target_columns_to_types, 1768 table_description=table_description, 1769 column_descriptions=column_descriptions, 1770 truncate=truncate, 1771 source_columns=source_columns, 1772 **kwargs, 1773 )
1775 def scd_type_2_by_column( 1776 self, 1777 target_table: TableName, 1778 source_table: QueryOrDF, 1779 unique_key: t.Sequence[exp.Expr], 1780 valid_from_col: exp.Column, 1781 valid_to_col: exp.Column, 1782 execution_time: t.Union[TimeLike, exp.Column], 1783 check_columns: t.Union[exp.Star, t.Sequence[exp.Expr]], 1784 invalidate_hard_deletes: bool = True, 1785 execution_time_as_valid_from: bool = False, 1786 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1787 table_description: t.Optional[str] = None, 1788 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1789 truncate: bool = False, 1790 source_columns: t.Optional[t.List[str]] = None, 1791 **kwargs: t.Any, 1792 ) -> None: 1793 self._scd_type_2( 1794 target_table=target_table, 1795 source_table=source_table, 1796 unique_key=unique_key, 1797 valid_from_col=valid_from_col, 1798 valid_to_col=valid_to_col, 1799 execution_time=execution_time, 1800 check_columns=check_columns, 1801 target_columns_to_types=target_columns_to_types, 1802 invalidate_hard_deletes=invalidate_hard_deletes, 1803 execution_time_as_valid_from=execution_time_as_valid_from, 1804 table_description=table_description, 1805 column_descriptions=column_descriptions, 1806 truncate=truncate, 1807 source_columns=source_columns, 1808 **kwargs, 1809 )
2204 def merge( 2205 self, 2206 target_table: TableName, 2207 source_table: QueryOrDF, 2208 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2209 unique_key: t.Sequence[exp.Expr], 2210 when_matched: t.Optional[exp.Whens] = None, 2211 merge_filter: t.Optional[exp.Expr] = None, 2212 source_columns: t.Optional[t.List[str]] = None, 2213 **kwargs: t.Any, 2214 ) -> None: 2215 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2216 source_table, 2217 target_columns_to_types, 2218 target_table=target_table, 2219 source_columns=source_columns, 2220 ) 2221 target_columns_to_types = target_columns_to_types or self.columns(target_table) 2222 on = exp.and_( 2223 *( 2224 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 2225 for part in unique_key 2226 ) 2227 ) 2228 if merge_filter: 2229 on = exp.and_(merge_filter, on) 2230 2231 if not when_matched: 2232 match_expressions = [ 2233 exp.When( 2234 matched=True, 2235 source=False, 2236 then=exp.Update( 2237 expressions=[ 2238 exp.column(col, MERGE_TARGET_ALIAS).eq( 2239 exp.column(col, MERGE_SOURCE_ALIAS) 2240 ) 2241 for col in target_columns_to_types 2242 ], 2243 ), 2244 ) 2245 ] 2246 else: 2247 match_expressions = when_matched.copy().expressions 2248 2249 match_expressions.append( 2250 exp.When( 2251 matched=False, 2252 source=False, 2253 then=exp.Insert( 2254 this=exp.Tuple( 2255 expressions=[exp.column(col) for col in target_columns_to_types] 2256 ), 2257 expression=exp.Tuple( 2258 expressions=[ 2259 exp.column(col, MERGE_SOURCE_ALIAS) for col in target_columns_to_types 2260 ] 2261 ), 2262 ), 2263 ) 2264 ) 2265 for source_query in source_queries: 2266 with source_query as query: 2267 self._merge( 2268 target_table=target_table, 2269 query=query, 2270 on=on, 2271 whens=exp.Whens(expressions=match_expressions), 2272 )
2274 def rename_table( 2275 self, 2276 old_table_name: TableName, 2277 new_table_name: TableName, 2278 ) -> None: 2279 new_table = exp.to_table(new_table_name) 2280 if new_table.catalog: 2281 old_table = exp.to_table(old_table_name) 2282 catalog = old_table.catalog or self.get_current_catalog() 2283 if catalog != new_table.catalog: 2284 raise UnsupportedCatalogOperationError( 2285 "Tried to rename table across catalogs which is not supported" 2286 ) 2287 self._rename_table(old_table_name, new_table_name) 2288 self._clear_data_object_cache(old_table_name) 2289 self._clear_data_object_cache(new_table_name)
2291 def get_data_object( 2292 self, target_name: TableName, safe_to_cache: bool = False 2293 ) -> t.Optional[DataObject]: 2294 target_table = exp.to_table(target_name) 2295 existing_data_objects = self.get_data_objects( 2296 schema_(target_table.db, target_table.catalog), 2297 {target_table.name}, 2298 safe_to_cache=safe_to_cache, 2299 ) 2300 if existing_data_objects: 2301 return existing_data_objects[0] 2302 return None
2304 def get_data_objects( 2305 self, 2306 schema_name: SchemaName, 2307 object_names: t.Optional[t.Set[str]] = None, 2308 safe_to_cache: bool = False, 2309 ) -> t.List[DataObject]: 2310 """Lists all data objects in the target schema. 2311 2312 Args: 2313 schema_name: The name of the schema to list data objects from. 2314 object_names: If provided, only return data objects with these names. 2315 safe_to_cache: Whether it is safe to cache the results of this call. 2316 2317 Returns: 2318 A list of data objects in the target schema. 2319 """ 2320 if object_names is not None: 2321 if not object_names: 2322 return [] 2323 2324 # Check cache for each object name 2325 target_schema = to_schema(schema_name) 2326 cached_objects = [] 2327 missing_names = set() 2328 2329 for name in object_names: 2330 cache_key = _get_data_object_cache_key( 2331 target_schema.catalog, target_schema.db, name 2332 ) 2333 if cache_key in self._data_object_cache: 2334 logger.debug("Data object cache hit: %s", cache_key) 2335 data_object = self._data_object_cache[cache_key] 2336 # If the object is none, then the table was previously looked for but not found 2337 if data_object: 2338 cached_objects.append(data_object) 2339 else: 2340 logger.debug("Data object cache miss: %s", cache_key) 2341 missing_names.add(name) 2342 2343 # Fetch missing objects from database 2344 if missing_names: 2345 object_names_list = list(missing_names) 2346 batches = [ 2347 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 2348 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 2349 ] 2350 2351 fetched_objects = [] 2352 fetched_object_names = set() 2353 for batch in batches: 2354 objects = self._get_data_objects(schema_name, set(batch)) 2355 for obj in objects: 2356 if safe_to_cache: 2357 cache_key = _get_data_object_cache_key( 2358 obj.catalog, obj.schema_name, obj.name 2359 ) 2360 self._data_object_cache[cache_key] = obj 2361 fetched_objects.append(obj) 2362 fetched_object_names.add(obj.name) 2363 2364 if safe_to_cache: 2365 for missing_name in missing_names - fetched_object_names: 2366 cache_key = _get_data_object_cache_key( 2367 target_schema.catalog, target_schema.db, missing_name 2368 ) 2369 self._data_object_cache[cache_key] = None 2370 2371 return cached_objects + fetched_objects 2372 2373 return cached_objects 2374 2375 fetched_objects = self._get_data_objects(schema_name) 2376 if safe_to_cache: 2377 for obj in fetched_objects: 2378 cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name) 2379 self._data_object_cache[cache_key] = obj 2380 return fetched_objects
Lists all data objects in the target schema.
Arguments:
- schema_name: The name of the schema to list data objects from.
- object_names: If provided, only return data objects with these names.
- safe_to_cache: Whether it is safe to cache the results of this call.
Returns:
A list of data objects in the target schema.
2382 def fetchone( 2383 self, 2384 query: t.Union[exp.Expr, str], 2385 ignore_unsupported_errors: bool = False, 2386 quote_identifiers: bool = False, 2387 ) -> t.Optional[t.Tuple]: 2388 with self.transaction(): 2389 self.execute( 2390 query, 2391 ignore_unsupported_errors=ignore_unsupported_errors, 2392 quote_identifiers=quote_identifiers, 2393 ) 2394 return self.cursor.fetchone()
2396 def fetchall( 2397 self, 2398 query: t.Union[exp.Expr, str], 2399 ignore_unsupported_errors: bool = False, 2400 quote_identifiers: bool = False, 2401 ) -> t.List[t.Tuple]: 2402 with self.transaction(): 2403 self.execute( 2404 query, 2405 ignore_unsupported_errors=ignore_unsupported_errors, 2406 quote_identifiers=quote_identifiers, 2407 ) 2408 return self.cursor.fetchall()
2433 def fetchdf( 2434 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2435 ) -> pd.DataFrame: 2436 """Fetches a Pandas DataFrame from the cursor""" 2437 import pandas as pd 2438 2439 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 2440 if not isinstance(df, pd.DataFrame): 2441 raise NotImplementedError( 2442 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 2443 ) 2444 return df
Fetches a Pandas DataFrame from the cursor
2446 def fetch_pyspark_df( 2447 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 2448 ) -> PySparkDataFrame: 2449 """Fetches a PySpark DataFrame from the cursor""" 2450 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")
Fetches a PySpark DataFrame from the cursor
2452 @property 2453 def wap_enabled(self) -> bool: 2454 """Returns whether WAP is enabled for this engine.""" 2455 return self._extra_config.get("wap_enabled", False)
Returns whether WAP is enabled for this engine.
2457 def wap_supported(self, table_name: TableName) -> bool: 2458 """Returns whether WAP for the target table is supported.""" 2459 return False
Returns whether WAP for the target table is supported.
2461 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 2462 """Returns the updated table name for the given WAP ID. 2463 2464 Args: 2465 table_name: The name of the target table. 2466 wap_id: The WAP ID to prepare. 2467 2468 Returns: 2469 The updated table name that should be used for writing. 2470 """ 2471 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Returns the updated table name for the given WAP ID.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to prepare.
Returns:
The updated table name that should be used for writing.
2473 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 2474 """Prepares the target table for WAP and returns the updated table name. 2475 2476 Args: 2477 table_name: The name of the target table. 2478 wap_id: The WAP ID to prepare. 2479 2480 Returns: 2481 The updated table name that should be used for writing. 2482 """ 2483 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Prepares the target table for WAP and returns the updated table name.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to prepare.
Returns:
The updated table name that should be used for writing.
2485 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 2486 """Publishes changes with the given WAP ID to the target table. 2487 2488 Args: 2489 table_name: The name of the target table. 2490 wap_id: The WAP ID to publish. 2491 """ 2492 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Publishes changes with the given WAP ID to the target table.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to publish.
2494 def sync_grants_config( 2495 self, 2496 table: exp.Table, 2497 grants_config: GrantsConfig, 2498 table_type: DataObjectType = DataObjectType.TABLE, 2499 ) -> None: 2500 """Applies the grants_config to a table authoritatively. 2501 It first compares the specified grants against the current grants, and then 2502 applies the diffs to the table by revoking and granting privileges as needed. 2503 2504 Args: 2505 table: The table/view to apply grants to. 2506 grants_config: Dictionary mapping privileges to lists of grantees. 2507 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 2508 """ 2509 if not self.SUPPORTS_GRANTS: 2510 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 2511 2512 current_grants = self._get_current_grants_config(table) 2513 new_grants, revoked_grants = self._diff_grants_configs(grants_config, current_grants) 2514 revoke_exprs = self._revoke_grants_config_expr(table, revoked_grants, table_type) 2515 grant_exprs = self._apply_grants_config_expr(table, new_grants, table_type) 2516 dcl_exprs = revoke_exprs + grant_exprs 2517 2518 if dcl_exprs: 2519 self.execute(dcl_exprs)
Applies the grants_config to a table authoritatively. It first compares the specified grants against the current grants, and then applies the diffs to the table by revoking and granting privileges as needed.
Arguments:
- table: The table/view to apply grants to.
- grants_config: Dictionary mapping privileges to lists of grantees.
- table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW).
2521 @contextlib.contextmanager 2522 def transaction( 2523 self, 2524 condition: t.Optional[bool] = None, 2525 ) -> t.Iterator[None]: 2526 """A transaction context manager.""" 2527 if ( 2528 self._connection_pool.is_transaction_active 2529 or not self.SUPPORTS_TRANSACTIONS 2530 or (condition is not None and not condition) 2531 ): 2532 yield 2533 return 2534 2535 if self._pre_ping: 2536 try: 2537 logger.debug("Pinging the database to check the connection") 2538 self.ping() 2539 except Exception: 2540 logger.info("Connection to the database was lost. Reconnecting...") 2541 self._connection_pool.close() 2542 2543 self._connection_pool.begin() 2544 try: 2545 yield 2546 except Exception as e: 2547 self._connection_pool.rollback() 2548 raise e 2549 else: 2550 self._connection_pool.commit()
A transaction context manager.
2552 @contextlib.contextmanager 2553 def session(self, properties: SessionProperties) -> t.Iterator[None]: 2554 """A session context manager.""" 2555 if self._is_session_active(): 2556 yield 2557 return 2558 2559 self._begin_session(properties) 2560 try: 2561 yield 2562 finally: 2563 self._end_session()
A session context manager.
2575 def execute( 2576 self, 2577 expressions: t.Union[str, exp.Expr, t.Sequence[exp.Expr]], 2578 ignore_unsupported_errors: bool = False, 2579 quote_identifiers: bool = True, 2580 track_rows_processed: bool = False, 2581 **kwargs: t.Any, 2582 ) -> None: 2583 """Execute a sql query.""" 2584 to_sql_kwargs = ( 2585 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 2586 ) 2587 with self.transaction(): 2588 for e in ensure_list(expressions): 2589 if isinstance(e, exp.Expr): 2590 self._check_identifier_length(e) 2591 sql = self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 2592 else: 2593 sql = t.cast(str, e) 2594 2595 sql = self._attach_correlation_id(sql) 2596 2597 self._log_sql( 2598 sql, 2599 expression=e if isinstance(e, exp.Expr) else None, 2600 quote_identifiers=quote_identifiers, 2601 ) 2602 self._execute(sql, track_rows_processed, **kwargs)
Execute a sql query.
2650 @contextlib.contextmanager 2651 def temp_table( 2652 self, 2653 query_or_df: QueryOrDF, 2654 name: TableName = "diff", 2655 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2656 source_columns: t.Optional[t.List[str]] = None, 2657 **kwargs: t.Any, 2658 ) -> t.Iterator[exp.Table]: 2659 """A context manager for working a temp table. 2660 2661 The table will be created with a random guid and cleaned up after the block. 2662 2663 Args: 2664 query_or_df: The query or df to create a temp table for. 2665 name: The base name of the temp table. 2666 target_columns_to_types: A mapping between the column name and its data type. 2667 2668 Yields: 2669 The table expression 2670 """ 2671 name = exp.to_table(name) 2672 # ensure that we use default catalog if none is not specified 2673 if isinstance(name, exp.Table) and not name.catalog and name.db and self.default_catalog: 2674 name.set("catalog", exp.parse_identifier(self.default_catalog)) 2675 2676 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2677 query_or_df, 2678 target_columns_to_types=target_columns_to_types, 2679 target_table=name, 2680 source_columns=source_columns, 2681 ) 2682 2683 with self.transaction(): 2684 table = self._get_temp_table(name) 2685 if table.db: 2686 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 2687 self._create_table_from_source_queries( 2688 table, 2689 source_queries, 2690 target_columns_to_types, 2691 exists=True, 2692 table_description=None, 2693 column_descriptions=None, 2694 track_rows_processed=False, 2695 **kwargs, 2696 ) 2697 2698 try: 2699 yield table 2700 finally: 2701 self.drop_table(table)
A context manager for working a temp table.
The table will be created with a random guid and cleaned up after the block.
Arguments:
- query_or_df: The query or df to create a temp table for.
- name: The base name of the temp table.
- target_columns_to_types: A mapping between the column name and its data type.
Yields:
The table expression
2887 def drop_data_object_on_type_mismatch( 2888 self, data_object: t.Optional[DataObject], expected_type: DataObjectType 2889 ) -> bool: 2890 """Drops a data object if it exists and is not of the expected type. 2891 2892 Args: 2893 data_object: The data object to check. 2894 expected_type: The expected type of the data object. 2895 2896 Returns: 2897 True if the data object was dropped, False otherwise. 2898 """ 2899 if data_object is None or data_object.type == expected_type: 2900 return False 2901 2902 logger.warning( 2903 "Target data object '%s' is a %s and not a %s, dropping it", 2904 data_object.to_table().sql(dialect=self.dialect), 2905 data_object.type.value, 2906 expected_type.value, 2907 ) 2908 self.drop_data_object(data_object) 2909 return True
Drops a data object if it exists and is not of the expected type.
Arguments:
- data_object: The data object to check.
- expected_type: The expected type of the data object.
Returns:
True if the data object was dropped, False otherwise.
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts