EngineAdapter
Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to generalize its functionality to different engines that have Python Database API 2.0-compliant connections. Rather than executing queries directly against your data stores, SQLMesh components such as the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic.
1""" 2# EngineAdapter 3 4Engine adapters are how SQLMesh connects and interacts with various data stores. They allow SQLMesh to 5generalize its functionality to different engines that have Python Database API 2.0-compliant 6connections. Rather than executing queries directly against your data stores, SQLMesh components such as 7the SnapshotEvaluator delegate them to engine adapters so these components can be engine-agnostic. 8""" 9 10from __future__ import annotations 11 12import contextlib 13import itertools 14import logging 15import sys 16import typing as t 17from functools import cached_property, partial 18 19from sqlglot import Dialect, exp 20from sqlglot.errors import ErrorLevel 21from sqlglot.helper import ensure_list, seq_get 22from sqlglot.optimizer.qualify_columns import quote_identifiers 23 24from sqlmesh.core.dialect import ( 25 add_table, 26 schema_, 27 select_from_values_for_batch_range, 28 to_schema, 29) 30from sqlmesh.core.engine_adapter.shared import ( 31 CatalogSupport, 32 CommentCreationTable, 33 CommentCreationView, 34 DataObject, 35 DataObjectType, 36 EngineRunMode, 37 InsertOverwriteStrategy, 38 SourceQuery, 39 set_catalog, 40) 41from sqlmesh.core.model.kind import TimeColumn 42from sqlmesh.core.schema_diff import SchemaDiffer, TableAlterOperation 43from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker 44from sqlmesh.utils import ( 45 CorrelationId, 46 columns_to_types_all_known, 47 random_id, 48 get_source_columns_to_types, 49) 50from sqlmesh.utils.connection_pool import ConnectionPool, create_connection_pool 51from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column 52from sqlmesh.utils.errors import ( 53 MissingDefaultCatalogError, 54 SQLMeshError, 55 UnsupportedCatalogOperationError, 56) 57from sqlmesh.utils.pandas import columns_to_types_from_df 58 59if t.TYPE_CHECKING: 60 import pandas as pd 61 62 from sqlmesh.core._typing import SchemaName, SessionProperties, TableName 63 from sqlmesh.core.engine_adapter._typing import ( 64 DF, 65 BigframeSession, 66 GrantsConfig, 67 PySparkDataFrame, 68 PySparkSession, 69 Query, 70 QueryOrDF, 71 SnowparkSession, 72 ) 73 from sqlmesh.core.node import IntervalUnit 74 75logger = logging.getLogger(__name__) 76 77MERGE_TARGET_ALIAS = "__MERGE_TARGET__" 78MERGE_SOURCE_ALIAS = "__MERGE_SOURCE__" 79 80KEY_FOR_CREATABLE_TYPE = "CREATABLE_TYPE" 81 82 83@set_catalog() 84class EngineAdapter: 85 """Base class wrapping a Database API compliant connection. 86 87 The EngineAdapter is an easily-subclassable interface that interacts 88 with the underlying engine and data store. 89 90 Args: 91 connection_factory_or_pool: a callable which produces a new Database API-compliant 92 connection on every call. 93 dialect: The dialect with which this adapter is associated. 94 multithreaded: Indicates whether this adapter will be used by more than one thread. 95 """ 96 97 DIALECT = "" 98 DEFAULT_BATCH_SIZE = 10000 99 DATA_OBJECT_FILTER_BATCH_SIZE = 4000 100 SUPPORTS_TRANSACTIONS = True 101 SUPPORTS_INDEXES = False 102 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS 103 COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 104 MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None 105 MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None 106 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 107 SUPPORTS_MATERIALIZED_VIEWS = False 108 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False 109 SUPPORTS_VIEW_SCHEMA = True 110 SUPPORTS_CLONING = False 111 SUPPORTS_MANAGED_MODELS = False 112 SUPPORTS_CREATE_DROP_CATALOG = False 113 SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = [] 114 SCHEMA_DIFFER_KWARGS: t.Dict[str, t.Any] = {} 115 SUPPORTS_TUPLE_IN = True 116 HAS_VIEW_BINDING = False 117 SUPPORTS_REPLACE_TABLE = True 118 SUPPORTS_GRANTS = False 119 DEFAULT_CATALOG_TYPE = DIALECT 120 QUOTE_IDENTIFIERS_IN_VIEWS = True 121 MAX_IDENTIFIER_LENGTH: t.Optional[int] = None 122 ATTACH_CORRELATION_ID = True 123 SUPPORTS_QUERY_EXECUTION_TRACKING = False 124 SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS = False 125 126 def __init__( 127 self, 128 connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool], 129 dialect: str = "", 130 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 131 multithreaded: bool = False, 132 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 133 default_catalog: t.Optional[str] = None, 134 execute_log_level: int = logging.DEBUG, 135 register_comments: bool = True, 136 pre_ping: bool = False, 137 pretty_sql: bool = False, 138 shared_connection: bool = False, 139 correlation_id: t.Optional[CorrelationId] = None, 140 schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None, 141 query_execution_tracker: t.Optional[QueryExecutionTracker] = None, 142 **kwargs: t.Any, 143 ): 144 self.dialect = dialect.lower() or self.DIALECT 145 self._connection_pool = ( 146 connection_factory_or_pool 147 if isinstance(connection_factory_or_pool, ConnectionPool) 148 else create_connection_pool( 149 connection_factory_or_pool, 150 multithreaded, 151 shared_connection=shared_connection, 152 cursor_init=cursor_init, 153 ) 154 ) 155 self._sql_gen_kwargs = sql_gen_kwargs or {} 156 self._default_catalog = default_catalog 157 self._execute_log_level = execute_log_level 158 self._extra_config = kwargs 159 self._register_comments = register_comments 160 self._pre_ping = pre_ping 161 self._pretty_sql = pretty_sql 162 self._multithreaded = multithreaded 163 self.correlation_id = correlation_id 164 self._schema_differ_overrides = schema_differ_overrides 165 self._query_execution_tracker = query_execution_tracker 166 self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {} 167 168 def with_settings(self, **kwargs: t.Any) -> EngineAdapter: 169 extra_kwargs = { 170 "null_connection": True, 171 "execute_log_level": kwargs.pop("execute_log_level", self._execute_log_level), 172 "correlation_id": kwargs.pop("correlation_id", self.correlation_id), 173 "query_execution_tracker": kwargs.pop( 174 "query_execution_tracker", self._query_execution_tracker 175 ), 176 **self._extra_config, 177 **kwargs, 178 } 179 180 adapter = self.__class__( 181 self._connection_pool, 182 dialect=self.dialect, 183 sql_gen_kwargs=self._sql_gen_kwargs, 184 default_catalog=self._default_catalog, 185 register_comments=self._register_comments, 186 multithreaded=self._multithreaded, 187 pretty_sql=self._pretty_sql, 188 **extra_kwargs, 189 ) 190 191 return adapter 192 193 @property 194 def cursor(self) -> t.Any: 195 return self._connection_pool.get_cursor() 196 197 @property 198 def connection(self) -> t.Any: 199 return self._connection_pool.get() 200 201 @property 202 def spark(self) -> t.Optional[PySparkSession]: 203 return None 204 205 @property 206 def snowpark(self) -> t.Optional[SnowparkSession]: 207 return None 208 209 @property 210 def bigframe(self) -> t.Optional[BigframeSession]: 211 return None 212 213 @property 214 def comments_enabled(self) -> bool: 215 return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported 216 217 @property 218 def catalog_support(self) -> CatalogSupport: 219 return CatalogSupport.UNSUPPORTED 220 221 @cached_property 222 def schema_differ(self) -> SchemaDiffer: 223 return SchemaDiffer( 224 **{ 225 **self.SCHEMA_DIFFER_KWARGS, 226 **(self._schema_differ_overrides or {}), 227 } 228 ) 229 230 @property 231 def _catalog_type_overrides(self) -> t.Dict[str, str]: 232 return self._extra_config.get("catalog_type_overrides") or {} 233 234 @classmethod 235 def _casted_columns( 236 cls, 237 target_columns_to_types: t.Dict[str, exp.DataType], 238 source_columns: t.Optional[t.List[str]] = None, 239 ) -> t.List[exp.Alias]: 240 source_columns_lookup = set(source_columns or target_columns_to_types) 241 return [ 242 exp.alias_( 243 exp.cast( 244 exp.column(column, quoted=True) 245 if column in source_columns_lookup 246 else exp.Null(), 247 to=kind, 248 ), 249 column, 250 copy=False, 251 quoted=True, 252 ) 253 for column, kind in target_columns_to_types.items() 254 ] 255 256 @property 257 def default_catalog(self) -> t.Optional[str]: 258 if self.catalog_support.is_unsupported: 259 return None 260 default_catalog = self._default_catalog or self.get_current_catalog() 261 if not default_catalog: 262 raise MissingDefaultCatalogError( 263 "Could not determine a default catalog despite it being supported." 264 ) 265 return default_catalog 266 267 @property 268 def engine_run_mode(self) -> EngineRunMode: 269 return EngineRunMode.SINGLE_MODE_ENGINE 270 271 def _get_source_queries( 272 self, 273 query_or_df: QueryOrDF, 274 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 275 target_table: TableName, 276 *, 277 batch_size: t.Optional[int] = None, 278 source_columns: t.Optional[t.List[str]] = None, 279 ) -> t.List[SourceQuery]: 280 import pandas as pd 281 282 batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size 283 if isinstance(query_or_df, exp.Query): 284 query_factory = lambda: query_or_df 285 if source_columns: 286 source_columns_lookup = set(source_columns) 287 if not target_columns_to_types: 288 raise SQLMeshError("columns_to_types must be set if source_columns is set") 289 if not set(target_columns_to_types).issubset(source_columns_lookup): 290 select_columns = [ 291 exp.column(c, quoted=True) 292 if c in source_columns_lookup 293 else exp.cast(exp.Null(), target_columns_to_types[c], copy=False).as_( 294 c, copy=False, quoted=True 295 ) 296 for c in target_columns_to_types 297 ] 298 query_factory = ( 299 lambda: exp.Select() 300 .select(*select_columns) 301 .from_(query_or_df.subquery("select_source_columns")) 302 ) 303 return [SourceQuery(query_factory=query_factory)] # type: ignore 304 305 if not target_columns_to_types: 306 raise SQLMeshError( 307 "It is expected that if a DataFrame is passed in then columns_to_types is set" 308 ) 309 310 if isinstance(query_or_df, pd.DataFrame) and query_or_df.empty: 311 raise SQLMeshError( 312 "Cannot construct source query from an empty DataFrame. This error is commonly " 313 "related to Python models that produce no data. For such models, consider yielding " 314 "from an empty generator if the resulting set is empty, i.e. use `yield from ()`." 315 ) 316 317 return self._df_to_source_queries( 318 query_or_df, 319 target_columns_to_types, 320 batch_size, 321 target_table=target_table, 322 source_columns=source_columns, 323 ) 324 325 def _df_to_source_queries( 326 self, 327 df: DF, 328 target_columns_to_types: t.Dict[str, exp.DataType], 329 batch_size: int, 330 target_table: TableName, 331 source_columns: t.Optional[t.List[str]] = None, 332 ) -> t.List[SourceQuery]: 333 import pandas as pd 334 335 assert isinstance(df, pd.DataFrame) 336 num_rows = len(df.index) 337 batch_size = sys.maxsize if batch_size == 0 else batch_size 338 339 # we need to ensure that the order of the columns in columns_to_types columns matches the order of the values 340 # they can differ if a user specifies columns() on a python model in a different order than what's in the DataFrame's emitted by that model 341 df = df[list(source_columns or target_columns_to_types)] 342 values = list(df.itertuples(index=False, name=None)) 343 344 return [ 345 SourceQuery( 346 query_factory=partial( 347 self._values_to_sql, 348 values=values, # type: ignore 349 target_columns_to_types=target_columns_to_types, 350 batch_start=i, 351 batch_end=min(i + batch_size, num_rows), 352 source_columns=source_columns, 353 ), 354 ) 355 for i in range(0, num_rows, batch_size) 356 ] 357 358 def _get_source_queries_and_columns_to_types( 359 self, 360 query_or_df: QueryOrDF, 361 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 362 target_table: TableName, 363 *, 364 batch_size: t.Optional[int] = None, 365 source_columns: t.Optional[t.List[str]] = None, 366 ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]: 367 target_columns_to_types, source_columns = self._columns_to_types( 368 query_or_df, target_columns_to_types, source_columns 369 ) 370 source_queries = self._get_source_queries( 371 query_or_df, 372 target_columns_to_types, 373 target_table=target_table, 374 batch_size=batch_size, 375 source_columns=source_columns, 376 ) 377 return source_queries, target_columns_to_types 378 379 @t.overload 380 def _columns_to_types( 381 self, 382 query_or_df: DF, 383 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 384 source_columns: t.Optional[t.List[str]] = None, 385 ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ... 386 387 @t.overload 388 def _columns_to_types( 389 self, 390 query_or_df: Query, 391 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 392 source_columns: t.Optional[t.List[str]] = None, 393 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ... 394 395 def _columns_to_types( 396 self, 397 query_or_df: QueryOrDF, 398 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 399 source_columns: t.Optional[t.List[str]] = None, 400 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: 401 import pandas as pd 402 403 if not target_columns_to_types and isinstance(query_or_df, pd.DataFrame): 404 target_columns_to_types = columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df)) 405 if not source_columns and target_columns_to_types: 406 source_columns = list(target_columns_to_types) 407 # source columns should only contain columns that are defined in the target. If there are extras then 408 # that means they are intended to be ignored and will be excluded 409 source_columns = ( 410 [x for x in source_columns if x in target_columns_to_types] 411 if source_columns and target_columns_to_types 412 else None 413 ) 414 return target_columns_to_types, source_columns 415 416 def recycle(self) -> None: 417 """Closes all open connections and releases all allocated resources associated with any thread 418 except the calling one.""" 419 self._connection_pool.close_all(exclude_calling_thread=True) 420 421 def close(self) -> t.Any: 422 """Closes all open connections and releases all allocated resources.""" 423 self._connection_pool.close_all() 424 425 def get_current_catalog(self) -> t.Optional[str]: 426 """Returns the catalog name of the current connection.""" 427 raise NotImplementedError() 428 429 def set_current_catalog(self, catalog: str) -> None: 430 """Sets the catalog name of the current connection.""" 431 raise NotImplementedError() 432 433 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 434 """Intended to be overridden for data virtualization systems like Trino that, 435 depending on the target catalog, require slightly different properties to be set when creating / updating tables 436 """ 437 if self.catalog_support.is_unsupported: 438 raise UnsupportedCatalogOperationError( 439 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 440 ) 441 return ( 442 self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE) 443 if catalog 444 else self.DEFAULT_CATALOG_TYPE 445 ) 446 447 def get_catalog_type_from_table(self, table: TableName) -> str: 448 """Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type""" 449 catalog = exp.to_table(table).catalog or self.get_current_catalog() 450 return self.get_catalog_type(catalog) 451 452 @property 453 def current_catalog_type(self) -> str: 454 # `get_catalog_type_from_table` should be used over this property. Reason is that the table that is the target 455 # of the operation is what matters and not the catalog type of the connection. 456 # This still remains for legacy reasons and should be refactored out. 457 return self.get_catalog_type(self.get_current_catalog()) 458 459 def replace_query( 460 self, 461 table_name: TableName, 462 query_or_df: QueryOrDF, 463 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 464 table_description: t.Optional[str] = None, 465 column_descriptions: t.Optional[t.Dict[str, str]] = None, 466 source_columns: t.Optional[t.List[str]] = None, 467 supports_replace_table_override: t.Optional[bool] = None, 468 **kwargs: t.Any, 469 ) -> None: 470 """Replaces an existing table with a query. 471 472 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 473 474 Args: 475 table_name: The name of the table (eg. prod.table) 476 query_or_df: The SQL query to run or a dataframe. 477 target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 478 Expected to be ordered to match the order of values in the dataframe. 479 kwargs: Optional create table properties. 480 """ 481 target_table = exp.to_table(table_name) 482 483 target_data_object = self.get_data_object(target_table) 484 table_exists = target_data_object is not None 485 if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE): 486 table_exists = False 487 488 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 489 query_or_df, 490 target_columns_to_types, 491 target_table=target_table, 492 source_columns=source_columns, 493 ) 494 if not target_columns_to_types and table_exists: 495 target_columns_to_types = self.columns(target_table) 496 query = source_queries[0].query_factory() 497 self_referencing = any( 498 quote_identifiers(table) == quote_identifiers(target_table) 499 for table in query.find_all(exp.Table) 500 ) 501 # If a query references itself then it must have a table created regardless of approach used. 502 if self_referencing: 503 if not target_columns_to_types: 504 raise SQLMeshError( 505 f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. " 506 "Try casting the columns to an expected type or defining the columns in the model metadata. " 507 ) 508 self._create_table_from_columns( 509 target_table, 510 target_columns_to_types, 511 exists=True, 512 table_description=table_description, 513 column_descriptions=column_descriptions, 514 **kwargs, 515 ) 516 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 517 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 518 supports_replace_table = ( 519 self.SUPPORTS_REPLACE_TABLE 520 if supports_replace_table_override is None 521 else supports_replace_table_override 522 ) 523 if supports_replace_table or not table_exists: 524 return self._create_table_from_source_queries( 525 target_table, 526 source_queries, 527 target_columns_to_types, 528 replace=supports_replace_table, 529 table_description=table_description, 530 column_descriptions=column_descriptions, 531 **kwargs, 532 ) 533 if self_referencing: 534 assert target_columns_to_types is not None 535 with self.temp_table( 536 self._select_columns(target_columns_to_types).from_(target_table), 537 name=target_table, 538 target_columns_to_types=target_columns_to_types, 539 **kwargs, 540 ) as temp_table: 541 for source_query in source_queries: 542 source_query.add_transform( 543 lambda node: ( # type: ignore 544 temp_table # type: ignore 545 if isinstance(node, exp.Table) 546 and quote_identifiers(node) == quote_identifiers(target_table) 547 else node 548 ) 549 ) 550 return self._insert_overwrite_by_condition( 551 target_table, 552 source_queries, 553 target_columns_to_types, 554 **kwargs, 555 ) 556 return self._insert_overwrite_by_condition( 557 target_table, 558 source_queries, 559 target_columns_to_types, 560 **kwargs, 561 ) 562 563 def create_index( 564 self, 565 table_name: TableName, 566 index_name: str, 567 columns: t.Tuple[str, ...], 568 exists: bool = True, 569 ) -> None: 570 """Creates a new index for the given table if supported 571 572 Args: 573 table_name: The name of the target table. 574 index_name: The name of the index. 575 columns: The list of columns that constitute the index. 576 exists: Indicates whether to include the IF NOT EXISTS check. 577 """ 578 if not self.SUPPORTS_INDEXES: 579 return 580 581 expression = exp.Create( 582 this=exp.Index( 583 this=exp.to_identifier(index_name), 584 table=exp.to_table(table_name), 585 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 586 ), 587 kind="INDEX", 588 exists=exists, 589 ) 590 self.execute(expression) 591 592 def _pop_creatable_type_from_properties( 593 self, 594 properties: t.Dict[str, exp.Expression], 595 ) -> t.Optional[exp.Property]: 596 """Pop out the creatable_type from the properties dictionary (if exists (return it/remove it) else return none). 597 It also checks that none of the expressions are MATERIALIZE as that conflicts with the `materialize` parameter. 598 """ 599 for key in list(properties.keys()): 600 upper_key = key.upper() 601 if upper_key == KEY_FOR_CREATABLE_TYPE: 602 value = properties.pop(key).name 603 parsed_properties = exp.maybe_parse( 604 value, into=exp.Properties, dialect=self.dialect 605 ) 606 property, *others = parsed_properties.expressions 607 if others: 608 # Multiple properties are unsupported today, can look into it in the future if needed 609 raise SQLMeshError( 610 f"Invalid creatable_type value with multiple properties: {value}" 611 ) 612 if isinstance(property, exp.MaterializedProperty): 613 raise SQLMeshError( 614 f"Cannot use {value} as a creatable_type as it conflicts with the `materialize` parameter." 615 ) 616 return property 617 return None 618 619 def create_table( 620 self, 621 table_name: TableName, 622 target_columns_to_types: t.Dict[str, exp.DataType], 623 primary_key: t.Optional[t.Tuple[str, ...]] = None, 624 exists: bool = True, 625 table_description: t.Optional[str] = None, 626 column_descriptions: t.Optional[t.Dict[str, str]] = None, 627 **kwargs: t.Any, 628 ) -> None: 629 """Create a table using a DDL statement 630 631 Args: 632 table_name: The name of the table to create. Can be fully qualified or just table name. 633 target_columns_to_types: A mapping between the column name and its data type. 634 primary_key: Determines the table primary key. 635 exists: Indicates whether to include the IF NOT EXISTS check. 636 table_description: Optional table description from MODEL DDL. 637 column_descriptions: Optional column descriptions from model query. 638 kwargs: Optional create table properties. 639 """ 640 self._create_table_from_columns( 641 table_name, 642 target_columns_to_types, 643 primary_key, 644 exists, 645 table_description, 646 column_descriptions, 647 **kwargs, 648 ) 649 650 def create_managed_table( 651 self, 652 table_name: TableName, 653 query: Query, 654 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 655 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 656 clustered_by: t.Optional[t.List[exp.Expression]] = None, 657 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 658 table_description: t.Optional[str] = None, 659 column_descriptions: t.Optional[t.Dict[str, str]] = None, 660 source_columns: t.Optional[t.List[str]] = None, 661 **kwargs: t.Any, 662 ) -> None: 663 """Create a managed table using a query. 664 665 "Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh. 666 667 Args: 668 table_name: The name of the table to create. Can be fully qualified or just table name. 669 query: The SQL query for the engine to base the managed table on 670 target_columns_to_types: A mapping between the column name and its data type. 671 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 672 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 673 table_properties: Optional mapping of engine-specific properties to be set on the managed table 674 table_description: Optional table description from MODEL DDL. 675 column_descriptions: Optional column descriptions from model query. 676 kwargs: Optional create table properties. 677 """ 678 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 679 680 def ctas( 681 self, 682 table_name: TableName, 683 query_or_df: QueryOrDF, 684 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 685 exists: bool = True, 686 table_description: t.Optional[str] = None, 687 column_descriptions: t.Optional[t.Dict[str, str]] = None, 688 source_columns: t.Optional[t.List[str]] = None, 689 **kwargs: t.Any, 690 ) -> None: 691 """Create a table using a CTAS statement 692 693 Args: 694 table_name: The name of the table to create. Can be fully qualified or just table name. 695 query_or_df: The SQL query to run or a dataframe for the CTAS. 696 target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 697 exists: Indicates whether to include the IF NOT EXISTS check. 698 table_description: Optional table description from MODEL DDL. 699 column_descriptions: Optional column descriptions from model query. 700 kwargs: Optional create table properties. 701 """ 702 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 703 query_or_df, 704 target_columns_to_types, 705 target_table=table_name, 706 source_columns=source_columns, 707 ) 708 return self._create_table_from_source_queries( 709 table_name, 710 source_queries, 711 target_columns_to_types, 712 exists, 713 table_description=table_description, 714 column_descriptions=column_descriptions, 715 **kwargs, 716 ) 717 718 def create_state_table( 719 self, 720 table_name: str, 721 target_columns_to_types: t.Dict[str, exp.DataType], 722 primary_key: t.Optional[t.Tuple[str, ...]] = None, 723 ) -> None: 724 """Create a table to store SQLMesh internal state. 725 726 Args: 727 table_name: The name of the table to create. Can be fully qualified or just table name. 728 target_columns_to_types: A mapping between the column name and its data type. 729 primary_key: Determines the table primary key. 730 """ 731 self.create_table( 732 table_name, 733 target_columns_to_types, 734 primary_key=primary_key, 735 ) 736 737 def _create_table_from_columns( 738 self, 739 table_name: TableName, 740 target_columns_to_types: t.Dict[str, exp.DataType], 741 primary_key: t.Optional[t.Tuple[str, ...]] = None, 742 exists: bool = True, 743 table_description: t.Optional[str] = None, 744 column_descriptions: t.Optional[t.Dict[str, str]] = None, 745 **kwargs: t.Any, 746 ) -> None: 747 """ 748 Create a table using a DDL statement. 749 750 Args: 751 table_name: The name of the table to create. Can be fully qualified or just table name. 752 target_columns_to_types: Mapping between the column name and its data type. 753 primary_key: Determines the table primary key. 754 exists: Indicates whether to include the IF NOT EXISTS check. 755 table_description: Optional table description from MODEL DDL. 756 column_descriptions: Optional column descriptions from model query. 757 kwargs: Optional create table properties. 758 """ 759 table = exp.to_table(table_name) 760 761 if not columns_to_types_all_known(target_columns_to_types): 762 # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set 763 if exists and self.table_exists(table_name): 764 return 765 raise SQLMeshError( 766 "Cannot create a table without knowing the column types. " 767 "Try casting the columns to an expected type or defining the columns in the model metadata. " 768 f"Columns to types: {target_columns_to_types}" 769 ) 770 771 primary_key_expression = ( 772 [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])] 773 if primary_key and self.SUPPORTS_INDEXES 774 else [] 775 ) 776 777 schema = self._build_schema_exp( 778 table, 779 target_columns_to_types, 780 column_descriptions, 781 primary_key_expression, 782 ) 783 784 self._create_table( 785 schema, 786 None, 787 exists=exists, 788 target_columns_to_types=target_columns_to_types, 789 table_description=table_description, 790 **kwargs, 791 ) 792 793 # Register comments with commands if the engine doesn't support comments in the schema or CREATE 794 if ( 795 table_description 796 and self.COMMENT_CREATION_TABLE.is_comment_command_only 797 and self.comments_enabled 798 ): 799 self._create_table_comment(table_name, table_description) 800 if ( 801 column_descriptions 802 and self.COMMENT_CREATION_TABLE.is_comment_command_only 803 and self.comments_enabled 804 ): 805 self._create_column_comments(table_name, column_descriptions) 806 807 def _build_schema_exp( 808 self, 809 table: exp.Table, 810 target_columns_to_types: t.Dict[str, exp.DataType], 811 column_descriptions: t.Optional[t.Dict[str, str]] = None, 812 expressions: t.Optional[t.List[exp.PrimaryKey]] = None, 813 is_view: bool = False, 814 materialized: bool = False, 815 ) -> exp.Schema: 816 """ 817 Build a schema expression for a table, columns, column comments, and additional schema properties. 818 """ 819 expressions = expressions or [] 820 821 return exp.Schema( 822 this=table, 823 expressions=self._build_column_defs( 824 target_columns_to_types=target_columns_to_types, 825 column_descriptions=column_descriptions, 826 is_view=is_view, 827 materialized=materialized, 828 ) 829 + expressions, 830 ) 831 832 def _build_column_defs( 833 self, 834 target_columns_to_types: t.Dict[str, exp.DataType], 835 column_descriptions: t.Optional[t.Dict[str, str]] = None, 836 is_view: bool = False, 837 materialized: bool = False, 838 ) -> t.List[exp.ColumnDef]: 839 engine_supports_schema_comments = ( 840 self.COMMENT_CREATION_VIEW.supports_schema_def 841 if is_view 842 else self.COMMENT_CREATION_TABLE.supports_schema_def 843 ) 844 return [ 845 self._build_column_def( 846 column, 847 column_descriptions=column_descriptions, 848 engine_supports_schema_comments=engine_supports_schema_comments, 849 col_type=None if is_view else kind, # don't include column data type for views 850 ) 851 for column, kind in target_columns_to_types.items() 852 ] 853 854 def _build_column_def( 855 self, 856 col_name: str, 857 column_descriptions: t.Optional[t.Dict[str, str]] = None, 858 engine_supports_schema_comments: bool = False, 859 col_type: t.Optional[exp.DATA_TYPE] = None, 860 nested_names: t.List[str] = [], 861 ) -> exp.ColumnDef: 862 return exp.ColumnDef( 863 this=exp.to_identifier(col_name), 864 kind=col_type, 865 constraints=( 866 self._build_col_comment_exp(col_name, column_descriptions) 867 if engine_supports_schema_comments and self.comments_enabled and column_descriptions 868 else None 869 ), 870 ) 871 872 def _build_col_comment_exp( 873 self, col_name: str, column_descriptions: t.Dict[str, str] 874 ) -> t.List[exp.ColumnConstraint]: 875 comment = column_descriptions.get(col_name, None) 876 if comment: 877 return [ 878 exp.ColumnConstraint( 879 kind=exp.CommentColumnConstraint( 880 this=exp.Literal.string(self._truncate_column_comment(comment)) 881 ) 882 ) 883 ] 884 return [] 885 886 def _create_table_from_source_queries( 887 self, 888 table_name: TableName, 889 source_queries: t.List[SourceQuery], 890 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 891 exists: bool = True, 892 replace: bool = False, 893 table_description: t.Optional[str] = None, 894 column_descriptions: t.Optional[t.Dict[str, str]] = None, 895 table_kind: t.Optional[str] = None, 896 track_rows_processed: bool = True, 897 **kwargs: t.Any, 898 ) -> None: 899 table = exp.to_table(table_name) 900 901 # CTAS calls do not usually include a schema expression. However, most engines 902 # permit them in CTAS expressions, and they allow us to register all column comments 903 # in a single call rather than in a separate comment command call for each column. 904 # 905 # This block conditionally builds a schema expression with column comments if the engine 906 # supports it and we have columns_to_types. column_to_types is required because the 907 # schema expression must include at least column name, data type, and the comment - 908 # for example, `(colname INTEGER COMMENT 'comment')`. 909 # 910 # column_to_types will be available when loading from a DataFrame (by converting from 911 # pandas to SQL types), when a model is "annotated" by explicitly specifying column 912 # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()` 913 # calls and SCD Type 2 model calls. 914 schema = None 915 target_columns_to_types_known = target_columns_to_types and columns_to_types_all_known( 916 target_columns_to_types 917 ) 918 if ( 919 column_descriptions 920 and target_columns_to_types_known 921 and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas 922 and self.comments_enabled 923 ): 924 schema = self._build_schema_exp(table, target_columns_to_types, column_descriptions) # type: ignore 925 926 with self.transaction(condition=len(source_queries) > 1): 927 for i, source_query in enumerate(source_queries): 928 with source_query as query: 929 if target_columns_to_types and target_columns_to_types_known: 930 query = self._order_projections_and_filter( 931 query, target_columns_to_types, coerce_types=True 932 ) 933 if i == 0: 934 self._create_table( 935 schema if schema else table, 936 query, 937 target_columns_to_types=target_columns_to_types, 938 exists=exists, 939 replace=replace, 940 table_description=table_description, 941 table_kind=table_kind, 942 track_rows_processed=track_rows_processed, 943 **kwargs, 944 ) 945 else: 946 self._insert_append_query( 947 table_name, 948 query, 949 target_columns_to_types or self.columns(table), 950 track_rows_processed=track_rows_processed, 951 ) 952 953 # Register comments with commands if the engine supports comments and we weren't able to 954 # register them with the CTAS call's schema expression. 955 if ( 956 table_description 957 and self.COMMENT_CREATION_TABLE.is_comment_command_only 958 and self.comments_enabled 959 ): 960 self._create_table_comment(table_name, table_description) 961 if column_descriptions and schema is None and self.comments_enabled: 962 self._create_column_comments(table_name, column_descriptions) 963 964 def _create_table( 965 self, 966 table_name_or_schema: t.Union[exp.Schema, TableName], 967 expression: t.Optional[exp.Expression], 968 exists: bool = True, 969 replace: bool = False, 970 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 971 table_description: t.Optional[str] = None, 972 column_descriptions: t.Optional[t.Dict[str, str]] = None, 973 table_kind: t.Optional[str] = None, 974 track_rows_processed: bool = True, 975 **kwargs: t.Any, 976 ) -> None: 977 self.execute( 978 self._build_create_table_exp( 979 table_name_or_schema, 980 expression=expression, 981 exists=exists, 982 replace=replace, 983 target_columns_to_types=target_columns_to_types, 984 table_description=( 985 table_description 986 if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled 987 else None 988 ), 989 table_kind=table_kind, 990 **kwargs, 991 ), 992 track_rows_processed=track_rows_processed, 993 ) 994 # Extract table name to clear cache 995 table_name = ( 996 table_name_or_schema.this 997 if isinstance(table_name_or_schema, exp.Schema) 998 else table_name_or_schema 999 ) 1000 self._clear_data_object_cache(table_name) 1001 1002 def _build_create_table_exp( 1003 self, 1004 table_name_or_schema: t.Union[exp.Schema, TableName], 1005 expression: t.Optional[exp.Expression], 1006 exists: bool = True, 1007 replace: bool = False, 1008 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1009 table_description: t.Optional[str] = None, 1010 table_kind: t.Optional[str] = None, 1011 **kwargs: t.Any, 1012 ) -> exp.Create: 1013 exists = False if replace else exists 1014 catalog_name = None 1015 if not isinstance(table_name_or_schema, exp.Schema): 1016 table_name_or_schema = exp.to_table(table_name_or_schema) 1017 catalog_name = table_name_or_schema.catalog 1018 else: 1019 if isinstance(table_name_or_schema.this, exp.Table): 1020 catalog_name = table_name_or_schema.this.catalog 1021 1022 properties = ( 1023 self._build_table_properties_exp( 1024 **kwargs, 1025 catalog_name=catalog_name, 1026 target_columns_to_types=target_columns_to_types, 1027 table_description=table_description, 1028 table_kind=table_kind, 1029 ) 1030 if kwargs or table_description 1031 else None 1032 ) 1033 return exp.Create( 1034 this=table_name_or_schema, 1035 kind=table_kind or "TABLE", 1036 replace=replace, 1037 exists=exists, 1038 expression=expression, 1039 properties=properties, 1040 ) 1041 1042 def create_table_like( 1043 self, 1044 target_table_name: TableName, 1045 source_table_name: TableName, 1046 exists: bool = True, 1047 **kwargs: t.Any, 1048 ) -> None: 1049 """Create a table to store SQLMesh internal state based on the definition of another table, including any 1050 column attributes and indexes defined in the original table. 1051 1052 Args: 1053 target_table_name: The name of the table to create. Can be fully qualified or just table name. 1054 source_table_name: The name of the table to base the new table on. 1055 """ 1056 self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs) 1057 self._clear_data_object_cache(target_table_name) 1058 1059 def clone_table( 1060 self, 1061 target_table_name: TableName, 1062 source_table_name: TableName, 1063 replace: bool = False, 1064 exists: bool = True, 1065 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 1066 **kwargs: t.Any, 1067 ) -> None: 1068 """Creates a table with the target name by cloning the source table. 1069 1070 Args: 1071 target_table_name: The name of the table that should be created. 1072 source_table_name: The name of the source table that should be cloned. 1073 replace: Whether or not to replace an existing table. 1074 exists: Indicates whether to include the IF NOT EXISTS check. 1075 """ 1076 if not self.SUPPORTS_CLONING: 1077 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 1078 1079 kwargs.pop("rendered_physical_properties", None) 1080 self.execute( 1081 exp.Create( 1082 this=exp.to_table(target_table_name), 1083 kind="TABLE", 1084 replace=replace, 1085 exists=exists, 1086 clone=exp.Clone( 1087 this=exp.to_table(source_table_name), 1088 **(clone_kwargs or {}), 1089 ), 1090 **kwargs, 1091 ) 1092 ) 1093 self._clear_data_object_cache(target_table_name) 1094 1095 def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None: 1096 """Drops a data object of arbitrary type. 1097 1098 Args: 1099 data_object: The data object to drop. 1100 ignore_if_not_exists: If True, no error will be raised if the data object does not exist. 1101 """ 1102 if data_object.type.is_view: 1103 self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists) 1104 elif data_object.type.is_materialized_view: 1105 self.drop_view( 1106 data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True 1107 ) 1108 elif data_object.type.is_table: 1109 self.drop_table(data_object.to_table(), exists=ignore_if_not_exists) 1110 elif data_object.type.is_managed_table: 1111 self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists) 1112 else: 1113 raise SQLMeshError( 1114 f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" 1115 ) 1116 1117 def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: 1118 """Drops a table. 1119 1120 Args: 1121 table_name: The name of the table to drop. 1122 exists: If exists, defaults to True. 1123 """ 1124 self._drop_object(name=table_name, exists=exists, **kwargs) 1125 1126 def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: 1127 """Drops a managed table. 1128 1129 Args: 1130 table_name: The name of the table to drop. 1131 exists: If exists, defaults to True. 1132 """ 1133 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 1134 1135 def _drop_object( 1136 self, 1137 name: TableName | SchemaName, 1138 exists: bool = True, 1139 kind: str = "TABLE", 1140 cascade: bool = False, 1141 **drop_args: t.Any, 1142 ) -> None: 1143 """Drops an object. 1144 1145 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 1146 1147 Args: 1148 name: The name of the table to drop. 1149 exists: If exists, defaults to True. 1150 kind: What kind of object to drop. Defaults to TABLE 1151 cascade: Whether or not to DROP ... CASCADE. 1152 Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS 1153 **drop_args: Any extra arguments to set on the Drop expression 1154 """ 1155 if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS: 1156 drop_args["cascade"] = cascade 1157 1158 self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args)) 1159 self._clear_data_object_cache(name) 1160 1161 def get_alter_operations( 1162 self, 1163 current_table_name: TableName, 1164 target_table_name: TableName, 1165 *, 1166 ignore_destructive: bool = False, 1167 ignore_additive: bool = False, 1168 ) -> t.List[TableAlterOperation]: 1169 """ 1170 Determines the alter statements needed to change the current table into the structure of the target table. 1171 """ 1172 return t.cast( 1173 t.List[TableAlterOperation], 1174 self.schema_differ.compare_columns( 1175 current_table_name, 1176 self.columns(current_table_name), 1177 self.columns(target_table_name), 1178 ignore_destructive=ignore_destructive, 1179 ignore_additive=ignore_additive, 1180 ), 1181 ) 1182 1183 def alter_table( 1184 self, 1185 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 1186 ) -> None: 1187 """ 1188 Performs the alter statements to change the current table into the structure of the target table. 1189 """ 1190 with self.transaction(): 1191 for alter_expression in [ 1192 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 1193 ]: 1194 self.execute(alter_expression) 1195 1196 def create_view( 1197 self, 1198 view_name: TableName, 1199 query_or_df: QueryOrDF, 1200 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1201 replace: bool = True, 1202 materialized: bool = False, 1203 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 1204 table_description: t.Optional[str] = None, 1205 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1206 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1207 source_columns: t.Optional[t.List[str]] = None, 1208 **create_kwargs: t.Any, 1209 ) -> None: 1210 """Create a view with a query or dataframe. 1211 1212 If a dataframe is passed in, it will be converted into a literal values statement. 1213 This should only be done if the dataframe is very small! 1214 1215 Args: 1216 view_name: The view name. 1217 query_or_df: A query or dataframe. 1218 target_columns_to_types: Columns to use in the view statement. 1219 replace: Whether or not to replace an existing view defaults to True. 1220 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 1221 materialized_properties: Optional materialized view properties to add to the view. 1222 table_description: Optional table description from MODEL DDL. 1223 column_descriptions: Optional column descriptions from model query. 1224 view_properties: Optional view properties to add to the view. 1225 create_kwargs: Additional kwargs to pass into the Create expression 1226 """ 1227 import pandas as pd 1228 1229 if materialized_properties and not materialized: 1230 raise SQLMeshError("Materialized properties are only supported for materialized views") 1231 1232 query_or_df = self._native_df_to_pandas_df(query_or_df) 1233 1234 if isinstance(query_or_df, pd.DataFrame): 1235 values: t.List[t.Tuple[t.Any, ...]] = list( 1236 query_or_df.itertuples(index=False, name=None) 1237 ) 1238 target_columns_to_types, source_columns = self._columns_to_types( 1239 query_or_df, target_columns_to_types, source_columns 1240 ) 1241 if not target_columns_to_types: 1242 raise SQLMeshError("columns_to_types must be provided for dataframes") 1243 source_columns_to_types = get_source_columns_to_types( 1244 target_columns_to_types, source_columns 1245 ) 1246 query_or_df = self._values_to_sql( 1247 values, 1248 source_columns_to_types, 1249 batch_start=0, 1250 batch_end=len(values), 1251 ) 1252 1253 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1254 query_or_df, 1255 target_columns_to_types, 1256 batch_size=0, 1257 target_table=view_name, 1258 source_columns=source_columns, 1259 ) 1260 if len(source_queries) != 1: 1261 raise SQLMeshError("Only one source query is supported for creating views") 1262 1263 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 1264 if target_columns_to_types: 1265 schema = self._build_schema_exp( 1266 exp.to_table(view_name), 1267 target_columns_to_types, 1268 column_descriptions, 1269 is_view=True, 1270 materialized=materialized, 1271 ) 1272 1273 properties = create_kwargs.pop("properties", None) 1274 if not properties: 1275 properties = exp.Properties(expressions=[]) 1276 1277 if view_properties: 1278 table_type = self._pop_creatable_type_from_properties(view_properties) 1279 if table_type: 1280 properties.append("expressions", table_type) 1281 1282 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 1283 properties.append("expressions", exp.MaterializedProperty()) 1284 1285 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1286 schema = schema.this 1287 1288 if not self.SUPPORTS_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1289 schema = schema.this 1290 1291 if materialized_properties: 1292 partitioned_by = materialized_properties.pop("partitioned_by", None) 1293 clustered_by = materialized_properties.pop("clustered_by", None) 1294 if ( 1295 partitioned_by 1296 and ( 1297 partitioned_by_prop := self._build_partitioned_by_exp( 1298 partitioned_by, **materialized_properties 1299 ) 1300 ) 1301 is not None 1302 ): 1303 materialized_properties["catalog_name"] = exp.to_table(view_name).catalog 1304 properties.append("expressions", partitioned_by_prop) 1305 if ( 1306 clustered_by 1307 and ( 1308 clustered_by_prop := self._build_clustered_by_exp( 1309 clustered_by, **materialized_properties 1310 ) 1311 ) 1312 is not None 1313 ): 1314 properties.append("expressions", clustered_by_prop) 1315 1316 create_view_properties = self._build_view_properties_exp( 1317 view_properties, 1318 ( 1319 table_description 1320 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 1321 else None 1322 ), 1323 physical_cluster=create_kwargs.pop("physical_cluster", None), 1324 ) 1325 if create_view_properties: 1326 for view_property in create_view_properties.expressions: 1327 # Small hack to make sure SECURE goes at the beginning before materialized as required by Snowflake 1328 if isinstance(view_property, exp.SecureProperty): 1329 properties.set("expressions", view_property, index=0, overwrite=False) 1330 else: 1331 properties.append("expressions", view_property) 1332 1333 if properties.expressions: 1334 create_kwargs["properties"] = properties 1335 1336 if replace: 1337 self.drop_data_object_on_type_mismatch( 1338 self.get_data_object(view_name), 1339 DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, 1340 ) 1341 1342 with source_queries[0] as query: 1343 self.execute( 1344 exp.Create( 1345 this=schema, 1346 kind="VIEW", 1347 replace=replace, 1348 expression=query, 1349 **create_kwargs, 1350 ), 1351 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 1352 ) 1353 1354 self._clear_data_object_cache(view_name) 1355 1356 # Register table comment with commands if the engine doesn't support doing it in CREATE 1357 if ( 1358 table_description 1359 and self.COMMENT_CREATION_VIEW.is_comment_command_only 1360 and self.comments_enabled 1361 ): 1362 self._create_table_comment(view_name, table_description, "VIEW") 1363 # Register column comments with commands if the engine doesn't support doing it in 1364 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 1365 # columns_to_types 1366 if ( 1367 column_descriptions 1368 and ( 1369 self.COMMENT_CREATION_VIEW.is_comment_command_only 1370 or ( 1371 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 1372 and not target_columns_to_types 1373 ) 1374 ) 1375 and self.comments_enabled 1376 ): 1377 self._create_column_comments(view_name, column_descriptions, "VIEW", materialized) 1378 1379 @set_catalog() 1380 def create_schema( 1381 self, 1382 schema_name: SchemaName, 1383 ignore_if_exists: bool = True, 1384 warn_on_error: bool = True, 1385 properties: t.Optional[t.List[exp.Expression]] = None, 1386 ) -> None: 1387 properties = properties or [] 1388 return self._create_schema( 1389 schema_name=schema_name, 1390 ignore_if_exists=ignore_if_exists, 1391 warn_on_error=warn_on_error, 1392 properties=properties, 1393 kind="SCHEMA", 1394 ) 1395 1396 def _create_schema( 1397 self, 1398 schema_name: SchemaName, 1399 ignore_if_exists: bool, 1400 warn_on_error: bool, 1401 properties: t.List[exp.Expression], 1402 kind: str, 1403 ) -> None: 1404 """Create a schema from a name or qualified table name.""" 1405 try: 1406 self.execute( 1407 exp.Create( 1408 this=to_schema(schema_name), 1409 kind=kind, 1410 exists=ignore_if_exists, 1411 properties=exp.Properties( # this renders as '' (empty string) if expressions is empty 1412 expressions=properties 1413 ), 1414 ) 1415 ) 1416 except Exception as e: 1417 if not warn_on_error: 1418 raise 1419 logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e) 1420 1421 def drop_schema( 1422 self, 1423 schema_name: SchemaName, 1424 ignore_if_not_exists: bool = True, 1425 cascade: bool = False, 1426 **drop_args: t.Dict[str, exp.Expression], 1427 ) -> None: 1428 return self._drop_object( 1429 name=schema_name, 1430 exists=ignore_if_not_exists, 1431 kind="SCHEMA", 1432 cascade=cascade, 1433 **drop_args, 1434 ) 1435 1436 def drop_view( 1437 self, 1438 view_name: TableName, 1439 ignore_if_not_exists: bool = True, 1440 materialized: bool = False, 1441 **kwargs: t.Any, 1442 ) -> None: 1443 """Drop a view.""" 1444 self._drop_object( 1445 name=view_name, 1446 exists=ignore_if_not_exists, 1447 kind="VIEW", 1448 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 1449 **kwargs, 1450 ) 1451 1452 def create_catalog(self, catalog_name: str | exp.Identifier) -> None: 1453 return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1454 1455 def _create_catalog(self, catalog_name: exp.Identifier) -> None: 1456 raise SQLMeshError( 1457 f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1458 ) 1459 1460 def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: 1461 return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1462 1463 def _drop_catalog(self, catalog_name: exp.Identifier) -> None: 1464 raise SQLMeshError( 1465 f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1466 ) 1467 1468 def columns( 1469 self, table_name: TableName, include_pseudo_columns: bool = False 1470 ) -> t.Dict[str, exp.DataType]: 1471 """Fetches column names and types for the target table.""" 1472 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 1473 describe_output = self.cursor.fetchall() 1474 return { 1475 # Note: MySQL returns the column type as bytes. 1476 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 1477 for column_name, column_type, *_ in itertools.takewhile( 1478 lambda t: not t[0].startswith("#"), 1479 describe_output, 1480 ) 1481 if column_name and column_name.strip() and column_type and column_type.strip() 1482 } 1483 1484 def table_exists(self, table_name: TableName) -> bool: 1485 table = exp.to_table(table_name) 1486 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 1487 if data_object_cache_key in self._data_object_cache: 1488 logger.debug("Table existence cache hit: %s", data_object_cache_key) 1489 return self._data_object_cache[data_object_cache_key] is not None 1490 1491 try: 1492 self.execute(exp.Describe(this=table, kind="TABLE")) 1493 return True 1494 except Exception: 1495 return False 1496 1497 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None: 1498 self.execute(exp.delete(table_name, where)) 1499 1500 def insert_append( 1501 self, 1502 table_name: TableName, 1503 query_or_df: QueryOrDF, 1504 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1505 track_rows_processed: bool = True, 1506 source_columns: t.Optional[t.List[str]] = None, 1507 ) -> None: 1508 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1509 query_or_df, 1510 target_columns_to_types, 1511 target_table=table_name, 1512 source_columns=source_columns, 1513 ) 1514 self._insert_append_source_queries( 1515 table_name, source_queries, target_columns_to_types, track_rows_processed 1516 ) 1517 1518 def _insert_append_source_queries( 1519 self, 1520 table_name: TableName, 1521 source_queries: t.List[SourceQuery], 1522 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1523 track_rows_processed: bool = True, 1524 ) -> None: 1525 with self.transaction(condition=len(source_queries) > 0): 1526 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1527 for source_query in source_queries: 1528 with source_query as query: 1529 self._insert_append_query( 1530 table_name, 1531 query, 1532 target_columns_to_types, 1533 track_rows_processed=track_rows_processed, 1534 ) 1535 1536 def _insert_append_query( 1537 self, 1538 table_name: TableName, 1539 query: Query, 1540 target_columns_to_types: t.Dict[str, exp.DataType], 1541 order_projections: bool = True, 1542 track_rows_processed: bool = True, 1543 ) -> None: 1544 if order_projections: 1545 query = self._order_projections_and_filter(query, target_columns_to_types) 1546 self.execute( 1547 exp.insert(query, table_name, columns=list(target_columns_to_types)), 1548 track_rows_processed=track_rows_processed, 1549 ) 1550 1551 def insert_overwrite_by_partition( 1552 self, 1553 table_name: TableName, 1554 query_or_df: QueryOrDF, 1555 partitioned_by: t.List[exp.Expression], 1556 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1557 source_columns: t.Optional[t.List[str]] = None, 1558 ) -> None: 1559 if self.INSERT_OVERWRITE_STRATEGY.is_insert_overwrite: 1560 target_table = exp.to_table(table_name) 1561 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1562 query_or_df, 1563 target_columns_to_types, 1564 target_table=target_table, 1565 source_columns=source_columns, 1566 ) 1567 self._insert_overwrite_by_condition( 1568 table_name, source_queries, target_columns_to_types=target_columns_to_types 1569 ) 1570 else: 1571 self._replace_by_key( 1572 table_name, 1573 query_or_df, 1574 target_columns_to_types, 1575 partitioned_by, 1576 is_unique_key=False, 1577 source_columns=source_columns, 1578 ) 1579 1580 def insert_overwrite_by_time_partition( 1581 self, 1582 table_name: TableName, 1583 query_or_df: QueryOrDF, 1584 start: TimeLike, 1585 end: TimeLike, 1586 time_formatter: t.Callable[ 1587 [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression 1588 ], 1589 time_column: TimeColumn | exp.Expression | str, 1590 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1591 source_columns: t.Optional[t.List[str]] = None, 1592 **kwargs: t.Any, 1593 ) -> None: 1594 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1595 query_or_df, 1596 target_columns_to_types, 1597 target_table=table_name, 1598 source_columns=source_columns, 1599 ) 1600 if not target_columns_to_types or not columns_to_types_all_known(target_columns_to_types): 1601 target_columns_to_types = self.columns(table_name) 1602 low, high = [ 1603 time_formatter(dt, target_columns_to_types) 1604 for dt in make_inclusive(start, end, self.dialect) 1605 ] 1606 if isinstance(time_column, TimeColumn): 1607 time_column = time_column.column 1608 where = exp.Between( 1609 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1610 low=low, 1611 high=high, 1612 ) 1613 return self._insert_overwrite_by_time_partition( 1614 table_name, source_queries, target_columns_to_types, where, **kwargs 1615 ) 1616 1617 def _insert_overwrite_by_time_partition( 1618 self, 1619 table_name: TableName, 1620 source_queries: t.List[SourceQuery], 1621 target_columns_to_types: t.Dict[str, exp.DataType], 1622 where: exp.Condition, 1623 **kwargs: t.Any, 1624 ) -> None: 1625 return self._insert_overwrite_by_condition( 1626 table_name, source_queries, target_columns_to_types, where, **kwargs 1627 ) 1628 1629 def _values_to_sql( 1630 self, 1631 values: t.List[t.Tuple[t.Any, ...]], 1632 target_columns_to_types: t.Dict[str, exp.DataType], 1633 batch_start: int, 1634 batch_end: int, 1635 alias: str = "t", 1636 source_columns: t.Optional[t.List[str]] = None, 1637 ) -> Query: 1638 return select_from_values_for_batch_range( 1639 values=values, 1640 target_columns_to_types=target_columns_to_types, 1641 batch_start=batch_start, 1642 batch_end=batch_end, 1643 alias=alias, 1644 source_columns=source_columns, 1645 ) 1646 1647 def _insert_overwrite_by_condition( 1648 self, 1649 table_name: TableName, 1650 source_queries: t.List[SourceQuery], 1651 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1652 where: t.Optional[exp.Condition] = None, 1653 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 1654 **kwargs: t.Any, 1655 ) -> None: 1656 table = exp.to_table(table_name) 1657 insert_overwrite_strategy = ( 1658 insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY 1659 ) 1660 with self.transaction( 1661 condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert 1662 ): 1663 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1664 for i, source_query in enumerate(source_queries): 1665 with source_query as query: 1666 query = self._order_projections_and_filter( 1667 query, target_columns_to_types, where=where 1668 ) 1669 if i > 0 or insert_overwrite_strategy.is_delete_insert: 1670 if i == 0: 1671 self.delete_from(table_name, where=where or exp.true()) 1672 self._insert_append_query( 1673 table_name, 1674 query, 1675 target_columns_to_types=target_columns_to_types, 1676 order_projections=False, 1677 ) 1678 elif insert_overwrite_strategy.is_merge: 1679 columns = [exp.column(col) for col in target_columns_to_types] 1680 when_not_matched_by_source = exp.When( 1681 matched=False, 1682 source=True, 1683 condition=where, 1684 then=exp.Delete(), 1685 ) 1686 when_not_matched_by_target = exp.When( 1687 matched=False, 1688 source=False, 1689 then=exp.Insert( 1690 this=exp.Tuple(expressions=columns), 1691 expression=exp.Tuple(expressions=columns), 1692 ), 1693 ) 1694 self._merge( 1695 target_table=table_name, 1696 query=query, 1697 on=exp.false(), 1698 whens=exp.Whens( 1699 expressions=[when_not_matched_by_source, when_not_matched_by_target] 1700 ), 1701 ) 1702 else: 1703 insert_exp = exp.insert( 1704 query, 1705 table, 1706 columns=( 1707 list(target_columns_to_types) 1708 if not insert_overwrite_strategy.is_replace_where 1709 else None 1710 ), 1711 overwrite=insert_overwrite_strategy.is_insert_overwrite, 1712 ) 1713 if insert_overwrite_strategy.is_replace_where: 1714 insert_exp.set("where", where or exp.true()) 1715 self.execute(insert_exp, track_rows_processed=True) 1716 1717 def update_table( 1718 self, 1719 table_name: TableName, 1720 properties: t.Dict[str, t.Any], 1721 where: t.Optional[str | exp.Condition] = None, 1722 ) -> None: 1723 self.execute(exp.update(table_name, properties, where=where)) 1724 1725 def _merge( 1726 self, 1727 target_table: TableName, 1728 query: Query, 1729 on: exp.Expression, 1730 whens: exp.Whens, 1731 ) -> None: 1732 this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True) 1733 using = exp.alias_( 1734 exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True 1735 ) 1736 self.execute( 1737 exp.Merge(this=this, using=using, on=on, whens=whens), track_rows_processed=True 1738 ) 1739 1740 def scd_type_2_by_time( 1741 self, 1742 target_table: TableName, 1743 source_table: QueryOrDF, 1744 unique_key: t.Sequence[exp.Expression], 1745 valid_from_col: exp.Column, 1746 valid_to_col: exp.Column, 1747 execution_time: t.Union[TimeLike, exp.Column], 1748 updated_at_col: exp.Column, 1749 invalidate_hard_deletes: bool = True, 1750 updated_at_as_valid_from: bool = False, 1751 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1752 table_description: t.Optional[str] = None, 1753 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1754 truncate: bool = False, 1755 source_columns: t.Optional[t.List[str]] = None, 1756 **kwargs: t.Any, 1757 ) -> None: 1758 self._scd_type_2( 1759 target_table=target_table, 1760 source_table=source_table, 1761 unique_key=unique_key, 1762 valid_from_col=valid_from_col, 1763 valid_to_col=valid_to_col, 1764 execution_time=execution_time, 1765 updated_at_col=updated_at_col, 1766 invalidate_hard_deletes=invalidate_hard_deletes, 1767 updated_at_as_valid_from=updated_at_as_valid_from, 1768 target_columns_to_types=target_columns_to_types, 1769 table_description=table_description, 1770 column_descriptions=column_descriptions, 1771 truncate=truncate, 1772 source_columns=source_columns, 1773 **kwargs, 1774 ) 1775 1776 def scd_type_2_by_column( 1777 self, 1778 target_table: TableName, 1779 source_table: QueryOrDF, 1780 unique_key: t.Sequence[exp.Expression], 1781 valid_from_col: exp.Column, 1782 valid_to_col: exp.Column, 1783 execution_time: t.Union[TimeLike, exp.Column], 1784 check_columns: t.Union[exp.Star, t.Sequence[exp.Expression]], 1785 invalidate_hard_deletes: bool = True, 1786 execution_time_as_valid_from: bool = False, 1787 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1788 table_description: t.Optional[str] = None, 1789 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1790 truncate: bool = False, 1791 source_columns: t.Optional[t.List[str]] = None, 1792 **kwargs: t.Any, 1793 ) -> None: 1794 self._scd_type_2( 1795 target_table=target_table, 1796 source_table=source_table, 1797 unique_key=unique_key, 1798 valid_from_col=valid_from_col, 1799 valid_to_col=valid_to_col, 1800 execution_time=execution_time, 1801 check_columns=check_columns, 1802 target_columns_to_types=target_columns_to_types, 1803 invalidate_hard_deletes=invalidate_hard_deletes, 1804 execution_time_as_valid_from=execution_time_as_valid_from, 1805 table_description=table_description, 1806 column_descriptions=column_descriptions, 1807 truncate=truncate, 1808 source_columns=source_columns, 1809 **kwargs, 1810 ) 1811 1812 def _scd_type_2( 1813 self, 1814 target_table: TableName, 1815 source_table: QueryOrDF, 1816 unique_key: t.Sequence[exp.Expression], 1817 valid_from_col: exp.Column, 1818 valid_to_col: exp.Column, 1819 execution_time: t.Union[TimeLike, exp.Column], 1820 invalidate_hard_deletes: bool = True, 1821 updated_at_col: t.Optional[exp.Column] = None, 1822 check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Expression]]] = None, 1823 updated_at_as_valid_from: bool = False, 1824 execution_time_as_valid_from: bool = False, 1825 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1826 table_description: t.Optional[str] = None, 1827 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1828 truncate: bool = False, 1829 source_columns: t.Optional[t.List[str]] = None, 1830 **kwargs: t.Any, 1831 ) -> None: 1832 def remove_managed_columns( 1833 cols_to_types: t.Dict[str, exp.DataType], 1834 ) -> t.Dict[str, exp.DataType]: 1835 return { 1836 k: v for k, v in cols_to_types.items() if k not in {valid_from_name, valid_to_name} 1837 } 1838 1839 valid_from_name = valid_from_col.name 1840 valid_to_name = valid_to_col.name 1841 target_columns_to_types = target_columns_to_types or self.columns(target_table) 1842 if ( 1843 valid_from_name not in target_columns_to_types 1844 or valid_to_name not in target_columns_to_types 1845 or not columns_to_types_all_known(target_columns_to_types) 1846 ): 1847 target_columns_to_types = self.columns(target_table) 1848 unmanaged_columns_to_types = ( 1849 remove_managed_columns(target_columns_to_types) if target_columns_to_types else None 1850 ) 1851 source_queries, unmanaged_columns_to_types = self._get_source_queries_and_columns_to_types( 1852 source_table, 1853 unmanaged_columns_to_types, 1854 target_table=target_table, 1855 batch_size=0, 1856 source_columns=source_columns, 1857 ) 1858 updated_at_name = updated_at_col.name if updated_at_col else None 1859 if not target_columns_to_types: 1860 raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?") 1861 unmanaged_columns_to_types = unmanaged_columns_to_types or remove_managed_columns( 1862 target_columns_to_types 1863 ) 1864 if not unique_key: 1865 raise SQLMeshError("unique_key must be provided for SCD Type 2") 1866 if check_columns and updated_at_col: 1867 raise SQLMeshError( 1868 "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2" 1869 ) 1870 if check_columns and updated_at_as_valid_from: 1871 raise SQLMeshError( 1872 "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2" 1873 ) 1874 if execution_time_as_valid_from and not check_columns: 1875 raise SQLMeshError( 1876 "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2" 1877 ) 1878 if updated_at_name and updated_at_name not in target_columns_to_types: 1879 raise SQLMeshError( 1880 f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2" 1881 ) 1882 time_data_type = target_columns_to_types[valid_from_name] 1883 select_source_columns: t.List[t.Union[str, exp.Alias]] = [ 1884 col for col in unmanaged_columns_to_types if col != updated_at_name 1885 ] 1886 table_columns = [exp.column(c, quoted=True) for c in target_columns_to_types] 1887 if updated_at_name: 1888 select_source_columns.append( 1889 exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this) # type: ignore 1890 ) 1891 1892 # If a star is provided, we include all unmanaged columns in the check. 1893 # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know 1894 # they are equal or not, the extra check is not a problem and we gain simplified logic here. 1895 # If we want to change this, then we just need to check the expressions in unique_key and pull out the 1896 # column names and then remove them from the unmanaged_columns 1897 if check_columns: 1898 # Handle both Star directly and [Star()] (which can happen during serialization/deserialization) 1899 if isinstance(seq_get(ensure_list(check_columns), 0), exp.Star): 1900 check_columns = [exp.column(col) for col in unmanaged_columns_to_types] 1901 execution_ts = ( 1902 exp.cast(execution_time, time_data_type, dialect=self.dialect) 1903 if isinstance(execution_time, exp.Column) 1904 else to_time_column(execution_time, time_data_type, self.dialect, nullable=True) 1905 ) 1906 if updated_at_as_valid_from: 1907 if not updated_at_col: 1908 raise SQLMeshError( 1909 "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2" 1910 ) 1911 update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col 1912 # If using check_columns and the user doesn't always want execution_time for valid from 1913 # then we only use epoch 0 if we are truncating the table and loading rows for the first time. 1914 # All future new rows should have execution time. 1915 elif check_columns and (execution_time_as_valid_from or not truncate): 1916 update_valid_from_start = execution_ts 1917 else: 1918 update_valid_from_start = to_time_column( 1919 "1970-01-01 00:00:00+00:00", time_data_type, self.dialect, nullable=True 1920 ) 1921 insert_valid_from_start = execution_ts if check_columns else updated_at_col # type: ignore 1922 # joined._exists IS NULL is saying "if the row is deleted" 1923 delete_check = ( 1924 exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None 1925 ) 1926 prefixed_valid_to_col = valid_to_col.copy() 1927 prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}") 1928 prefixed_valid_from_col = valid_from_col.copy() 1929 prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}") 1930 if check_columns: 1931 row_check_conditions = [] 1932 for col in check_columns: 1933 col_qualified = col.copy() 1934 col_qualified.set("table", exp.to_identifier("joined")) 1935 1936 t_col = col_qualified.copy() 1937 for column in t_col.find_all(exp.Column): 1938 column.this.set("this", f"t_{column.name}") 1939 1940 row_check_conditions.extend( 1941 [ 1942 col_qualified.neq(t_col), 1943 exp.and_(t_col.is_(exp.Null()), col_qualified.is_(exp.Null()).not_()), 1944 exp.and_(t_col.is_(exp.Null()).not_(), col_qualified.is_(exp.Null())), 1945 ] 1946 ) 1947 row_value_check = exp.or_(*row_check_conditions) 1948 unique_key_conditions = [] 1949 for key in unique_key: 1950 key_qualified = key.copy() 1951 key_qualified.set("table", exp.to_identifier("joined")) 1952 t_key = key_qualified.copy() 1953 for col in t_key.find_all(exp.Column): 1954 col.this.set("this", f"t_{col.name}") 1955 unique_key_conditions.extend( 1956 [t_key.is_(exp.Null()).not_(), key_qualified.is_(exp.Null()).not_()] 1957 ) 1958 unique_key_check = exp.and_(*unique_key_conditions) 1959 # unique_key_check is saying "if the row is updated" 1960 # row_value_check is saying "if the row has changed" 1961 updated_row_filter = exp.and_(unique_key_check, row_value_check) 1962 valid_to_case_stmt = ( 1963 exp.Case() 1964 .when( 1965 exp.and_( 1966 exp.or_( 1967 delete_check, 1968 updated_row_filter, 1969 ) 1970 ), 1971 execution_ts, 1972 ) 1973 .else_(prefixed_valid_to_col) 1974 .as_(valid_to_col.this) 1975 ) 1976 valid_from_case_stmt = exp.func( 1977 "COALESCE", 1978 prefixed_valid_from_col, 1979 update_valid_from_start, 1980 ).as_(valid_from_col.this) 1981 else: 1982 assert updated_at_col is not None 1983 updated_at_col_qualified = updated_at_col.copy() 1984 updated_at_col_qualified.set("table", exp.to_identifier("joined")) 1985 prefixed_updated_at_col = updated_at_col_qualified.copy() 1986 prefixed_updated_at_col.this.set("this", f"t_{updated_at_col_qualified.name}") 1987 updated_row_filter = updated_at_col_qualified > prefixed_updated_at_col 1988 1989 valid_to_case_stmt_builder = exp.Case().when( 1990 updated_row_filter, updated_at_col_qualified 1991 ) 1992 if delete_check: 1993 valid_to_case_stmt_builder = valid_to_case_stmt_builder.when( 1994 delete_check, execution_ts 1995 ) 1996 valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_( 1997 valid_to_col.this 1998 ) 1999 2000 valid_from_case_stmt = ( 2001 exp.Case() 2002 .when( 2003 exp.and_( 2004 prefixed_valid_from_col.is_(exp.Null()), 2005 exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(), 2006 ), 2007 exp.Case() 2008 .when( 2009 exp.column(valid_to_col.this, "latest_deleted") > updated_at_col, 2010 exp.column(valid_to_col.this, "latest_deleted"), 2011 ) 2012 .else_(updated_at_col), 2013 ) 2014 .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start) 2015 .else_(prefixed_valid_from_col) 2016 ).as_(valid_from_col.this) 2017 2018 existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_( 2019 target_table 2020 ) 2021 if truncate: 2022 existing_rows_query = existing_rows_query.limit(0) 2023 2024 with source_queries[0] as source_query: 2025 prefixed_columns_to_types = [] 2026 for column in target_columns_to_types: 2027 prefixed_col = exp.column(column).copy() 2028 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2029 prefixed_columns_to_types.append(prefixed_col) 2030 prefixed_unmanaged_columns = [] 2031 for column in unmanaged_columns_to_types: 2032 prefixed_col = exp.column(column).copy() 2033 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2034 prefixed_unmanaged_columns.append(prefixed_col) 2035 query = ( 2036 exp.Select() # type: ignore 2037 .select(*table_columns) 2038 .from_("static") 2039 .union( 2040 exp.select(*table_columns).from_("updated_rows"), 2041 distinct=False, 2042 ) 2043 .union( 2044 exp.select(*table_columns).from_("inserted_rows"), 2045 distinct=False, 2046 ) 2047 .with_( 2048 "source", 2049 exp.select(exp.true().as_("_exists"), *select_source_columns) 2050 .distinct(*unique_key) 2051 .from_( 2052 self.use_server_nulls_for_unmatched_after_join(source_query).subquery( # type: ignore 2053 "raw_source" 2054 ) 2055 ), 2056 ) 2057 # Historical Records that Do Not Change 2058 .with_( 2059 "static", 2060 existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), 2061 ) 2062 # Latest Records that can be updated 2063 .with_( 2064 "latest", 2065 existing_rows_query.where(valid_to_col.is_(exp.Null())), 2066 ) 2067 # Deleted records which can be used to determine `valid_from` for undeleted source records 2068 .with_( 2069 "deleted", 2070 exp.select(*[exp.column(col, "static") for col in target_columns_to_types]) 2071 .from_("static") 2072 .join( 2073 "latest", 2074 on=exp.and_( 2075 *[ 2076 add_table(key, "static").eq(add_table(key, "latest")) 2077 for key in unique_key 2078 ] 2079 ), 2080 join_type="left", 2081 ) 2082 .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())), 2083 ) 2084 # Get the latest `valid_to` deleted record for each unique key 2085 .with_( 2086 "latest_deleted", 2087 exp.select( 2088 exp.true().as_("_exists"), 2089 *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)), 2090 exp.Max(this=valid_to_col).as_(valid_to_col.this), 2091 ) 2092 .from_("deleted") 2093 .group_by(*unique_key), 2094 ) 2095 # Do a full join between latest records and source table in order to combine them together 2096 # MySQL doesn't support full join so going to do a left then right join and remove dups with union 2097 # We do a left/right and filter right on only matching to remove the need to do union distinct 2098 # which allows scd type 2 to be compatible with unhashable data types 2099 .with_( 2100 "joined", 2101 exp.select( 2102 exp.column("_exists", table="source").as_("_exists"), 2103 *( 2104 exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this) 2105 for i, col in enumerate(target_columns_to_types) 2106 ), 2107 *( 2108 exp.column(col, table="source").as_(col) 2109 for col in unmanaged_columns_to_types 2110 ), 2111 ) 2112 .from_("latest") 2113 .join( 2114 "source", 2115 on=exp.and_( 2116 *[ 2117 add_table(key, "latest").eq(add_table(key, "source")) 2118 for key in unique_key 2119 ] 2120 ), 2121 join_type="left", 2122 ) 2123 .union( 2124 exp.select( 2125 exp.column("_exists", table="source").as_("_exists"), 2126 *( 2127 exp.column(col, table="latest").as_( 2128 prefixed_columns_to_types[i].this 2129 ) 2130 for i, col in enumerate(target_columns_to_types) 2131 ), 2132 *( 2133 exp.column(col, table="source").as_(col) 2134 for col in unmanaged_columns_to_types 2135 ), 2136 ) 2137 .from_("latest") 2138 .join( 2139 "source", 2140 on=exp.and_( 2141 *[ 2142 add_table(key, "latest").eq(add_table(key, "source")) 2143 for key in unique_key 2144 ] 2145 ), 2146 join_type="right", 2147 ) 2148 .where(exp.column("_exists", table="latest").is_(exp.Null())), 2149 distinct=False, 2150 ), 2151 ) 2152 # Get deleted, new, no longer current, or unchanged records 2153 .with_( 2154 "updated_rows", 2155 exp.select( 2156 *( 2157 exp.func( 2158 "COALESCE", 2159 exp.column(prefixed_unmanaged_columns[i].this, table="joined"), 2160 exp.column(col, table="joined"), 2161 ).as_(col) 2162 for i, col in enumerate(unmanaged_columns_to_types) 2163 ), 2164 valid_from_case_stmt, 2165 valid_to_case_stmt, 2166 ) 2167 .from_("joined") 2168 .join( 2169 "latest_deleted", 2170 on=exp.and_( 2171 *[ 2172 add_table(part, "joined").eq( 2173 exp.column(f"_key{i}", "latest_deleted") 2174 ) 2175 for i, part in enumerate(unique_key) 2176 ] 2177 ), 2178 join_type="left", 2179 ), 2180 ) 2181 # Get records that have been "updated" which means inserting a new record with previous `valid_from` 2182 .with_( 2183 "inserted_rows", 2184 exp.select( 2185 *unmanaged_columns_to_types, 2186 insert_valid_from_start.as_(valid_from_col.this), # type: ignore 2187 to_time_column(exp.null(), time_data_type, self.dialect, nullable=True).as_( 2188 valid_to_col.this 2189 ), 2190 ) 2191 .from_("joined") 2192 .where(updated_row_filter), 2193 ) 2194 ) 2195 2196 self.replace_query( 2197 target_table, 2198 self.ensure_nulls_for_unmatched_after_join(query), 2199 target_columns_to_types=target_columns_to_types, 2200 table_description=table_description, 2201 column_descriptions=column_descriptions, 2202 **kwargs, 2203 ) 2204 2205 def merge( 2206 self, 2207 target_table: TableName, 2208 source_table: QueryOrDF, 2209 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2210 unique_key: t.Sequence[exp.Expression], 2211 when_matched: t.Optional[exp.Whens] = None, 2212 merge_filter: t.Optional[exp.Expression] = None, 2213 source_columns: t.Optional[t.List[str]] = None, 2214 **kwargs: t.Any, 2215 ) -> None: 2216 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2217 source_table, 2218 target_columns_to_types, 2219 target_table=target_table, 2220 source_columns=source_columns, 2221 ) 2222 target_columns_to_types = target_columns_to_types or self.columns(target_table) 2223 on = exp.and_( 2224 *( 2225 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 2226 for part in unique_key 2227 ) 2228 ) 2229 if merge_filter: 2230 on = exp.and_(merge_filter, on) 2231 2232 if not when_matched: 2233 match_expressions = [ 2234 exp.When( 2235 matched=True, 2236 source=False, 2237 then=exp.Update( 2238 expressions=[ 2239 exp.column(col, MERGE_TARGET_ALIAS).eq( 2240 exp.column(col, MERGE_SOURCE_ALIAS) 2241 ) 2242 for col in target_columns_to_types 2243 ], 2244 ), 2245 ) 2246 ] 2247 else: 2248 match_expressions = when_matched.copy().expressions 2249 2250 match_expressions.append( 2251 exp.When( 2252 matched=False, 2253 source=False, 2254 then=exp.Insert( 2255 this=exp.Tuple( 2256 expressions=[exp.column(col) for col in target_columns_to_types] 2257 ), 2258 expression=exp.Tuple( 2259 expressions=[ 2260 exp.column(col, MERGE_SOURCE_ALIAS) for col in target_columns_to_types 2261 ] 2262 ), 2263 ), 2264 ) 2265 ) 2266 for source_query in source_queries: 2267 with source_query as query: 2268 self._merge( 2269 target_table=target_table, 2270 query=query, 2271 on=on, 2272 whens=exp.Whens(expressions=match_expressions), 2273 ) 2274 2275 def rename_table( 2276 self, 2277 old_table_name: TableName, 2278 new_table_name: TableName, 2279 ) -> None: 2280 new_table = exp.to_table(new_table_name) 2281 if new_table.catalog: 2282 old_table = exp.to_table(old_table_name) 2283 catalog = old_table.catalog or self.get_current_catalog() 2284 if catalog != new_table.catalog: 2285 raise UnsupportedCatalogOperationError( 2286 "Tried to rename table across catalogs which is not supported" 2287 ) 2288 self._rename_table(old_table_name, new_table_name) 2289 self._clear_data_object_cache(old_table_name) 2290 self._clear_data_object_cache(new_table_name) 2291 2292 def get_data_object( 2293 self, target_name: TableName, safe_to_cache: bool = False 2294 ) -> t.Optional[DataObject]: 2295 target_table = exp.to_table(target_name) 2296 existing_data_objects = self.get_data_objects( 2297 schema_(target_table.db, target_table.catalog), 2298 {target_table.name}, 2299 safe_to_cache=safe_to_cache, 2300 ) 2301 if existing_data_objects: 2302 return existing_data_objects[0] 2303 return None 2304 2305 def get_data_objects( 2306 self, 2307 schema_name: SchemaName, 2308 object_names: t.Optional[t.Set[str]] = None, 2309 safe_to_cache: bool = False, 2310 ) -> t.List[DataObject]: 2311 """Lists all data objects in the target schema. 2312 2313 Args: 2314 schema_name: The name of the schema to list data objects from. 2315 object_names: If provided, only return data objects with these names. 2316 safe_to_cache: Whether it is safe to cache the results of this call. 2317 2318 Returns: 2319 A list of data objects in the target schema. 2320 """ 2321 if object_names is not None: 2322 if not object_names: 2323 return [] 2324 2325 # Check cache for each object name 2326 target_schema = to_schema(schema_name) 2327 cached_objects = [] 2328 missing_names = set() 2329 2330 for name in object_names: 2331 cache_key = _get_data_object_cache_key( 2332 target_schema.catalog, target_schema.db, name 2333 ) 2334 if cache_key in self._data_object_cache: 2335 logger.debug("Data object cache hit: %s", cache_key) 2336 data_object = self._data_object_cache[cache_key] 2337 # If the object is none, then the table was previously looked for but not found 2338 if data_object: 2339 cached_objects.append(data_object) 2340 else: 2341 logger.debug("Data object cache miss: %s", cache_key) 2342 missing_names.add(name) 2343 2344 # Fetch missing objects from database 2345 if missing_names: 2346 object_names_list = list(missing_names) 2347 batches = [ 2348 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 2349 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 2350 ] 2351 2352 fetched_objects = [] 2353 fetched_object_names = set() 2354 for batch in batches: 2355 objects = self._get_data_objects(schema_name, set(batch)) 2356 for obj in objects: 2357 if safe_to_cache: 2358 cache_key = _get_data_object_cache_key( 2359 obj.catalog, obj.schema_name, obj.name 2360 ) 2361 self._data_object_cache[cache_key] = obj 2362 fetched_objects.append(obj) 2363 fetched_object_names.add(obj.name) 2364 2365 if safe_to_cache: 2366 for missing_name in missing_names - fetched_object_names: 2367 cache_key = _get_data_object_cache_key( 2368 target_schema.catalog, target_schema.db, missing_name 2369 ) 2370 self._data_object_cache[cache_key] = None 2371 2372 return cached_objects + fetched_objects 2373 2374 return cached_objects 2375 2376 fetched_objects = self._get_data_objects(schema_name) 2377 if safe_to_cache: 2378 for obj in fetched_objects: 2379 cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name) 2380 self._data_object_cache[cache_key] = obj 2381 return fetched_objects 2382 2383 def fetchone( 2384 self, 2385 query: t.Union[exp.Expression, str], 2386 ignore_unsupported_errors: bool = False, 2387 quote_identifiers: bool = False, 2388 ) -> t.Optional[t.Tuple]: 2389 with self.transaction(): 2390 self.execute( 2391 query, 2392 ignore_unsupported_errors=ignore_unsupported_errors, 2393 quote_identifiers=quote_identifiers, 2394 ) 2395 return self.cursor.fetchone() 2396 2397 def fetchall( 2398 self, 2399 query: t.Union[exp.Expression, str], 2400 ignore_unsupported_errors: bool = False, 2401 quote_identifiers: bool = False, 2402 ) -> t.List[t.Tuple]: 2403 with self.transaction(): 2404 self.execute( 2405 query, 2406 ignore_unsupported_errors=ignore_unsupported_errors, 2407 quote_identifiers=quote_identifiers, 2408 ) 2409 return self.cursor.fetchall() 2410 2411 def _fetch_native_df( 2412 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2413 ) -> DF: 2414 """Fetches a DataFrame that can be either Pandas or PySpark from the cursor""" 2415 with self.transaction(): 2416 self.execute(query, quote_identifiers=quote_identifiers) 2417 return self.cursor.fetchdf() 2418 2419 def _native_df_to_pandas_df( 2420 self, 2421 query_or_df: QueryOrDF, 2422 ) -> t.Union[Query, pd.DataFrame]: 2423 """ 2424 Take a "native" DataFrame (eg Pyspark, Bigframe, Snowpark etc) and convert it to Pandas 2425 """ 2426 import pandas as pd 2427 2428 if isinstance(query_or_df, (exp.Query, pd.DataFrame)): 2429 return query_or_df 2430 2431 # EngineAdapter subclasses that have native DataFrame types should override this 2432 raise NotImplementedError(f"Unable to convert {type(query_or_df)} to Pandas") 2433 2434 def fetchdf( 2435 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2436 ) -> pd.DataFrame: 2437 """Fetches a Pandas DataFrame from the cursor""" 2438 import pandas as pd 2439 2440 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 2441 if not isinstance(df, pd.DataFrame): 2442 raise NotImplementedError( 2443 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 2444 ) 2445 return df 2446 2447 def fetch_pyspark_df( 2448 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2449 ) -> PySparkDataFrame: 2450 """Fetches a PySpark DataFrame from the cursor""" 2451 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}") 2452 2453 @property 2454 def wap_enabled(self) -> bool: 2455 """Returns whether WAP is enabled for this engine.""" 2456 return self._extra_config.get("wap_enabled", False) 2457 2458 def wap_supported(self, table_name: TableName) -> bool: 2459 """Returns whether WAP for the target table is supported.""" 2460 return False 2461 2462 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 2463 """Returns the updated table name for the given WAP ID. 2464 2465 Args: 2466 table_name: The name of the target table. 2467 wap_id: The WAP ID to prepare. 2468 2469 Returns: 2470 The updated table name that should be used for writing. 2471 """ 2472 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2473 2474 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 2475 """Prepares the target table for WAP and returns the updated table name. 2476 2477 Args: 2478 table_name: The name of the target table. 2479 wap_id: The WAP ID to prepare. 2480 2481 Returns: 2482 The updated table name that should be used for writing. 2483 """ 2484 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2485 2486 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 2487 """Publishes changes with the given WAP ID to the target table. 2488 2489 Args: 2490 table_name: The name of the target table. 2491 wap_id: The WAP ID to publish. 2492 """ 2493 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2494 2495 def sync_grants_config( 2496 self, 2497 table: exp.Table, 2498 grants_config: GrantsConfig, 2499 table_type: DataObjectType = DataObjectType.TABLE, 2500 ) -> None: 2501 """Applies the grants_config to a table authoritatively. 2502 It first compares the specified grants against the current grants, and then 2503 applies the diffs to the table by revoking and granting privileges as needed. 2504 2505 Args: 2506 table: The table/view to apply grants to. 2507 grants_config: Dictionary mapping privileges to lists of grantees. 2508 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 2509 """ 2510 if not self.SUPPORTS_GRANTS: 2511 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 2512 2513 current_grants = self._get_current_grants_config(table) 2514 new_grants, revoked_grants = self._diff_grants_configs(grants_config, current_grants) 2515 revoke_exprs = self._revoke_grants_config_expr(table, revoked_grants, table_type) 2516 grant_exprs = self._apply_grants_config_expr(table, new_grants, table_type) 2517 dcl_exprs = revoke_exprs + grant_exprs 2518 2519 if dcl_exprs: 2520 self.execute(dcl_exprs) 2521 2522 @contextlib.contextmanager 2523 def transaction( 2524 self, 2525 condition: t.Optional[bool] = None, 2526 ) -> t.Iterator[None]: 2527 """A transaction context manager.""" 2528 if ( 2529 self._connection_pool.is_transaction_active 2530 or not self.SUPPORTS_TRANSACTIONS 2531 or (condition is not None and not condition) 2532 ): 2533 yield 2534 return 2535 2536 if self._pre_ping: 2537 try: 2538 logger.debug("Pinging the database to check the connection") 2539 self.ping() 2540 except Exception: 2541 logger.info("Connection to the database was lost. Reconnecting...") 2542 self._connection_pool.close() 2543 2544 self._connection_pool.begin() 2545 try: 2546 yield 2547 except Exception as e: 2548 self._connection_pool.rollback() 2549 raise e 2550 else: 2551 self._connection_pool.commit() 2552 2553 @contextlib.contextmanager 2554 def session(self, properties: SessionProperties) -> t.Iterator[None]: 2555 """A session context manager.""" 2556 if self._is_session_active(): 2557 yield 2558 return 2559 2560 self._begin_session(properties) 2561 try: 2562 yield 2563 finally: 2564 self._end_session() 2565 2566 def _begin_session(self, properties: SessionProperties) -> t.Any: 2567 """Begin a new session.""" 2568 2569 def _end_session(self) -> None: 2570 """End the existing session.""" 2571 2572 def _is_session_active(self) -> bool: 2573 """Indicates whether or not a session is active.""" 2574 return False 2575 2576 def execute( 2577 self, 2578 expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]], 2579 ignore_unsupported_errors: bool = False, 2580 quote_identifiers: bool = True, 2581 track_rows_processed: bool = False, 2582 **kwargs: t.Any, 2583 ) -> None: 2584 """Execute a sql query.""" 2585 to_sql_kwargs = ( 2586 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 2587 ) 2588 with self.transaction(): 2589 for e in ensure_list(expressions): 2590 if isinstance(e, exp.Expression): 2591 self._check_identifier_length(e) 2592 sql = self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 2593 else: 2594 sql = t.cast(str, e) 2595 2596 sql = self._attach_correlation_id(sql) 2597 2598 self._log_sql( 2599 sql, 2600 expression=e if isinstance(e, exp.Expression) else None, 2601 quote_identifiers=quote_identifiers, 2602 ) 2603 self._execute(sql, track_rows_processed, **kwargs) 2604 2605 def _attach_correlation_id(self, sql: str) -> str: 2606 if self.ATTACH_CORRELATION_ID and self.correlation_id: 2607 return f"/* {self.correlation_id} */ {sql}" 2608 return sql 2609 2610 def _log_sql( 2611 self, 2612 sql: str, 2613 expression: t.Optional[exp.Expression] = None, 2614 quote_identifiers: bool = True, 2615 ) -> None: 2616 if not logger.isEnabledFor(self._execute_log_level): 2617 return 2618 2619 sql_to_log = sql 2620 if expression is not None and not isinstance(expression, exp.Query): 2621 values = expression.find(exp.Values) 2622 if values: 2623 values.set("expressions", [exp.to_identifier("<REDACTED VALUES>")]) 2624 sql_to_log = self._to_sql(expression, quote=quote_identifiers) 2625 2626 logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log) 2627 2628 def _record_execution_stats( 2629 self, sql: str, rowcount: t.Optional[int] = None, bytes_processed: t.Optional[int] = None 2630 ) -> None: 2631 if self._query_execution_tracker: 2632 self._query_execution_tracker.record_execution(sql, rowcount, bytes_processed) 2633 2634 def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: 2635 self.cursor.execute(sql, **kwargs) 2636 2637 if ( 2638 self.SUPPORTS_QUERY_EXECUTION_TRACKING 2639 and track_rows_processed 2640 and self._query_execution_tracker 2641 and self._query_execution_tracker.is_tracking() 2642 ): 2643 if ( 2644 rowcount := getattr(self.cursor, "rowcount", None) 2645 ) is not None and rowcount is not None: 2646 try: 2647 self._record_execution_stats(sql, int(rowcount)) 2648 except (TypeError, ValueError): 2649 return 2650 2651 @contextlib.contextmanager 2652 def temp_table( 2653 self, 2654 query_or_df: QueryOrDF, 2655 name: TableName = "diff", 2656 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2657 source_columns: t.Optional[t.List[str]] = None, 2658 **kwargs: t.Any, 2659 ) -> t.Iterator[exp.Table]: 2660 """A context manager for working a temp table. 2661 2662 The table will be created with a random guid and cleaned up after the block. 2663 2664 Args: 2665 query_or_df: The query or df to create a temp table for. 2666 name: The base name of the temp table. 2667 target_columns_to_types: A mapping between the column name and its data type. 2668 2669 Yields: 2670 The table expression 2671 """ 2672 name = exp.to_table(name) 2673 # ensure that we use default catalog if none is not specified 2674 if isinstance(name, exp.Table) and not name.catalog and name.db and self.default_catalog: 2675 name.set("catalog", exp.parse_identifier(self.default_catalog)) 2676 2677 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2678 query_or_df, 2679 target_columns_to_types=target_columns_to_types, 2680 target_table=name, 2681 source_columns=source_columns, 2682 ) 2683 2684 with self.transaction(): 2685 table = self._get_temp_table(name) 2686 if table.db: 2687 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 2688 self._create_table_from_source_queries( 2689 table, 2690 source_queries, 2691 target_columns_to_types, 2692 exists=True, 2693 table_description=None, 2694 column_descriptions=None, 2695 track_rows_processed=False, 2696 **kwargs, 2697 ) 2698 2699 try: 2700 yield table 2701 finally: 2702 self.drop_table(table) 2703 2704 def _table_or_view_properties_to_expressions( 2705 self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expression]] = None 2706 ) -> t.List[exp.Property]: 2707 """Converts model properties (either physical or virtual) to a list of property expressions.""" 2708 if not table_or_view_properties: 2709 return [] 2710 return [ 2711 exp.Property(this=key, value=value.copy()) 2712 for key, value in table_or_view_properties.items() 2713 ] 2714 2715 def _build_partitioned_by_exp( 2716 self, 2717 partitioned_by: t.List[exp.Expression], 2718 *, 2719 partition_interval_unit: t.Optional[IntervalUnit] = None, 2720 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2721 catalog_name: t.Optional[str] = None, 2722 **kwargs: t.Any, 2723 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 2724 return None 2725 2726 def _build_clustered_by_exp( 2727 self, 2728 clustered_by: t.List[exp.Expression], 2729 **kwargs: t.Any, 2730 ) -> t.Optional[exp.Cluster]: 2731 return None 2732 2733 def _build_table_properties_exp( 2734 self, 2735 catalog_name: t.Optional[str] = None, 2736 table_format: t.Optional[str] = None, 2737 storage_format: t.Optional[str] = None, 2738 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 2739 partition_interval_unit: t.Optional[IntervalUnit] = None, 2740 clustered_by: t.Optional[t.List[exp.Expression]] = None, 2741 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 2742 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2743 table_description: t.Optional[str] = None, 2744 table_kind: t.Optional[str] = None, 2745 **kwargs: t.Any, 2746 ) -> t.Optional[exp.Properties]: 2747 """Creates a SQLGlot table properties expression for ddl.""" 2748 properties: t.List[exp.Expression] = [] 2749 2750 if table_description: 2751 properties.append( 2752 exp.SchemaCommentProperty( 2753 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2754 ) 2755 ) 2756 2757 if table_properties: 2758 table_type = self._pop_creatable_type_from_properties(table_properties) 2759 properties.extend(ensure_list(table_type)) 2760 2761 if properties: 2762 return exp.Properties(expressions=properties) 2763 return None 2764 2765 def _build_view_properties_exp( 2766 self, 2767 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 2768 table_description: t.Optional[str] = None, 2769 **kwargs: t.Any, 2770 ) -> t.Optional[exp.Properties]: 2771 """Creates a SQLGlot table properties expression for view""" 2772 properties: t.List[exp.Expression] = [] 2773 2774 if table_description: 2775 properties.append( 2776 exp.SchemaCommentProperty( 2777 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2778 ) 2779 ) 2780 2781 if properties: 2782 return exp.Properties(expressions=properties) 2783 return None 2784 2785 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 2786 return comment[:length] if length else comment 2787 2788 def _truncate_table_comment(self, comment: str) -> str: 2789 return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH) 2790 2791 def _truncate_column_comment(self, comment: str) -> str: 2792 return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH) 2793 2794 def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.Any) -> str: 2795 """ 2796 Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default 2797 kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine 2798 adapter, and then finally kwargs provided by the user when calling this method. 2799 """ 2800 sql_gen_kwargs = { 2801 "dialect": self.dialect, 2802 "pretty": self._pretty_sql, 2803 "comments": False, 2804 **self._sql_gen_kwargs, 2805 **kwargs, 2806 } 2807 2808 expression = expression.copy() 2809 2810 if quote: 2811 quote_identifiers(expression) 2812 2813 return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore 2814 2815 def _clear_data_object_cache(self, table_name: t.Optional[TableName] = None) -> None: 2816 """Clears the cache entry for the given table name, or clears the entire cache if table_name is None.""" 2817 if table_name is None: 2818 logger.debug("Clearing entire data object cache") 2819 self._data_object_cache.clear() 2820 else: 2821 table = exp.to_table(table_name) 2822 cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 2823 logger.debug("Clearing data object cache key: %s", cache_key) 2824 self._data_object_cache.pop(cache_key, None) 2825 2826 def _get_data_objects( 2827 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 2828 ) -> t.List[DataObject]: 2829 """ 2830 Returns all the data objects that exist in the given schema and optionally catalog. 2831 """ 2832 raise NotImplementedError() 2833 2834 def _get_temp_table( 2835 self, table: TableName, table_only: bool = False, quoted: bool = True 2836 ) -> exp.Table: 2837 """ 2838 Returns the name of the temp table that should be used for the given table name. 2839 """ 2840 table = t.cast(exp.Table, exp.to_table(table).copy()) 2841 table.set( 2842 "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=quoted) 2843 ) 2844 2845 if table_only: 2846 table.set("db", None) 2847 table.set("catalog", None) 2848 2849 return table 2850 2851 def _order_projections_and_filter( 2852 self, 2853 query: Query, 2854 target_columns_to_types: t.Dict[str, exp.DataType], 2855 where: t.Optional[exp.Expression] = None, 2856 coerce_types: bool = False, 2857 ) -> Query: 2858 if not isinstance(query, exp.Query) or ( 2859 not where and not coerce_types and query.named_selects == list(target_columns_to_types) 2860 ): 2861 return query 2862 2863 query = t.cast(exp.Query, query.copy()) 2864 with_ = query.args.pop("with_", None) 2865 2866 select_exprs: t.List[exp.Expression] = [ 2867 exp.column(c, quoted=True) for c in target_columns_to_types 2868 ] 2869 if coerce_types and columns_to_types_all_known(target_columns_to_types): 2870 select_exprs = [ 2871 exp.cast(select_exprs[i], col_tpe).as_(col, quoted=True) 2872 for i, (col, col_tpe) in enumerate(target_columns_to_types.items()) 2873 ] 2874 2875 query = exp.select(*select_exprs).from_(query.subquery("_subquery", copy=False), copy=False) 2876 if where: 2877 query = query.where(where, copy=False) 2878 2879 if with_: 2880 query.set("with_", with_) 2881 2882 return query 2883 2884 def _truncate_table(self, table_name: TableName) -> None: 2885 table = exp.to_table(table_name) 2886 self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") 2887 2888 def drop_data_object_on_type_mismatch( 2889 self, data_object: t.Optional[DataObject], expected_type: DataObjectType 2890 ) -> bool: 2891 """Drops a data object if it exists and is not of the expected type. 2892 2893 Args: 2894 data_object: The data object to check. 2895 expected_type: The expected type of the data object. 2896 2897 Returns: 2898 True if the data object was dropped, False otherwise. 2899 """ 2900 if data_object is None or data_object.type == expected_type: 2901 return False 2902 2903 logger.warning( 2904 "Target data object '%s' is a %s and not a %s, dropping it", 2905 data_object.to_table().sql(dialect=self.dialect), 2906 data_object.type.value, 2907 expected_type.value, 2908 ) 2909 self.drop_data_object(data_object) 2910 return True 2911 2912 def _replace_by_key( 2913 self, 2914 target_table: TableName, 2915 source_table: QueryOrDF, 2916 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2917 key: t.Sequence[exp.Expression], 2918 is_unique_key: bool, 2919 source_columns: t.Optional[t.List[str]] = None, 2920 ) -> None: 2921 if target_columns_to_types is None: 2922 target_columns_to_types = self.columns(target_table) 2923 2924 temp_table = self._get_temp_table(target_table) 2925 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 2926 column_names = list(target_columns_to_types or []) 2927 2928 with self.transaction(): 2929 self.ctas( 2930 temp_table, 2931 source_table, 2932 target_columns_to_types=target_columns_to_types, 2933 exists=False, 2934 source_columns=source_columns, 2935 ) 2936 2937 try: 2938 delete_query = exp.select(key_exp).from_(temp_table) 2939 insert_query = self._select_columns(target_columns_to_types).from_(temp_table) 2940 if not is_unique_key: 2941 delete_query = delete_query.distinct() 2942 else: 2943 insert_query = insert_query.distinct(*key) 2944 2945 insert_statement = exp.insert( 2946 insert_query, 2947 target_table, 2948 columns=column_names, 2949 ) 2950 delete_filter = key_exp.isin(query=delete_query) 2951 2952 if not self.INSERT_OVERWRITE_STRATEGY.is_replace_where: 2953 self.delete_from(target_table, delete_filter) 2954 else: 2955 insert_statement.set("where", delete_filter) 2956 insert_statement.set("this", exp.to_table(target_table)) 2957 2958 self.execute(insert_statement, track_rows_processed=True) 2959 finally: 2960 self.drop_table(temp_table) 2961 2962 def _build_create_comment_table_exp( 2963 self, table: exp.Table, table_comment: str, table_kind: str 2964 ) -> exp.Comment | str: 2965 return exp.Comment( 2966 this=table, 2967 kind=table_kind, 2968 expression=exp.Literal.string(self._truncate_table_comment(table_comment)), 2969 ) 2970 2971 def _create_table_comment( 2972 self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" 2973 ) -> None: 2974 table = exp.to_table(table_name) 2975 2976 try: 2977 self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind)) 2978 except Exception: 2979 logger.warning( 2980 f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions", 2981 exc_info=True, 2982 ) 2983 2984 def _build_create_comment_column_exp( 2985 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 2986 ) -> exp.Comment | str: 2987 return exp.Comment( 2988 this=exp.column(column_name, *reversed(table.parts)), # type: ignore 2989 kind="COLUMN", 2990 expression=exp.Literal.string(self._truncate_column_comment(column_comment)), 2991 ) 2992 2993 def _create_column_comments( 2994 self, 2995 table_name: TableName, 2996 column_comments: t.Dict[str, str], 2997 table_kind: str = "TABLE", 2998 materialized_view: bool = False, 2999 ) -> None: 3000 table = exp.to_table(table_name) 3001 3002 for col, comment in column_comments.items(): 3003 try: 3004 self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind)) 3005 except Exception: 3006 logger.warning( 3007 f"Column comments for column '{col}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions", 3008 exc_info=True, 3009 ) 3010 3011 def _create_table_like( 3012 self, 3013 target_table_name: TableName, 3014 source_table_name: TableName, 3015 exists: bool, 3016 **kwargs: t.Any, 3017 ) -> None: 3018 self.create_table(target_table_name, self.columns(source_table_name), exists=exists) 3019 3020 def _rename_table( 3021 self, 3022 old_table_name: TableName, 3023 new_table_name: TableName, 3024 ) -> None: 3025 self.execute(exp.rename_table(old_table_name, new_table_name)) 3026 3027 def ensure_nulls_for_unmatched_after_join( 3028 self, 3029 query: Query, 3030 ) -> Query: 3031 return query 3032 3033 def use_server_nulls_for_unmatched_after_join( 3034 self, 3035 query: Query, 3036 ) -> Query: 3037 return query 3038 3039 def ping(self) -> None: 3040 try: 3041 self._execute(exp.select("1").sql(dialect=self.dialect)) 3042 finally: 3043 self._connection_pool.close_cursor() 3044 3045 @classmethod 3046 def _select_columns( 3047 cls, columns: t.Iterable[str], source_columns: t.Optional[t.List[str]] = None 3048 ) -> exp.Select: 3049 return exp.select( 3050 *( 3051 exp.column(c, quoted=True) 3052 if c in (source_columns or columns) 3053 else exp.alias_(exp.Null(), c, quoted=True) 3054 for c in columns 3055 ) 3056 ) 3057 3058 def _check_identifier_length(self, expression: exp.Expression) -> None: 3059 if self.MAX_IDENTIFIER_LENGTH is None or not isinstance(expression, exp.DDL): 3060 return 3061 3062 for identifier in expression.find_all(exp.Identifier): 3063 name = identifier.name 3064 name_length = len(name) 3065 if name_length > self.MAX_IDENTIFIER_LENGTH: 3066 raise SQLMeshError( 3067 f"Identifier name '{name}' (length {name_length}) exceeds {self.dialect.capitalize()}'s max identifier limit of {self.MAX_IDENTIFIER_LENGTH} characters" 3068 ) 3069 3070 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 3071 raise NotImplementedError() 3072 3073 @classmethod 3074 def _diff_grants_configs( 3075 cls, new_config: GrantsConfig, old_config: GrantsConfig 3076 ) -> t.Tuple[GrantsConfig, GrantsConfig]: 3077 """Compute additions and removals between two grants configurations. 3078 3079 This method compares new (desired) and old (current) GrantsConfigs case-insensitively 3080 for both privilege keys and grantees, while preserving original casing 3081 in the output GrantsConfigs. 3082 3083 Args: 3084 new_config: Desired grants configuration (specified by the user). 3085 old_config: Current grants configuration (returned by the database). 3086 3087 Returns: 3088 A tuple of (additions, removals) GrantsConfig where: 3089 - additions contains privileges/grantees present in new_config but not in old_config 3090 - additions uses keys and grantee strings from new_config (user-specified casing) 3091 - removals contains privileges/grantees present in old_config but not in new_config 3092 - removals uses keys and grantee strings from old_config (database-returned casing) 3093 3094 Notes: 3095 - Comparison is case-insensitive using casefold(); original casing is preserved in results. 3096 - Overlapping grantees (case-insensitive) are excluded from the results. 3097 """ 3098 3099 def _diffs(config1: GrantsConfig, config2: GrantsConfig) -> GrantsConfig: 3100 diffs: GrantsConfig = {} 3101 cf_config2 = {k.casefold(): {g.casefold() for g in v} for k, v in config2.items()} 3102 for key, grantees in config1.items(): 3103 cf_key = key.casefold() 3104 3105 # Missing key (add all grantees) 3106 if cf_key not in cf_config2: 3107 diffs[key] = grantees.copy() 3108 continue 3109 3110 # Include only grantees not in config2 3111 cf_grantees2 = cf_config2[cf_key] 3112 diff_grantees = [] 3113 for grantee in grantees: 3114 if grantee.casefold() not in cf_grantees2: 3115 diff_grantees.append(grantee) 3116 if diff_grantees: 3117 diffs[key] = diff_grantees 3118 return diffs 3119 3120 return _diffs(new_config, old_config), _diffs(old_config, new_config) 3121 3122 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 3123 """Returns current grants for a table as a dictionary. 3124 3125 This method queries the database and returns the current grants/permissions 3126 for the given table, parsed into a dictionary format. The it handles 3127 case-insensitive comparison between these current grants and the desired 3128 grants from model configuration. 3129 3130 Args: 3131 table: The table/view to query grants for. 3132 3133 Returns: 3134 Dictionary mapping permissions to lists of grantees. Permission names 3135 should be returned as the database provides them (typically uppercase 3136 for standard SQL permissions, but engine-specific roles may vary). 3137 3138 Raises: 3139 NotImplementedError: If the engine does not support grants. 3140 """ 3141 if not self.SUPPORTS_GRANTS: 3142 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3143 raise NotImplementedError("Subclass must implement get_current_grants") 3144 3145 def _apply_grants_config_expr( 3146 self, 3147 table: exp.Table, 3148 grants_config: GrantsConfig, 3149 table_type: DataObjectType = DataObjectType.TABLE, 3150 ) -> t.List[exp.Expression]: 3151 """Returns SQLGlot Grant expressions to apply grants to a table. 3152 3153 Args: 3154 table: The table/view to grant permissions on. 3155 grants_config: Dictionary mapping permissions to lists of grantees. 3156 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3157 3158 Returns: 3159 List of SQLGlot expressions for grant operations. 3160 3161 Raises: 3162 NotImplementedError: If the engine does not support grants. 3163 """ 3164 if not self.SUPPORTS_GRANTS: 3165 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3166 raise NotImplementedError("Subclass must implement _apply_grants_config_expr") 3167 3168 def _revoke_grants_config_expr( 3169 self, 3170 table: exp.Table, 3171 grants_config: GrantsConfig, 3172 table_type: DataObjectType = DataObjectType.TABLE, 3173 ) -> t.List[exp.Expression]: 3174 """Returns SQLGlot expressions to revoke grants from a table. 3175 3176 Args: 3177 table: The table/view to revoke permissions from. 3178 grants_config: Dictionary mapping permissions to lists of grantees. 3179 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3180 3181 Returns: 3182 List of SQLGlot expressions for revoke operations. 3183 3184 Raises: 3185 NotImplementedError: If the engine does not support grants. 3186 """ 3187 if not self.SUPPORTS_GRANTS: 3188 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3189 raise NotImplementedError("Subclass must implement _revoke_grants_config_expr") 3190 3191 3192class EngineAdapterWithIndexSupport(EngineAdapter): 3193 SUPPORTS_INDEXES = True 3194 3195 3196def _decoded_str(value: t.Union[str, bytes]) -> str: 3197 if isinstance(value, bytes): 3198 return value.decode("utf-8") 3199 return value 3200 3201 3202def _get_data_object_cache_key(catalog: t.Optional[str], schema_name: str, object_name: str) -> str: 3203 """Returns a cache key for a data object based on its fully qualified name.""" 3204 catalog = f"{catalog}." if catalog else "" 3205 return f"{catalog}{schema_name}.{object_name}"
84@set_catalog() 85class EngineAdapter: 86 """Base class wrapping a Database API compliant connection. 87 88 The EngineAdapter is an easily-subclassable interface that interacts 89 with the underlying engine and data store. 90 91 Args: 92 connection_factory_or_pool: a callable which produces a new Database API-compliant 93 connection on every call. 94 dialect: The dialect with which this adapter is associated. 95 multithreaded: Indicates whether this adapter will be used by more than one thread. 96 """ 97 98 DIALECT = "" 99 DEFAULT_BATCH_SIZE = 10000 100 DATA_OBJECT_FILTER_BATCH_SIZE = 4000 101 SUPPORTS_TRANSACTIONS = True 102 SUPPORTS_INDEXES = False 103 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_CTAS 104 COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 105 MAX_TABLE_COMMENT_LENGTH: t.Optional[int] = None 106 MAX_COLUMN_COMMENT_LENGTH: t.Optional[int] = None 107 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 108 SUPPORTS_MATERIALIZED_VIEWS = False 109 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = False 110 SUPPORTS_VIEW_SCHEMA = True 111 SUPPORTS_CLONING = False 112 SUPPORTS_MANAGED_MODELS = False 113 SUPPORTS_CREATE_DROP_CATALOG = False 114 SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = [] 115 SCHEMA_DIFFER_KWARGS: t.Dict[str, t.Any] = {} 116 SUPPORTS_TUPLE_IN = True 117 HAS_VIEW_BINDING = False 118 SUPPORTS_REPLACE_TABLE = True 119 SUPPORTS_GRANTS = False 120 DEFAULT_CATALOG_TYPE = DIALECT 121 QUOTE_IDENTIFIERS_IN_VIEWS = True 122 MAX_IDENTIFIER_LENGTH: t.Optional[int] = None 123 ATTACH_CORRELATION_ID = True 124 SUPPORTS_QUERY_EXECUTION_TRACKING = False 125 SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS = False 126 127 def __init__( 128 self, 129 connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool], 130 dialect: str = "", 131 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 132 multithreaded: bool = False, 133 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 134 default_catalog: t.Optional[str] = None, 135 execute_log_level: int = logging.DEBUG, 136 register_comments: bool = True, 137 pre_ping: bool = False, 138 pretty_sql: bool = False, 139 shared_connection: bool = False, 140 correlation_id: t.Optional[CorrelationId] = None, 141 schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None, 142 query_execution_tracker: t.Optional[QueryExecutionTracker] = None, 143 **kwargs: t.Any, 144 ): 145 self.dialect = dialect.lower() or self.DIALECT 146 self._connection_pool = ( 147 connection_factory_or_pool 148 if isinstance(connection_factory_or_pool, ConnectionPool) 149 else create_connection_pool( 150 connection_factory_or_pool, 151 multithreaded, 152 shared_connection=shared_connection, 153 cursor_init=cursor_init, 154 ) 155 ) 156 self._sql_gen_kwargs = sql_gen_kwargs or {} 157 self._default_catalog = default_catalog 158 self._execute_log_level = execute_log_level 159 self._extra_config = kwargs 160 self._register_comments = register_comments 161 self._pre_ping = pre_ping 162 self._pretty_sql = pretty_sql 163 self._multithreaded = multithreaded 164 self.correlation_id = correlation_id 165 self._schema_differ_overrides = schema_differ_overrides 166 self._query_execution_tracker = query_execution_tracker 167 self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {} 168 169 def with_settings(self, **kwargs: t.Any) -> EngineAdapter: 170 extra_kwargs = { 171 "null_connection": True, 172 "execute_log_level": kwargs.pop("execute_log_level", self._execute_log_level), 173 "correlation_id": kwargs.pop("correlation_id", self.correlation_id), 174 "query_execution_tracker": kwargs.pop( 175 "query_execution_tracker", self._query_execution_tracker 176 ), 177 **self._extra_config, 178 **kwargs, 179 } 180 181 adapter = self.__class__( 182 self._connection_pool, 183 dialect=self.dialect, 184 sql_gen_kwargs=self._sql_gen_kwargs, 185 default_catalog=self._default_catalog, 186 register_comments=self._register_comments, 187 multithreaded=self._multithreaded, 188 pretty_sql=self._pretty_sql, 189 **extra_kwargs, 190 ) 191 192 return adapter 193 194 @property 195 def cursor(self) -> t.Any: 196 return self._connection_pool.get_cursor() 197 198 @property 199 def connection(self) -> t.Any: 200 return self._connection_pool.get() 201 202 @property 203 def spark(self) -> t.Optional[PySparkSession]: 204 return None 205 206 @property 207 def snowpark(self) -> t.Optional[SnowparkSession]: 208 return None 209 210 @property 211 def bigframe(self) -> t.Optional[BigframeSession]: 212 return None 213 214 @property 215 def comments_enabled(self) -> bool: 216 return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported 217 218 @property 219 def catalog_support(self) -> CatalogSupport: 220 return CatalogSupport.UNSUPPORTED 221 222 @cached_property 223 def schema_differ(self) -> SchemaDiffer: 224 return SchemaDiffer( 225 **{ 226 **self.SCHEMA_DIFFER_KWARGS, 227 **(self._schema_differ_overrides or {}), 228 } 229 ) 230 231 @property 232 def _catalog_type_overrides(self) -> t.Dict[str, str]: 233 return self._extra_config.get("catalog_type_overrides") or {} 234 235 @classmethod 236 def _casted_columns( 237 cls, 238 target_columns_to_types: t.Dict[str, exp.DataType], 239 source_columns: t.Optional[t.List[str]] = None, 240 ) -> t.List[exp.Alias]: 241 source_columns_lookup = set(source_columns or target_columns_to_types) 242 return [ 243 exp.alias_( 244 exp.cast( 245 exp.column(column, quoted=True) 246 if column in source_columns_lookup 247 else exp.Null(), 248 to=kind, 249 ), 250 column, 251 copy=False, 252 quoted=True, 253 ) 254 for column, kind in target_columns_to_types.items() 255 ] 256 257 @property 258 def default_catalog(self) -> t.Optional[str]: 259 if self.catalog_support.is_unsupported: 260 return None 261 default_catalog = self._default_catalog or self.get_current_catalog() 262 if not default_catalog: 263 raise MissingDefaultCatalogError( 264 "Could not determine a default catalog despite it being supported." 265 ) 266 return default_catalog 267 268 @property 269 def engine_run_mode(self) -> EngineRunMode: 270 return EngineRunMode.SINGLE_MODE_ENGINE 271 272 def _get_source_queries( 273 self, 274 query_or_df: QueryOrDF, 275 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 276 target_table: TableName, 277 *, 278 batch_size: t.Optional[int] = None, 279 source_columns: t.Optional[t.List[str]] = None, 280 ) -> t.List[SourceQuery]: 281 import pandas as pd 282 283 batch_size = self.DEFAULT_BATCH_SIZE if batch_size is None else batch_size 284 if isinstance(query_or_df, exp.Query): 285 query_factory = lambda: query_or_df 286 if source_columns: 287 source_columns_lookup = set(source_columns) 288 if not target_columns_to_types: 289 raise SQLMeshError("columns_to_types must be set if source_columns is set") 290 if not set(target_columns_to_types).issubset(source_columns_lookup): 291 select_columns = [ 292 exp.column(c, quoted=True) 293 if c in source_columns_lookup 294 else exp.cast(exp.Null(), target_columns_to_types[c], copy=False).as_( 295 c, copy=False, quoted=True 296 ) 297 for c in target_columns_to_types 298 ] 299 query_factory = ( 300 lambda: exp.Select() 301 .select(*select_columns) 302 .from_(query_or_df.subquery("select_source_columns")) 303 ) 304 return [SourceQuery(query_factory=query_factory)] # type: ignore 305 306 if not target_columns_to_types: 307 raise SQLMeshError( 308 "It is expected that if a DataFrame is passed in then columns_to_types is set" 309 ) 310 311 if isinstance(query_or_df, pd.DataFrame) and query_or_df.empty: 312 raise SQLMeshError( 313 "Cannot construct source query from an empty DataFrame. This error is commonly " 314 "related to Python models that produce no data. For such models, consider yielding " 315 "from an empty generator if the resulting set is empty, i.e. use `yield from ()`." 316 ) 317 318 return self._df_to_source_queries( 319 query_or_df, 320 target_columns_to_types, 321 batch_size, 322 target_table=target_table, 323 source_columns=source_columns, 324 ) 325 326 def _df_to_source_queries( 327 self, 328 df: DF, 329 target_columns_to_types: t.Dict[str, exp.DataType], 330 batch_size: int, 331 target_table: TableName, 332 source_columns: t.Optional[t.List[str]] = None, 333 ) -> t.List[SourceQuery]: 334 import pandas as pd 335 336 assert isinstance(df, pd.DataFrame) 337 num_rows = len(df.index) 338 batch_size = sys.maxsize if batch_size == 0 else batch_size 339 340 # we need to ensure that the order of the columns in columns_to_types columns matches the order of the values 341 # they can differ if a user specifies columns() on a python model in a different order than what's in the DataFrame's emitted by that model 342 df = df[list(source_columns or target_columns_to_types)] 343 values = list(df.itertuples(index=False, name=None)) 344 345 return [ 346 SourceQuery( 347 query_factory=partial( 348 self._values_to_sql, 349 values=values, # type: ignore 350 target_columns_to_types=target_columns_to_types, 351 batch_start=i, 352 batch_end=min(i + batch_size, num_rows), 353 source_columns=source_columns, 354 ), 355 ) 356 for i in range(0, num_rows, batch_size) 357 ] 358 359 def _get_source_queries_and_columns_to_types( 360 self, 361 query_or_df: QueryOrDF, 362 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 363 target_table: TableName, 364 *, 365 batch_size: t.Optional[int] = None, 366 source_columns: t.Optional[t.List[str]] = None, 367 ) -> t.Tuple[t.List[SourceQuery], t.Optional[t.Dict[str, exp.DataType]]]: 368 target_columns_to_types, source_columns = self._columns_to_types( 369 query_or_df, target_columns_to_types, source_columns 370 ) 371 source_queries = self._get_source_queries( 372 query_or_df, 373 target_columns_to_types, 374 target_table=target_table, 375 batch_size=batch_size, 376 source_columns=source_columns, 377 ) 378 return source_queries, target_columns_to_types 379 380 @t.overload 381 def _columns_to_types( 382 self, 383 query_or_df: DF, 384 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 385 source_columns: t.Optional[t.List[str]] = None, 386 ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ... 387 388 @t.overload 389 def _columns_to_types( 390 self, 391 query_or_df: Query, 392 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 393 source_columns: t.Optional[t.List[str]] = None, 394 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ... 395 396 def _columns_to_types( 397 self, 398 query_or_df: QueryOrDF, 399 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 400 source_columns: t.Optional[t.List[str]] = None, 401 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: 402 import pandas as pd 403 404 if not target_columns_to_types and isinstance(query_or_df, pd.DataFrame): 405 target_columns_to_types = columns_to_types_from_df(t.cast(pd.DataFrame, query_or_df)) 406 if not source_columns and target_columns_to_types: 407 source_columns = list(target_columns_to_types) 408 # source columns should only contain columns that are defined in the target. If there are extras then 409 # that means they are intended to be ignored and will be excluded 410 source_columns = ( 411 [x for x in source_columns if x in target_columns_to_types] 412 if source_columns and target_columns_to_types 413 else None 414 ) 415 return target_columns_to_types, source_columns 416 417 def recycle(self) -> None: 418 """Closes all open connections and releases all allocated resources associated with any thread 419 except the calling one.""" 420 self._connection_pool.close_all(exclude_calling_thread=True) 421 422 def close(self) -> t.Any: 423 """Closes all open connections and releases all allocated resources.""" 424 self._connection_pool.close_all() 425 426 def get_current_catalog(self) -> t.Optional[str]: 427 """Returns the catalog name of the current connection.""" 428 raise NotImplementedError() 429 430 def set_current_catalog(self, catalog: str) -> None: 431 """Sets the catalog name of the current connection.""" 432 raise NotImplementedError() 433 434 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 435 """Intended to be overridden for data virtualization systems like Trino that, 436 depending on the target catalog, require slightly different properties to be set when creating / updating tables 437 """ 438 if self.catalog_support.is_unsupported: 439 raise UnsupportedCatalogOperationError( 440 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 441 ) 442 return ( 443 self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE) 444 if catalog 445 else self.DEFAULT_CATALOG_TYPE 446 ) 447 448 def get_catalog_type_from_table(self, table: TableName) -> str: 449 """Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type""" 450 catalog = exp.to_table(table).catalog or self.get_current_catalog() 451 return self.get_catalog_type(catalog) 452 453 @property 454 def current_catalog_type(self) -> str: 455 # `get_catalog_type_from_table` should be used over this property. Reason is that the table that is the target 456 # of the operation is what matters and not the catalog type of the connection. 457 # This still remains for legacy reasons and should be refactored out. 458 return self.get_catalog_type(self.get_current_catalog()) 459 460 def replace_query( 461 self, 462 table_name: TableName, 463 query_or_df: QueryOrDF, 464 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 465 table_description: t.Optional[str] = None, 466 column_descriptions: t.Optional[t.Dict[str, str]] = None, 467 source_columns: t.Optional[t.List[str]] = None, 468 supports_replace_table_override: t.Optional[bool] = None, 469 **kwargs: t.Any, 470 ) -> None: 471 """Replaces an existing table with a query. 472 473 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 474 475 Args: 476 table_name: The name of the table (eg. prod.table) 477 query_or_df: The SQL query to run or a dataframe. 478 target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 479 Expected to be ordered to match the order of values in the dataframe. 480 kwargs: Optional create table properties. 481 """ 482 target_table = exp.to_table(table_name) 483 484 target_data_object = self.get_data_object(target_table) 485 table_exists = target_data_object is not None 486 if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE): 487 table_exists = False 488 489 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 490 query_or_df, 491 target_columns_to_types, 492 target_table=target_table, 493 source_columns=source_columns, 494 ) 495 if not target_columns_to_types and table_exists: 496 target_columns_to_types = self.columns(target_table) 497 query = source_queries[0].query_factory() 498 self_referencing = any( 499 quote_identifiers(table) == quote_identifiers(target_table) 500 for table in query.find_all(exp.Table) 501 ) 502 # If a query references itself then it must have a table created regardless of approach used. 503 if self_referencing: 504 if not target_columns_to_types: 505 raise SQLMeshError( 506 f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. " 507 "Try casting the columns to an expected type or defining the columns in the model metadata. " 508 ) 509 self._create_table_from_columns( 510 target_table, 511 target_columns_to_types, 512 exists=True, 513 table_description=table_description, 514 column_descriptions=column_descriptions, 515 **kwargs, 516 ) 517 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 518 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 519 supports_replace_table = ( 520 self.SUPPORTS_REPLACE_TABLE 521 if supports_replace_table_override is None 522 else supports_replace_table_override 523 ) 524 if supports_replace_table or not table_exists: 525 return self._create_table_from_source_queries( 526 target_table, 527 source_queries, 528 target_columns_to_types, 529 replace=supports_replace_table, 530 table_description=table_description, 531 column_descriptions=column_descriptions, 532 **kwargs, 533 ) 534 if self_referencing: 535 assert target_columns_to_types is not None 536 with self.temp_table( 537 self._select_columns(target_columns_to_types).from_(target_table), 538 name=target_table, 539 target_columns_to_types=target_columns_to_types, 540 **kwargs, 541 ) as temp_table: 542 for source_query in source_queries: 543 source_query.add_transform( 544 lambda node: ( # type: ignore 545 temp_table # type: ignore 546 if isinstance(node, exp.Table) 547 and quote_identifiers(node) == quote_identifiers(target_table) 548 else node 549 ) 550 ) 551 return self._insert_overwrite_by_condition( 552 target_table, 553 source_queries, 554 target_columns_to_types, 555 **kwargs, 556 ) 557 return self._insert_overwrite_by_condition( 558 target_table, 559 source_queries, 560 target_columns_to_types, 561 **kwargs, 562 ) 563 564 def create_index( 565 self, 566 table_name: TableName, 567 index_name: str, 568 columns: t.Tuple[str, ...], 569 exists: bool = True, 570 ) -> None: 571 """Creates a new index for the given table if supported 572 573 Args: 574 table_name: The name of the target table. 575 index_name: The name of the index. 576 columns: The list of columns that constitute the index. 577 exists: Indicates whether to include the IF NOT EXISTS check. 578 """ 579 if not self.SUPPORTS_INDEXES: 580 return 581 582 expression = exp.Create( 583 this=exp.Index( 584 this=exp.to_identifier(index_name), 585 table=exp.to_table(table_name), 586 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 587 ), 588 kind="INDEX", 589 exists=exists, 590 ) 591 self.execute(expression) 592 593 def _pop_creatable_type_from_properties( 594 self, 595 properties: t.Dict[str, exp.Expression], 596 ) -> t.Optional[exp.Property]: 597 """Pop out the creatable_type from the properties dictionary (if exists (return it/remove it) else return none). 598 It also checks that none of the expressions are MATERIALIZE as that conflicts with the `materialize` parameter. 599 """ 600 for key in list(properties.keys()): 601 upper_key = key.upper() 602 if upper_key == KEY_FOR_CREATABLE_TYPE: 603 value = properties.pop(key).name 604 parsed_properties = exp.maybe_parse( 605 value, into=exp.Properties, dialect=self.dialect 606 ) 607 property, *others = parsed_properties.expressions 608 if others: 609 # Multiple properties are unsupported today, can look into it in the future if needed 610 raise SQLMeshError( 611 f"Invalid creatable_type value with multiple properties: {value}" 612 ) 613 if isinstance(property, exp.MaterializedProperty): 614 raise SQLMeshError( 615 f"Cannot use {value} as a creatable_type as it conflicts with the `materialize` parameter." 616 ) 617 return property 618 return None 619 620 def create_table( 621 self, 622 table_name: TableName, 623 target_columns_to_types: t.Dict[str, exp.DataType], 624 primary_key: t.Optional[t.Tuple[str, ...]] = None, 625 exists: bool = True, 626 table_description: t.Optional[str] = None, 627 column_descriptions: t.Optional[t.Dict[str, str]] = None, 628 **kwargs: t.Any, 629 ) -> None: 630 """Create a table using a DDL statement 631 632 Args: 633 table_name: The name of the table to create. Can be fully qualified or just table name. 634 target_columns_to_types: A mapping between the column name and its data type. 635 primary_key: Determines the table primary key. 636 exists: Indicates whether to include the IF NOT EXISTS check. 637 table_description: Optional table description from MODEL DDL. 638 column_descriptions: Optional column descriptions from model query. 639 kwargs: Optional create table properties. 640 """ 641 self._create_table_from_columns( 642 table_name, 643 target_columns_to_types, 644 primary_key, 645 exists, 646 table_description, 647 column_descriptions, 648 **kwargs, 649 ) 650 651 def create_managed_table( 652 self, 653 table_name: TableName, 654 query: Query, 655 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 656 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 657 clustered_by: t.Optional[t.List[exp.Expression]] = None, 658 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 659 table_description: t.Optional[str] = None, 660 column_descriptions: t.Optional[t.Dict[str, str]] = None, 661 source_columns: t.Optional[t.List[str]] = None, 662 **kwargs: t.Any, 663 ) -> None: 664 """Create a managed table using a query. 665 666 "Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh. 667 668 Args: 669 table_name: The name of the table to create. Can be fully qualified or just table name. 670 query: The SQL query for the engine to base the managed table on 671 target_columns_to_types: A mapping between the column name and its data type. 672 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 673 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 674 table_properties: Optional mapping of engine-specific properties to be set on the managed table 675 table_description: Optional table description from MODEL DDL. 676 column_descriptions: Optional column descriptions from model query. 677 kwargs: Optional create table properties. 678 """ 679 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 680 681 def ctas( 682 self, 683 table_name: TableName, 684 query_or_df: QueryOrDF, 685 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 686 exists: bool = True, 687 table_description: t.Optional[str] = None, 688 column_descriptions: t.Optional[t.Dict[str, str]] = None, 689 source_columns: t.Optional[t.List[str]] = None, 690 **kwargs: t.Any, 691 ) -> None: 692 """Create a table using a CTAS statement 693 694 Args: 695 table_name: The name of the table to create. Can be fully qualified or just table name. 696 query_or_df: The SQL query to run or a dataframe for the CTAS. 697 target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 698 exists: Indicates whether to include the IF NOT EXISTS check. 699 table_description: Optional table description from MODEL DDL. 700 column_descriptions: Optional column descriptions from model query. 701 kwargs: Optional create table properties. 702 """ 703 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 704 query_or_df, 705 target_columns_to_types, 706 target_table=table_name, 707 source_columns=source_columns, 708 ) 709 return self._create_table_from_source_queries( 710 table_name, 711 source_queries, 712 target_columns_to_types, 713 exists, 714 table_description=table_description, 715 column_descriptions=column_descriptions, 716 **kwargs, 717 ) 718 719 def create_state_table( 720 self, 721 table_name: str, 722 target_columns_to_types: t.Dict[str, exp.DataType], 723 primary_key: t.Optional[t.Tuple[str, ...]] = None, 724 ) -> None: 725 """Create a table to store SQLMesh internal state. 726 727 Args: 728 table_name: The name of the table to create. Can be fully qualified or just table name. 729 target_columns_to_types: A mapping between the column name and its data type. 730 primary_key: Determines the table primary key. 731 """ 732 self.create_table( 733 table_name, 734 target_columns_to_types, 735 primary_key=primary_key, 736 ) 737 738 def _create_table_from_columns( 739 self, 740 table_name: TableName, 741 target_columns_to_types: t.Dict[str, exp.DataType], 742 primary_key: t.Optional[t.Tuple[str, ...]] = None, 743 exists: bool = True, 744 table_description: t.Optional[str] = None, 745 column_descriptions: t.Optional[t.Dict[str, str]] = None, 746 **kwargs: t.Any, 747 ) -> None: 748 """ 749 Create a table using a DDL statement. 750 751 Args: 752 table_name: The name of the table to create. Can be fully qualified or just table name. 753 target_columns_to_types: Mapping between the column name and its data type. 754 primary_key: Determines the table primary key. 755 exists: Indicates whether to include the IF NOT EXISTS check. 756 table_description: Optional table description from MODEL DDL. 757 column_descriptions: Optional column descriptions from model query. 758 kwargs: Optional create table properties. 759 """ 760 table = exp.to_table(table_name) 761 762 if not columns_to_types_all_known(target_columns_to_types): 763 # It is ok if the columns types are not known if the table already exists and IF NOT EXISTS is set 764 if exists and self.table_exists(table_name): 765 return 766 raise SQLMeshError( 767 "Cannot create a table without knowing the column types. " 768 "Try casting the columns to an expected type or defining the columns in the model metadata. " 769 f"Columns to types: {target_columns_to_types}" 770 ) 771 772 primary_key_expression = ( 773 [exp.PrimaryKey(expressions=[exp.to_column(k) for k in primary_key])] 774 if primary_key and self.SUPPORTS_INDEXES 775 else [] 776 ) 777 778 schema = self._build_schema_exp( 779 table, 780 target_columns_to_types, 781 column_descriptions, 782 primary_key_expression, 783 ) 784 785 self._create_table( 786 schema, 787 None, 788 exists=exists, 789 target_columns_to_types=target_columns_to_types, 790 table_description=table_description, 791 **kwargs, 792 ) 793 794 # Register comments with commands if the engine doesn't support comments in the schema or CREATE 795 if ( 796 table_description 797 and self.COMMENT_CREATION_TABLE.is_comment_command_only 798 and self.comments_enabled 799 ): 800 self._create_table_comment(table_name, table_description) 801 if ( 802 column_descriptions 803 and self.COMMENT_CREATION_TABLE.is_comment_command_only 804 and self.comments_enabled 805 ): 806 self._create_column_comments(table_name, column_descriptions) 807 808 def _build_schema_exp( 809 self, 810 table: exp.Table, 811 target_columns_to_types: t.Dict[str, exp.DataType], 812 column_descriptions: t.Optional[t.Dict[str, str]] = None, 813 expressions: t.Optional[t.List[exp.PrimaryKey]] = None, 814 is_view: bool = False, 815 materialized: bool = False, 816 ) -> exp.Schema: 817 """ 818 Build a schema expression for a table, columns, column comments, and additional schema properties. 819 """ 820 expressions = expressions or [] 821 822 return exp.Schema( 823 this=table, 824 expressions=self._build_column_defs( 825 target_columns_to_types=target_columns_to_types, 826 column_descriptions=column_descriptions, 827 is_view=is_view, 828 materialized=materialized, 829 ) 830 + expressions, 831 ) 832 833 def _build_column_defs( 834 self, 835 target_columns_to_types: t.Dict[str, exp.DataType], 836 column_descriptions: t.Optional[t.Dict[str, str]] = None, 837 is_view: bool = False, 838 materialized: bool = False, 839 ) -> t.List[exp.ColumnDef]: 840 engine_supports_schema_comments = ( 841 self.COMMENT_CREATION_VIEW.supports_schema_def 842 if is_view 843 else self.COMMENT_CREATION_TABLE.supports_schema_def 844 ) 845 return [ 846 self._build_column_def( 847 column, 848 column_descriptions=column_descriptions, 849 engine_supports_schema_comments=engine_supports_schema_comments, 850 col_type=None if is_view else kind, # don't include column data type for views 851 ) 852 for column, kind in target_columns_to_types.items() 853 ] 854 855 def _build_column_def( 856 self, 857 col_name: str, 858 column_descriptions: t.Optional[t.Dict[str, str]] = None, 859 engine_supports_schema_comments: bool = False, 860 col_type: t.Optional[exp.DATA_TYPE] = None, 861 nested_names: t.List[str] = [], 862 ) -> exp.ColumnDef: 863 return exp.ColumnDef( 864 this=exp.to_identifier(col_name), 865 kind=col_type, 866 constraints=( 867 self._build_col_comment_exp(col_name, column_descriptions) 868 if engine_supports_schema_comments and self.comments_enabled and column_descriptions 869 else None 870 ), 871 ) 872 873 def _build_col_comment_exp( 874 self, col_name: str, column_descriptions: t.Dict[str, str] 875 ) -> t.List[exp.ColumnConstraint]: 876 comment = column_descriptions.get(col_name, None) 877 if comment: 878 return [ 879 exp.ColumnConstraint( 880 kind=exp.CommentColumnConstraint( 881 this=exp.Literal.string(self._truncate_column_comment(comment)) 882 ) 883 ) 884 ] 885 return [] 886 887 def _create_table_from_source_queries( 888 self, 889 table_name: TableName, 890 source_queries: t.List[SourceQuery], 891 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 892 exists: bool = True, 893 replace: bool = False, 894 table_description: t.Optional[str] = None, 895 column_descriptions: t.Optional[t.Dict[str, str]] = None, 896 table_kind: t.Optional[str] = None, 897 track_rows_processed: bool = True, 898 **kwargs: t.Any, 899 ) -> None: 900 table = exp.to_table(table_name) 901 902 # CTAS calls do not usually include a schema expression. However, most engines 903 # permit them in CTAS expressions, and they allow us to register all column comments 904 # in a single call rather than in a separate comment command call for each column. 905 # 906 # This block conditionally builds a schema expression with column comments if the engine 907 # supports it and we have columns_to_types. column_to_types is required because the 908 # schema expression must include at least column name, data type, and the comment - 909 # for example, `(colname INTEGER COMMENT 'comment')`. 910 # 911 # column_to_types will be available when loading from a DataFrame (by converting from 912 # pandas to SQL types), when a model is "annotated" by explicitly specifying column 913 # types, and for evaluation methods like `LogicalReplaceQueryMixin.replace_query()` 914 # calls and SCD Type 2 model calls. 915 schema = None 916 target_columns_to_types_known = target_columns_to_types and columns_to_types_all_known( 917 target_columns_to_types 918 ) 919 if ( 920 column_descriptions 921 and target_columns_to_types_known 922 and self.COMMENT_CREATION_TABLE.is_in_schema_def_ctas 923 and self.comments_enabled 924 ): 925 schema = self._build_schema_exp(table, target_columns_to_types, column_descriptions) # type: ignore 926 927 with self.transaction(condition=len(source_queries) > 1): 928 for i, source_query in enumerate(source_queries): 929 with source_query as query: 930 if target_columns_to_types and target_columns_to_types_known: 931 query = self._order_projections_and_filter( 932 query, target_columns_to_types, coerce_types=True 933 ) 934 if i == 0: 935 self._create_table( 936 schema if schema else table, 937 query, 938 target_columns_to_types=target_columns_to_types, 939 exists=exists, 940 replace=replace, 941 table_description=table_description, 942 table_kind=table_kind, 943 track_rows_processed=track_rows_processed, 944 **kwargs, 945 ) 946 else: 947 self._insert_append_query( 948 table_name, 949 query, 950 target_columns_to_types or self.columns(table), 951 track_rows_processed=track_rows_processed, 952 ) 953 954 # Register comments with commands if the engine supports comments and we weren't able to 955 # register them with the CTAS call's schema expression. 956 if ( 957 table_description 958 and self.COMMENT_CREATION_TABLE.is_comment_command_only 959 and self.comments_enabled 960 ): 961 self._create_table_comment(table_name, table_description) 962 if column_descriptions and schema is None and self.comments_enabled: 963 self._create_column_comments(table_name, column_descriptions) 964 965 def _create_table( 966 self, 967 table_name_or_schema: t.Union[exp.Schema, TableName], 968 expression: t.Optional[exp.Expression], 969 exists: bool = True, 970 replace: bool = False, 971 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 972 table_description: t.Optional[str] = None, 973 column_descriptions: t.Optional[t.Dict[str, str]] = None, 974 table_kind: t.Optional[str] = None, 975 track_rows_processed: bool = True, 976 **kwargs: t.Any, 977 ) -> None: 978 self.execute( 979 self._build_create_table_exp( 980 table_name_or_schema, 981 expression=expression, 982 exists=exists, 983 replace=replace, 984 target_columns_to_types=target_columns_to_types, 985 table_description=( 986 table_description 987 if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled 988 else None 989 ), 990 table_kind=table_kind, 991 **kwargs, 992 ), 993 track_rows_processed=track_rows_processed, 994 ) 995 # Extract table name to clear cache 996 table_name = ( 997 table_name_or_schema.this 998 if isinstance(table_name_or_schema, exp.Schema) 999 else table_name_or_schema 1000 ) 1001 self._clear_data_object_cache(table_name) 1002 1003 def _build_create_table_exp( 1004 self, 1005 table_name_or_schema: t.Union[exp.Schema, TableName], 1006 expression: t.Optional[exp.Expression], 1007 exists: bool = True, 1008 replace: bool = False, 1009 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1010 table_description: t.Optional[str] = None, 1011 table_kind: t.Optional[str] = None, 1012 **kwargs: t.Any, 1013 ) -> exp.Create: 1014 exists = False if replace else exists 1015 catalog_name = None 1016 if not isinstance(table_name_or_schema, exp.Schema): 1017 table_name_or_schema = exp.to_table(table_name_or_schema) 1018 catalog_name = table_name_or_schema.catalog 1019 else: 1020 if isinstance(table_name_or_schema.this, exp.Table): 1021 catalog_name = table_name_or_schema.this.catalog 1022 1023 properties = ( 1024 self._build_table_properties_exp( 1025 **kwargs, 1026 catalog_name=catalog_name, 1027 target_columns_to_types=target_columns_to_types, 1028 table_description=table_description, 1029 table_kind=table_kind, 1030 ) 1031 if kwargs or table_description 1032 else None 1033 ) 1034 return exp.Create( 1035 this=table_name_or_schema, 1036 kind=table_kind or "TABLE", 1037 replace=replace, 1038 exists=exists, 1039 expression=expression, 1040 properties=properties, 1041 ) 1042 1043 def create_table_like( 1044 self, 1045 target_table_name: TableName, 1046 source_table_name: TableName, 1047 exists: bool = True, 1048 **kwargs: t.Any, 1049 ) -> None: 1050 """Create a table to store SQLMesh internal state based on the definition of another table, including any 1051 column attributes and indexes defined in the original table. 1052 1053 Args: 1054 target_table_name: The name of the table to create. Can be fully qualified or just table name. 1055 source_table_name: The name of the table to base the new table on. 1056 """ 1057 self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs) 1058 self._clear_data_object_cache(target_table_name) 1059 1060 def clone_table( 1061 self, 1062 target_table_name: TableName, 1063 source_table_name: TableName, 1064 replace: bool = False, 1065 exists: bool = True, 1066 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 1067 **kwargs: t.Any, 1068 ) -> None: 1069 """Creates a table with the target name by cloning the source table. 1070 1071 Args: 1072 target_table_name: The name of the table that should be created. 1073 source_table_name: The name of the source table that should be cloned. 1074 replace: Whether or not to replace an existing table. 1075 exists: Indicates whether to include the IF NOT EXISTS check. 1076 """ 1077 if not self.SUPPORTS_CLONING: 1078 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 1079 1080 kwargs.pop("rendered_physical_properties", None) 1081 self.execute( 1082 exp.Create( 1083 this=exp.to_table(target_table_name), 1084 kind="TABLE", 1085 replace=replace, 1086 exists=exists, 1087 clone=exp.Clone( 1088 this=exp.to_table(source_table_name), 1089 **(clone_kwargs or {}), 1090 ), 1091 **kwargs, 1092 ) 1093 ) 1094 self._clear_data_object_cache(target_table_name) 1095 1096 def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None: 1097 """Drops a data object of arbitrary type. 1098 1099 Args: 1100 data_object: The data object to drop. 1101 ignore_if_not_exists: If True, no error will be raised if the data object does not exist. 1102 """ 1103 if data_object.type.is_view: 1104 self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists) 1105 elif data_object.type.is_materialized_view: 1106 self.drop_view( 1107 data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True 1108 ) 1109 elif data_object.type.is_table: 1110 self.drop_table(data_object.to_table(), exists=ignore_if_not_exists) 1111 elif data_object.type.is_managed_table: 1112 self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists) 1113 else: 1114 raise SQLMeshError( 1115 f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" 1116 ) 1117 1118 def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: 1119 """Drops a table. 1120 1121 Args: 1122 table_name: The name of the table to drop. 1123 exists: If exists, defaults to True. 1124 """ 1125 self._drop_object(name=table_name, exists=exists, **kwargs) 1126 1127 def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: 1128 """Drops a managed table. 1129 1130 Args: 1131 table_name: The name of the table to drop. 1132 exists: If exists, defaults to True. 1133 """ 1134 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}") 1135 1136 def _drop_object( 1137 self, 1138 name: TableName | SchemaName, 1139 exists: bool = True, 1140 kind: str = "TABLE", 1141 cascade: bool = False, 1142 **drop_args: t.Any, 1143 ) -> None: 1144 """Drops an object. 1145 1146 An object could be a DATABASE, SCHEMA, VIEW, TABLE, DYNAMIC TABLE, TEMPORARY TABLE etc depending on the :kind. 1147 1148 Args: 1149 name: The name of the table to drop. 1150 exists: If exists, defaults to True. 1151 kind: What kind of object to drop. Defaults to TABLE 1152 cascade: Whether or not to DROP ... CASCADE. 1153 Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS 1154 **drop_args: Any extra arguments to set on the Drop expression 1155 """ 1156 if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS: 1157 drop_args["cascade"] = cascade 1158 1159 self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args)) 1160 self._clear_data_object_cache(name) 1161 1162 def get_alter_operations( 1163 self, 1164 current_table_name: TableName, 1165 target_table_name: TableName, 1166 *, 1167 ignore_destructive: bool = False, 1168 ignore_additive: bool = False, 1169 ) -> t.List[TableAlterOperation]: 1170 """ 1171 Determines the alter statements needed to change the current table into the structure of the target table. 1172 """ 1173 return t.cast( 1174 t.List[TableAlterOperation], 1175 self.schema_differ.compare_columns( 1176 current_table_name, 1177 self.columns(current_table_name), 1178 self.columns(target_table_name), 1179 ignore_destructive=ignore_destructive, 1180 ignore_additive=ignore_additive, 1181 ), 1182 ) 1183 1184 def alter_table( 1185 self, 1186 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 1187 ) -> None: 1188 """ 1189 Performs the alter statements to change the current table into the structure of the target table. 1190 """ 1191 with self.transaction(): 1192 for alter_expression in [ 1193 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 1194 ]: 1195 self.execute(alter_expression) 1196 1197 def create_view( 1198 self, 1199 view_name: TableName, 1200 query_or_df: QueryOrDF, 1201 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1202 replace: bool = True, 1203 materialized: bool = False, 1204 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 1205 table_description: t.Optional[str] = None, 1206 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1207 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1208 source_columns: t.Optional[t.List[str]] = None, 1209 **create_kwargs: t.Any, 1210 ) -> None: 1211 """Create a view with a query or dataframe. 1212 1213 If a dataframe is passed in, it will be converted into a literal values statement. 1214 This should only be done if the dataframe is very small! 1215 1216 Args: 1217 view_name: The view name. 1218 query_or_df: A query or dataframe. 1219 target_columns_to_types: Columns to use in the view statement. 1220 replace: Whether or not to replace an existing view defaults to True. 1221 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 1222 materialized_properties: Optional materialized view properties to add to the view. 1223 table_description: Optional table description from MODEL DDL. 1224 column_descriptions: Optional column descriptions from model query. 1225 view_properties: Optional view properties to add to the view. 1226 create_kwargs: Additional kwargs to pass into the Create expression 1227 """ 1228 import pandas as pd 1229 1230 if materialized_properties and not materialized: 1231 raise SQLMeshError("Materialized properties are only supported for materialized views") 1232 1233 query_or_df = self._native_df_to_pandas_df(query_or_df) 1234 1235 if isinstance(query_or_df, pd.DataFrame): 1236 values: t.List[t.Tuple[t.Any, ...]] = list( 1237 query_or_df.itertuples(index=False, name=None) 1238 ) 1239 target_columns_to_types, source_columns = self._columns_to_types( 1240 query_or_df, target_columns_to_types, source_columns 1241 ) 1242 if not target_columns_to_types: 1243 raise SQLMeshError("columns_to_types must be provided for dataframes") 1244 source_columns_to_types = get_source_columns_to_types( 1245 target_columns_to_types, source_columns 1246 ) 1247 query_or_df = self._values_to_sql( 1248 values, 1249 source_columns_to_types, 1250 batch_start=0, 1251 batch_end=len(values), 1252 ) 1253 1254 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1255 query_or_df, 1256 target_columns_to_types, 1257 batch_size=0, 1258 target_table=view_name, 1259 source_columns=source_columns, 1260 ) 1261 if len(source_queries) != 1: 1262 raise SQLMeshError("Only one source query is supported for creating views") 1263 1264 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 1265 if target_columns_to_types: 1266 schema = self._build_schema_exp( 1267 exp.to_table(view_name), 1268 target_columns_to_types, 1269 column_descriptions, 1270 is_view=True, 1271 materialized=materialized, 1272 ) 1273 1274 properties = create_kwargs.pop("properties", None) 1275 if not properties: 1276 properties = exp.Properties(expressions=[]) 1277 1278 if view_properties: 1279 table_type = self._pop_creatable_type_from_properties(view_properties) 1280 if table_type: 1281 properties.append("expressions", table_type) 1282 1283 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 1284 properties.append("expressions", exp.MaterializedProperty()) 1285 1286 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1287 schema = schema.this 1288 1289 if not self.SUPPORTS_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1290 schema = schema.this 1291 1292 if materialized_properties: 1293 partitioned_by = materialized_properties.pop("partitioned_by", None) 1294 clustered_by = materialized_properties.pop("clustered_by", None) 1295 if ( 1296 partitioned_by 1297 and ( 1298 partitioned_by_prop := self._build_partitioned_by_exp( 1299 partitioned_by, **materialized_properties 1300 ) 1301 ) 1302 is not None 1303 ): 1304 materialized_properties["catalog_name"] = exp.to_table(view_name).catalog 1305 properties.append("expressions", partitioned_by_prop) 1306 if ( 1307 clustered_by 1308 and ( 1309 clustered_by_prop := self._build_clustered_by_exp( 1310 clustered_by, **materialized_properties 1311 ) 1312 ) 1313 is not None 1314 ): 1315 properties.append("expressions", clustered_by_prop) 1316 1317 create_view_properties = self._build_view_properties_exp( 1318 view_properties, 1319 ( 1320 table_description 1321 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 1322 else None 1323 ), 1324 physical_cluster=create_kwargs.pop("physical_cluster", None), 1325 ) 1326 if create_view_properties: 1327 for view_property in create_view_properties.expressions: 1328 # Small hack to make sure SECURE goes at the beginning before materialized as required by Snowflake 1329 if isinstance(view_property, exp.SecureProperty): 1330 properties.set("expressions", view_property, index=0, overwrite=False) 1331 else: 1332 properties.append("expressions", view_property) 1333 1334 if properties.expressions: 1335 create_kwargs["properties"] = properties 1336 1337 if replace: 1338 self.drop_data_object_on_type_mismatch( 1339 self.get_data_object(view_name), 1340 DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, 1341 ) 1342 1343 with source_queries[0] as query: 1344 self.execute( 1345 exp.Create( 1346 this=schema, 1347 kind="VIEW", 1348 replace=replace, 1349 expression=query, 1350 **create_kwargs, 1351 ), 1352 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 1353 ) 1354 1355 self._clear_data_object_cache(view_name) 1356 1357 # Register table comment with commands if the engine doesn't support doing it in CREATE 1358 if ( 1359 table_description 1360 and self.COMMENT_CREATION_VIEW.is_comment_command_only 1361 and self.comments_enabled 1362 ): 1363 self._create_table_comment(view_name, table_description, "VIEW") 1364 # Register column comments with commands if the engine doesn't support doing it in 1365 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 1366 # columns_to_types 1367 if ( 1368 column_descriptions 1369 and ( 1370 self.COMMENT_CREATION_VIEW.is_comment_command_only 1371 or ( 1372 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 1373 and not target_columns_to_types 1374 ) 1375 ) 1376 and self.comments_enabled 1377 ): 1378 self._create_column_comments(view_name, column_descriptions, "VIEW", materialized) 1379 1380 @set_catalog() 1381 def create_schema( 1382 self, 1383 schema_name: SchemaName, 1384 ignore_if_exists: bool = True, 1385 warn_on_error: bool = True, 1386 properties: t.Optional[t.List[exp.Expression]] = None, 1387 ) -> None: 1388 properties = properties or [] 1389 return self._create_schema( 1390 schema_name=schema_name, 1391 ignore_if_exists=ignore_if_exists, 1392 warn_on_error=warn_on_error, 1393 properties=properties, 1394 kind="SCHEMA", 1395 ) 1396 1397 def _create_schema( 1398 self, 1399 schema_name: SchemaName, 1400 ignore_if_exists: bool, 1401 warn_on_error: bool, 1402 properties: t.List[exp.Expression], 1403 kind: str, 1404 ) -> None: 1405 """Create a schema from a name or qualified table name.""" 1406 try: 1407 self.execute( 1408 exp.Create( 1409 this=to_schema(schema_name), 1410 kind=kind, 1411 exists=ignore_if_exists, 1412 properties=exp.Properties( # this renders as '' (empty string) if expressions is empty 1413 expressions=properties 1414 ), 1415 ) 1416 ) 1417 except Exception as e: 1418 if not warn_on_error: 1419 raise 1420 logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e) 1421 1422 def drop_schema( 1423 self, 1424 schema_name: SchemaName, 1425 ignore_if_not_exists: bool = True, 1426 cascade: bool = False, 1427 **drop_args: t.Dict[str, exp.Expression], 1428 ) -> None: 1429 return self._drop_object( 1430 name=schema_name, 1431 exists=ignore_if_not_exists, 1432 kind="SCHEMA", 1433 cascade=cascade, 1434 **drop_args, 1435 ) 1436 1437 def drop_view( 1438 self, 1439 view_name: TableName, 1440 ignore_if_not_exists: bool = True, 1441 materialized: bool = False, 1442 **kwargs: t.Any, 1443 ) -> None: 1444 """Drop a view.""" 1445 self._drop_object( 1446 name=view_name, 1447 exists=ignore_if_not_exists, 1448 kind="VIEW", 1449 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 1450 **kwargs, 1451 ) 1452 1453 def create_catalog(self, catalog_name: str | exp.Identifier) -> None: 1454 return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1455 1456 def _create_catalog(self, catalog_name: exp.Identifier) -> None: 1457 raise SQLMeshError( 1458 f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1459 ) 1460 1461 def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: 1462 return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) 1463 1464 def _drop_catalog(self, catalog_name: exp.Identifier) -> None: 1465 raise SQLMeshError( 1466 f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." 1467 ) 1468 1469 def columns( 1470 self, table_name: TableName, include_pseudo_columns: bool = False 1471 ) -> t.Dict[str, exp.DataType]: 1472 """Fetches column names and types for the target table.""" 1473 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 1474 describe_output = self.cursor.fetchall() 1475 return { 1476 # Note: MySQL returns the column type as bytes. 1477 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 1478 for column_name, column_type, *_ in itertools.takewhile( 1479 lambda t: not t[0].startswith("#"), 1480 describe_output, 1481 ) 1482 if column_name and column_name.strip() and column_type and column_type.strip() 1483 } 1484 1485 def table_exists(self, table_name: TableName) -> bool: 1486 table = exp.to_table(table_name) 1487 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 1488 if data_object_cache_key in self._data_object_cache: 1489 logger.debug("Table existence cache hit: %s", data_object_cache_key) 1490 return self._data_object_cache[data_object_cache_key] is not None 1491 1492 try: 1493 self.execute(exp.Describe(this=table, kind="TABLE")) 1494 return True 1495 except Exception: 1496 return False 1497 1498 def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None: 1499 self.execute(exp.delete(table_name, where)) 1500 1501 def insert_append( 1502 self, 1503 table_name: TableName, 1504 query_or_df: QueryOrDF, 1505 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1506 track_rows_processed: bool = True, 1507 source_columns: t.Optional[t.List[str]] = None, 1508 ) -> None: 1509 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1510 query_or_df, 1511 target_columns_to_types, 1512 target_table=table_name, 1513 source_columns=source_columns, 1514 ) 1515 self._insert_append_source_queries( 1516 table_name, source_queries, target_columns_to_types, track_rows_processed 1517 ) 1518 1519 def _insert_append_source_queries( 1520 self, 1521 table_name: TableName, 1522 source_queries: t.List[SourceQuery], 1523 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1524 track_rows_processed: bool = True, 1525 ) -> None: 1526 with self.transaction(condition=len(source_queries) > 0): 1527 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1528 for source_query in source_queries: 1529 with source_query as query: 1530 self._insert_append_query( 1531 table_name, 1532 query, 1533 target_columns_to_types, 1534 track_rows_processed=track_rows_processed, 1535 ) 1536 1537 def _insert_append_query( 1538 self, 1539 table_name: TableName, 1540 query: Query, 1541 target_columns_to_types: t.Dict[str, exp.DataType], 1542 order_projections: bool = True, 1543 track_rows_processed: bool = True, 1544 ) -> None: 1545 if order_projections: 1546 query = self._order_projections_and_filter(query, target_columns_to_types) 1547 self.execute( 1548 exp.insert(query, table_name, columns=list(target_columns_to_types)), 1549 track_rows_processed=track_rows_processed, 1550 ) 1551 1552 def insert_overwrite_by_partition( 1553 self, 1554 table_name: TableName, 1555 query_or_df: QueryOrDF, 1556 partitioned_by: t.List[exp.Expression], 1557 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1558 source_columns: t.Optional[t.List[str]] = None, 1559 ) -> None: 1560 if self.INSERT_OVERWRITE_STRATEGY.is_insert_overwrite: 1561 target_table = exp.to_table(table_name) 1562 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1563 query_or_df, 1564 target_columns_to_types, 1565 target_table=target_table, 1566 source_columns=source_columns, 1567 ) 1568 self._insert_overwrite_by_condition( 1569 table_name, source_queries, target_columns_to_types=target_columns_to_types 1570 ) 1571 else: 1572 self._replace_by_key( 1573 table_name, 1574 query_or_df, 1575 target_columns_to_types, 1576 partitioned_by, 1577 is_unique_key=False, 1578 source_columns=source_columns, 1579 ) 1580 1581 def insert_overwrite_by_time_partition( 1582 self, 1583 table_name: TableName, 1584 query_or_df: QueryOrDF, 1585 start: TimeLike, 1586 end: TimeLike, 1587 time_formatter: t.Callable[ 1588 [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression 1589 ], 1590 time_column: TimeColumn | exp.Expression | str, 1591 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1592 source_columns: t.Optional[t.List[str]] = None, 1593 **kwargs: t.Any, 1594 ) -> None: 1595 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1596 query_or_df, 1597 target_columns_to_types, 1598 target_table=table_name, 1599 source_columns=source_columns, 1600 ) 1601 if not target_columns_to_types or not columns_to_types_all_known(target_columns_to_types): 1602 target_columns_to_types = self.columns(table_name) 1603 low, high = [ 1604 time_formatter(dt, target_columns_to_types) 1605 for dt in make_inclusive(start, end, self.dialect) 1606 ] 1607 if isinstance(time_column, TimeColumn): 1608 time_column = time_column.column 1609 where = exp.Between( 1610 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1611 low=low, 1612 high=high, 1613 ) 1614 return self._insert_overwrite_by_time_partition( 1615 table_name, source_queries, target_columns_to_types, where, **kwargs 1616 ) 1617 1618 def _insert_overwrite_by_time_partition( 1619 self, 1620 table_name: TableName, 1621 source_queries: t.List[SourceQuery], 1622 target_columns_to_types: t.Dict[str, exp.DataType], 1623 where: exp.Condition, 1624 **kwargs: t.Any, 1625 ) -> None: 1626 return self._insert_overwrite_by_condition( 1627 table_name, source_queries, target_columns_to_types, where, **kwargs 1628 ) 1629 1630 def _values_to_sql( 1631 self, 1632 values: t.List[t.Tuple[t.Any, ...]], 1633 target_columns_to_types: t.Dict[str, exp.DataType], 1634 batch_start: int, 1635 batch_end: int, 1636 alias: str = "t", 1637 source_columns: t.Optional[t.List[str]] = None, 1638 ) -> Query: 1639 return select_from_values_for_batch_range( 1640 values=values, 1641 target_columns_to_types=target_columns_to_types, 1642 batch_start=batch_start, 1643 batch_end=batch_end, 1644 alias=alias, 1645 source_columns=source_columns, 1646 ) 1647 1648 def _insert_overwrite_by_condition( 1649 self, 1650 table_name: TableName, 1651 source_queries: t.List[SourceQuery], 1652 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1653 where: t.Optional[exp.Condition] = None, 1654 insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, 1655 **kwargs: t.Any, 1656 ) -> None: 1657 table = exp.to_table(table_name) 1658 insert_overwrite_strategy = ( 1659 insert_overwrite_strategy_override or self.INSERT_OVERWRITE_STRATEGY 1660 ) 1661 with self.transaction( 1662 condition=len(source_queries) > 0 or insert_overwrite_strategy.is_delete_insert 1663 ): 1664 target_columns_to_types = target_columns_to_types or self.columns(table_name) 1665 for i, source_query in enumerate(source_queries): 1666 with source_query as query: 1667 query = self._order_projections_and_filter( 1668 query, target_columns_to_types, where=where 1669 ) 1670 if i > 0 or insert_overwrite_strategy.is_delete_insert: 1671 if i == 0: 1672 self.delete_from(table_name, where=where or exp.true()) 1673 self._insert_append_query( 1674 table_name, 1675 query, 1676 target_columns_to_types=target_columns_to_types, 1677 order_projections=False, 1678 ) 1679 elif insert_overwrite_strategy.is_merge: 1680 columns = [exp.column(col) for col in target_columns_to_types] 1681 when_not_matched_by_source = exp.When( 1682 matched=False, 1683 source=True, 1684 condition=where, 1685 then=exp.Delete(), 1686 ) 1687 when_not_matched_by_target = exp.When( 1688 matched=False, 1689 source=False, 1690 then=exp.Insert( 1691 this=exp.Tuple(expressions=columns), 1692 expression=exp.Tuple(expressions=columns), 1693 ), 1694 ) 1695 self._merge( 1696 target_table=table_name, 1697 query=query, 1698 on=exp.false(), 1699 whens=exp.Whens( 1700 expressions=[when_not_matched_by_source, when_not_matched_by_target] 1701 ), 1702 ) 1703 else: 1704 insert_exp = exp.insert( 1705 query, 1706 table, 1707 columns=( 1708 list(target_columns_to_types) 1709 if not insert_overwrite_strategy.is_replace_where 1710 else None 1711 ), 1712 overwrite=insert_overwrite_strategy.is_insert_overwrite, 1713 ) 1714 if insert_overwrite_strategy.is_replace_where: 1715 insert_exp.set("where", where or exp.true()) 1716 self.execute(insert_exp, track_rows_processed=True) 1717 1718 def update_table( 1719 self, 1720 table_name: TableName, 1721 properties: t.Dict[str, t.Any], 1722 where: t.Optional[str | exp.Condition] = None, 1723 ) -> None: 1724 self.execute(exp.update(table_name, properties, where=where)) 1725 1726 def _merge( 1727 self, 1728 target_table: TableName, 1729 query: Query, 1730 on: exp.Expression, 1731 whens: exp.Whens, 1732 ) -> None: 1733 this = exp.alias_(exp.to_table(target_table), alias=MERGE_TARGET_ALIAS, table=True) 1734 using = exp.alias_( 1735 exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True 1736 ) 1737 self.execute( 1738 exp.Merge(this=this, using=using, on=on, whens=whens), track_rows_processed=True 1739 ) 1740 1741 def scd_type_2_by_time( 1742 self, 1743 target_table: TableName, 1744 source_table: QueryOrDF, 1745 unique_key: t.Sequence[exp.Expression], 1746 valid_from_col: exp.Column, 1747 valid_to_col: exp.Column, 1748 execution_time: t.Union[TimeLike, exp.Column], 1749 updated_at_col: exp.Column, 1750 invalidate_hard_deletes: bool = True, 1751 updated_at_as_valid_from: bool = False, 1752 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1753 table_description: t.Optional[str] = None, 1754 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1755 truncate: bool = False, 1756 source_columns: t.Optional[t.List[str]] = None, 1757 **kwargs: t.Any, 1758 ) -> None: 1759 self._scd_type_2( 1760 target_table=target_table, 1761 source_table=source_table, 1762 unique_key=unique_key, 1763 valid_from_col=valid_from_col, 1764 valid_to_col=valid_to_col, 1765 execution_time=execution_time, 1766 updated_at_col=updated_at_col, 1767 invalidate_hard_deletes=invalidate_hard_deletes, 1768 updated_at_as_valid_from=updated_at_as_valid_from, 1769 target_columns_to_types=target_columns_to_types, 1770 table_description=table_description, 1771 column_descriptions=column_descriptions, 1772 truncate=truncate, 1773 source_columns=source_columns, 1774 **kwargs, 1775 ) 1776 1777 def scd_type_2_by_column( 1778 self, 1779 target_table: TableName, 1780 source_table: QueryOrDF, 1781 unique_key: t.Sequence[exp.Expression], 1782 valid_from_col: exp.Column, 1783 valid_to_col: exp.Column, 1784 execution_time: t.Union[TimeLike, exp.Column], 1785 check_columns: t.Union[exp.Star, t.Sequence[exp.Expression]], 1786 invalidate_hard_deletes: bool = True, 1787 execution_time_as_valid_from: bool = False, 1788 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1789 table_description: t.Optional[str] = None, 1790 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1791 truncate: bool = False, 1792 source_columns: t.Optional[t.List[str]] = None, 1793 **kwargs: t.Any, 1794 ) -> None: 1795 self._scd_type_2( 1796 target_table=target_table, 1797 source_table=source_table, 1798 unique_key=unique_key, 1799 valid_from_col=valid_from_col, 1800 valid_to_col=valid_to_col, 1801 execution_time=execution_time, 1802 check_columns=check_columns, 1803 target_columns_to_types=target_columns_to_types, 1804 invalidate_hard_deletes=invalidate_hard_deletes, 1805 execution_time_as_valid_from=execution_time_as_valid_from, 1806 table_description=table_description, 1807 column_descriptions=column_descriptions, 1808 truncate=truncate, 1809 source_columns=source_columns, 1810 **kwargs, 1811 ) 1812 1813 def _scd_type_2( 1814 self, 1815 target_table: TableName, 1816 source_table: QueryOrDF, 1817 unique_key: t.Sequence[exp.Expression], 1818 valid_from_col: exp.Column, 1819 valid_to_col: exp.Column, 1820 execution_time: t.Union[TimeLike, exp.Column], 1821 invalidate_hard_deletes: bool = True, 1822 updated_at_col: t.Optional[exp.Column] = None, 1823 check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Expression]]] = None, 1824 updated_at_as_valid_from: bool = False, 1825 execution_time_as_valid_from: bool = False, 1826 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1827 table_description: t.Optional[str] = None, 1828 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1829 truncate: bool = False, 1830 source_columns: t.Optional[t.List[str]] = None, 1831 **kwargs: t.Any, 1832 ) -> None: 1833 def remove_managed_columns( 1834 cols_to_types: t.Dict[str, exp.DataType], 1835 ) -> t.Dict[str, exp.DataType]: 1836 return { 1837 k: v for k, v in cols_to_types.items() if k not in {valid_from_name, valid_to_name} 1838 } 1839 1840 valid_from_name = valid_from_col.name 1841 valid_to_name = valid_to_col.name 1842 target_columns_to_types = target_columns_to_types or self.columns(target_table) 1843 if ( 1844 valid_from_name not in target_columns_to_types 1845 or valid_to_name not in target_columns_to_types 1846 or not columns_to_types_all_known(target_columns_to_types) 1847 ): 1848 target_columns_to_types = self.columns(target_table) 1849 unmanaged_columns_to_types = ( 1850 remove_managed_columns(target_columns_to_types) if target_columns_to_types else None 1851 ) 1852 source_queries, unmanaged_columns_to_types = self._get_source_queries_and_columns_to_types( 1853 source_table, 1854 unmanaged_columns_to_types, 1855 target_table=target_table, 1856 batch_size=0, 1857 source_columns=source_columns, 1858 ) 1859 updated_at_name = updated_at_col.name if updated_at_col else None 1860 if not target_columns_to_types: 1861 raise SQLMeshError(f"Could not get columns_to_types. Does {target_table} exist?") 1862 unmanaged_columns_to_types = unmanaged_columns_to_types or remove_managed_columns( 1863 target_columns_to_types 1864 ) 1865 if not unique_key: 1866 raise SQLMeshError("unique_key must be provided for SCD Type 2") 1867 if check_columns and updated_at_col: 1868 raise SQLMeshError( 1869 "Cannot use both `check_columns` and `updated_at_name` for SCD Type 2" 1870 ) 1871 if check_columns and updated_at_as_valid_from: 1872 raise SQLMeshError( 1873 "Cannot use both `check_columns` and `updated_at_as_valid_from` for SCD Type 2" 1874 ) 1875 if execution_time_as_valid_from and not check_columns: 1876 raise SQLMeshError( 1877 "Cannot use `execution_time_as_valid_from` without `check_columns` for SCD Type 2" 1878 ) 1879 if updated_at_name and updated_at_name not in target_columns_to_types: 1880 raise SQLMeshError( 1881 f"Column {updated_at_name} not found in {target_table}. Table must contain an `updated_at` timestamp for SCD Type 2" 1882 ) 1883 time_data_type = target_columns_to_types[valid_from_name] 1884 select_source_columns: t.List[t.Union[str, exp.Alias]] = [ 1885 col for col in unmanaged_columns_to_types if col != updated_at_name 1886 ] 1887 table_columns = [exp.column(c, quoted=True) for c in target_columns_to_types] 1888 if updated_at_name: 1889 select_source_columns.append( 1890 exp.cast(updated_at_col, time_data_type).as_(updated_at_col.this) # type: ignore 1891 ) 1892 1893 # If a star is provided, we include all unmanaged columns in the check. 1894 # This unnecessarily includes unique key columns but since they are used in the join, and therefore we know 1895 # they are equal or not, the extra check is not a problem and we gain simplified logic here. 1896 # If we want to change this, then we just need to check the expressions in unique_key and pull out the 1897 # column names and then remove them from the unmanaged_columns 1898 if check_columns: 1899 # Handle both Star directly and [Star()] (which can happen during serialization/deserialization) 1900 if isinstance(seq_get(ensure_list(check_columns), 0), exp.Star): 1901 check_columns = [exp.column(col) for col in unmanaged_columns_to_types] 1902 execution_ts = ( 1903 exp.cast(execution_time, time_data_type, dialect=self.dialect) 1904 if isinstance(execution_time, exp.Column) 1905 else to_time_column(execution_time, time_data_type, self.dialect, nullable=True) 1906 ) 1907 if updated_at_as_valid_from: 1908 if not updated_at_col: 1909 raise SQLMeshError( 1910 "Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2" 1911 ) 1912 update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col 1913 # If using check_columns and the user doesn't always want execution_time for valid from 1914 # then we only use epoch 0 if we are truncating the table and loading rows for the first time. 1915 # All future new rows should have execution time. 1916 elif check_columns and (execution_time_as_valid_from or not truncate): 1917 update_valid_from_start = execution_ts 1918 else: 1919 update_valid_from_start = to_time_column( 1920 "1970-01-01 00:00:00+00:00", time_data_type, self.dialect, nullable=True 1921 ) 1922 insert_valid_from_start = execution_ts if check_columns else updated_at_col # type: ignore 1923 # joined._exists IS NULL is saying "if the row is deleted" 1924 delete_check = ( 1925 exp.column("_exists", "joined").is_(exp.Null()) if invalidate_hard_deletes else None 1926 ) 1927 prefixed_valid_to_col = valid_to_col.copy() 1928 prefixed_valid_to_col.this.set("this", f"t_{prefixed_valid_to_col.name}") 1929 prefixed_valid_from_col = valid_from_col.copy() 1930 prefixed_valid_from_col.this.set("this", f"t_{valid_from_col.name}") 1931 if check_columns: 1932 row_check_conditions = [] 1933 for col in check_columns: 1934 col_qualified = col.copy() 1935 col_qualified.set("table", exp.to_identifier("joined")) 1936 1937 t_col = col_qualified.copy() 1938 for column in t_col.find_all(exp.Column): 1939 column.this.set("this", f"t_{column.name}") 1940 1941 row_check_conditions.extend( 1942 [ 1943 col_qualified.neq(t_col), 1944 exp.and_(t_col.is_(exp.Null()), col_qualified.is_(exp.Null()).not_()), 1945 exp.and_(t_col.is_(exp.Null()).not_(), col_qualified.is_(exp.Null())), 1946 ] 1947 ) 1948 row_value_check = exp.or_(*row_check_conditions) 1949 unique_key_conditions = [] 1950 for key in unique_key: 1951 key_qualified = key.copy() 1952 key_qualified.set("table", exp.to_identifier("joined")) 1953 t_key = key_qualified.copy() 1954 for col in t_key.find_all(exp.Column): 1955 col.this.set("this", f"t_{col.name}") 1956 unique_key_conditions.extend( 1957 [t_key.is_(exp.Null()).not_(), key_qualified.is_(exp.Null()).not_()] 1958 ) 1959 unique_key_check = exp.and_(*unique_key_conditions) 1960 # unique_key_check is saying "if the row is updated" 1961 # row_value_check is saying "if the row has changed" 1962 updated_row_filter = exp.and_(unique_key_check, row_value_check) 1963 valid_to_case_stmt = ( 1964 exp.Case() 1965 .when( 1966 exp.and_( 1967 exp.or_( 1968 delete_check, 1969 updated_row_filter, 1970 ) 1971 ), 1972 execution_ts, 1973 ) 1974 .else_(prefixed_valid_to_col) 1975 .as_(valid_to_col.this) 1976 ) 1977 valid_from_case_stmt = exp.func( 1978 "COALESCE", 1979 prefixed_valid_from_col, 1980 update_valid_from_start, 1981 ).as_(valid_from_col.this) 1982 else: 1983 assert updated_at_col is not None 1984 updated_at_col_qualified = updated_at_col.copy() 1985 updated_at_col_qualified.set("table", exp.to_identifier("joined")) 1986 prefixed_updated_at_col = updated_at_col_qualified.copy() 1987 prefixed_updated_at_col.this.set("this", f"t_{updated_at_col_qualified.name}") 1988 updated_row_filter = updated_at_col_qualified > prefixed_updated_at_col 1989 1990 valid_to_case_stmt_builder = exp.Case().when( 1991 updated_row_filter, updated_at_col_qualified 1992 ) 1993 if delete_check: 1994 valid_to_case_stmt_builder = valid_to_case_stmt_builder.when( 1995 delete_check, execution_ts 1996 ) 1997 valid_to_case_stmt = valid_to_case_stmt_builder.else_(prefixed_valid_to_col).as_( 1998 valid_to_col.this 1999 ) 2000 2001 valid_from_case_stmt = ( 2002 exp.Case() 2003 .when( 2004 exp.and_( 2005 prefixed_valid_from_col.is_(exp.Null()), 2006 exp.column("_exists", "latest_deleted").is_(exp.Null()).not_(), 2007 ), 2008 exp.Case() 2009 .when( 2010 exp.column(valid_to_col.this, "latest_deleted") > updated_at_col, 2011 exp.column(valid_to_col.this, "latest_deleted"), 2012 ) 2013 .else_(updated_at_col), 2014 ) 2015 .when(prefixed_valid_from_col.is_(exp.Null()), update_valid_from_start) 2016 .else_(prefixed_valid_from_col) 2017 ).as_(valid_from_col.this) 2018 2019 existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_( 2020 target_table 2021 ) 2022 if truncate: 2023 existing_rows_query = existing_rows_query.limit(0) 2024 2025 with source_queries[0] as source_query: 2026 prefixed_columns_to_types = [] 2027 for column in target_columns_to_types: 2028 prefixed_col = exp.column(column).copy() 2029 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2030 prefixed_columns_to_types.append(prefixed_col) 2031 prefixed_unmanaged_columns = [] 2032 for column in unmanaged_columns_to_types: 2033 prefixed_col = exp.column(column).copy() 2034 prefixed_col.this.set("this", f"t_{prefixed_col.name}") 2035 prefixed_unmanaged_columns.append(prefixed_col) 2036 query = ( 2037 exp.Select() # type: ignore 2038 .select(*table_columns) 2039 .from_("static") 2040 .union( 2041 exp.select(*table_columns).from_("updated_rows"), 2042 distinct=False, 2043 ) 2044 .union( 2045 exp.select(*table_columns).from_("inserted_rows"), 2046 distinct=False, 2047 ) 2048 .with_( 2049 "source", 2050 exp.select(exp.true().as_("_exists"), *select_source_columns) 2051 .distinct(*unique_key) 2052 .from_( 2053 self.use_server_nulls_for_unmatched_after_join(source_query).subquery( # type: ignore 2054 "raw_source" 2055 ) 2056 ), 2057 ) 2058 # Historical Records that Do Not Change 2059 .with_( 2060 "static", 2061 existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), 2062 ) 2063 # Latest Records that can be updated 2064 .with_( 2065 "latest", 2066 existing_rows_query.where(valid_to_col.is_(exp.Null())), 2067 ) 2068 # Deleted records which can be used to determine `valid_from` for undeleted source records 2069 .with_( 2070 "deleted", 2071 exp.select(*[exp.column(col, "static") for col in target_columns_to_types]) 2072 .from_("static") 2073 .join( 2074 "latest", 2075 on=exp.and_( 2076 *[ 2077 add_table(key, "static").eq(add_table(key, "latest")) 2078 for key in unique_key 2079 ] 2080 ), 2081 join_type="left", 2082 ) 2083 .where(exp.column(valid_to_col.this, "latest").is_(exp.Null())), 2084 ) 2085 # Get the latest `valid_to` deleted record for each unique key 2086 .with_( 2087 "latest_deleted", 2088 exp.select( 2089 exp.true().as_("_exists"), 2090 *(part.as_(f"_key{i}") for i, part in enumerate(unique_key)), 2091 exp.Max(this=valid_to_col).as_(valid_to_col.this), 2092 ) 2093 .from_("deleted") 2094 .group_by(*unique_key), 2095 ) 2096 # Do a full join between latest records and source table in order to combine them together 2097 # MySQL doesn't support full join so going to do a left then right join and remove dups with union 2098 # We do a left/right and filter right on only matching to remove the need to do union distinct 2099 # which allows scd type 2 to be compatible with unhashable data types 2100 .with_( 2101 "joined", 2102 exp.select( 2103 exp.column("_exists", table="source").as_("_exists"), 2104 *( 2105 exp.column(col, table="latest").as_(prefixed_columns_to_types[i].this) 2106 for i, col in enumerate(target_columns_to_types) 2107 ), 2108 *( 2109 exp.column(col, table="source").as_(col) 2110 for col in unmanaged_columns_to_types 2111 ), 2112 ) 2113 .from_("latest") 2114 .join( 2115 "source", 2116 on=exp.and_( 2117 *[ 2118 add_table(key, "latest").eq(add_table(key, "source")) 2119 for key in unique_key 2120 ] 2121 ), 2122 join_type="left", 2123 ) 2124 .union( 2125 exp.select( 2126 exp.column("_exists", table="source").as_("_exists"), 2127 *( 2128 exp.column(col, table="latest").as_( 2129 prefixed_columns_to_types[i].this 2130 ) 2131 for i, col in enumerate(target_columns_to_types) 2132 ), 2133 *( 2134 exp.column(col, table="source").as_(col) 2135 for col in unmanaged_columns_to_types 2136 ), 2137 ) 2138 .from_("latest") 2139 .join( 2140 "source", 2141 on=exp.and_( 2142 *[ 2143 add_table(key, "latest").eq(add_table(key, "source")) 2144 for key in unique_key 2145 ] 2146 ), 2147 join_type="right", 2148 ) 2149 .where(exp.column("_exists", table="latest").is_(exp.Null())), 2150 distinct=False, 2151 ), 2152 ) 2153 # Get deleted, new, no longer current, or unchanged records 2154 .with_( 2155 "updated_rows", 2156 exp.select( 2157 *( 2158 exp.func( 2159 "COALESCE", 2160 exp.column(prefixed_unmanaged_columns[i].this, table="joined"), 2161 exp.column(col, table="joined"), 2162 ).as_(col) 2163 for i, col in enumerate(unmanaged_columns_to_types) 2164 ), 2165 valid_from_case_stmt, 2166 valid_to_case_stmt, 2167 ) 2168 .from_("joined") 2169 .join( 2170 "latest_deleted", 2171 on=exp.and_( 2172 *[ 2173 add_table(part, "joined").eq( 2174 exp.column(f"_key{i}", "latest_deleted") 2175 ) 2176 for i, part in enumerate(unique_key) 2177 ] 2178 ), 2179 join_type="left", 2180 ), 2181 ) 2182 # Get records that have been "updated" which means inserting a new record with previous `valid_from` 2183 .with_( 2184 "inserted_rows", 2185 exp.select( 2186 *unmanaged_columns_to_types, 2187 insert_valid_from_start.as_(valid_from_col.this), # type: ignore 2188 to_time_column(exp.null(), time_data_type, self.dialect, nullable=True).as_( 2189 valid_to_col.this 2190 ), 2191 ) 2192 .from_("joined") 2193 .where(updated_row_filter), 2194 ) 2195 ) 2196 2197 self.replace_query( 2198 target_table, 2199 self.ensure_nulls_for_unmatched_after_join(query), 2200 target_columns_to_types=target_columns_to_types, 2201 table_description=table_description, 2202 column_descriptions=column_descriptions, 2203 **kwargs, 2204 ) 2205 2206 def merge( 2207 self, 2208 target_table: TableName, 2209 source_table: QueryOrDF, 2210 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2211 unique_key: t.Sequence[exp.Expression], 2212 when_matched: t.Optional[exp.Whens] = None, 2213 merge_filter: t.Optional[exp.Expression] = None, 2214 source_columns: t.Optional[t.List[str]] = None, 2215 **kwargs: t.Any, 2216 ) -> None: 2217 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2218 source_table, 2219 target_columns_to_types, 2220 target_table=target_table, 2221 source_columns=source_columns, 2222 ) 2223 target_columns_to_types = target_columns_to_types or self.columns(target_table) 2224 on = exp.and_( 2225 *( 2226 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 2227 for part in unique_key 2228 ) 2229 ) 2230 if merge_filter: 2231 on = exp.and_(merge_filter, on) 2232 2233 if not when_matched: 2234 match_expressions = [ 2235 exp.When( 2236 matched=True, 2237 source=False, 2238 then=exp.Update( 2239 expressions=[ 2240 exp.column(col, MERGE_TARGET_ALIAS).eq( 2241 exp.column(col, MERGE_SOURCE_ALIAS) 2242 ) 2243 for col in target_columns_to_types 2244 ], 2245 ), 2246 ) 2247 ] 2248 else: 2249 match_expressions = when_matched.copy().expressions 2250 2251 match_expressions.append( 2252 exp.When( 2253 matched=False, 2254 source=False, 2255 then=exp.Insert( 2256 this=exp.Tuple( 2257 expressions=[exp.column(col) for col in target_columns_to_types] 2258 ), 2259 expression=exp.Tuple( 2260 expressions=[ 2261 exp.column(col, MERGE_SOURCE_ALIAS) for col in target_columns_to_types 2262 ] 2263 ), 2264 ), 2265 ) 2266 ) 2267 for source_query in source_queries: 2268 with source_query as query: 2269 self._merge( 2270 target_table=target_table, 2271 query=query, 2272 on=on, 2273 whens=exp.Whens(expressions=match_expressions), 2274 ) 2275 2276 def rename_table( 2277 self, 2278 old_table_name: TableName, 2279 new_table_name: TableName, 2280 ) -> None: 2281 new_table = exp.to_table(new_table_name) 2282 if new_table.catalog: 2283 old_table = exp.to_table(old_table_name) 2284 catalog = old_table.catalog or self.get_current_catalog() 2285 if catalog != new_table.catalog: 2286 raise UnsupportedCatalogOperationError( 2287 "Tried to rename table across catalogs which is not supported" 2288 ) 2289 self._rename_table(old_table_name, new_table_name) 2290 self._clear_data_object_cache(old_table_name) 2291 self._clear_data_object_cache(new_table_name) 2292 2293 def get_data_object( 2294 self, target_name: TableName, safe_to_cache: bool = False 2295 ) -> t.Optional[DataObject]: 2296 target_table = exp.to_table(target_name) 2297 existing_data_objects = self.get_data_objects( 2298 schema_(target_table.db, target_table.catalog), 2299 {target_table.name}, 2300 safe_to_cache=safe_to_cache, 2301 ) 2302 if existing_data_objects: 2303 return existing_data_objects[0] 2304 return None 2305 2306 def get_data_objects( 2307 self, 2308 schema_name: SchemaName, 2309 object_names: t.Optional[t.Set[str]] = None, 2310 safe_to_cache: bool = False, 2311 ) -> t.List[DataObject]: 2312 """Lists all data objects in the target schema. 2313 2314 Args: 2315 schema_name: The name of the schema to list data objects from. 2316 object_names: If provided, only return data objects with these names. 2317 safe_to_cache: Whether it is safe to cache the results of this call. 2318 2319 Returns: 2320 A list of data objects in the target schema. 2321 """ 2322 if object_names is not None: 2323 if not object_names: 2324 return [] 2325 2326 # Check cache for each object name 2327 target_schema = to_schema(schema_name) 2328 cached_objects = [] 2329 missing_names = set() 2330 2331 for name in object_names: 2332 cache_key = _get_data_object_cache_key( 2333 target_schema.catalog, target_schema.db, name 2334 ) 2335 if cache_key in self._data_object_cache: 2336 logger.debug("Data object cache hit: %s", cache_key) 2337 data_object = self._data_object_cache[cache_key] 2338 # If the object is none, then the table was previously looked for but not found 2339 if data_object: 2340 cached_objects.append(data_object) 2341 else: 2342 logger.debug("Data object cache miss: %s", cache_key) 2343 missing_names.add(name) 2344 2345 # Fetch missing objects from database 2346 if missing_names: 2347 object_names_list = list(missing_names) 2348 batches = [ 2349 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 2350 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 2351 ] 2352 2353 fetched_objects = [] 2354 fetched_object_names = set() 2355 for batch in batches: 2356 objects = self._get_data_objects(schema_name, set(batch)) 2357 for obj in objects: 2358 if safe_to_cache: 2359 cache_key = _get_data_object_cache_key( 2360 obj.catalog, obj.schema_name, obj.name 2361 ) 2362 self._data_object_cache[cache_key] = obj 2363 fetched_objects.append(obj) 2364 fetched_object_names.add(obj.name) 2365 2366 if safe_to_cache: 2367 for missing_name in missing_names - fetched_object_names: 2368 cache_key = _get_data_object_cache_key( 2369 target_schema.catalog, target_schema.db, missing_name 2370 ) 2371 self._data_object_cache[cache_key] = None 2372 2373 return cached_objects + fetched_objects 2374 2375 return cached_objects 2376 2377 fetched_objects = self._get_data_objects(schema_name) 2378 if safe_to_cache: 2379 for obj in fetched_objects: 2380 cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name) 2381 self._data_object_cache[cache_key] = obj 2382 return fetched_objects 2383 2384 def fetchone( 2385 self, 2386 query: t.Union[exp.Expression, str], 2387 ignore_unsupported_errors: bool = False, 2388 quote_identifiers: bool = False, 2389 ) -> t.Optional[t.Tuple]: 2390 with self.transaction(): 2391 self.execute( 2392 query, 2393 ignore_unsupported_errors=ignore_unsupported_errors, 2394 quote_identifiers=quote_identifiers, 2395 ) 2396 return self.cursor.fetchone() 2397 2398 def fetchall( 2399 self, 2400 query: t.Union[exp.Expression, str], 2401 ignore_unsupported_errors: bool = False, 2402 quote_identifiers: bool = False, 2403 ) -> t.List[t.Tuple]: 2404 with self.transaction(): 2405 self.execute( 2406 query, 2407 ignore_unsupported_errors=ignore_unsupported_errors, 2408 quote_identifiers=quote_identifiers, 2409 ) 2410 return self.cursor.fetchall() 2411 2412 def _fetch_native_df( 2413 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2414 ) -> DF: 2415 """Fetches a DataFrame that can be either Pandas or PySpark from the cursor""" 2416 with self.transaction(): 2417 self.execute(query, quote_identifiers=quote_identifiers) 2418 return self.cursor.fetchdf() 2419 2420 def _native_df_to_pandas_df( 2421 self, 2422 query_or_df: QueryOrDF, 2423 ) -> t.Union[Query, pd.DataFrame]: 2424 """ 2425 Take a "native" DataFrame (eg Pyspark, Bigframe, Snowpark etc) and convert it to Pandas 2426 """ 2427 import pandas as pd 2428 2429 if isinstance(query_or_df, (exp.Query, pd.DataFrame)): 2430 return query_or_df 2431 2432 # EngineAdapter subclasses that have native DataFrame types should override this 2433 raise NotImplementedError(f"Unable to convert {type(query_or_df)} to Pandas") 2434 2435 def fetchdf( 2436 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2437 ) -> pd.DataFrame: 2438 """Fetches a Pandas DataFrame from the cursor""" 2439 import pandas as pd 2440 2441 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 2442 if not isinstance(df, pd.DataFrame): 2443 raise NotImplementedError( 2444 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 2445 ) 2446 return df 2447 2448 def fetch_pyspark_df( 2449 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2450 ) -> PySparkDataFrame: 2451 """Fetches a PySpark DataFrame from the cursor""" 2452 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}") 2453 2454 @property 2455 def wap_enabled(self) -> bool: 2456 """Returns whether WAP is enabled for this engine.""" 2457 return self._extra_config.get("wap_enabled", False) 2458 2459 def wap_supported(self, table_name: TableName) -> bool: 2460 """Returns whether WAP for the target table is supported.""" 2461 return False 2462 2463 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 2464 """Returns the updated table name for the given WAP ID. 2465 2466 Args: 2467 table_name: The name of the target table. 2468 wap_id: The WAP ID to prepare. 2469 2470 Returns: 2471 The updated table name that should be used for writing. 2472 """ 2473 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2474 2475 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 2476 """Prepares the target table for WAP and returns the updated table name. 2477 2478 Args: 2479 table_name: The name of the target table. 2480 wap_id: The WAP ID to prepare. 2481 2482 Returns: 2483 The updated table name that should be used for writing. 2484 """ 2485 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2486 2487 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 2488 """Publishes changes with the given WAP ID to the target table. 2489 2490 Args: 2491 table_name: The name of the target table. 2492 wap_id: The WAP ID to publish. 2493 """ 2494 raise NotImplementedError(f"Engine does not support WAP: {type(self)}") 2495 2496 def sync_grants_config( 2497 self, 2498 table: exp.Table, 2499 grants_config: GrantsConfig, 2500 table_type: DataObjectType = DataObjectType.TABLE, 2501 ) -> None: 2502 """Applies the grants_config to a table authoritatively. 2503 It first compares the specified grants against the current grants, and then 2504 applies the diffs to the table by revoking and granting privileges as needed. 2505 2506 Args: 2507 table: The table/view to apply grants to. 2508 grants_config: Dictionary mapping privileges to lists of grantees. 2509 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 2510 """ 2511 if not self.SUPPORTS_GRANTS: 2512 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 2513 2514 current_grants = self._get_current_grants_config(table) 2515 new_grants, revoked_grants = self._diff_grants_configs(grants_config, current_grants) 2516 revoke_exprs = self._revoke_grants_config_expr(table, revoked_grants, table_type) 2517 grant_exprs = self._apply_grants_config_expr(table, new_grants, table_type) 2518 dcl_exprs = revoke_exprs + grant_exprs 2519 2520 if dcl_exprs: 2521 self.execute(dcl_exprs) 2522 2523 @contextlib.contextmanager 2524 def transaction( 2525 self, 2526 condition: t.Optional[bool] = None, 2527 ) -> t.Iterator[None]: 2528 """A transaction context manager.""" 2529 if ( 2530 self._connection_pool.is_transaction_active 2531 or not self.SUPPORTS_TRANSACTIONS 2532 or (condition is not None and not condition) 2533 ): 2534 yield 2535 return 2536 2537 if self._pre_ping: 2538 try: 2539 logger.debug("Pinging the database to check the connection") 2540 self.ping() 2541 except Exception: 2542 logger.info("Connection to the database was lost. Reconnecting...") 2543 self._connection_pool.close() 2544 2545 self._connection_pool.begin() 2546 try: 2547 yield 2548 except Exception as e: 2549 self._connection_pool.rollback() 2550 raise e 2551 else: 2552 self._connection_pool.commit() 2553 2554 @contextlib.contextmanager 2555 def session(self, properties: SessionProperties) -> t.Iterator[None]: 2556 """A session context manager.""" 2557 if self._is_session_active(): 2558 yield 2559 return 2560 2561 self._begin_session(properties) 2562 try: 2563 yield 2564 finally: 2565 self._end_session() 2566 2567 def _begin_session(self, properties: SessionProperties) -> t.Any: 2568 """Begin a new session.""" 2569 2570 def _end_session(self) -> None: 2571 """End the existing session.""" 2572 2573 def _is_session_active(self) -> bool: 2574 """Indicates whether or not a session is active.""" 2575 return False 2576 2577 def execute( 2578 self, 2579 expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]], 2580 ignore_unsupported_errors: bool = False, 2581 quote_identifiers: bool = True, 2582 track_rows_processed: bool = False, 2583 **kwargs: t.Any, 2584 ) -> None: 2585 """Execute a sql query.""" 2586 to_sql_kwargs = ( 2587 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 2588 ) 2589 with self.transaction(): 2590 for e in ensure_list(expressions): 2591 if isinstance(e, exp.Expression): 2592 self._check_identifier_length(e) 2593 sql = self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 2594 else: 2595 sql = t.cast(str, e) 2596 2597 sql = self._attach_correlation_id(sql) 2598 2599 self._log_sql( 2600 sql, 2601 expression=e if isinstance(e, exp.Expression) else None, 2602 quote_identifiers=quote_identifiers, 2603 ) 2604 self._execute(sql, track_rows_processed, **kwargs) 2605 2606 def _attach_correlation_id(self, sql: str) -> str: 2607 if self.ATTACH_CORRELATION_ID and self.correlation_id: 2608 return f"/* {self.correlation_id} */ {sql}" 2609 return sql 2610 2611 def _log_sql( 2612 self, 2613 sql: str, 2614 expression: t.Optional[exp.Expression] = None, 2615 quote_identifiers: bool = True, 2616 ) -> None: 2617 if not logger.isEnabledFor(self._execute_log_level): 2618 return 2619 2620 sql_to_log = sql 2621 if expression is not None and not isinstance(expression, exp.Query): 2622 values = expression.find(exp.Values) 2623 if values: 2624 values.set("expressions", [exp.to_identifier("<REDACTED VALUES>")]) 2625 sql_to_log = self._to_sql(expression, quote=quote_identifiers) 2626 2627 logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log) 2628 2629 def _record_execution_stats( 2630 self, sql: str, rowcount: t.Optional[int] = None, bytes_processed: t.Optional[int] = None 2631 ) -> None: 2632 if self._query_execution_tracker: 2633 self._query_execution_tracker.record_execution(sql, rowcount, bytes_processed) 2634 2635 def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: 2636 self.cursor.execute(sql, **kwargs) 2637 2638 if ( 2639 self.SUPPORTS_QUERY_EXECUTION_TRACKING 2640 and track_rows_processed 2641 and self._query_execution_tracker 2642 and self._query_execution_tracker.is_tracking() 2643 ): 2644 if ( 2645 rowcount := getattr(self.cursor, "rowcount", None) 2646 ) is not None and rowcount is not None: 2647 try: 2648 self._record_execution_stats(sql, int(rowcount)) 2649 except (TypeError, ValueError): 2650 return 2651 2652 @contextlib.contextmanager 2653 def temp_table( 2654 self, 2655 query_or_df: QueryOrDF, 2656 name: TableName = "diff", 2657 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2658 source_columns: t.Optional[t.List[str]] = None, 2659 **kwargs: t.Any, 2660 ) -> t.Iterator[exp.Table]: 2661 """A context manager for working a temp table. 2662 2663 The table will be created with a random guid and cleaned up after the block. 2664 2665 Args: 2666 query_or_df: The query or df to create a temp table for. 2667 name: The base name of the temp table. 2668 target_columns_to_types: A mapping between the column name and its data type. 2669 2670 Yields: 2671 The table expression 2672 """ 2673 name = exp.to_table(name) 2674 # ensure that we use default catalog if none is not specified 2675 if isinstance(name, exp.Table) and not name.catalog and name.db and self.default_catalog: 2676 name.set("catalog", exp.parse_identifier(self.default_catalog)) 2677 2678 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2679 query_or_df, 2680 target_columns_to_types=target_columns_to_types, 2681 target_table=name, 2682 source_columns=source_columns, 2683 ) 2684 2685 with self.transaction(): 2686 table = self._get_temp_table(name) 2687 if table.db: 2688 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 2689 self._create_table_from_source_queries( 2690 table, 2691 source_queries, 2692 target_columns_to_types, 2693 exists=True, 2694 table_description=None, 2695 column_descriptions=None, 2696 track_rows_processed=False, 2697 **kwargs, 2698 ) 2699 2700 try: 2701 yield table 2702 finally: 2703 self.drop_table(table) 2704 2705 def _table_or_view_properties_to_expressions( 2706 self, table_or_view_properties: t.Optional[t.Dict[str, exp.Expression]] = None 2707 ) -> t.List[exp.Property]: 2708 """Converts model properties (either physical or virtual) to a list of property expressions.""" 2709 if not table_or_view_properties: 2710 return [] 2711 return [ 2712 exp.Property(this=key, value=value.copy()) 2713 for key, value in table_or_view_properties.items() 2714 ] 2715 2716 def _build_partitioned_by_exp( 2717 self, 2718 partitioned_by: t.List[exp.Expression], 2719 *, 2720 partition_interval_unit: t.Optional[IntervalUnit] = None, 2721 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2722 catalog_name: t.Optional[str] = None, 2723 **kwargs: t.Any, 2724 ) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.Property]]: 2725 return None 2726 2727 def _build_clustered_by_exp( 2728 self, 2729 clustered_by: t.List[exp.Expression], 2730 **kwargs: t.Any, 2731 ) -> t.Optional[exp.Cluster]: 2732 return None 2733 2734 def _build_table_properties_exp( 2735 self, 2736 catalog_name: t.Optional[str] = None, 2737 table_format: t.Optional[str] = None, 2738 storage_format: t.Optional[str] = None, 2739 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 2740 partition_interval_unit: t.Optional[IntervalUnit] = None, 2741 clustered_by: t.Optional[t.List[exp.Expression]] = None, 2742 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 2743 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2744 table_description: t.Optional[str] = None, 2745 table_kind: t.Optional[str] = None, 2746 **kwargs: t.Any, 2747 ) -> t.Optional[exp.Properties]: 2748 """Creates a SQLGlot table properties expression for ddl.""" 2749 properties: t.List[exp.Expression] = [] 2750 2751 if table_description: 2752 properties.append( 2753 exp.SchemaCommentProperty( 2754 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2755 ) 2756 ) 2757 2758 if table_properties: 2759 table_type = self._pop_creatable_type_from_properties(table_properties) 2760 properties.extend(ensure_list(table_type)) 2761 2762 if properties: 2763 return exp.Properties(expressions=properties) 2764 return None 2765 2766 def _build_view_properties_exp( 2767 self, 2768 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 2769 table_description: t.Optional[str] = None, 2770 **kwargs: t.Any, 2771 ) -> t.Optional[exp.Properties]: 2772 """Creates a SQLGlot table properties expression for view""" 2773 properties: t.List[exp.Expression] = [] 2774 2775 if table_description: 2776 properties.append( 2777 exp.SchemaCommentProperty( 2778 this=exp.Literal.string(self._truncate_table_comment(table_description)) 2779 ) 2780 ) 2781 2782 if properties: 2783 return exp.Properties(expressions=properties) 2784 return None 2785 2786 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 2787 return comment[:length] if length else comment 2788 2789 def _truncate_table_comment(self, comment: str) -> str: 2790 return self._truncate_comment(comment, self.MAX_TABLE_COMMENT_LENGTH) 2791 2792 def _truncate_column_comment(self, comment: str) -> str: 2793 return self._truncate_comment(comment, self.MAX_COLUMN_COMMENT_LENGTH) 2794 2795 def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.Any) -> str: 2796 """ 2797 Converts an expression to a SQL string. Has a set of default kwargs to apply, and then default 2798 kwargs defined for the given dialect, and then kwargs provided by the user when defining the engine 2799 adapter, and then finally kwargs provided by the user when calling this method. 2800 """ 2801 sql_gen_kwargs = { 2802 "dialect": self.dialect, 2803 "pretty": self._pretty_sql, 2804 "comments": False, 2805 **self._sql_gen_kwargs, 2806 **kwargs, 2807 } 2808 2809 expression = expression.copy() 2810 2811 if quote: 2812 quote_identifiers(expression) 2813 2814 return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore 2815 2816 def _clear_data_object_cache(self, table_name: t.Optional[TableName] = None) -> None: 2817 """Clears the cache entry for the given table name, or clears the entire cache if table_name is None.""" 2818 if table_name is None: 2819 logger.debug("Clearing entire data object cache") 2820 self._data_object_cache.clear() 2821 else: 2822 table = exp.to_table(table_name) 2823 cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 2824 logger.debug("Clearing data object cache key: %s", cache_key) 2825 self._data_object_cache.pop(cache_key, None) 2826 2827 def _get_data_objects( 2828 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 2829 ) -> t.List[DataObject]: 2830 """ 2831 Returns all the data objects that exist in the given schema and optionally catalog. 2832 """ 2833 raise NotImplementedError() 2834 2835 def _get_temp_table( 2836 self, table: TableName, table_only: bool = False, quoted: bool = True 2837 ) -> exp.Table: 2838 """ 2839 Returns the name of the temp table that should be used for the given table name. 2840 """ 2841 table = t.cast(exp.Table, exp.to_table(table).copy()) 2842 table.set( 2843 "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=quoted) 2844 ) 2845 2846 if table_only: 2847 table.set("db", None) 2848 table.set("catalog", None) 2849 2850 return table 2851 2852 def _order_projections_and_filter( 2853 self, 2854 query: Query, 2855 target_columns_to_types: t.Dict[str, exp.DataType], 2856 where: t.Optional[exp.Expression] = None, 2857 coerce_types: bool = False, 2858 ) -> Query: 2859 if not isinstance(query, exp.Query) or ( 2860 not where and not coerce_types and query.named_selects == list(target_columns_to_types) 2861 ): 2862 return query 2863 2864 query = t.cast(exp.Query, query.copy()) 2865 with_ = query.args.pop("with_", None) 2866 2867 select_exprs: t.List[exp.Expression] = [ 2868 exp.column(c, quoted=True) for c in target_columns_to_types 2869 ] 2870 if coerce_types and columns_to_types_all_known(target_columns_to_types): 2871 select_exprs = [ 2872 exp.cast(select_exprs[i], col_tpe).as_(col, quoted=True) 2873 for i, (col, col_tpe) in enumerate(target_columns_to_types.items()) 2874 ] 2875 2876 query = exp.select(*select_exprs).from_(query.subquery("_subquery", copy=False), copy=False) 2877 if where: 2878 query = query.where(where, copy=False) 2879 2880 if with_: 2881 query.set("with_", with_) 2882 2883 return query 2884 2885 def _truncate_table(self, table_name: TableName) -> None: 2886 table = exp.to_table(table_name) 2887 self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") 2888 2889 def drop_data_object_on_type_mismatch( 2890 self, data_object: t.Optional[DataObject], expected_type: DataObjectType 2891 ) -> bool: 2892 """Drops a data object if it exists and is not of the expected type. 2893 2894 Args: 2895 data_object: The data object to check. 2896 expected_type: The expected type of the data object. 2897 2898 Returns: 2899 True if the data object was dropped, False otherwise. 2900 """ 2901 if data_object is None or data_object.type == expected_type: 2902 return False 2903 2904 logger.warning( 2905 "Target data object '%s' is a %s and not a %s, dropping it", 2906 data_object.to_table().sql(dialect=self.dialect), 2907 data_object.type.value, 2908 expected_type.value, 2909 ) 2910 self.drop_data_object(data_object) 2911 return True 2912 2913 def _replace_by_key( 2914 self, 2915 target_table: TableName, 2916 source_table: QueryOrDF, 2917 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2918 key: t.Sequence[exp.Expression], 2919 is_unique_key: bool, 2920 source_columns: t.Optional[t.List[str]] = None, 2921 ) -> None: 2922 if target_columns_to_types is None: 2923 target_columns_to_types = self.columns(target_table) 2924 2925 temp_table = self._get_temp_table(target_table) 2926 key_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *key) if len(key) > 1 else key[0] 2927 column_names = list(target_columns_to_types or []) 2928 2929 with self.transaction(): 2930 self.ctas( 2931 temp_table, 2932 source_table, 2933 target_columns_to_types=target_columns_to_types, 2934 exists=False, 2935 source_columns=source_columns, 2936 ) 2937 2938 try: 2939 delete_query = exp.select(key_exp).from_(temp_table) 2940 insert_query = self._select_columns(target_columns_to_types).from_(temp_table) 2941 if not is_unique_key: 2942 delete_query = delete_query.distinct() 2943 else: 2944 insert_query = insert_query.distinct(*key) 2945 2946 insert_statement = exp.insert( 2947 insert_query, 2948 target_table, 2949 columns=column_names, 2950 ) 2951 delete_filter = key_exp.isin(query=delete_query) 2952 2953 if not self.INSERT_OVERWRITE_STRATEGY.is_replace_where: 2954 self.delete_from(target_table, delete_filter) 2955 else: 2956 insert_statement.set("where", delete_filter) 2957 insert_statement.set("this", exp.to_table(target_table)) 2958 2959 self.execute(insert_statement, track_rows_processed=True) 2960 finally: 2961 self.drop_table(temp_table) 2962 2963 def _build_create_comment_table_exp( 2964 self, table: exp.Table, table_comment: str, table_kind: str 2965 ) -> exp.Comment | str: 2966 return exp.Comment( 2967 this=table, 2968 kind=table_kind, 2969 expression=exp.Literal.string(self._truncate_table_comment(table_comment)), 2970 ) 2971 2972 def _create_table_comment( 2973 self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" 2974 ) -> None: 2975 table = exp.to_table(table_name) 2976 2977 try: 2978 self.execute(self._build_create_comment_table_exp(table, table_comment, table_kind)) 2979 except Exception: 2980 logger.warning( 2981 f"Table comment for '{table.alias_or_name}' not registered - this may be due to limited permissions", 2982 exc_info=True, 2983 ) 2984 2985 def _build_create_comment_column_exp( 2986 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 2987 ) -> exp.Comment | str: 2988 return exp.Comment( 2989 this=exp.column(column_name, *reversed(table.parts)), # type: ignore 2990 kind="COLUMN", 2991 expression=exp.Literal.string(self._truncate_column_comment(column_comment)), 2992 ) 2993 2994 def _create_column_comments( 2995 self, 2996 table_name: TableName, 2997 column_comments: t.Dict[str, str], 2998 table_kind: str = "TABLE", 2999 materialized_view: bool = False, 3000 ) -> None: 3001 table = exp.to_table(table_name) 3002 3003 for col, comment in column_comments.items(): 3004 try: 3005 self.execute(self._build_create_comment_column_exp(table, col, comment, table_kind)) 3006 except Exception: 3007 logger.warning( 3008 f"Column comments for column '{col}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions", 3009 exc_info=True, 3010 ) 3011 3012 def _create_table_like( 3013 self, 3014 target_table_name: TableName, 3015 source_table_name: TableName, 3016 exists: bool, 3017 **kwargs: t.Any, 3018 ) -> None: 3019 self.create_table(target_table_name, self.columns(source_table_name), exists=exists) 3020 3021 def _rename_table( 3022 self, 3023 old_table_name: TableName, 3024 new_table_name: TableName, 3025 ) -> None: 3026 self.execute(exp.rename_table(old_table_name, new_table_name)) 3027 3028 def ensure_nulls_for_unmatched_after_join( 3029 self, 3030 query: Query, 3031 ) -> Query: 3032 return query 3033 3034 def use_server_nulls_for_unmatched_after_join( 3035 self, 3036 query: Query, 3037 ) -> Query: 3038 return query 3039 3040 def ping(self) -> None: 3041 try: 3042 self._execute(exp.select("1").sql(dialect=self.dialect)) 3043 finally: 3044 self._connection_pool.close_cursor() 3045 3046 @classmethod 3047 def _select_columns( 3048 cls, columns: t.Iterable[str], source_columns: t.Optional[t.List[str]] = None 3049 ) -> exp.Select: 3050 return exp.select( 3051 *( 3052 exp.column(c, quoted=True) 3053 if c in (source_columns or columns) 3054 else exp.alias_(exp.Null(), c, quoted=True) 3055 for c in columns 3056 ) 3057 ) 3058 3059 def _check_identifier_length(self, expression: exp.Expression) -> None: 3060 if self.MAX_IDENTIFIER_LENGTH is None or not isinstance(expression, exp.DDL): 3061 return 3062 3063 for identifier in expression.find_all(exp.Identifier): 3064 name = identifier.name 3065 name_length = len(name) 3066 if name_length > self.MAX_IDENTIFIER_LENGTH: 3067 raise SQLMeshError( 3068 f"Identifier name '{name}' (length {name_length}) exceeds {self.dialect.capitalize()}'s max identifier limit of {self.MAX_IDENTIFIER_LENGTH} characters" 3069 ) 3070 3071 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 3072 raise NotImplementedError() 3073 3074 @classmethod 3075 def _diff_grants_configs( 3076 cls, new_config: GrantsConfig, old_config: GrantsConfig 3077 ) -> t.Tuple[GrantsConfig, GrantsConfig]: 3078 """Compute additions and removals between two grants configurations. 3079 3080 This method compares new (desired) and old (current) GrantsConfigs case-insensitively 3081 for both privilege keys and grantees, while preserving original casing 3082 in the output GrantsConfigs. 3083 3084 Args: 3085 new_config: Desired grants configuration (specified by the user). 3086 old_config: Current grants configuration (returned by the database). 3087 3088 Returns: 3089 A tuple of (additions, removals) GrantsConfig where: 3090 - additions contains privileges/grantees present in new_config but not in old_config 3091 - additions uses keys and grantee strings from new_config (user-specified casing) 3092 - removals contains privileges/grantees present in old_config but not in new_config 3093 - removals uses keys and grantee strings from old_config (database-returned casing) 3094 3095 Notes: 3096 - Comparison is case-insensitive using casefold(); original casing is preserved in results. 3097 - Overlapping grantees (case-insensitive) are excluded from the results. 3098 """ 3099 3100 def _diffs(config1: GrantsConfig, config2: GrantsConfig) -> GrantsConfig: 3101 diffs: GrantsConfig = {} 3102 cf_config2 = {k.casefold(): {g.casefold() for g in v} for k, v in config2.items()} 3103 for key, grantees in config1.items(): 3104 cf_key = key.casefold() 3105 3106 # Missing key (add all grantees) 3107 if cf_key not in cf_config2: 3108 diffs[key] = grantees.copy() 3109 continue 3110 3111 # Include only grantees not in config2 3112 cf_grantees2 = cf_config2[cf_key] 3113 diff_grantees = [] 3114 for grantee in grantees: 3115 if grantee.casefold() not in cf_grantees2: 3116 diff_grantees.append(grantee) 3117 if diff_grantees: 3118 diffs[key] = diff_grantees 3119 return diffs 3120 3121 return _diffs(new_config, old_config), _diffs(old_config, new_config) 3122 3123 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 3124 """Returns current grants for a table as a dictionary. 3125 3126 This method queries the database and returns the current grants/permissions 3127 for the given table, parsed into a dictionary format. The it handles 3128 case-insensitive comparison between these current grants and the desired 3129 grants from model configuration. 3130 3131 Args: 3132 table: The table/view to query grants for. 3133 3134 Returns: 3135 Dictionary mapping permissions to lists of grantees. Permission names 3136 should be returned as the database provides them (typically uppercase 3137 for standard SQL permissions, but engine-specific roles may vary). 3138 3139 Raises: 3140 NotImplementedError: If the engine does not support grants. 3141 """ 3142 if not self.SUPPORTS_GRANTS: 3143 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3144 raise NotImplementedError("Subclass must implement get_current_grants") 3145 3146 def _apply_grants_config_expr( 3147 self, 3148 table: exp.Table, 3149 grants_config: GrantsConfig, 3150 table_type: DataObjectType = DataObjectType.TABLE, 3151 ) -> t.List[exp.Expression]: 3152 """Returns SQLGlot Grant expressions to apply grants to a table. 3153 3154 Args: 3155 table: The table/view to grant permissions on. 3156 grants_config: Dictionary mapping permissions to lists of grantees. 3157 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3158 3159 Returns: 3160 List of SQLGlot expressions for grant operations. 3161 3162 Raises: 3163 NotImplementedError: If the engine does not support grants. 3164 """ 3165 if not self.SUPPORTS_GRANTS: 3166 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3167 raise NotImplementedError("Subclass must implement _apply_grants_config_expr") 3168 3169 def _revoke_grants_config_expr( 3170 self, 3171 table: exp.Table, 3172 grants_config: GrantsConfig, 3173 table_type: DataObjectType = DataObjectType.TABLE, 3174 ) -> t.List[exp.Expression]: 3175 """Returns SQLGlot expressions to revoke grants from a table. 3176 3177 Args: 3178 table: The table/view to revoke permissions from. 3179 grants_config: Dictionary mapping permissions to lists of grantees. 3180 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 3181 3182 Returns: 3183 List of SQLGlot expressions for revoke operations. 3184 3185 Raises: 3186 NotImplementedError: If the engine does not support grants. 3187 """ 3188 if not self.SUPPORTS_GRANTS: 3189 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 3190 raise NotImplementedError("Subclass must implement _revoke_grants_config_expr")
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
127 def __init__( 128 self, 129 connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool], 130 dialect: str = "", 131 sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None, 132 multithreaded: bool = False, 133 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 134 default_catalog: t.Optional[str] = None, 135 execute_log_level: int = logging.DEBUG, 136 register_comments: bool = True, 137 pre_ping: bool = False, 138 pretty_sql: bool = False, 139 shared_connection: bool = False, 140 correlation_id: t.Optional[CorrelationId] = None, 141 schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None, 142 query_execution_tracker: t.Optional[QueryExecutionTracker] = None, 143 **kwargs: t.Any, 144 ): 145 self.dialect = dialect.lower() or self.DIALECT 146 self._connection_pool = ( 147 connection_factory_or_pool 148 if isinstance(connection_factory_or_pool, ConnectionPool) 149 else create_connection_pool( 150 connection_factory_or_pool, 151 multithreaded, 152 shared_connection=shared_connection, 153 cursor_init=cursor_init, 154 ) 155 ) 156 self._sql_gen_kwargs = sql_gen_kwargs or {} 157 self._default_catalog = default_catalog 158 self._execute_log_level = execute_log_level 159 self._extra_config = kwargs 160 self._register_comments = register_comments 161 self._pre_ping = pre_ping 162 self._pretty_sql = pretty_sql 163 self._multithreaded = multithreaded 164 self.correlation_id = correlation_id 165 self._schema_differ_overrides = schema_differ_overrides 166 self._query_execution_tracker = query_execution_tracker 167 self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {}
169 def with_settings(self, **kwargs: t.Any) -> EngineAdapter: 170 extra_kwargs = { 171 "null_connection": True, 172 "execute_log_level": kwargs.pop("execute_log_level", self._execute_log_level), 173 "correlation_id": kwargs.pop("correlation_id", self.correlation_id), 174 "query_execution_tracker": kwargs.pop( 175 "query_execution_tracker", self._query_execution_tracker 176 ), 177 **self._extra_config, 178 **kwargs, 179 } 180 181 adapter = self.__class__( 182 self._connection_pool, 183 dialect=self.dialect, 184 sql_gen_kwargs=self._sql_gen_kwargs, 185 default_catalog=self._default_catalog, 186 register_comments=self._register_comments, 187 multithreaded=self._multithreaded, 188 pretty_sql=self._pretty_sql, 189 **extra_kwargs, 190 ) 191 192 return adapter
257 @property 258 def default_catalog(self) -> t.Optional[str]: 259 if self.catalog_support.is_unsupported: 260 return None 261 default_catalog = self._default_catalog or self.get_current_catalog() 262 if not default_catalog: 263 raise MissingDefaultCatalogError( 264 "Could not determine a default catalog despite it being supported." 265 ) 266 return default_catalog
417 def recycle(self) -> None: 418 """Closes all open connections and releases all allocated resources associated with any thread 419 except the calling one.""" 420 self._connection_pool.close_all(exclude_calling_thread=True)
Closes all open connections and releases all allocated resources associated with any thread except the calling one.
422 def close(self) -> t.Any: 423 """Closes all open connections and releases all allocated resources.""" 424 self._connection_pool.close_all()
Closes all open connections and releases all allocated resources.
426 def get_current_catalog(self) -> t.Optional[str]: 427 """Returns the catalog name of the current connection.""" 428 raise NotImplementedError()
Returns the catalog name of the current connection.
430 def set_current_catalog(self, catalog: str) -> None: 431 """Sets the catalog name of the current connection.""" 432 raise NotImplementedError()
Sets the catalog name of the current connection.
434 def get_catalog_type(self, catalog: t.Optional[str]) -> str: 435 """Intended to be overridden for data virtualization systems like Trino that, 436 depending on the target catalog, require slightly different properties to be set when creating / updating tables 437 """ 438 if self.catalog_support.is_unsupported: 439 raise UnsupportedCatalogOperationError( 440 f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" 441 ) 442 return ( 443 self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE) 444 if catalog 445 else self.DEFAULT_CATALOG_TYPE 446 )
Intended to be overridden for data virtualization systems like Trino that, depending on the target catalog, require slightly different properties to be set when creating / updating tables
448 def get_catalog_type_from_table(self, table: TableName) -> str: 449 """Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type""" 450 catalog = exp.to_table(table).catalog or self.get_current_catalog() 451 return self.get_catalog_type(catalog)
Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type
453 @property 454 def current_catalog_type(self) -> str: 455 # `get_catalog_type_from_table` should be used over this property. Reason is that the table that is the target 456 # of the operation is what matters and not the catalog type of the connection. 457 # This still remains for legacy reasons and should be refactored out. 458 return self.get_catalog_type(self.get_current_catalog())
460 def replace_query( 461 self, 462 table_name: TableName, 463 query_or_df: QueryOrDF, 464 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 465 table_description: t.Optional[str] = None, 466 column_descriptions: t.Optional[t.Dict[str, str]] = None, 467 source_columns: t.Optional[t.List[str]] = None, 468 supports_replace_table_override: t.Optional[bool] = None, 469 **kwargs: t.Any, 470 ) -> None: 471 """Replaces an existing table with a query. 472 473 For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used. 474 475 Args: 476 table_name: The name of the table (eg. prod.table) 477 query_or_df: The SQL query to run or a dataframe. 478 target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. 479 Expected to be ordered to match the order of values in the dataframe. 480 kwargs: Optional create table properties. 481 """ 482 target_table = exp.to_table(table_name) 483 484 target_data_object = self.get_data_object(target_table) 485 table_exists = target_data_object is not None 486 if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE): 487 table_exists = False 488 489 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 490 query_or_df, 491 target_columns_to_types, 492 target_table=target_table, 493 source_columns=source_columns, 494 ) 495 if not target_columns_to_types and table_exists: 496 target_columns_to_types = self.columns(target_table) 497 query = source_queries[0].query_factory() 498 self_referencing = any( 499 quote_identifiers(table) == quote_identifiers(target_table) 500 for table in query.find_all(exp.Table) 501 ) 502 # If a query references itself then it must have a table created regardless of approach used. 503 if self_referencing: 504 if not target_columns_to_types: 505 raise SQLMeshError( 506 f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. " 507 "Try casting the columns to an expected type or defining the columns in the model metadata. " 508 ) 509 self._create_table_from_columns( 510 target_table, 511 target_columns_to_types, 512 exists=True, 513 table_description=table_description, 514 column_descriptions=column_descriptions, 515 **kwargs, 516 ) 517 # All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we 518 # use `CREATE OR REPLACE TABLE AS` if the engine supports it 519 supports_replace_table = ( 520 self.SUPPORTS_REPLACE_TABLE 521 if supports_replace_table_override is None 522 else supports_replace_table_override 523 ) 524 if supports_replace_table or not table_exists: 525 return self._create_table_from_source_queries( 526 target_table, 527 source_queries, 528 target_columns_to_types, 529 replace=supports_replace_table, 530 table_description=table_description, 531 column_descriptions=column_descriptions, 532 **kwargs, 533 ) 534 if self_referencing: 535 assert target_columns_to_types is not None 536 with self.temp_table( 537 self._select_columns(target_columns_to_types).from_(target_table), 538 name=target_table, 539 target_columns_to_types=target_columns_to_types, 540 **kwargs, 541 ) as temp_table: 542 for source_query in source_queries: 543 source_query.add_transform( 544 lambda node: ( # type: ignore 545 temp_table # type: ignore 546 if isinstance(node, exp.Table) 547 and quote_identifiers(node) == quote_identifiers(target_table) 548 else node 549 ) 550 ) 551 return self._insert_overwrite_by_condition( 552 target_table, 553 source_queries, 554 target_columns_to_types, 555 **kwargs, 556 ) 557 return self._insert_overwrite_by_condition( 558 target_table, 559 source_queries, 560 target_columns_to_types, 561 **kwargs, 562 )
Replaces an existing table with a query.
For partition based engines (hive, spark), insert override is used. For other systems, create or replace is used.
Arguments:
- table_name: The name of the table (eg. prod.table)
- query_or_df: The SQL query to run or a dataframe.
- target_columns_to_types: Only used if a dataframe is provided. A mapping between the column name and its data type. Expected to be ordered to match the order of values in the dataframe.
- kwargs: Optional create table properties.
564 def create_index( 565 self, 566 table_name: TableName, 567 index_name: str, 568 columns: t.Tuple[str, ...], 569 exists: bool = True, 570 ) -> None: 571 """Creates a new index for the given table if supported 572 573 Args: 574 table_name: The name of the target table. 575 index_name: The name of the index. 576 columns: The list of columns that constitute the index. 577 exists: Indicates whether to include the IF NOT EXISTS check. 578 """ 579 if not self.SUPPORTS_INDEXES: 580 return 581 582 expression = exp.Create( 583 this=exp.Index( 584 this=exp.to_identifier(index_name), 585 table=exp.to_table(table_name), 586 params=exp.IndexParameters(columns=[exp.to_column(c) for c in columns]), 587 ), 588 kind="INDEX", 589 exists=exists, 590 ) 591 self.execute(expression)
Creates a new index for the given table if supported
Arguments:
- table_name: The name of the target table.
- index_name: The name of the index.
- columns: The list of columns that constitute the index.
- exists: Indicates whether to include the IF NOT EXISTS check.
620 def create_table( 621 self, 622 table_name: TableName, 623 target_columns_to_types: t.Dict[str, exp.DataType], 624 primary_key: t.Optional[t.Tuple[str, ...]] = None, 625 exists: bool = True, 626 table_description: t.Optional[str] = None, 627 column_descriptions: t.Optional[t.Dict[str, str]] = None, 628 **kwargs: t.Any, 629 ) -> None: 630 """Create a table using a DDL statement 631 632 Args: 633 table_name: The name of the table to create. Can be fully qualified or just table name. 634 target_columns_to_types: A mapping between the column name and its data type. 635 primary_key: Determines the table primary key. 636 exists: Indicates whether to include the IF NOT EXISTS check. 637 table_description: Optional table description from MODEL DDL. 638 column_descriptions: Optional column descriptions from model query. 639 kwargs: Optional create table properties. 640 """ 641 self._create_table_from_columns( 642 table_name, 643 target_columns_to_types, 644 primary_key, 645 exists, 646 table_description, 647 column_descriptions, 648 **kwargs, 649 )
Create a table using a DDL statement
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- target_columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
- exists: Indicates whether to include the IF NOT EXISTS check.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
651 def create_managed_table( 652 self, 653 table_name: TableName, 654 query: Query, 655 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 656 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 657 clustered_by: t.Optional[t.List[exp.Expression]] = None, 658 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 659 table_description: t.Optional[str] = None, 660 column_descriptions: t.Optional[t.Dict[str, str]] = None, 661 source_columns: t.Optional[t.List[str]] = None, 662 **kwargs: t.Any, 663 ) -> None: 664 """Create a managed table using a query. 665 666 "Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh. 667 668 Args: 669 table_name: The name of the table to create. Can be fully qualified or just table name. 670 query: The SQL query for the engine to base the managed table on 671 target_columns_to_types: A mapping between the column name and its data type. 672 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 673 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 674 table_properties: Optional mapping of engine-specific properties to be set on the managed table 675 table_description: Optional table description from MODEL DDL. 676 column_descriptions: Optional column descriptions from model query. 677 kwargs: Optional create table properties. 678 """ 679 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}")
Create a managed table using a query.
"Managed" means that once the table is created, the data is kept up to date by the underlying database engine and not SQLMesh.
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- query: The SQL query for the engine to base the managed table on
- target_columns_to_types: A mapping between the column name and its data type.
- partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour))
- clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour))
- table_properties: Optional mapping of engine-specific properties to be set on the managed table
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
681 def ctas( 682 self, 683 table_name: TableName, 684 query_or_df: QueryOrDF, 685 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 686 exists: bool = True, 687 table_description: t.Optional[str] = None, 688 column_descriptions: t.Optional[t.Dict[str, str]] = None, 689 source_columns: t.Optional[t.List[str]] = None, 690 **kwargs: t.Any, 691 ) -> None: 692 """Create a table using a CTAS statement 693 694 Args: 695 table_name: The name of the table to create. Can be fully qualified or just table name. 696 query_or_df: The SQL query to run or a dataframe for the CTAS. 697 target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame. 698 exists: Indicates whether to include the IF NOT EXISTS check. 699 table_description: Optional table description from MODEL DDL. 700 column_descriptions: Optional column descriptions from model query. 701 kwargs: Optional create table properties. 702 """ 703 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 704 query_or_df, 705 target_columns_to_types, 706 target_table=table_name, 707 source_columns=source_columns, 708 ) 709 return self._create_table_from_source_queries( 710 table_name, 711 source_queries, 712 target_columns_to_types, 713 exists, 714 table_description=table_description, 715 column_descriptions=column_descriptions, 716 **kwargs, 717 )
Create a table using a CTAS statement
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- query_or_df: The SQL query to run or a dataframe for the CTAS.
- target_columns_to_types: A mapping between the column name and its data type. Required if using a DataFrame.
- exists: Indicates whether to include the IF NOT EXISTS check.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- kwargs: Optional create table properties.
719 def create_state_table( 720 self, 721 table_name: str, 722 target_columns_to_types: t.Dict[str, exp.DataType], 723 primary_key: t.Optional[t.Tuple[str, ...]] = None, 724 ) -> None: 725 """Create a table to store SQLMesh internal state. 726 727 Args: 728 table_name: The name of the table to create. Can be fully qualified or just table name. 729 target_columns_to_types: A mapping between the column name and its data type. 730 primary_key: Determines the table primary key. 731 """ 732 self.create_table( 733 table_name, 734 target_columns_to_types, 735 primary_key=primary_key, 736 )
Create a table to store SQLMesh internal state.
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- target_columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
1043 def create_table_like( 1044 self, 1045 target_table_name: TableName, 1046 source_table_name: TableName, 1047 exists: bool = True, 1048 **kwargs: t.Any, 1049 ) -> None: 1050 """Create a table to store SQLMesh internal state based on the definition of another table, including any 1051 column attributes and indexes defined in the original table. 1052 1053 Args: 1054 target_table_name: The name of the table to create. Can be fully qualified or just table name. 1055 source_table_name: The name of the table to base the new table on. 1056 """ 1057 self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs) 1058 self._clear_data_object_cache(target_table_name)
Create a table to store SQLMesh internal state based on the definition of another table, including any column attributes and indexes defined in the original table.
Arguments:
- target_table_name: The name of the table to create. Can be fully qualified or just table name.
- source_table_name: The name of the table to base the new table on.
1060 def clone_table( 1061 self, 1062 target_table_name: TableName, 1063 source_table_name: TableName, 1064 replace: bool = False, 1065 exists: bool = True, 1066 clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 1067 **kwargs: t.Any, 1068 ) -> None: 1069 """Creates a table with the target name by cloning the source table. 1070 1071 Args: 1072 target_table_name: The name of the table that should be created. 1073 source_table_name: The name of the source table that should be cloned. 1074 replace: Whether or not to replace an existing table. 1075 exists: Indicates whether to include the IF NOT EXISTS check. 1076 """ 1077 if not self.SUPPORTS_CLONING: 1078 raise NotImplementedError(f"Engine does not support cloning: {type(self)}") 1079 1080 kwargs.pop("rendered_physical_properties", None) 1081 self.execute( 1082 exp.Create( 1083 this=exp.to_table(target_table_name), 1084 kind="TABLE", 1085 replace=replace, 1086 exists=exists, 1087 clone=exp.Clone( 1088 this=exp.to_table(source_table_name), 1089 **(clone_kwargs or {}), 1090 ), 1091 **kwargs, 1092 ) 1093 ) 1094 self._clear_data_object_cache(target_table_name)
Creates a table with the target name by cloning the source table.
Arguments:
- target_table_name: The name of the table that should be created.
- source_table_name: The name of the source table that should be cloned.
- replace: Whether or not to replace an existing table.
- exists: Indicates whether to include the IF NOT EXISTS check.
1096 def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None: 1097 """Drops a data object of arbitrary type. 1098 1099 Args: 1100 data_object: The data object to drop. 1101 ignore_if_not_exists: If True, no error will be raised if the data object does not exist. 1102 """ 1103 if data_object.type.is_view: 1104 self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists) 1105 elif data_object.type.is_materialized_view: 1106 self.drop_view( 1107 data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True 1108 ) 1109 elif data_object.type.is_table: 1110 self.drop_table(data_object.to_table(), exists=ignore_if_not_exists) 1111 elif data_object.type.is_managed_table: 1112 self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists) 1113 else: 1114 raise SQLMeshError( 1115 f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" 1116 )
Drops a data object of arbitrary type.
Arguments:
- data_object: The data object to drop.
- ignore_if_not_exists: If True, no error will be raised if the data object does not exist.
1118 def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: 1119 """Drops a table. 1120 1121 Args: 1122 table_name: The name of the table to drop. 1123 exists: If exists, defaults to True. 1124 """ 1125 self._drop_object(name=table_name, exists=exists, **kwargs)
Drops a table.
Arguments:
- table_name: The name of the table to drop.
- exists: If exists, defaults to True.
1127 def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: 1128 """Drops a managed table. 1129 1130 Args: 1131 table_name: The name of the table to drop. 1132 exists: If exists, defaults to True. 1133 """ 1134 raise NotImplementedError(f"Engine does not support managed tables: {type(self)}")
Drops a managed table.
Arguments:
- table_name: The name of the table to drop.
- exists: If exists, defaults to True.
1162 def get_alter_operations( 1163 self, 1164 current_table_name: TableName, 1165 target_table_name: TableName, 1166 *, 1167 ignore_destructive: bool = False, 1168 ignore_additive: bool = False, 1169 ) -> t.List[TableAlterOperation]: 1170 """ 1171 Determines the alter statements needed to change the current table into the structure of the target table. 1172 """ 1173 return t.cast( 1174 t.List[TableAlterOperation], 1175 self.schema_differ.compare_columns( 1176 current_table_name, 1177 self.columns(current_table_name), 1178 self.columns(target_table_name), 1179 ignore_destructive=ignore_destructive, 1180 ignore_additive=ignore_additive, 1181 ), 1182 )
Determines the alter statements needed to change the current table into the structure of the target table.
1184 def alter_table( 1185 self, 1186 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 1187 ) -> None: 1188 """ 1189 Performs the alter statements to change the current table into the structure of the target table. 1190 """ 1191 with self.transaction(): 1192 for alter_expression in [ 1193 x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions 1194 ]: 1195 self.execute(alter_expression)
Performs the alter statements to change the current table into the structure of the target table.
1197 def create_view( 1198 self, 1199 view_name: TableName, 1200 query_or_df: QueryOrDF, 1201 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1202 replace: bool = True, 1203 materialized: bool = False, 1204 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 1205 table_description: t.Optional[str] = None, 1206 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1207 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 1208 source_columns: t.Optional[t.List[str]] = None, 1209 **create_kwargs: t.Any, 1210 ) -> None: 1211 """Create a view with a query or dataframe. 1212 1213 If a dataframe is passed in, it will be converted into a literal values statement. 1214 This should only be done if the dataframe is very small! 1215 1216 Args: 1217 view_name: The view name. 1218 query_or_df: A query or dataframe. 1219 target_columns_to_types: Columns to use in the view statement. 1220 replace: Whether or not to replace an existing view defaults to True. 1221 materialized: Whether to create a a materialized view. Only used for engines that support this feature. 1222 materialized_properties: Optional materialized view properties to add to the view. 1223 table_description: Optional table description from MODEL DDL. 1224 column_descriptions: Optional column descriptions from model query. 1225 view_properties: Optional view properties to add to the view. 1226 create_kwargs: Additional kwargs to pass into the Create expression 1227 """ 1228 import pandas as pd 1229 1230 if materialized_properties and not materialized: 1231 raise SQLMeshError("Materialized properties are only supported for materialized views") 1232 1233 query_or_df = self._native_df_to_pandas_df(query_or_df) 1234 1235 if isinstance(query_or_df, pd.DataFrame): 1236 values: t.List[t.Tuple[t.Any, ...]] = list( 1237 query_or_df.itertuples(index=False, name=None) 1238 ) 1239 target_columns_to_types, source_columns = self._columns_to_types( 1240 query_or_df, target_columns_to_types, source_columns 1241 ) 1242 if not target_columns_to_types: 1243 raise SQLMeshError("columns_to_types must be provided for dataframes") 1244 source_columns_to_types = get_source_columns_to_types( 1245 target_columns_to_types, source_columns 1246 ) 1247 query_or_df = self._values_to_sql( 1248 values, 1249 source_columns_to_types, 1250 batch_start=0, 1251 batch_end=len(values), 1252 ) 1253 1254 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1255 query_or_df, 1256 target_columns_to_types, 1257 batch_size=0, 1258 target_table=view_name, 1259 source_columns=source_columns, 1260 ) 1261 if len(source_queries) != 1: 1262 raise SQLMeshError("Only one source query is supported for creating views") 1263 1264 schema: t.Union[exp.Table, exp.Schema] = exp.to_table(view_name) 1265 if target_columns_to_types: 1266 schema = self._build_schema_exp( 1267 exp.to_table(view_name), 1268 target_columns_to_types, 1269 column_descriptions, 1270 is_view=True, 1271 materialized=materialized, 1272 ) 1273 1274 properties = create_kwargs.pop("properties", None) 1275 if not properties: 1276 properties = exp.Properties(expressions=[]) 1277 1278 if view_properties: 1279 table_type = self._pop_creatable_type_from_properties(view_properties) 1280 if table_type: 1281 properties.append("expressions", table_type) 1282 1283 if materialized and self.SUPPORTS_MATERIALIZED_VIEWS: 1284 properties.append("expressions", exp.MaterializedProperty()) 1285 1286 if not self.SUPPORTS_MATERIALIZED_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1287 schema = schema.this 1288 1289 if not self.SUPPORTS_VIEW_SCHEMA and isinstance(schema, exp.Schema): 1290 schema = schema.this 1291 1292 if materialized_properties: 1293 partitioned_by = materialized_properties.pop("partitioned_by", None) 1294 clustered_by = materialized_properties.pop("clustered_by", None) 1295 if ( 1296 partitioned_by 1297 and ( 1298 partitioned_by_prop := self._build_partitioned_by_exp( 1299 partitioned_by, **materialized_properties 1300 ) 1301 ) 1302 is not None 1303 ): 1304 materialized_properties["catalog_name"] = exp.to_table(view_name).catalog 1305 properties.append("expressions", partitioned_by_prop) 1306 if ( 1307 clustered_by 1308 and ( 1309 clustered_by_prop := self._build_clustered_by_exp( 1310 clustered_by, **materialized_properties 1311 ) 1312 ) 1313 is not None 1314 ): 1315 properties.append("expressions", clustered_by_prop) 1316 1317 create_view_properties = self._build_view_properties_exp( 1318 view_properties, 1319 ( 1320 table_description 1321 if self.COMMENT_CREATION_VIEW.supports_schema_def and self.comments_enabled 1322 else None 1323 ), 1324 physical_cluster=create_kwargs.pop("physical_cluster", None), 1325 ) 1326 if create_view_properties: 1327 for view_property in create_view_properties.expressions: 1328 # Small hack to make sure SECURE goes at the beginning before materialized as required by Snowflake 1329 if isinstance(view_property, exp.SecureProperty): 1330 properties.set("expressions", view_property, index=0, overwrite=False) 1331 else: 1332 properties.append("expressions", view_property) 1333 1334 if properties.expressions: 1335 create_kwargs["properties"] = properties 1336 1337 if replace: 1338 self.drop_data_object_on_type_mismatch( 1339 self.get_data_object(view_name), 1340 DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, 1341 ) 1342 1343 with source_queries[0] as query: 1344 self.execute( 1345 exp.Create( 1346 this=schema, 1347 kind="VIEW", 1348 replace=replace, 1349 expression=query, 1350 **create_kwargs, 1351 ), 1352 quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS, 1353 ) 1354 1355 self._clear_data_object_cache(view_name) 1356 1357 # Register table comment with commands if the engine doesn't support doing it in CREATE 1358 if ( 1359 table_description 1360 and self.COMMENT_CREATION_VIEW.is_comment_command_only 1361 and self.comments_enabled 1362 ): 1363 self._create_table_comment(view_name, table_description, "VIEW") 1364 # Register column comments with commands if the engine doesn't support doing it in 1365 # CREATE or we couldn't do it in the CREATE schema definition because we don't have 1366 # columns_to_types 1367 if ( 1368 column_descriptions 1369 and ( 1370 self.COMMENT_CREATION_VIEW.is_comment_command_only 1371 or ( 1372 self.COMMENT_CREATION_VIEW.is_in_schema_def_and_commands 1373 and not target_columns_to_types 1374 ) 1375 ) 1376 and self.comments_enabled 1377 ): 1378 self._create_column_comments(view_name, column_descriptions, "VIEW", materialized)
Create a view with a query or dataframe.
If a dataframe is passed in, it will be converted into a literal values statement. This should only be done if the dataframe is very small!
Arguments:
- view_name: The view name.
- query_or_df: A query or dataframe.
- target_columns_to_types: Columns to use in the view statement.
- replace: Whether or not to replace an existing view defaults to True.
- materialized: Whether to create a a materialized view. Only used for engines that support this feature.
- materialized_properties: Optional materialized view properties to add to the view.
- table_description: Optional table description from MODEL DDL.
- column_descriptions: Optional column descriptions from model query.
- view_properties: Optional view properties to add to the view.
- create_kwargs: Additional kwargs to pass into the Create expression
1380 @set_catalog() 1381 def create_schema( 1382 self, 1383 schema_name: SchemaName, 1384 ignore_if_exists: bool = True, 1385 warn_on_error: bool = True, 1386 properties: t.Optional[t.List[exp.Expression]] = None, 1387 ) -> None: 1388 properties = properties or [] 1389 return self._create_schema( 1390 schema_name=schema_name, 1391 ignore_if_exists=ignore_if_exists, 1392 warn_on_error=warn_on_error, 1393 properties=properties, 1394 kind="SCHEMA", 1395 )
1422 def drop_schema( 1423 self, 1424 schema_name: SchemaName, 1425 ignore_if_not_exists: bool = True, 1426 cascade: bool = False, 1427 **drop_args: t.Dict[str, exp.Expression], 1428 ) -> None: 1429 return self._drop_object( 1430 name=schema_name, 1431 exists=ignore_if_not_exists, 1432 kind="SCHEMA", 1433 cascade=cascade, 1434 **drop_args, 1435 )
1437 def drop_view( 1438 self, 1439 view_name: TableName, 1440 ignore_if_not_exists: bool = True, 1441 materialized: bool = False, 1442 **kwargs: t.Any, 1443 ) -> None: 1444 """Drop a view.""" 1445 self._drop_object( 1446 name=view_name, 1447 exists=ignore_if_not_exists, 1448 kind="VIEW", 1449 materialized=materialized and self.SUPPORTS_MATERIALIZED_VIEWS, 1450 **kwargs, 1451 )
Drop a view.
1469 def columns( 1470 self, table_name: TableName, include_pseudo_columns: bool = False 1471 ) -> t.Dict[str, exp.DataType]: 1472 """Fetches column names and types for the target table.""" 1473 self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE")) 1474 describe_output = self.cursor.fetchall() 1475 return { 1476 # Note: MySQL returns the column type as bytes. 1477 column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect) 1478 for column_name, column_type, *_ in itertools.takewhile( 1479 lambda t: not t[0].startswith("#"), 1480 describe_output, 1481 ) 1482 if column_name and column_name.strip() and column_type and column_type.strip() 1483 }
Fetches column names and types for the target table.
1485 def table_exists(self, table_name: TableName) -> bool: 1486 table = exp.to_table(table_name) 1487 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 1488 if data_object_cache_key in self._data_object_cache: 1489 logger.debug("Table existence cache hit: %s", data_object_cache_key) 1490 return self._data_object_cache[data_object_cache_key] is not None 1491 1492 try: 1493 self.execute(exp.Describe(this=table, kind="TABLE")) 1494 return True 1495 except Exception: 1496 return False
1501 def insert_append( 1502 self, 1503 table_name: TableName, 1504 query_or_df: QueryOrDF, 1505 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1506 track_rows_processed: bool = True, 1507 source_columns: t.Optional[t.List[str]] = None, 1508 ) -> None: 1509 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1510 query_or_df, 1511 target_columns_to_types, 1512 target_table=table_name, 1513 source_columns=source_columns, 1514 ) 1515 self._insert_append_source_queries( 1516 table_name, source_queries, target_columns_to_types, track_rows_processed 1517 )
1552 def insert_overwrite_by_partition( 1553 self, 1554 table_name: TableName, 1555 query_or_df: QueryOrDF, 1556 partitioned_by: t.List[exp.Expression], 1557 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1558 source_columns: t.Optional[t.List[str]] = None, 1559 ) -> None: 1560 if self.INSERT_OVERWRITE_STRATEGY.is_insert_overwrite: 1561 target_table = exp.to_table(table_name) 1562 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1563 query_or_df, 1564 target_columns_to_types, 1565 target_table=target_table, 1566 source_columns=source_columns, 1567 ) 1568 self._insert_overwrite_by_condition( 1569 table_name, source_queries, target_columns_to_types=target_columns_to_types 1570 ) 1571 else: 1572 self._replace_by_key( 1573 table_name, 1574 query_or_df, 1575 target_columns_to_types, 1576 partitioned_by, 1577 is_unique_key=False, 1578 source_columns=source_columns, 1579 )
1581 def insert_overwrite_by_time_partition( 1582 self, 1583 table_name: TableName, 1584 query_or_df: QueryOrDF, 1585 start: TimeLike, 1586 end: TimeLike, 1587 time_formatter: t.Callable[ 1588 [TimeLike, t.Optional[t.Dict[str, exp.DataType]]], exp.Expression 1589 ], 1590 time_column: TimeColumn | exp.Expression | str, 1591 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1592 source_columns: t.Optional[t.List[str]] = None, 1593 **kwargs: t.Any, 1594 ) -> None: 1595 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 1596 query_or_df, 1597 target_columns_to_types, 1598 target_table=table_name, 1599 source_columns=source_columns, 1600 ) 1601 if not target_columns_to_types or not columns_to_types_all_known(target_columns_to_types): 1602 target_columns_to_types = self.columns(table_name) 1603 low, high = [ 1604 time_formatter(dt, target_columns_to_types) 1605 for dt in make_inclusive(start, end, self.dialect) 1606 ] 1607 if isinstance(time_column, TimeColumn): 1608 time_column = time_column.column 1609 where = exp.Between( 1610 this=exp.to_column(time_column) if isinstance(time_column, str) else time_column, 1611 low=low, 1612 high=high, 1613 ) 1614 return self._insert_overwrite_by_time_partition( 1615 table_name, source_queries, target_columns_to_types, where, **kwargs 1616 )
1741 def scd_type_2_by_time( 1742 self, 1743 target_table: TableName, 1744 source_table: QueryOrDF, 1745 unique_key: t.Sequence[exp.Expression], 1746 valid_from_col: exp.Column, 1747 valid_to_col: exp.Column, 1748 execution_time: t.Union[TimeLike, exp.Column], 1749 updated_at_col: exp.Column, 1750 invalidate_hard_deletes: bool = True, 1751 updated_at_as_valid_from: bool = False, 1752 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1753 table_description: t.Optional[str] = None, 1754 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1755 truncate: bool = False, 1756 source_columns: t.Optional[t.List[str]] = None, 1757 **kwargs: t.Any, 1758 ) -> None: 1759 self._scd_type_2( 1760 target_table=target_table, 1761 source_table=source_table, 1762 unique_key=unique_key, 1763 valid_from_col=valid_from_col, 1764 valid_to_col=valid_to_col, 1765 execution_time=execution_time, 1766 updated_at_col=updated_at_col, 1767 invalidate_hard_deletes=invalidate_hard_deletes, 1768 updated_at_as_valid_from=updated_at_as_valid_from, 1769 target_columns_to_types=target_columns_to_types, 1770 table_description=table_description, 1771 column_descriptions=column_descriptions, 1772 truncate=truncate, 1773 source_columns=source_columns, 1774 **kwargs, 1775 )
1777 def scd_type_2_by_column( 1778 self, 1779 target_table: TableName, 1780 source_table: QueryOrDF, 1781 unique_key: t.Sequence[exp.Expression], 1782 valid_from_col: exp.Column, 1783 valid_to_col: exp.Column, 1784 execution_time: t.Union[TimeLike, exp.Column], 1785 check_columns: t.Union[exp.Star, t.Sequence[exp.Expression]], 1786 invalidate_hard_deletes: bool = True, 1787 execution_time_as_valid_from: bool = False, 1788 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1789 table_description: t.Optional[str] = None, 1790 column_descriptions: t.Optional[t.Dict[str, str]] = None, 1791 truncate: bool = False, 1792 source_columns: t.Optional[t.List[str]] = None, 1793 **kwargs: t.Any, 1794 ) -> None: 1795 self._scd_type_2( 1796 target_table=target_table, 1797 source_table=source_table, 1798 unique_key=unique_key, 1799 valid_from_col=valid_from_col, 1800 valid_to_col=valid_to_col, 1801 execution_time=execution_time, 1802 check_columns=check_columns, 1803 target_columns_to_types=target_columns_to_types, 1804 invalidate_hard_deletes=invalidate_hard_deletes, 1805 execution_time_as_valid_from=execution_time_as_valid_from, 1806 table_description=table_description, 1807 column_descriptions=column_descriptions, 1808 truncate=truncate, 1809 source_columns=source_columns, 1810 **kwargs, 1811 )
2206 def merge( 2207 self, 2208 target_table: TableName, 2209 source_table: QueryOrDF, 2210 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 2211 unique_key: t.Sequence[exp.Expression], 2212 when_matched: t.Optional[exp.Whens] = None, 2213 merge_filter: t.Optional[exp.Expression] = None, 2214 source_columns: t.Optional[t.List[str]] = None, 2215 **kwargs: t.Any, 2216 ) -> None: 2217 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2218 source_table, 2219 target_columns_to_types, 2220 target_table=target_table, 2221 source_columns=source_columns, 2222 ) 2223 target_columns_to_types = target_columns_to_types or self.columns(target_table) 2224 on = exp.and_( 2225 *( 2226 add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) 2227 for part in unique_key 2228 ) 2229 ) 2230 if merge_filter: 2231 on = exp.and_(merge_filter, on) 2232 2233 if not when_matched: 2234 match_expressions = [ 2235 exp.When( 2236 matched=True, 2237 source=False, 2238 then=exp.Update( 2239 expressions=[ 2240 exp.column(col, MERGE_TARGET_ALIAS).eq( 2241 exp.column(col, MERGE_SOURCE_ALIAS) 2242 ) 2243 for col in target_columns_to_types 2244 ], 2245 ), 2246 ) 2247 ] 2248 else: 2249 match_expressions = when_matched.copy().expressions 2250 2251 match_expressions.append( 2252 exp.When( 2253 matched=False, 2254 source=False, 2255 then=exp.Insert( 2256 this=exp.Tuple( 2257 expressions=[exp.column(col) for col in target_columns_to_types] 2258 ), 2259 expression=exp.Tuple( 2260 expressions=[ 2261 exp.column(col, MERGE_SOURCE_ALIAS) for col in target_columns_to_types 2262 ] 2263 ), 2264 ), 2265 ) 2266 ) 2267 for source_query in source_queries: 2268 with source_query as query: 2269 self._merge( 2270 target_table=target_table, 2271 query=query, 2272 on=on, 2273 whens=exp.Whens(expressions=match_expressions), 2274 )
2276 def rename_table( 2277 self, 2278 old_table_name: TableName, 2279 new_table_name: TableName, 2280 ) -> None: 2281 new_table = exp.to_table(new_table_name) 2282 if new_table.catalog: 2283 old_table = exp.to_table(old_table_name) 2284 catalog = old_table.catalog or self.get_current_catalog() 2285 if catalog != new_table.catalog: 2286 raise UnsupportedCatalogOperationError( 2287 "Tried to rename table across catalogs which is not supported" 2288 ) 2289 self._rename_table(old_table_name, new_table_name) 2290 self._clear_data_object_cache(old_table_name) 2291 self._clear_data_object_cache(new_table_name)
2293 def get_data_object( 2294 self, target_name: TableName, safe_to_cache: bool = False 2295 ) -> t.Optional[DataObject]: 2296 target_table = exp.to_table(target_name) 2297 existing_data_objects = self.get_data_objects( 2298 schema_(target_table.db, target_table.catalog), 2299 {target_table.name}, 2300 safe_to_cache=safe_to_cache, 2301 ) 2302 if existing_data_objects: 2303 return existing_data_objects[0] 2304 return None
2306 def get_data_objects( 2307 self, 2308 schema_name: SchemaName, 2309 object_names: t.Optional[t.Set[str]] = None, 2310 safe_to_cache: bool = False, 2311 ) -> t.List[DataObject]: 2312 """Lists all data objects in the target schema. 2313 2314 Args: 2315 schema_name: The name of the schema to list data objects from. 2316 object_names: If provided, only return data objects with these names. 2317 safe_to_cache: Whether it is safe to cache the results of this call. 2318 2319 Returns: 2320 A list of data objects in the target schema. 2321 """ 2322 if object_names is not None: 2323 if not object_names: 2324 return [] 2325 2326 # Check cache for each object name 2327 target_schema = to_schema(schema_name) 2328 cached_objects = [] 2329 missing_names = set() 2330 2331 for name in object_names: 2332 cache_key = _get_data_object_cache_key( 2333 target_schema.catalog, target_schema.db, name 2334 ) 2335 if cache_key in self._data_object_cache: 2336 logger.debug("Data object cache hit: %s", cache_key) 2337 data_object = self._data_object_cache[cache_key] 2338 # If the object is none, then the table was previously looked for but not found 2339 if data_object: 2340 cached_objects.append(data_object) 2341 else: 2342 logger.debug("Data object cache miss: %s", cache_key) 2343 missing_names.add(name) 2344 2345 # Fetch missing objects from database 2346 if missing_names: 2347 object_names_list = list(missing_names) 2348 batches = [ 2349 object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE] 2350 for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE) 2351 ] 2352 2353 fetched_objects = [] 2354 fetched_object_names = set() 2355 for batch in batches: 2356 objects = self._get_data_objects(schema_name, set(batch)) 2357 for obj in objects: 2358 if safe_to_cache: 2359 cache_key = _get_data_object_cache_key( 2360 obj.catalog, obj.schema_name, obj.name 2361 ) 2362 self._data_object_cache[cache_key] = obj 2363 fetched_objects.append(obj) 2364 fetched_object_names.add(obj.name) 2365 2366 if safe_to_cache: 2367 for missing_name in missing_names - fetched_object_names: 2368 cache_key = _get_data_object_cache_key( 2369 target_schema.catalog, target_schema.db, missing_name 2370 ) 2371 self._data_object_cache[cache_key] = None 2372 2373 return cached_objects + fetched_objects 2374 2375 return cached_objects 2376 2377 fetched_objects = self._get_data_objects(schema_name) 2378 if safe_to_cache: 2379 for obj in fetched_objects: 2380 cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name) 2381 self._data_object_cache[cache_key] = obj 2382 return fetched_objects
Lists all data objects in the target schema.
Arguments:
- schema_name: The name of the schema to list data objects from.
- object_names: If provided, only return data objects with these names.
- safe_to_cache: Whether it is safe to cache the results of this call.
Returns:
A list of data objects in the target schema.
2384 def fetchone( 2385 self, 2386 query: t.Union[exp.Expression, str], 2387 ignore_unsupported_errors: bool = False, 2388 quote_identifiers: bool = False, 2389 ) -> t.Optional[t.Tuple]: 2390 with self.transaction(): 2391 self.execute( 2392 query, 2393 ignore_unsupported_errors=ignore_unsupported_errors, 2394 quote_identifiers=quote_identifiers, 2395 ) 2396 return self.cursor.fetchone()
2398 def fetchall( 2399 self, 2400 query: t.Union[exp.Expression, str], 2401 ignore_unsupported_errors: bool = False, 2402 quote_identifiers: bool = False, 2403 ) -> t.List[t.Tuple]: 2404 with self.transaction(): 2405 self.execute( 2406 query, 2407 ignore_unsupported_errors=ignore_unsupported_errors, 2408 quote_identifiers=quote_identifiers, 2409 ) 2410 return self.cursor.fetchall()
2435 def fetchdf( 2436 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2437 ) -> pd.DataFrame: 2438 """Fetches a Pandas DataFrame from the cursor""" 2439 import pandas as pd 2440 2441 df = self._fetch_native_df(query, quote_identifiers=quote_identifiers) 2442 if not isinstance(df, pd.DataFrame): 2443 raise NotImplementedError( 2444 "The cursor's `fetch_native_df` method is not returning a pandas DataFrame. Need to update `fetchdf` so a Pandas DataFrame is returned" 2445 ) 2446 return df
Fetches a Pandas DataFrame from the cursor
2448 def fetch_pyspark_df( 2449 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 2450 ) -> PySparkDataFrame: 2451 """Fetches a PySpark DataFrame from the cursor""" 2452 raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")
Fetches a PySpark DataFrame from the cursor
2454 @property 2455 def wap_enabled(self) -> bool: 2456 """Returns whether WAP is enabled for this engine.""" 2457 return self._extra_config.get("wap_enabled", False)
Returns whether WAP is enabled for this engine.
2459 def wap_supported(self, table_name: TableName) -> bool: 2460 """Returns whether WAP for the target table is supported.""" 2461 return False
Returns whether WAP for the target table is supported.
2463 def wap_table_name(self, table_name: TableName, wap_id: str) -> str: 2464 """Returns the updated table name for the given WAP ID. 2465 2466 Args: 2467 table_name: The name of the target table. 2468 wap_id: The WAP ID to prepare. 2469 2470 Returns: 2471 The updated table name that should be used for writing. 2472 """ 2473 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Returns the updated table name for the given WAP ID.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to prepare.
Returns:
The updated table name that should be used for writing.
2475 def wap_prepare(self, table_name: TableName, wap_id: str) -> str: 2476 """Prepares the target table for WAP and returns the updated table name. 2477 2478 Args: 2479 table_name: The name of the target table. 2480 wap_id: The WAP ID to prepare. 2481 2482 Returns: 2483 The updated table name that should be used for writing. 2484 """ 2485 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Prepares the target table for WAP and returns the updated table name.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to prepare.
Returns:
The updated table name that should be used for writing.
2487 def wap_publish(self, table_name: TableName, wap_id: str) -> None: 2488 """Publishes changes with the given WAP ID to the target table. 2489 2490 Args: 2491 table_name: The name of the target table. 2492 wap_id: The WAP ID to publish. 2493 """ 2494 raise NotImplementedError(f"Engine does not support WAP: {type(self)}")
Publishes changes with the given WAP ID to the target table.
Arguments:
- table_name: The name of the target table.
- wap_id: The WAP ID to publish.
2496 def sync_grants_config( 2497 self, 2498 table: exp.Table, 2499 grants_config: GrantsConfig, 2500 table_type: DataObjectType = DataObjectType.TABLE, 2501 ) -> None: 2502 """Applies the grants_config to a table authoritatively. 2503 It first compares the specified grants against the current grants, and then 2504 applies the diffs to the table by revoking and granting privileges as needed. 2505 2506 Args: 2507 table: The table/view to apply grants to. 2508 grants_config: Dictionary mapping privileges to lists of grantees. 2509 table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW). 2510 """ 2511 if not self.SUPPORTS_GRANTS: 2512 raise NotImplementedError(f"Engine does not support grants: {type(self)}") 2513 2514 current_grants = self._get_current_grants_config(table) 2515 new_grants, revoked_grants = self._diff_grants_configs(grants_config, current_grants) 2516 revoke_exprs = self._revoke_grants_config_expr(table, revoked_grants, table_type) 2517 grant_exprs = self._apply_grants_config_expr(table, new_grants, table_type) 2518 dcl_exprs = revoke_exprs + grant_exprs 2519 2520 if dcl_exprs: 2521 self.execute(dcl_exprs)
Applies the grants_config to a table authoritatively. It first compares the specified grants against the current grants, and then applies the diffs to the table by revoking and granting privileges as needed.
Arguments:
- table: The table/view to apply grants to.
- grants_config: Dictionary mapping privileges to lists of grantees.
- table_type: The type of database object (TABLE, VIEW, MATERIALIZED_VIEW).
2523 @contextlib.contextmanager 2524 def transaction( 2525 self, 2526 condition: t.Optional[bool] = None, 2527 ) -> t.Iterator[None]: 2528 """A transaction context manager.""" 2529 if ( 2530 self._connection_pool.is_transaction_active 2531 or not self.SUPPORTS_TRANSACTIONS 2532 or (condition is not None and not condition) 2533 ): 2534 yield 2535 return 2536 2537 if self._pre_ping: 2538 try: 2539 logger.debug("Pinging the database to check the connection") 2540 self.ping() 2541 except Exception: 2542 logger.info("Connection to the database was lost. Reconnecting...") 2543 self._connection_pool.close() 2544 2545 self._connection_pool.begin() 2546 try: 2547 yield 2548 except Exception as e: 2549 self._connection_pool.rollback() 2550 raise e 2551 else: 2552 self._connection_pool.commit()
A transaction context manager.
2554 @contextlib.contextmanager 2555 def session(self, properties: SessionProperties) -> t.Iterator[None]: 2556 """A session context manager.""" 2557 if self._is_session_active(): 2558 yield 2559 return 2560 2561 self._begin_session(properties) 2562 try: 2563 yield 2564 finally: 2565 self._end_session()
A session context manager.
2577 def execute( 2578 self, 2579 expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]], 2580 ignore_unsupported_errors: bool = False, 2581 quote_identifiers: bool = True, 2582 track_rows_processed: bool = False, 2583 **kwargs: t.Any, 2584 ) -> None: 2585 """Execute a sql query.""" 2586 to_sql_kwargs = ( 2587 {"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {} 2588 ) 2589 with self.transaction(): 2590 for e in ensure_list(expressions): 2591 if isinstance(e, exp.Expression): 2592 self._check_identifier_length(e) 2593 sql = self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs) 2594 else: 2595 sql = t.cast(str, e) 2596 2597 sql = self._attach_correlation_id(sql) 2598 2599 self._log_sql( 2600 sql, 2601 expression=e if isinstance(e, exp.Expression) else None, 2602 quote_identifiers=quote_identifiers, 2603 ) 2604 self._execute(sql, track_rows_processed, **kwargs)
Execute a sql query.
2652 @contextlib.contextmanager 2653 def temp_table( 2654 self, 2655 query_or_df: QueryOrDF, 2656 name: TableName = "diff", 2657 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 2658 source_columns: t.Optional[t.List[str]] = None, 2659 **kwargs: t.Any, 2660 ) -> t.Iterator[exp.Table]: 2661 """A context manager for working a temp table. 2662 2663 The table will be created with a random guid and cleaned up after the block. 2664 2665 Args: 2666 query_or_df: The query or df to create a temp table for. 2667 name: The base name of the temp table. 2668 target_columns_to_types: A mapping between the column name and its data type. 2669 2670 Yields: 2671 The table expression 2672 """ 2673 name = exp.to_table(name) 2674 # ensure that we use default catalog if none is not specified 2675 if isinstance(name, exp.Table) and not name.catalog and name.db and self.default_catalog: 2676 name.set("catalog", exp.parse_identifier(self.default_catalog)) 2677 2678 source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( 2679 query_or_df, 2680 target_columns_to_types=target_columns_to_types, 2681 target_table=name, 2682 source_columns=source_columns, 2683 ) 2684 2685 with self.transaction(): 2686 table = self._get_temp_table(name) 2687 if table.db: 2688 self.create_schema(schema_(table.args["db"], table.args.get("catalog"))) 2689 self._create_table_from_source_queries( 2690 table, 2691 source_queries, 2692 target_columns_to_types, 2693 exists=True, 2694 table_description=None, 2695 column_descriptions=None, 2696 track_rows_processed=False, 2697 **kwargs, 2698 ) 2699 2700 try: 2701 yield table 2702 finally: 2703 self.drop_table(table)
A context manager for working a temp table.
The table will be created with a random guid and cleaned up after the block.
Arguments:
- query_or_df: The query or df to create a temp table for.
- name: The base name of the temp table.
- target_columns_to_types: A mapping between the column name and its data type.
Yields:
The table expression
2889 def drop_data_object_on_type_mismatch( 2890 self, data_object: t.Optional[DataObject], expected_type: DataObjectType 2891 ) -> bool: 2892 """Drops a data object if it exists and is not of the expected type. 2893 2894 Args: 2895 data_object: The data object to check. 2896 expected_type: The expected type of the data object. 2897 2898 Returns: 2899 True if the data object was dropped, False otherwise. 2900 """ 2901 if data_object is None or data_object.type == expected_type: 2902 return False 2903 2904 logger.warning( 2905 "Target data object '%s' is a %s and not a %s, dropping it", 2906 data_object.to_table().sql(dialect=self.dialect), 2907 data_object.type.value, 2908 expected_type.value, 2909 ) 2910 self.drop_data_object(data_object) 2911 return True
Drops a data object if it exists and is not of the expected type.
Arguments:
- data_object: The data object to check.
- expected_type: The expected type of the data object.
Returns:
True if the data object was dropped, False otherwise.
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts