sqlmesh.core.engine_adapter.mixins
1from __future__ import annotations 2 3import abc 4import logging 5import typing as t 6from dataclasses import dataclass 7 8from sqlglot import exp, parse_one 9from sqlglot.helper import seq_get 10from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 11 12from sqlmesh.core.engine_adapter.base import EngineAdapter 13from sqlmesh.core.engine_adapter.shared import DataObjectType 14from sqlmesh.core.node import IntervalUnit 15from sqlmesh.core.dialect import schema_ 16from sqlmesh.core.schema_diff import TableAlterOperation 17from sqlmesh.utils.errors import SQLMeshError 18 19if t.TYPE_CHECKING: 20 from sqlmesh.core._typing import TableName 21 from sqlmesh.core.engine_adapter._typing import ( 22 DCL, 23 DF, 24 GrantsConfig, 25 QueryOrDF, 26 ) 27 from sqlmesh.core.engine_adapter.base import QueryOrDF 28 29logger = logging.getLogger(__name__) 30 31NORMALIZED_DATE_FORMAT = "%Y-%m-%d" 32NORMALIZED_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f" 33 34 35class LogicalMergeMixin(EngineAdapter): 36 def merge( 37 self, 38 target_table: TableName, 39 source_table: QueryOrDF, 40 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 41 unique_key: t.Sequence[exp.Expr], 42 when_matched: t.Optional[exp.Whens] = None, 43 merge_filter: t.Optional[exp.Expr] = None, 44 source_columns: t.Optional[t.List[str]] = None, 45 **kwargs: t.Any, 46 ) -> None: 47 logical_merge( 48 self, 49 target_table, 50 source_table, 51 target_columns_to_types, 52 unique_key, 53 when_matched=when_matched, 54 merge_filter=merge_filter, 55 source_columns=source_columns, 56 ) 57 58 59class PandasNativeFetchDFSupportMixin(EngineAdapter): 60 def _fetch_native_df( 61 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 62 ) -> DF: 63 """Fetches a Pandas DataFrame from a SQL query.""" 64 from warnings import catch_warnings, filterwarnings 65 66 from pandas.io.sql import read_sql_query 67 68 sql = self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query 69 logger.debug(f"Executing SQL:\n{sql}") 70 with catch_warnings(), self.transaction(): 71 filterwarnings( 72 "ignore", 73 category=UserWarning, 74 message=".*pandas only supports SQLAlchemy connectable.*", 75 ) 76 df = read_sql_query(sql, self._connection_pool.get()) 77 return df 78 79 80class HiveMetastoreTablePropertiesMixin(EngineAdapter): 81 MAX_TABLE_COMMENT_LENGTH = 4000 82 MAX_COLUMN_COMMENT_LENGTH = 4000 83 84 def _build_partitioned_by_exp( 85 self, 86 partitioned_by: t.List[exp.Expr], 87 *, 88 catalog_name: t.Optional[str] = None, 89 **kwargs: t.Any, 90 ) -> t.Union[exp.PartitionedByProperty, exp.Property]: 91 if ( 92 self.dialect == "trino" 93 and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg" 94 ): 95 # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by" 96 # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed 97 # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar] 98 # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties 99 return exp.Property( 100 this=exp.var("PARTITIONING"), 101 value=exp.array( 102 *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by) 103 ), 104 ) 105 for expr in partitioned_by: 106 if not isinstance(expr, exp.Column): 107 raise SQLMeshError( 108 f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'." 109 ) 110 return exp.PartitionedByProperty( 111 this=exp.Schema(expressions=partitioned_by), 112 ) 113 114 def _build_table_properties_exp( 115 self, 116 catalog_name: t.Optional[str] = None, 117 table_format: t.Optional[str] = None, 118 storage_format: t.Optional[str] = None, 119 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 120 partition_interval_unit: t.Optional[IntervalUnit] = None, 121 clustered_by: t.Optional[t.List[exp.Expr]] = None, 122 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 123 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 124 table_description: t.Optional[str] = None, 125 table_kind: t.Optional[str] = None, 126 **kwargs: t.Any, 127 ) -> t.Optional[exp.Properties]: 128 properties: t.List[exp.Expr] = [] 129 130 if table_format and self.dialect == "spark": 131 properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format))) 132 if storage_format: 133 properties.append( 134 exp.Property( 135 this="write.format.default", value=exp.Literal.string(storage_format) 136 ) 137 ) 138 elif storage_format: 139 properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format))) 140 141 if partitioned_by: 142 properties.append( 143 self._build_partitioned_by_exp( 144 partitioned_by, 145 partition_interval_unit=partition_interval_unit, 146 catalog_name=catalog_name, 147 ) 148 ) 149 150 if table_description: 151 properties.append( 152 exp.SchemaCommentProperty( 153 this=exp.Literal.string(self._truncate_table_comment(table_description)) 154 ) 155 ) 156 157 properties.extend(self._table_or_view_properties_to_expressions(table_properties)) 158 159 if properties: 160 return exp.Properties(expressions=properties) 161 return None 162 163 def _build_view_properties_exp( 164 self, 165 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 166 table_description: t.Optional[str] = None, 167 **kwargs: t.Any, 168 ) -> t.Optional[exp.Properties]: 169 """Creates a SQLGlot table properties expression for view""" 170 properties: t.List[exp.Expr] = [] 171 172 if table_description: 173 properties.append( 174 exp.SchemaCommentProperty( 175 this=exp.Literal.string(self._truncate_table_comment(table_description)) 176 ) 177 ) 178 179 properties.extend(self._table_or_view_properties_to_expressions(view_properties)) 180 181 if properties: 182 return exp.Properties(expressions=properties) 183 return None 184 185 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 186 # iceberg and delta do not have a comment length limit 187 if self.current_catalog_type in ("iceberg", "delta_lake"): 188 return comment 189 return super()._truncate_comment(comment, length) 190 191 192class GetCurrentCatalogFromFunctionMixin(EngineAdapter): 193 CURRENT_CATALOG_EXPRESSION: exp.Expr = exp.func("current_catalog") 194 195 def get_current_catalog(self) -> t.Optional[str]: 196 """Returns the catalog name of the current connection.""" 197 result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) 198 if result: 199 return result[0] 200 return None 201 202 203class NonTransactionalTruncateMixin(EngineAdapter): 204 def _truncate_table(self, table_name: TableName) -> None: 205 # Truncate forces a commit of the current transaction so we want to do an unconditional delete to 206 # preserve the transaction if one exists otherwise we can truncate 207 if self._connection_pool.is_transaction_active: 208 return self.execute(exp.Delete(this=exp.to_table(table_name))) 209 super()._truncate_table(table_name) 210 211 212class VarcharSizeWorkaroundMixin(EngineAdapter): 213 def _default_precision_to_max( 214 self, columns_to_types: t.Dict[str, exp.DataType] 215 ) -> t.Dict[str, exp.DataType]: 216 # get default lengths for types that support "max" length 217 types_with_max_default_param = { 218 k: [self.schema_differ.parameterized_type_defaults[k][0][0]] 219 for k in self.schema_differ.max_parameter_length 220 if k in self.schema_differ.parameterized_type_defaults 221 } 222 223 # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT 224 # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their 225 # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length 226 # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation. 227 for col_name, col_type in columns_to_types.items(): 228 if col_type.this in types_with_max_default_param and col_type.expressions: 229 parameter = self.schema_differ.get_type_parameters(col_type) 230 type_default = types_with_max_default_param[col_type.this] 231 if parameter == type_default: 232 col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))]) 233 234 return columns_to_types 235 236 def _build_create_table_exp( 237 self, 238 table_name_or_schema: t.Union[exp.Schema, TableName], 239 expression: t.Optional[exp.Expr], 240 exists: bool = True, 241 replace: bool = False, 242 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 243 table_description: t.Optional[str] = None, 244 table_kind: t.Optional[str] = None, 245 **kwargs: t.Any, 246 ) -> exp.Create: 247 statement = super()._build_create_table_exp( 248 table_name_or_schema, 249 expression=expression, 250 exists=exists, 251 replace=replace, 252 target_columns_to_types=target_columns_to_types, 253 table_description=table_description, 254 table_kind=table_kind, 255 **kwargs, 256 ) 257 258 if ( 259 statement.expression 260 and statement.expression.args.get("limit") is not None 261 and statement.expression.args["limit"].expression.this == "0" 262 ): 263 assert not isinstance(table_name_or_schema, exp.Schema) 264 265 # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit 266 # is applied to a ctas statement, VARCHAR types default to 1 in some instances. 267 select_statement = statement.expression.copy() 268 for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation): 269 limit = select_or_union.args.get("limit") 270 if limit is not None and limit.expression.this == "0": 271 limit.pop() 272 273 select_or_union.set("where", None) 274 275 temp_view_name = self._get_temp_table("ctas") 276 277 self.create_view(temp_view_name, select_statement, replace=False) 278 try: 279 columns_to_types_from_view = self._default_precision_to_max( 280 self.columns(temp_view_name) 281 ) 282 283 schema = self._build_schema_exp( 284 exp.to_table(table_name_or_schema), 285 columns_to_types_from_view, 286 ) 287 statement = super()._build_create_table_exp( 288 schema, 289 None, 290 exists=exists, 291 replace=replace, 292 target_columns_to_types=columns_to_types_from_view, 293 table_description=table_description, 294 **kwargs, 295 ) 296 finally: 297 self.drop_view(temp_view_name) 298 299 return statement 300 301 302@dataclass(frozen=True) 303class TableAlterClusterByOperation(TableAlterOperation, abc.ABC): 304 pass 305 306 307@dataclass(frozen=True) 308class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation): 309 clustering_key: str 310 dialect: str 311 312 @property 313 def is_additive(self) -> bool: 314 return False 315 316 @property 317 def is_destructive(self) -> bool: 318 return False 319 320 @property 321 def _alter_actions(self) -> t.List[exp.Expr]: 322 return [exp.Cluster(expressions=self.cluster_key_expressions)] 323 324 @property 325 def cluster_key_expressions(self) -> t.List[exp.Expr]: 326 # Note: Assumes `clustering_key` as a string like: 327 # - "(col_a)" 328 # - "(col_a, col_b)" 329 # - "func(col_a, transform(col_b))" 330 parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect) 331 return parsed_cluster_key.expressions or [parsed_cluster_key.this] 332 333 334@dataclass(frozen=True) 335class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation): 336 @property 337 def is_additive(self) -> bool: 338 return False 339 340 @property 341 def is_destructive(self) -> bool: 342 return False 343 344 @property 345 def _alter_actions(self) -> t.List[exp.Expr]: 346 return [exp.Command(this="DROP", expression="CLUSTERING KEY")] 347 348 349class ClusteredByMixin(EngineAdapter): 350 def _build_clustered_by_exp( 351 self, 352 clustered_by: t.List[exp.Expr], 353 **kwargs: t.Any, 354 ) -> t.Optional[exp.Cluster]: 355 return exp.Cluster(expressions=[c.copy() for c in clustered_by]) 356 357 def get_alter_operations( 358 self, 359 current_table_name: TableName, 360 target_table_name: TableName, 361 *, 362 ignore_destructive: bool = False, 363 ignore_additive: bool = False, 364 ) -> t.List[TableAlterOperation]: 365 operations = super().get_alter_operations( 366 current_table_name, 367 target_table_name, 368 ignore_destructive=ignore_destructive, 369 ignore_additive=ignore_additive, 370 ) 371 372 # check for a change in clustering 373 current_table = exp.to_table(current_table_name) 374 target_table = exp.to_table(target_table_name) 375 376 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 377 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 378 379 current_table_info = seq_get( 380 self.get_data_objects(current_table_schema, {current_table.name}), 0 381 ) 382 target_table_info = seq_get( 383 self.get_data_objects(target_table_schema, {target_table.name}), 0 384 ) 385 386 if current_table_info and target_table_info: 387 if target_table_info.is_clustered: 388 if target_table_info.clustering_key and ( 389 current_table_info.clustering_key != target_table_info.clustering_key 390 ): 391 operations.append( 392 TableAlterChangeClusterKeyOperation( 393 target_table=current_table, 394 clustering_key=target_table_info.clustering_key, 395 dialect=self.dialect, 396 ) 397 ) 398 elif current_table_info.is_clustered: 399 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 400 401 return operations 402 403 404def logical_merge( 405 engine_adapter: EngineAdapter, 406 target_table: TableName, 407 source_table: QueryOrDF, 408 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 409 unique_key: t.Sequence[exp.Expr], 410 when_matched: t.Optional[exp.Whens] = None, 411 merge_filter: t.Optional[exp.Expr] = None, 412 source_columns: t.Optional[t.List[str]] = None, 413) -> None: 414 """ 415 Merge implementation for engine adapters that do not support merge natively. 416 417 The merge is executed as follows: 418 1. Create a temporary table containing the new data to merge. 419 2. Delete rows from target table where unique_key cols match a row in the temporary table. 420 3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows 421 within the temporary table are ommitted. 422 4. Drop the temporary table. 423 """ 424 if when_matched or merge_filter: 425 prop = "when_matched" if when_matched else "merge_filter" 426 raise SQLMeshError( 427 f"This engine does not support MERGE expressions and therefore `{prop}` is not supported." 428 ) 429 430 engine_adapter._replace_by_key( 431 target_table, 432 source_table, 433 target_columns_to_types, 434 unique_key, 435 is_unique_key=True, 436 source_columns=source_columns, 437 ) 438 439 440class RowDiffMixin(EngineAdapter): 441 # The maximum supported value for n in timestamp(n). 442 # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9) 443 MAX_TIMESTAMP_PRECISION = 6 444 445 def concat_columns( 446 self, 447 columns_to_types: t.Dict[str, exp.DataType], 448 decimal_precision: int = 3, 449 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 450 delimiter: str = ",", 451 ) -> exp.Expr: 452 """ 453 Produce an expression that generates a string version of a record, that is: 454 - Every column converted to a string representation, joined together into a single string using the specified :delimiter 455 """ 456 expressions_to_concat: t.List[exp.Expr] = [] 457 for idx, (column, type) in enumerate(columns_to_types.items()): 458 expressions_to_concat.append( 459 exp.func( 460 "COALESCE", 461 self.normalize_value( 462 exp.to_column(column), type, decimal_precision, timestamp_precision 463 ), 464 exp.Literal.string(""), 465 ) 466 ) 467 if idx < len(columns_to_types) - 1: 468 expressions_to_concat.append(exp.Literal.string(delimiter)) 469 470 return exp.func("CONCAT", *expressions_to_concat) 471 472 def normalize_value( 473 self, 474 expr: exp.Expr, 475 type: exp.DataType, 476 decimal_precision: int = 3, 477 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 478 ) -> exp.Expr: 479 """ 480 Return an expression that converts the values inside the column `col` to a normalized string 481 482 This string should be comparable across database engines, eg: 483 - `date` columns -> YYYY-MM-DD string 484 - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision 485 - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places 486 - `boolean` columns -> '1' or '0' 487 - NULLS -> "" (empty string) 488 """ 489 value: exp.Expr 490 if type.is_type(exp.DataType.Type.BOOLEAN): 491 value = self._normalize_boolean_value(expr) 492 elif type.is_type(*exp.DataType.INTEGER_TYPES): 493 value = self._normalize_integer_value(expr) 494 elif type.is_type(*exp.DataType.REAL_TYPES): 495 # If there is no scale on the decimal type, treat it like an integer when comparing 496 # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0) 497 # and they should be treated as integers and not decimals 498 type_params = list(type.find_all(exp.DataTypeParam)) 499 if len(type_params) == 2 and type_params[-1].this.to_py() == 0: 500 value = self._normalize_integer_value(expr) 501 else: 502 value = self._normalize_decimal_value(expr, decimal_precision) 503 elif type.is_type(*exp.DataType.TEMPORAL_TYPES): 504 value = self._normalize_timestamp_value(expr, type, timestamp_precision) 505 elif type.is_type(*exp.DataType.NESTED_TYPES): 506 value = self._normalize_nested_value(expr) 507 else: 508 value = expr 509 510 return exp.cast(value, to=exp.DataType.build("VARCHAR")) 511 512 def _normalize_nested_value(self, expr: exp.Expr) -> exp.Expr: 513 return expr 514 515 def _normalize_timestamp_value( 516 self, expr: exp.Expr, type: exp.DataType, precision: int 517 ) -> exp.Expr: 518 if precision > self.MAX_TIMESTAMP_PRECISION: 519 raise ValueError( 520 f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}" 521 ) 522 523 is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32) 524 525 format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT 526 527 if type.is_type( 528 exp.DataType.Type.TIMESTAMPTZ, 529 exp.DataType.Type.TIMESTAMPLTZ, 530 exp.DataType.Type.TIMESTAMPNTZ, 531 ): 532 # Convert all timezone-aware values to UTC for comparison 533 expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC")) 534 535 digits_to_chop_off = ( 536 6 - precision 537 ) # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds 538 539 expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format)) 540 if digits_to_chop_off > 0: 541 expr = exp.func( 542 "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off 543 ) 544 545 return expr 546 547 def _normalize_integer_value(self, expr: exp.Expr) -> exp.Expr: 548 return exp.cast(expr, "BIGINT") 549 550 def _normalize_decimal_value(self, expr: exp.Expr, precision: int) -> exp.Expr: 551 return exp.cast(expr, f"DECIMAL(38,{precision})") 552 553 def _normalize_boolean_value(self, expr: exp.Expr) -> exp.Expr: 554 return exp.cast(expr, "INT") 555 556 557class GrantsFromInfoSchemaMixin(EngineAdapter): 558 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("current_user") 559 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False 560 USE_CATALOG_IN_GRANTS = False 561 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges" 562 563 @staticmethod 564 @abc.abstractmethod 565 def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]: 566 pass 567 568 @abc.abstractmethod 569 def _get_current_schema(self) -> str: 570 pass 571 572 def _dcl_grants_config_expr( 573 self, 574 dcl_cmd: t.Type[DCL], 575 table: exp.Table, 576 grants_config: GrantsConfig, 577 table_type: DataObjectType = DataObjectType.TABLE, 578 ) -> t.List[exp.Expr]: 579 expressions: t.List[exp.Expr] = [] 580 if not grants_config: 581 return expressions 582 583 object_kind = self._grant_object_kind(table_type) 584 for privilege, principals in grants_config.items(): 585 args: t.Dict[str, t.Any] = { 586 "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))], 587 "securable": table.copy(), 588 } 589 if object_kind: 590 args["kind"] = exp.Var(this=object_kind) 591 if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS: 592 args["principals"] = [ 593 normalize_identifiers( 594 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 595 dialect=self.dialect, 596 ) 597 for principal in principals 598 ] 599 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 600 else: 601 for principal in principals: 602 args["principals"] = [ 603 normalize_identifiers( 604 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 605 dialect=self.dialect, 606 ) 607 ] 608 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 609 610 return expressions 611 612 def _apply_grants_config_expr( 613 self, 614 table: exp.Table, 615 grants_config: GrantsConfig, 616 table_type: DataObjectType = DataObjectType.TABLE, 617 ) -> t.List[exp.Expr]: 618 return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type) 619 620 def _revoke_grants_config_expr( 621 self, 622 table: exp.Table, 623 grants_config: GrantsConfig, 624 table_type: DataObjectType = DataObjectType.TABLE, 625 ) -> t.List[exp.Expr]: 626 return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type) 627 628 def _get_grant_expression(self, table: exp.Table) -> exp.Expr: 629 schema_identifier = table.args.get("db") or normalize_identifiers( 630 exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect 631 ) 632 schema_name = schema_identifier.this 633 table_name = table.args.get("this").this # type: ignore 634 635 grant_conditions = [ 636 exp.column("table_schema").eq(exp.Literal.string(schema_name)), 637 exp.column("table_name").eq(exp.Literal.string(table_name)), 638 exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 639 exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 640 ] 641 642 info_schema_table = normalize_identifiers( 643 exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"), 644 dialect=self.dialect, 645 ) 646 if self.USE_CATALOG_IN_GRANTS: 647 catalog_identifier = table.args.get("catalog") 648 if not catalog_identifier: 649 catalog_name = self.get_current_catalog() 650 if not catalog_name: 651 raise SQLMeshError( 652 "Current catalog could not be determined for fetching grants. This is unexpected." 653 ) 654 catalog_identifier = normalize_identifiers( 655 exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect 656 ) 657 catalog_name = catalog_identifier.this 658 info_schema_table.set("catalog", catalog_identifier.copy()) 659 grant_conditions.insert( 660 0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name)) 661 ) 662 663 return ( 664 exp.select("privilege_type", "grantee") 665 .from_(info_schema_table) 666 .where(exp.and_(*grant_conditions)) 667 ) 668 669 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 670 grant_expr = self._get_grant_expression(table) 671 672 results = self.fetchall(grant_expr) 673 674 grants_dict: GrantsConfig = {} 675 for privilege_raw, grantee_raw in results: 676 if privilege_raw is None or grantee_raw is None: 677 continue 678 679 privilege = str(privilege_raw) 680 grantee = str(grantee_raw) 681 if not privilege or not grantee: 682 continue 683 684 grantees = grants_dict.setdefault(privilege, []) 685 if grantee not in grantees: 686 grantees.append(grantee) 687 688 return grants_dict
36class LogicalMergeMixin(EngineAdapter): 37 def merge( 38 self, 39 target_table: TableName, 40 source_table: QueryOrDF, 41 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 42 unique_key: t.Sequence[exp.Expr], 43 when_matched: t.Optional[exp.Whens] = None, 44 merge_filter: t.Optional[exp.Expr] = None, 45 source_columns: t.Optional[t.List[str]] = None, 46 **kwargs: t.Any, 47 ) -> None: 48 logical_merge( 49 self, 50 target_table, 51 source_table, 52 target_columns_to_types, 53 unique_key, 54 when_matched=when_matched, 55 merge_filter=merge_filter, 56 source_columns=source_columns, 57 )
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
37 def merge( 38 self, 39 target_table: TableName, 40 source_table: QueryOrDF, 41 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 42 unique_key: t.Sequence[exp.Expr], 43 when_matched: t.Optional[exp.Whens] = None, 44 merge_filter: t.Optional[exp.Expr] = None, 45 source_columns: t.Optional[t.List[str]] = None, 46 **kwargs: t.Any, 47 ) -> None: 48 logical_merge( 49 self, 50 target_table, 51 source_table, 52 target_columns_to_types, 53 unique_key, 54 when_matched=when_matched, 55 merge_filter=merge_filter, 56 source_columns=source_columns, 57 )
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
60class PandasNativeFetchDFSupportMixin(EngineAdapter): 61 def _fetch_native_df( 62 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 63 ) -> DF: 64 """Fetches a Pandas DataFrame from a SQL query.""" 65 from warnings import catch_warnings, filterwarnings 66 67 from pandas.io.sql import read_sql_query 68 69 sql = self._to_sql(query, quote=quote_identifiers) if isinstance(query, exp.Expr) else query 70 logger.debug(f"Executing SQL:\n{sql}") 71 with catch_warnings(), self.transaction(): 72 filterwarnings( 73 "ignore", 74 category=UserWarning, 75 message=".*pandas only supports SQLAlchemy connectable.*", 76 ) 77 df = read_sql_query(sql, self._connection_pool.get()) 78 return df
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
81class HiveMetastoreTablePropertiesMixin(EngineAdapter): 82 MAX_TABLE_COMMENT_LENGTH = 4000 83 MAX_COLUMN_COMMENT_LENGTH = 4000 84 85 def _build_partitioned_by_exp( 86 self, 87 partitioned_by: t.List[exp.Expr], 88 *, 89 catalog_name: t.Optional[str] = None, 90 **kwargs: t.Any, 91 ) -> t.Union[exp.PartitionedByProperty, exp.Property]: 92 if ( 93 self.dialect == "trino" 94 and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg" 95 ): 96 # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by" 97 # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed 98 # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar] 99 # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties 100 return exp.Property( 101 this=exp.var("PARTITIONING"), 102 value=exp.array( 103 *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by) 104 ), 105 ) 106 for expr in partitioned_by: 107 if not isinstance(expr, exp.Column): 108 raise SQLMeshError( 109 f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'." 110 ) 111 return exp.PartitionedByProperty( 112 this=exp.Schema(expressions=partitioned_by), 113 ) 114 115 def _build_table_properties_exp( 116 self, 117 catalog_name: t.Optional[str] = None, 118 table_format: t.Optional[str] = None, 119 storage_format: t.Optional[str] = None, 120 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 121 partition_interval_unit: t.Optional[IntervalUnit] = None, 122 clustered_by: t.Optional[t.List[exp.Expr]] = None, 123 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 124 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 125 table_description: t.Optional[str] = None, 126 table_kind: t.Optional[str] = None, 127 **kwargs: t.Any, 128 ) -> t.Optional[exp.Properties]: 129 properties: t.List[exp.Expr] = [] 130 131 if table_format and self.dialect == "spark": 132 properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format))) 133 if storage_format: 134 properties.append( 135 exp.Property( 136 this="write.format.default", value=exp.Literal.string(storage_format) 137 ) 138 ) 139 elif storage_format: 140 properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format))) 141 142 if partitioned_by: 143 properties.append( 144 self._build_partitioned_by_exp( 145 partitioned_by, 146 partition_interval_unit=partition_interval_unit, 147 catalog_name=catalog_name, 148 ) 149 ) 150 151 if table_description: 152 properties.append( 153 exp.SchemaCommentProperty( 154 this=exp.Literal.string(self._truncate_table_comment(table_description)) 155 ) 156 ) 157 158 properties.extend(self._table_or_view_properties_to_expressions(table_properties)) 159 160 if properties: 161 return exp.Properties(expressions=properties) 162 return None 163 164 def _build_view_properties_exp( 165 self, 166 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 167 table_description: t.Optional[str] = None, 168 **kwargs: t.Any, 169 ) -> t.Optional[exp.Properties]: 170 """Creates a SQLGlot table properties expression for view""" 171 properties: t.List[exp.Expr] = [] 172 173 if table_description: 174 properties.append( 175 exp.SchemaCommentProperty( 176 this=exp.Literal.string(self._truncate_table_comment(table_description)) 177 ) 178 ) 179 180 properties.extend(self._table_or_view_properties_to_expressions(view_properties)) 181 182 if properties: 183 return exp.Properties(expressions=properties) 184 return None 185 186 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 187 # iceberg and delta do not have a comment length limit 188 if self.current_catalog_type in ("iceberg", "delta_lake"): 189 return comment 190 return super()._truncate_comment(comment, length)
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
193class GetCurrentCatalogFromFunctionMixin(EngineAdapter): 194 CURRENT_CATALOG_EXPRESSION: exp.Expr = exp.func("current_catalog") 195 196 def get_current_catalog(self) -> t.Optional[str]: 197 """Returns the catalog name of the current connection.""" 198 result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) 199 if result: 200 return result[0] 201 return None
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
196 def get_current_catalog(self) -> t.Optional[str]: 197 """Returns the catalog name of the current connection.""" 198 result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) 199 if result: 200 return result[0] 201 return None
Returns the catalog name of the current connection.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
204class NonTransactionalTruncateMixin(EngineAdapter): 205 def _truncate_table(self, table_name: TableName) -> None: 206 # Truncate forces a commit of the current transaction so we want to do an unconditional delete to 207 # preserve the transaction if one exists otherwise we can truncate 208 if self._connection_pool.is_transaction_active: 209 return self.execute(exp.Delete(this=exp.to_table(table_name))) 210 super()._truncate_table(table_name)
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
213class VarcharSizeWorkaroundMixin(EngineAdapter): 214 def _default_precision_to_max( 215 self, columns_to_types: t.Dict[str, exp.DataType] 216 ) -> t.Dict[str, exp.DataType]: 217 # get default lengths for types that support "max" length 218 types_with_max_default_param = { 219 k: [self.schema_differ.parameterized_type_defaults[k][0][0]] 220 for k in self.schema_differ.max_parameter_length 221 if k in self.schema_differ.parameterized_type_defaults 222 } 223 224 # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT 225 # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their 226 # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length 227 # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation. 228 for col_name, col_type in columns_to_types.items(): 229 if col_type.this in types_with_max_default_param and col_type.expressions: 230 parameter = self.schema_differ.get_type_parameters(col_type) 231 type_default = types_with_max_default_param[col_type.this] 232 if parameter == type_default: 233 col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))]) 234 235 return columns_to_types 236 237 def _build_create_table_exp( 238 self, 239 table_name_or_schema: t.Union[exp.Schema, TableName], 240 expression: t.Optional[exp.Expr], 241 exists: bool = True, 242 replace: bool = False, 243 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 244 table_description: t.Optional[str] = None, 245 table_kind: t.Optional[str] = None, 246 **kwargs: t.Any, 247 ) -> exp.Create: 248 statement = super()._build_create_table_exp( 249 table_name_or_schema, 250 expression=expression, 251 exists=exists, 252 replace=replace, 253 target_columns_to_types=target_columns_to_types, 254 table_description=table_description, 255 table_kind=table_kind, 256 **kwargs, 257 ) 258 259 if ( 260 statement.expression 261 and statement.expression.args.get("limit") is not None 262 and statement.expression.args["limit"].expression.this == "0" 263 ): 264 assert not isinstance(table_name_or_schema, exp.Schema) 265 266 # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit 267 # is applied to a ctas statement, VARCHAR types default to 1 in some instances. 268 select_statement = statement.expression.copy() 269 for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation): 270 limit = select_or_union.args.get("limit") 271 if limit is not None and limit.expression.this == "0": 272 limit.pop() 273 274 select_or_union.set("where", None) 275 276 temp_view_name = self._get_temp_table("ctas") 277 278 self.create_view(temp_view_name, select_statement, replace=False) 279 try: 280 columns_to_types_from_view = self._default_precision_to_max( 281 self.columns(temp_view_name) 282 ) 283 284 schema = self._build_schema_exp( 285 exp.to_table(table_name_or_schema), 286 columns_to_types_from_view, 287 ) 288 statement = super()._build_create_table_exp( 289 schema, 290 None, 291 exists=exists, 292 replace=replace, 293 target_columns_to_types=columns_to_types_from_view, 294 table_description=table_description, 295 **kwargs, 296 ) 297 finally: 298 self.drop_view(temp_view_name) 299 300 return statement
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
308@dataclass(frozen=True) 309class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation): 310 clustering_key: str 311 dialect: str 312 313 @property 314 def is_additive(self) -> bool: 315 return False 316 317 @property 318 def is_destructive(self) -> bool: 319 return False 320 321 @property 322 def _alter_actions(self) -> t.List[exp.Expr]: 323 return [exp.Cluster(expressions=self.cluster_key_expressions)] 324 325 @property 326 def cluster_key_expressions(self) -> t.List[exp.Expr]: 327 # Note: Assumes `clustering_key` as a string like: 328 # - "(col_a)" 329 # - "(col_a, col_b)" 330 # - "func(col_a, transform(col_b))" 331 parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect) 332 return parsed_cluster_key.expressions or [parsed_cluster_key.this]
325 @property 326 def cluster_key_expressions(self) -> t.List[exp.Expr]: 327 # Note: Assumes `clustering_key` as a string like: 328 # - "(col_a)" 329 # - "(col_a, col_b)" 330 # - "func(col_a, transform(col_b))" 331 parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect) 332 return parsed_cluster_key.expressions or [parsed_cluster_key.this]
Inherited Members
335@dataclass(frozen=True) 336class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation): 337 @property 338 def is_additive(self) -> bool: 339 return False 340 341 @property 342 def is_destructive(self) -> bool: 343 return False 344 345 @property 346 def _alter_actions(self) -> t.List[exp.Expr]: 347 return [exp.Command(this="DROP", expression="CLUSTERING KEY")]
Inherited Members
350class ClusteredByMixin(EngineAdapter): 351 def _build_clustered_by_exp( 352 self, 353 clustered_by: t.List[exp.Expr], 354 **kwargs: t.Any, 355 ) -> t.Optional[exp.Cluster]: 356 return exp.Cluster(expressions=[c.copy() for c in clustered_by]) 357 358 def get_alter_operations( 359 self, 360 current_table_name: TableName, 361 target_table_name: TableName, 362 *, 363 ignore_destructive: bool = False, 364 ignore_additive: bool = False, 365 ) -> t.List[TableAlterOperation]: 366 operations = super().get_alter_operations( 367 current_table_name, 368 target_table_name, 369 ignore_destructive=ignore_destructive, 370 ignore_additive=ignore_additive, 371 ) 372 373 # check for a change in clustering 374 current_table = exp.to_table(current_table_name) 375 target_table = exp.to_table(target_table_name) 376 377 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 378 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 379 380 current_table_info = seq_get( 381 self.get_data_objects(current_table_schema, {current_table.name}), 0 382 ) 383 target_table_info = seq_get( 384 self.get_data_objects(target_table_schema, {target_table.name}), 0 385 ) 386 387 if current_table_info and target_table_info: 388 if target_table_info.is_clustered: 389 if target_table_info.clustering_key and ( 390 current_table_info.clustering_key != target_table_info.clustering_key 391 ): 392 operations.append( 393 TableAlterChangeClusterKeyOperation( 394 target_table=current_table, 395 clustering_key=target_table_info.clustering_key, 396 dialect=self.dialect, 397 ) 398 ) 399 elif current_table_info.is_clustered: 400 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 401 402 return operations
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
358 def get_alter_operations( 359 self, 360 current_table_name: TableName, 361 target_table_name: TableName, 362 *, 363 ignore_destructive: bool = False, 364 ignore_additive: bool = False, 365 ) -> t.List[TableAlterOperation]: 366 operations = super().get_alter_operations( 367 current_table_name, 368 target_table_name, 369 ignore_destructive=ignore_destructive, 370 ignore_additive=ignore_additive, 371 ) 372 373 # check for a change in clustering 374 current_table = exp.to_table(current_table_name) 375 target_table = exp.to_table(target_table_name) 376 377 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 378 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 379 380 current_table_info = seq_get( 381 self.get_data_objects(current_table_schema, {current_table.name}), 0 382 ) 383 target_table_info = seq_get( 384 self.get_data_objects(target_table_schema, {target_table.name}), 0 385 ) 386 387 if current_table_info and target_table_info: 388 if target_table_info.is_clustered: 389 if target_table_info.clustering_key and ( 390 current_table_info.clustering_key != target_table_info.clustering_key 391 ): 392 operations.append( 393 TableAlterChangeClusterKeyOperation( 394 target_table=current_table, 395 clustering_key=target_table_info.clustering_key, 396 dialect=self.dialect, 397 ) 398 ) 399 elif current_table_info.is_clustered: 400 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 401 402 return operations
Determines the alter statements needed to change the current table into the structure of the target table.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
405def logical_merge( 406 engine_adapter: EngineAdapter, 407 target_table: TableName, 408 source_table: QueryOrDF, 409 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 410 unique_key: t.Sequence[exp.Expr], 411 when_matched: t.Optional[exp.Whens] = None, 412 merge_filter: t.Optional[exp.Expr] = None, 413 source_columns: t.Optional[t.List[str]] = None, 414) -> None: 415 """ 416 Merge implementation for engine adapters that do not support merge natively. 417 418 The merge is executed as follows: 419 1. Create a temporary table containing the new data to merge. 420 2. Delete rows from target table where unique_key cols match a row in the temporary table. 421 3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows 422 within the temporary table are ommitted. 423 4. Drop the temporary table. 424 """ 425 if when_matched or merge_filter: 426 prop = "when_matched" if when_matched else "merge_filter" 427 raise SQLMeshError( 428 f"This engine does not support MERGE expressions and therefore `{prop}` is not supported." 429 ) 430 431 engine_adapter._replace_by_key( 432 target_table, 433 source_table, 434 target_columns_to_types, 435 unique_key, 436 is_unique_key=True, 437 source_columns=source_columns, 438 )
Merge implementation for engine adapters that do not support merge natively.
The merge is executed as follows:
- Create a temporary table containing the new data to merge.
- Delete rows from target table where unique_key cols match a row in the temporary table.
- Insert the temporary table contents into the target table. Any duplicate, non-unique rows within the temporary table are ommitted.
- Drop the temporary table.
441class RowDiffMixin(EngineAdapter): 442 # The maximum supported value for n in timestamp(n). 443 # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9) 444 MAX_TIMESTAMP_PRECISION = 6 445 446 def concat_columns( 447 self, 448 columns_to_types: t.Dict[str, exp.DataType], 449 decimal_precision: int = 3, 450 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 451 delimiter: str = ",", 452 ) -> exp.Expr: 453 """ 454 Produce an expression that generates a string version of a record, that is: 455 - Every column converted to a string representation, joined together into a single string using the specified :delimiter 456 """ 457 expressions_to_concat: t.List[exp.Expr] = [] 458 for idx, (column, type) in enumerate(columns_to_types.items()): 459 expressions_to_concat.append( 460 exp.func( 461 "COALESCE", 462 self.normalize_value( 463 exp.to_column(column), type, decimal_precision, timestamp_precision 464 ), 465 exp.Literal.string(""), 466 ) 467 ) 468 if idx < len(columns_to_types) - 1: 469 expressions_to_concat.append(exp.Literal.string(delimiter)) 470 471 return exp.func("CONCAT", *expressions_to_concat) 472 473 def normalize_value( 474 self, 475 expr: exp.Expr, 476 type: exp.DataType, 477 decimal_precision: int = 3, 478 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 479 ) -> exp.Expr: 480 """ 481 Return an expression that converts the values inside the column `col` to a normalized string 482 483 This string should be comparable across database engines, eg: 484 - `date` columns -> YYYY-MM-DD string 485 - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision 486 - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places 487 - `boolean` columns -> '1' or '0' 488 - NULLS -> "" (empty string) 489 """ 490 value: exp.Expr 491 if type.is_type(exp.DataType.Type.BOOLEAN): 492 value = self._normalize_boolean_value(expr) 493 elif type.is_type(*exp.DataType.INTEGER_TYPES): 494 value = self._normalize_integer_value(expr) 495 elif type.is_type(*exp.DataType.REAL_TYPES): 496 # If there is no scale on the decimal type, treat it like an integer when comparing 497 # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0) 498 # and they should be treated as integers and not decimals 499 type_params = list(type.find_all(exp.DataTypeParam)) 500 if len(type_params) == 2 and type_params[-1].this.to_py() == 0: 501 value = self._normalize_integer_value(expr) 502 else: 503 value = self._normalize_decimal_value(expr, decimal_precision) 504 elif type.is_type(*exp.DataType.TEMPORAL_TYPES): 505 value = self._normalize_timestamp_value(expr, type, timestamp_precision) 506 elif type.is_type(*exp.DataType.NESTED_TYPES): 507 value = self._normalize_nested_value(expr) 508 else: 509 value = expr 510 511 return exp.cast(value, to=exp.DataType.build("VARCHAR")) 512 513 def _normalize_nested_value(self, expr: exp.Expr) -> exp.Expr: 514 return expr 515 516 def _normalize_timestamp_value( 517 self, expr: exp.Expr, type: exp.DataType, precision: int 518 ) -> exp.Expr: 519 if precision > self.MAX_TIMESTAMP_PRECISION: 520 raise ValueError( 521 f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}" 522 ) 523 524 is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32) 525 526 format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT 527 528 if type.is_type( 529 exp.DataType.Type.TIMESTAMPTZ, 530 exp.DataType.Type.TIMESTAMPLTZ, 531 exp.DataType.Type.TIMESTAMPNTZ, 532 ): 533 # Convert all timezone-aware values to UTC for comparison 534 expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC")) 535 536 digits_to_chop_off = ( 537 6 - precision 538 ) # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds 539 540 expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format)) 541 if digits_to_chop_off > 0: 542 expr = exp.func( 543 "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off 544 ) 545 546 return expr 547 548 def _normalize_integer_value(self, expr: exp.Expr) -> exp.Expr: 549 return exp.cast(expr, "BIGINT") 550 551 def _normalize_decimal_value(self, expr: exp.Expr, precision: int) -> exp.Expr: 552 return exp.cast(expr, f"DECIMAL(38,{precision})") 553 554 def _normalize_boolean_value(self, expr: exp.Expr) -> exp.Expr: 555 return exp.cast(expr, "INT")
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
446 def concat_columns( 447 self, 448 columns_to_types: t.Dict[str, exp.DataType], 449 decimal_precision: int = 3, 450 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 451 delimiter: str = ",", 452 ) -> exp.Expr: 453 """ 454 Produce an expression that generates a string version of a record, that is: 455 - Every column converted to a string representation, joined together into a single string using the specified :delimiter 456 """ 457 expressions_to_concat: t.List[exp.Expr] = [] 458 for idx, (column, type) in enumerate(columns_to_types.items()): 459 expressions_to_concat.append( 460 exp.func( 461 "COALESCE", 462 self.normalize_value( 463 exp.to_column(column), type, decimal_precision, timestamp_precision 464 ), 465 exp.Literal.string(""), 466 ) 467 ) 468 if idx < len(columns_to_types) - 1: 469 expressions_to_concat.append(exp.Literal.string(delimiter)) 470 471 return exp.func("CONCAT", *expressions_to_concat)
Produce an expression that generates a string version of a record, that is: - Every column converted to a string representation, joined together into a single string using the specified :delimiter
473 def normalize_value( 474 self, 475 expr: exp.Expr, 476 type: exp.DataType, 477 decimal_precision: int = 3, 478 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 479 ) -> exp.Expr: 480 """ 481 Return an expression that converts the values inside the column `col` to a normalized string 482 483 This string should be comparable across database engines, eg: 484 - `date` columns -> YYYY-MM-DD string 485 - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision 486 - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places 487 - `boolean` columns -> '1' or '0' 488 - NULLS -> "" (empty string) 489 """ 490 value: exp.Expr 491 if type.is_type(exp.DataType.Type.BOOLEAN): 492 value = self._normalize_boolean_value(expr) 493 elif type.is_type(*exp.DataType.INTEGER_TYPES): 494 value = self._normalize_integer_value(expr) 495 elif type.is_type(*exp.DataType.REAL_TYPES): 496 # If there is no scale on the decimal type, treat it like an integer when comparing 497 # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0) 498 # and they should be treated as integers and not decimals 499 type_params = list(type.find_all(exp.DataTypeParam)) 500 if len(type_params) == 2 and type_params[-1].this.to_py() == 0: 501 value = self._normalize_integer_value(expr) 502 else: 503 value = self._normalize_decimal_value(expr, decimal_precision) 504 elif type.is_type(*exp.DataType.TEMPORAL_TYPES): 505 value = self._normalize_timestamp_value(expr, type, timestamp_precision) 506 elif type.is_type(*exp.DataType.NESTED_TYPES): 507 value = self._normalize_nested_value(expr) 508 else: 509 value = expr 510 511 return exp.cast(value, to=exp.DataType.build("VARCHAR"))
Return an expression that converts the values inside the column col to a normalized string
This string should be comparable across database engines, eg:
- date columns -> YYYY-MM-DD string
- datetime/timestamp/timestamptz columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
- float / double / decimal -> Value formatted to :decimal_precision decimal places
- boolean columns -> '1' or '0'
- NULLS -> "" (empty string)
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
558class GrantsFromInfoSchemaMixin(EngineAdapter): 559 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("current_user") 560 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False 561 USE_CATALOG_IN_GRANTS = False 562 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges" 563 564 @staticmethod 565 @abc.abstractmethod 566 def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]: 567 pass 568 569 @abc.abstractmethod 570 def _get_current_schema(self) -> str: 571 pass 572 573 def _dcl_grants_config_expr( 574 self, 575 dcl_cmd: t.Type[DCL], 576 table: exp.Table, 577 grants_config: GrantsConfig, 578 table_type: DataObjectType = DataObjectType.TABLE, 579 ) -> t.List[exp.Expr]: 580 expressions: t.List[exp.Expr] = [] 581 if not grants_config: 582 return expressions 583 584 object_kind = self._grant_object_kind(table_type) 585 for privilege, principals in grants_config.items(): 586 args: t.Dict[str, t.Any] = { 587 "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))], 588 "securable": table.copy(), 589 } 590 if object_kind: 591 args["kind"] = exp.Var(this=object_kind) 592 if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS: 593 args["principals"] = [ 594 normalize_identifiers( 595 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 596 dialect=self.dialect, 597 ) 598 for principal in principals 599 ] 600 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 601 else: 602 for principal in principals: 603 args["principals"] = [ 604 normalize_identifiers( 605 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 606 dialect=self.dialect, 607 ) 608 ] 609 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 610 611 return expressions 612 613 def _apply_grants_config_expr( 614 self, 615 table: exp.Table, 616 grants_config: GrantsConfig, 617 table_type: DataObjectType = DataObjectType.TABLE, 618 ) -> t.List[exp.Expr]: 619 return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type) 620 621 def _revoke_grants_config_expr( 622 self, 623 table: exp.Table, 624 grants_config: GrantsConfig, 625 table_type: DataObjectType = DataObjectType.TABLE, 626 ) -> t.List[exp.Expr]: 627 return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type) 628 629 def _get_grant_expression(self, table: exp.Table) -> exp.Expr: 630 schema_identifier = table.args.get("db") or normalize_identifiers( 631 exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect 632 ) 633 schema_name = schema_identifier.this 634 table_name = table.args.get("this").this # type: ignore 635 636 grant_conditions = [ 637 exp.column("table_schema").eq(exp.Literal.string(schema_name)), 638 exp.column("table_name").eq(exp.Literal.string(table_name)), 639 exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 640 exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 641 ] 642 643 info_schema_table = normalize_identifiers( 644 exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"), 645 dialect=self.dialect, 646 ) 647 if self.USE_CATALOG_IN_GRANTS: 648 catalog_identifier = table.args.get("catalog") 649 if not catalog_identifier: 650 catalog_name = self.get_current_catalog() 651 if not catalog_name: 652 raise SQLMeshError( 653 "Current catalog could not be determined for fetching grants. This is unexpected." 654 ) 655 catalog_identifier = normalize_identifiers( 656 exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect 657 ) 658 catalog_name = catalog_identifier.this 659 info_schema_table.set("catalog", catalog_identifier.copy()) 660 grant_conditions.insert( 661 0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name)) 662 ) 663 664 return ( 665 exp.select("privilege_type", "grantee") 666 .from_(info_schema_table) 667 .where(exp.and_(*grant_conditions)) 668 ) 669 670 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 671 grant_expr = self._get_grant_expression(table) 672 673 results = self.fetchall(grant_expr) 674 675 grants_dict: GrantsConfig = {} 676 for privilege_raw, grantee_raw in results: 677 if privilege_raw is None or grantee_raw is None: 678 continue 679 680 privilege = str(privilege_raw) 681 grantee = str(grantee_raw) 682 if not privilege or not grantee: 683 continue 684 685 grantees = grants_dict.setdefault(privilege, []) 686 if grantee not in grantees: 687 grantees.append(grantee) 688 689 return grants_dict
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts