sqlmesh.core.engine_adapter.mixins
1from __future__ import annotations 2 3import abc 4import logging 5import typing as t 6from dataclasses import dataclass 7 8from sqlglot import exp, parse_one 9from sqlglot.helper import seq_get 10from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 11 12from sqlmesh.core.engine_adapter.base import EngineAdapter 13from sqlmesh.core.engine_adapter.shared import DataObjectType 14from sqlmesh.core.node import IntervalUnit 15from sqlmesh.core.dialect import schema_ 16from sqlmesh.core.schema_diff import TableAlterOperation 17from sqlmesh.utils.errors import SQLMeshError 18 19if t.TYPE_CHECKING: 20 from sqlmesh.core._typing import TableName 21 from sqlmesh.core.engine_adapter._typing import ( 22 DCL, 23 DF, 24 GrantsConfig, 25 QueryOrDF, 26 ) 27 from sqlmesh.core.engine_adapter.base import QueryOrDF 28 29logger = logging.getLogger(__name__) 30 31NORMALIZED_DATE_FORMAT = "%Y-%m-%d" 32NORMALIZED_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f" 33 34 35class LogicalMergeMixin(EngineAdapter): 36 def merge( 37 self, 38 target_table: TableName, 39 source_table: QueryOrDF, 40 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 41 unique_key: t.Sequence[exp.Expression], 42 when_matched: t.Optional[exp.Whens] = None, 43 merge_filter: t.Optional[exp.Expression] = None, 44 source_columns: t.Optional[t.List[str]] = None, 45 **kwargs: t.Any, 46 ) -> None: 47 logical_merge( 48 self, 49 target_table, 50 source_table, 51 target_columns_to_types, 52 unique_key, 53 when_matched=when_matched, 54 merge_filter=merge_filter, 55 source_columns=source_columns, 56 ) 57 58 59class PandasNativeFetchDFSupportMixin(EngineAdapter): 60 def _fetch_native_df( 61 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 62 ) -> DF: 63 """Fetches a Pandas DataFrame from a SQL query.""" 64 from warnings import catch_warnings, filterwarnings 65 66 from pandas.io.sql import read_sql_query 67 68 sql = ( 69 self._to_sql(query, quote=quote_identifiers) 70 if isinstance(query, exp.Expression) 71 else query 72 ) 73 logger.debug(f"Executing SQL:\n{sql}") 74 with catch_warnings(), self.transaction(): 75 filterwarnings( 76 "ignore", 77 category=UserWarning, 78 message=".*pandas only supports SQLAlchemy connectable.*", 79 ) 80 df = read_sql_query(sql, self._connection_pool.get()) 81 return df 82 83 84class HiveMetastoreTablePropertiesMixin(EngineAdapter): 85 MAX_TABLE_COMMENT_LENGTH = 4000 86 MAX_COLUMN_COMMENT_LENGTH = 4000 87 88 def _build_partitioned_by_exp( 89 self, 90 partitioned_by: t.List[exp.Expression], 91 *, 92 catalog_name: t.Optional[str] = None, 93 **kwargs: t.Any, 94 ) -> t.Union[exp.PartitionedByProperty, exp.Property]: 95 if ( 96 self.dialect == "trino" 97 and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg" 98 ): 99 # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by" 100 # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed 101 # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar] 102 # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties 103 return exp.Property( 104 this=exp.var("PARTITIONING"), 105 value=exp.array( 106 *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by) 107 ), 108 ) 109 for expr in partitioned_by: 110 if not isinstance(expr, exp.Column): 111 raise SQLMeshError( 112 f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'." 113 ) 114 return exp.PartitionedByProperty( 115 this=exp.Schema(expressions=partitioned_by), 116 ) 117 118 def _build_table_properties_exp( 119 self, 120 catalog_name: t.Optional[str] = None, 121 table_format: t.Optional[str] = None, 122 storage_format: t.Optional[str] = None, 123 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 124 partition_interval_unit: t.Optional[IntervalUnit] = None, 125 clustered_by: t.Optional[t.List[exp.Expression]] = None, 126 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 127 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 128 table_description: t.Optional[str] = None, 129 table_kind: t.Optional[str] = None, 130 **kwargs: t.Any, 131 ) -> t.Optional[exp.Properties]: 132 properties: t.List[exp.Expression] = [] 133 134 if table_format and self.dialect == "spark": 135 properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format))) 136 if storage_format: 137 properties.append( 138 exp.Property( 139 this="write.format.default", value=exp.Literal.string(storage_format) 140 ) 141 ) 142 elif storage_format: 143 properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format))) 144 145 if partitioned_by: 146 properties.append( 147 self._build_partitioned_by_exp( 148 partitioned_by, 149 partition_interval_unit=partition_interval_unit, 150 catalog_name=catalog_name, 151 ) 152 ) 153 154 if table_description: 155 properties.append( 156 exp.SchemaCommentProperty( 157 this=exp.Literal.string(self._truncate_table_comment(table_description)) 158 ) 159 ) 160 161 properties.extend(self._table_or_view_properties_to_expressions(table_properties)) 162 163 if properties: 164 return exp.Properties(expressions=properties) 165 return None 166 167 def _build_view_properties_exp( 168 self, 169 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 170 table_description: t.Optional[str] = None, 171 **kwargs: t.Any, 172 ) -> t.Optional[exp.Properties]: 173 """Creates a SQLGlot table properties expression for view""" 174 properties: t.List[exp.Expression] = [] 175 176 if table_description: 177 properties.append( 178 exp.SchemaCommentProperty( 179 this=exp.Literal.string(self._truncate_table_comment(table_description)) 180 ) 181 ) 182 183 properties.extend(self._table_or_view_properties_to_expressions(view_properties)) 184 185 if properties: 186 return exp.Properties(expressions=properties) 187 return None 188 189 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 190 # iceberg and delta do not have a comment length limit 191 if self.current_catalog_type in ("iceberg", "delta_lake"): 192 return comment 193 return super()._truncate_comment(comment, length) 194 195 196class GetCurrentCatalogFromFunctionMixin(EngineAdapter): 197 CURRENT_CATALOG_EXPRESSION: exp.Expression = exp.func("current_catalog") 198 199 def get_current_catalog(self) -> t.Optional[str]: 200 """Returns the catalog name of the current connection.""" 201 result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) 202 if result: 203 return result[0] 204 return None 205 206 207class NonTransactionalTruncateMixin(EngineAdapter): 208 def _truncate_table(self, table_name: TableName) -> None: 209 # Truncate forces a commit of the current transaction so we want to do an unconditional delete to 210 # preserve the transaction if one exists otherwise we can truncate 211 if self._connection_pool.is_transaction_active: 212 return self.execute(exp.Delete(this=exp.to_table(table_name))) 213 super()._truncate_table(table_name) 214 215 216class VarcharSizeWorkaroundMixin(EngineAdapter): 217 def _default_precision_to_max( 218 self, columns_to_types: t.Dict[str, exp.DataType] 219 ) -> t.Dict[str, exp.DataType]: 220 # get default lengths for types that support "max" length 221 types_with_max_default_param = { 222 k: [self.schema_differ.parameterized_type_defaults[k][0][0]] 223 for k in self.schema_differ.max_parameter_length 224 if k in self.schema_differ.parameterized_type_defaults 225 } 226 227 # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT 228 # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their 229 # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length 230 # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation. 231 for col_name, col_type in columns_to_types.items(): 232 if col_type.this in types_with_max_default_param and col_type.expressions: 233 parameter = self.schema_differ.get_type_parameters(col_type) 234 type_default = types_with_max_default_param[col_type.this] 235 if parameter == type_default: 236 col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))]) 237 238 return columns_to_types 239 240 def _build_create_table_exp( 241 self, 242 table_name_or_schema: t.Union[exp.Schema, TableName], 243 expression: t.Optional[exp.Expression], 244 exists: bool = True, 245 replace: bool = False, 246 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 247 table_description: t.Optional[str] = None, 248 table_kind: t.Optional[str] = None, 249 **kwargs: t.Any, 250 ) -> exp.Create: 251 statement = super()._build_create_table_exp( 252 table_name_or_schema, 253 expression=expression, 254 exists=exists, 255 replace=replace, 256 target_columns_to_types=target_columns_to_types, 257 table_description=table_description, 258 table_kind=table_kind, 259 **kwargs, 260 ) 261 262 if ( 263 statement.expression 264 and statement.expression.args.get("limit") is not None 265 and statement.expression.args["limit"].expression.this == "0" 266 ): 267 assert not isinstance(table_name_or_schema, exp.Schema) 268 269 # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit 270 # is applied to a ctas statement, VARCHAR types default to 1 in some instances. 271 select_statement = statement.expression.copy() 272 for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation): 273 limit = select_or_union.args.get("limit") 274 if limit is not None and limit.expression.this == "0": 275 limit.pop() 276 277 select_or_union.set("where", None) 278 279 temp_view_name = self._get_temp_table("ctas") 280 281 self.create_view(temp_view_name, select_statement, replace=False) 282 try: 283 columns_to_types_from_view = self._default_precision_to_max( 284 self.columns(temp_view_name) 285 ) 286 287 schema = self._build_schema_exp( 288 exp.to_table(table_name_or_schema), 289 columns_to_types_from_view, 290 ) 291 statement = super()._build_create_table_exp( 292 schema, 293 None, 294 exists=exists, 295 replace=replace, 296 target_columns_to_types=columns_to_types_from_view, 297 table_description=table_description, 298 **kwargs, 299 ) 300 finally: 301 self.drop_view(temp_view_name) 302 303 return statement 304 305 306@dataclass(frozen=True) 307class TableAlterClusterByOperation(TableAlterOperation, abc.ABC): 308 pass 309 310 311@dataclass(frozen=True) 312class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation): 313 clustering_key: str 314 dialect: str 315 316 @property 317 def is_additive(self) -> bool: 318 return False 319 320 @property 321 def is_destructive(self) -> bool: 322 return False 323 324 @property 325 def _alter_actions(self) -> t.List[exp.Expression]: 326 return [exp.Cluster(expressions=self.cluster_key_expressions)] 327 328 @property 329 def cluster_key_expressions(self) -> t.List[exp.Expression]: 330 # Note: Assumes `clustering_key` as a string like: 331 # - "(col_a)" 332 # - "(col_a, col_b)" 333 # - "func(col_a, transform(col_b))" 334 parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect) 335 return parsed_cluster_key.expressions or [parsed_cluster_key.this] 336 337 338@dataclass(frozen=True) 339class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation): 340 @property 341 def is_additive(self) -> bool: 342 return False 343 344 @property 345 def is_destructive(self) -> bool: 346 return False 347 348 @property 349 def _alter_actions(self) -> t.List[exp.Expression]: 350 return [exp.Command(this="DROP", expression="CLUSTERING KEY")] 351 352 353class ClusteredByMixin(EngineAdapter): 354 def _build_clustered_by_exp( 355 self, 356 clustered_by: t.List[exp.Expression], 357 **kwargs: t.Any, 358 ) -> t.Optional[exp.Cluster]: 359 return exp.Cluster(expressions=[c.copy() for c in clustered_by]) 360 361 def get_alter_operations( 362 self, 363 current_table_name: TableName, 364 target_table_name: TableName, 365 *, 366 ignore_destructive: bool = False, 367 ignore_additive: bool = False, 368 ) -> t.List[TableAlterOperation]: 369 operations = super().get_alter_operations( 370 current_table_name, 371 target_table_name, 372 ignore_destructive=ignore_destructive, 373 ignore_additive=ignore_additive, 374 ) 375 376 # check for a change in clustering 377 current_table = exp.to_table(current_table_name) 378 target_table = exp.to_table(target_table_name) 379 380 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 381 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 382 383 current_table_info = seq_get( 384 self.get_data_objects(current_table_schema, {current_table.name}), 0 385 ) 386 target_table_info = seq_get( 387 self.get_data_objects(target_table_schema, {target_table.name}), 0 388 ) 389 390 if current_table_info and target_table_info: 391 if target_table_info.is_clustered: 392 if target_table_info.clustering_key and ( 393 current_table_info.clustering_key != target_table_info.clustering_key 394 ): 395 operations.append( 396 TableAlterChangeClusterKeyOperation( 397 target_table=current_table, 398 clustering_key=target_table_info.clustering_key, 399 dialect=self.dialect, 400 ) 401 ) 402 elif current_table_info.is_clustered: 403 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 404 405 return operations 406 407 408def logical_merge( 409 engine_adapter: EngineAdapter, 410 target_table: TableName, 411 source_table: QueryOrDF, 412 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 413 unique_key: t.Sequence[exp.Expression], 414 when_matched: t.Optional[exp.Whens] = None, 415 merge_filter: t.Optional[exp.Expression] = None, 416 source_columns: t.Optional[t.List[str]] = None, 417) -> None: 418 """ 419 Merge implementation for engine adapters that do not support merge natively. 420 421 The merge is executed as follows: 422 1. Create a temporary table containing the new data to merge. 423 2. Delete rows from target table where unique_key cols match a row in the temporary table. 424 3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows 425 within the temporary table are ommitted. 426 4. Drop the temporary table. 427 """ 428 if when_matched or merge_filter: 429 prop = "when_matched" if when_matched else "merge_filter" 430 raise SQLMeshError( 431 f"This engine does not support MERGE expressions and therefore `{prop}` is not supported." 432 ) 433 434 engine_adapter._replace_by_key( 435 target_table, 436 source_table, 437 target_columns_to_types, 438 unique_key, 439 is_unique_key=True, 440 source_columns=source_columns, 441 ) 442 443 444class RowDiffMixin(EngineAdapter): 445 # The maximum supported value for n in timestamp(n). 446 # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9) 447 MAX_TIMESTAMP_PRECISION = 6 448 449 def concat_columns( 450 self, 451 columns_to_types: t.Dict[str, exp.DataType], 452 decimal_precision: int = 3, 453 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 454 delimiter: str = ",", 455 ) -> exp.Expression: 456 """ 457 Produce an expression that generates a string version of a record, that is: 458 - Every column converted to a string representation, joined together into a single string using the specified :delimiter 459 """ 460 expressions_to_concat: t.List[exp.Expression] = [] 461 for idx, (column, type) in enumerate(columns_to_types.items()): 462 expressions_to_concat.append( 463 exp.func( 464 "COALESCE", 465 self.normalize_value( 466 exp.to_column(column), type, decimal_precision, timestamp_precision 467 ), 468 exp.Literal.string(""), 469 ) 470 ) 471 if idx < len(columns_to_types) - 1: 472 expressions_to_concat.append(exp.Literal.string(delimiter)) 473 474 return exp.func("CONCAT", *expressions_to_concat) 475 476 def normalize_value( 477 self, 478 expr: exp.Expression, 479 type: exp.DataType, 480 decimal_precision: int = 3, 481 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 482 ) -> exp.Expression: 483 """ 484 Return an expression that converts the values inside the column `col` to a normalized string 485 486 This string should be comparable across database engines, eg: 487 - `date` columns -> YYYY-MM-DD string 488 - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision 489 - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places 490 - `boolean` columns -> '1' or '0' 491 - NULLS -> "" (empty string) 492 """ 493 if type.is_type(exp.DataType.Type.BOOLEAN): 494 value = self._normalize_boolean_value(expr) 495 elif type.is_type(*exp.DataType.INTEGER_TYPES): 496 value = self._normalize_integer_value(expr) 497 elif type.is_type(*exp.DataType.REAL_TYPES): 498 # If there is no scale on the decimal type, treat it like an integer when comparing 499 # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0) 500 # and they should be treated as integers and not decimals 501 type_params = list(type.find_all(exp.DataTypeParam)) 502 if len(type_params) == 2 and type_params[-1].this.to_py() == 0: 503 value = self._normalize_integer_value(expr) 504 else: 505 value = self._normalize_decimal_value(expr, decimal_precision) 506 elif type.is_type(*exp.DataType.TEMPORAL_TYPES): 507 value = self._normalize_timestamp_value(expr, type, timestamp_precision) 508 elif type.is_type(*exp.DataType.NESTED_TYPES): 509 value = self._normalize_nested_value(expr) 510 else: 511 value = expr 512 513 return exp.cast(value, to=exp.DataType.build("VARCHAR")) 514 515 def _normalize_nested_value(self, expr: exp.Expression) -> exp.Expression: 516 return expr 517 518 def _normalize_timestamp_value( 519 self, expr: exp.Expression, type: exp.DataType, precision: int 520 ) -> exp.Expression: 521 if precision > self.MAX_TIMESTAMP_PRECISION: 522 raise ValueError( 523 f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}" 524 ) 525 526 is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32) 527 528 format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT 529 530 if type.is_type( 531 exp.DataType.Type.TIMESTAMPTZ, 532 exp.DataType.Type.TIMESTAMPLTZ, 533 exp.DataType.Type.TIMESTAMPNTZ, 534 ): 535 # Convert all timezone-aware values to UTC for comparison 536 expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC")) 537 538 digits_to_chop_off = ( 539 6 - precision 540 ) # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds 541 542 expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format)) 543 if digits_to_chop_off > 0: 544 expr = exp.func( 545 "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off 546 ) 547 548 return expr 549 550 def _normalize_integer_value(self, expr: exp.Expression) -> exp.Expression: 551 return exp.cast(expr, "BIGINT") 552 553 def _normalize_decimal_value(self, expr: exp.Expression, precision: int) -> exp.Expression: 554 return exp.cast(expr, f"DECIMAL(38,{precision})") 555 556 def _normalize_boolean_value(self, expr: exp.Expression) -> exp.Expression: 557 return exp.cast(expr, "INT") 558 559 560class GrantsFromInfoSchemaMixin(EngineAdapter): 561 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("current_user") 562 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False 563 USE_CATALOG_IN_GRANTS = False 564 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges" 565 566 @staticmethod 567 @abc.abstractmethod 568 def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]: 569 pass 570 571 @abc.abstractmethod 572 def _get_current_schema(self) -> str: 573 pass 574 575 def _dcl_grants_config_expr( 576 self, 577 dcl_cmd: t.Type[DCL], 578 table: exp.Table, 579 grants_config: GrantsConfig, 580 table_type: DataObjectType = DataObjectType.TABLE, 581 ) -> t.List[exp.Expression]: 582 expressions: t.List[exp.Expression] = [] 583 if not grants_config: 584 return expressions 585 586 object_kind = self._grant_object_kind(table_type) 587 for privilege, principals in grants_config.items(): 588 args: t.Dict[str, t.Any] = { 589 "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))], 590 "securable": table.copy(), 591 } 592 if object_kind: 593 args["kind"] = exp.Var(this=object_kind) 594 if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS: 595 args["principals"] = [ 596 normalize_identifiers( 597 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 598 dialect=self.dialect, 599 ) 600 for principal in principals 601 ] 602 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 603 else: 604 for principal in principals: 605 args["principals"] = [ 606 normalize_identifiers( 607 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 608 dialect=self.dialect, 609 ) 610 ] 611 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 612 613 return expressions 614 615 def _apply_grants_config_expr( 616 self, 617 table: exp.Table, 618 grants_config: GrantsConfig, 619 table_type: DataObjectType = DataObjectType.TABLE, 620 ) -> t.List[exp.Expression]: 621 return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type) 622 623 def _revoke_grants_config_expr( 624 self, 625 table: exp.Table, 626 grants_config: GrantsConfig, 627 table_type: DataObjectType = DataObjectType.TABLE, 628 ) -> t.List[exp.Expression]: 629 return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type) 630 631 def _get_grant_expression(self, table: exp.Table) -> exp.Expression: 632 schema_identifier = table.args.get("db") or normalize_identifiers( 633 exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect 634 ) 635 schema_name = schema_identifier.this 636 table_name = table.args.get("this").this # type: ignore 637 638 grant_conditions = [ 639 exp.column("table_schema").eq(exp.Literal.string(schema_name)), 640 exp.column("table_name").eq(exp.Literal.string(table_name)), 641 exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 642 exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 643 ] 644 645 info_schema_table = normalize_identifiers( 646 exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"), 647 dialect=self.dialect, 648 ) 649 if self.USE_CATALOG_IN_GRANTS: 650 catalog_identifier = table.args.get("catalog") 651 if not catalog_identifier: 652 catalog_name = self.get_current_catalog() 653 if not catalog_name: 654 raise SQLMeshError( 655 "Current catalog could not be determined for fetching grants. This is unexpected." 656 ) 657 catalog_identifier = normalize_identifiers( 658 exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect 659 ) 660 catalog_name = catalog_identifier.this 661 info_schema_table.set("catalog", catalog_identifier.copy()) 662 grant_conditions.insert( 663 0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name)) 664 ) 665 666 return ( 667 exp.select("privilege_type", "grantee") 668 .from_(info_schema_table) 669 .where(exp.and_(*grant_conditions)) 670 ) 671 672 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 673 grant_expr = self._get_grant_expression(table) 674 675 results = self.fetchall(grant_expr) 676 677 grants_dict: GrantsConfig = {} 678 for privilege_raw, grantee_raw in results: 679 if privilege_raw is None or grantee_raw is None: 680 continue 681 682 privilege = str(privilege_raw) 683 grantee = str(grantee_raw) 684 if not privilege or not grantee: 685 continue 686 687 grantees = grants_dict.setdefault(privilege, []) 688 if grantee not in grantees: 689 grantees.append(grantee) 690 691 return grants_dict
36class LogicalMergeMixin(EngineAdapter): 37 def merge( 38 self, 39 target_table: TableName, 40 source_table: QueryOrDF, 41 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 42 unique_key: t.Sequence[exp.Expression], 43 when_matched: t.Optional[exp.Whens] = None, 44 merge_filter: t.Optional[exp.Expression] = None, 45 source_columns: t.Optional[t.List[str]] = None, 46 **kwargs: t.Any, 47 ) -> None: 48 logical_merge( 49 self, 50 target_table, 51 source_table, 52 target_columns_to_types, 53 unique_key, 54 when_matched=when_matched, 55 merge_filter=merge_filter, 56 source_columns=source_columns, 57 )
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
37 def merge( 38 self, 39 target_table: TableName, 40 source_table: QueryOrDF, 41 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 42 unique_key: t.Sequence[exp.Expression], 43 when_matched: t.Optional[exp.Whens] = None, 44 merge_filter: t.Optional[exp.Expression] = None, 45 source_columns: t.Optional[t.List[str]] = None, 46 **kwargs: t.Any, 47 ) -> None: 48 logical_merge( 49 self, 50 target_table, 51 source_table, 52 target_columns_to_types, 53 unique_key, 54 when_matched=when_matched, 55 merge_filter=merge_filter, 56 source_columns=source_columns, 57 )
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
60class PandasNativeFetchDFSupportMixin(EngineAdapter): 61 def _fetch_native_df( 62 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 63 ) -> DF: 64 """Fetches a Pandas DataFrame from a SQL query.""" 65 from warnings import catch_warnings, filterwarnings 66 67 from pandas.io.sql import read_sql_query 68 69 sql = ( 70 self._to_sql(query, quote=quote_identifiers) 71 if isinstance(query, exp.Expression) 72 else query 73 ) 74 logger.debug(f"Executing SQL:\n{sql}") 75 with catch_warnings(), self.transaction(): 76 filterwarnings( 77 "ignore", 78 category=UserWarning, 79 message=".*pandas only supports SQLAlchemy connectable.*", 80 ) 81 df = read_sql_query(sql, self._connection_pool.get()) 82 return df
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
85class HiveMetastoreTablePropertiesMixin(EngineAdapter): 86 MAX_TABLE_COMMENT_LENGTH = 4000 87 MAX_COLUMN_COMMENT_LENGTH = 4000 88 89 def _build_partitioned_by_exp( 90 self, 91 partitioned_by: t.List[exp.Expression], 92 *, 93 catalog_name: t.Optional[str] = None, 94 **kwargs: t.Any, 95 ) -> t.Union[exp.PartitionedByProperty, exp.Property]: 96 if ( 97 self.dialect == "trino" 98 and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg" 99 ): 100 # On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by" 101 # In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed 102 # Also, column names and transforms need to be strings and supplied as an ARRAY[varchar] 103 # ref: https://trino.io/docs/current/connector/iceberg.html#table-properties 104 return exp.Property( 105 this=exp.var("PARTITIONING"), 106 value=exp.array( 107 *(exp.Literal.string(e.sql(dialect=self.dialect)) for e in partitioned_by) 108 ), 109 ) 110 for expr in partitioned_by: 111 if not isinstance(expr, exp.Column): 112 raise SQLMeshError( 113 f"PARTITIONED BY contains non-column value '{expr.sql(dialect=self.dialect)}'." 114 ) 115 return exp.PartitionedByProperty( 116 this=exp.Schema(expressions=partitioned_by), 117 ) 118 119 def _build_table_properties_exp( 120 self, 121 catalog_name: t.Optional[str] = None, 122 table_format: t.Optional[str] = None, 123 storage_format: t.Optional[str] = None, 124 partitioned_by: t.Optional[t.List[exp.Expression]] = None, 125 partition_interval_unit: t.Optional[IntervalUnit] = None, 126 clustered_by: t.Optional[t.List[exp.Expression]] = None, 127 table_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 128 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 129 table_description: t.Optional[str] = None, 130 table_kind: t.Optional[str] = None, 131 **kwargs: t.Any, 132 ) -> t.Optional[exp.Properties]: 133 properties: t.List[exp.Expression] = [] 134 135 if table_format and self.dialect == "spark": 136 properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format))) 137 if storage_format: 138 properties.append( 139 exp.Property( 140 this="write.format.default", value=exp.Literal.string(storage_format) 141 ) 142 ) 143 elif storage_format: 144 properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format))) 145 146 if partitioned_by: 147 properties.append( 148 self._build_partitioned_by_exp( 149 partitioned_by, 150 partition_interval_unit=partition_interval_unit, 151 catalog_name=catalog_name, 152 ) 153 ) 154 155 if table_description: 156 properties.append( 157 exp.SchemaCommentProperty( 158 this=exp.Literal.string(self._truncate_table_comment(table_description)) 159 ) 160 ) 161 162 properties.extend(self._table_or_view_properties_to_expressions(table_properties)) 163 164 if properties: 165 return exp.Properties(expressions=properties) 166 return None 167 168 def _build_view_properties_exp( 169 self, 170 view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, 171 table_description: t.Optional[str] = None, 172 **kwargs: t.Any, 173 ) -> t.Optional[exp.Properties]: 174 """Creates a SQLGlot table properties expression for view""" 175 properties: t.List[exp.Expression] = [] 176 177 if table_description: 178 properties.append( 179 exp.SchemaCommentProperty( 180 this=exp.Literal.string(self._truncate_table_comment(table_description)) 181 ) 182 ) 183 184 properties.extend(self._table_or_view_properties_to_expressions(view_properties)) 185 186 if properties: 187 return exp.Properties(expressions=properties) 188 return None 189 190 def _truncate_comment(self, comment: str, length: t.Optional[int]) -> str: 191 # iceberg and delta do not have a comment length limit 192 if self.current_catalog_type in ("iceberg", "delta_lake"): 193 return comment 194 return super()._truncate_comment(comment, length)
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
197class GetCurrentCatalogFromFunctionMixin(EngineAdapter): 198 CURRENT_CATALOG_EXPRESSION: exp.Expression = exp.func("current_catalog") 199 200 def get_current_catalog(self) -> t.Optional[str]: 201 """Returns the catalog name of the current connection.""" 202 result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) 203 if result: 204 return result[0] 205 return None
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
200 def get_current_catalog(self) -> t.Optional[str]: 201 """Returns the catalog name of the current connection.""" 202 result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) 203 if result: 204 return result[0] 205 return None
Returns the catalog name of the current connection.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
208class NonTransactionalTruncateMixin(EngineAdapter): 209 def _truncate_table(self, table_name: TableName) -> None: 210 # Truncate forces a commit of the current transaction so we want to do an unconditional delete to 211 # preserve the transaction if one exists otherwise we can truncate 212 if self._connection_pool.is_transaction_active: 213 return self.execute(exp.Delete(this=exp.to_table(table_name))) 214 super()._truncate_table(table_name)
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
217class VarcharSizeWorkaroundMixin(EngineAdapter): 218 def _default_precision_to_max( 219 self, columns_to_types: t.Dict[str, exp.DataType] 220 ) -> t.Dict[str, exp.DataType]: 221 # get default lengths for types that support "max" length 222 types_with_max_default_param = { 223 k: [self.schema_differ.parameterized_type_defaults[k][0][0]] 224 for k in self.schema_differ.max_parameter_length 225 if k in self.schema_differ.parameterized_type_defaults 226 } 227 228 # Redshift and MSSQL have a bug where CTAS statements have non-deterministic types. If a LIMIT 229 # is applied to a CTAS statement, VARCHAR (and possibly other) types sometimes revert to their 230 # default length of 256 (Redshift) or 1 (MSSQL). If we detect that a type has its default length 231 # and supports "max" length, we convert it to "max" length to prevent inadvertent data truncation. 232 for col_name, col_type in columns_to_types.items(): 233 if col_type.this in types_with_max_default_param and col_type.expressions: 234 parameter = self.schema_differ.get_type_parameters(col_type) 235 type_default = types_with_max_default_param[col_type.this] 236 if parameter == type_default: 237 col_type.set("expressions", [exp.DataTypeParam(this=exp.var("max"))]) 238 239 return columns_to_types 240 241 def _build_create_table_exp( 242 self, 243 table_name_or_schema: t.Union[exp.Schema, TableName], 244 expression: t.Optional[exp.Expression], 245 exists: bool = True, 246 replace: bool = False, 247 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 248 table_description: t.Optional[str] = None, 249 table_kind: t.Optional[str] = None, 250 **kwargs: t.Any, 251 ) -> exp.Create: 252 statement = super()._build_create_table_exp( 253 table_name_or_schema, 254 expression=expression, 255 exists=exists, 256 replace=replace, 257 target_columns_to_types=target_columns_to_types, 258 table_description=table_description, 259 table_kind=table_kind, 260 **kwargs, 261 ) 262 263 if ( 264 statement.expression 265 and statement.expression.args.get("limit") is not None 266 and statement.expression.args["limit"].expression.this == "0" 267 ): 268 assert not isinstance(table_name_or_schema, exp.Schema) 269 270 # redshift and mssql have a bug where CTAS statements have non determistic types. if a limit 271 # is applied to a ctas statement, VARCHAR types default to 1 in some instances. 272 select_statement = statement.expression.copy() 273 for select_or_union in select_statement.find_all(exp.Select, exp.SetOperation): 274 limit = select_or_union.args.get("limit") 275 if limit is not None and limit.expression.this == "0": 276 limit.pop() 277 278 select_or_union.set("where", None) 279 280 temp_view_name = self._get_temp_table("ctas") 281 282 self.create_view(temp_view_name, select_statement, replace=False) 283 try: 284 columns_to_types_from_view = self._default_precision_to_max( 285 self.columns(temp_view_name) 286 ) 287 288 schema = self._build_schema_exp( 289 exp.to_table(table_name_or_schema), 290 columns_to_types_from_view, 291 ) 292 statement = super()._build_create_table_exp( 293 schema, 294 None, 295 exists=exists, 296 replace=replace, 297 target_columns_to_types=columns_to_types_from_view, 298 table_description=table_description, 299 **kwargs, 300 ) 301 finally: 302 self.drop_view(temp_view_name) 303 304 return statement
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
312@dataclass(frozen=True) 313class TableAlterChangeClusterKeyOperation(TableAlterClusterByOperation): 314 clustering_key: str 315 dialect: str 316 317 @property 318 def is_additive(self) -> bool: 319 return False 320 321 @property 322 def is_destructive(self) -> bool: 323 return False 324 325 @property 326 def _alter_actions(self) -> t.List[exp.Expression]: 327 return [exp.Cluster(expressions=self.cluster_key_expressions)] 328 329 @property 330 def cluster_key_expressions(self) -> t.List[exp.Expression]: 331 # Note: Assumes `clustering_key` as a string like: 332 # - "(col_a)" 333 # - "(col_a, col_b)" 334 # - "func(col_a, transform(col_b))" 335 parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect) 336 return parsed_cluster_key.expressions or [parsed_cluster_key.this]
329 @property 330 def cluster_key_expressions(self) -> t.List[exp.Expression]: 331 # Note: Assumes `clustering_key` as a string like: 332 # - "(col_a)" 333 # - "(col_a, col_b)" 334 # - "func(col_a, transform(col_b))" 335 parsed_cluster_key = parse_one(self.clustering_key, dialect=self.dialect) 336 return parsed_cluster_key.expressions or [parsed_cluster_key.this]
Inherited Members
339@dataclass(frozen=True) 340class TableAlterDropClusterKeyOperation(TableAlterClusterByOperation): 341 @property 342 def is_additive(self) -> bool: 343 return False 344 345 @property 346 def is_destructive(self) -> bool: 347 return False 348 349 @property 350 def _alter_actions(self) -> t.List[exp.Expression]: 351 return [exp.Command(this="DROP", expression="CLUSTERING KEY")]
Inherited Members
354class ClusteredByMixin(EngineAdapter): 355 def _build_clustered_by_exp( 356 self, 357 clustered_by: t.List[exp.Expression], 358 **kwargs: t.Any, 359 ) -> t.Optional[exp.Cluster]: 360 return exp.Cluster(expressions=[c.copy() for c in clustered_by]) 361 362 def get_alter_operations( 363 self, 364 current_table_name: TableName, 365 target_table_name: TableName, 366 *, 367 ignore_destructive: bool = False, 368 ignore_additive: bool = False, 369 ) -> t.List[TableAlterOperation]: 370 operations = super().get_alter_operations( 371 current_table_name, 372 target_table_name, 373 ignore_destructive=ignore_destructive, 374 ignore_additive=ignore_additive, 375 ) 376 377 # check for a change in clustering 378 current_table = exp.to_table(current_table_name) 379 target_table = exp.to_table(target_table_name) 380 381 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 382 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 383 384 current_table_info = seq_get( 385 self.get_data_objects(current_table_schema, {current_table.name}), 0 386 ) 387 target_table_info = seq_get( 388 self.get_data_objects(target_table_schema, {target_table.name}), 0 389 ) 390 391 if current_table_info and target_table_info: 392 if target_table_info.is_clustered: 393 if target_table_info.clustering_key and ( 394 current_table_info.clustering_key != target_table_info.clustering_key 395 ): 396 operations.append( 397 TableAlterChangeClusterKeyOperation( 398 target_table=current_table, 399 clustering_key=target_table_info.clustering_key, 400 dialect=self.dialect, 401 ) 402 ) 403 elif current_table_info.is_clustered: 404 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 405 406 return operations
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
362 def get_alter_operations( 363 self, 364 current_table_name: TableName, 365 target_table_name: TableName, 366 *, 367 ignore_destructive: bool = False, 368 ignore_additive: bool = False, 369 ) -> t.List[TableAlterOperation]: 370 operations = super().get_alter_operations( 371 current_table_name, 372 target_table_name, 373 ignore_destructive=ignore_destructive, 374 ignore_additive=ignore_additive, 375 ) 376 377 # check for a change in clustering 378 current_table = exp.to_table(current_table_name) 379 target_table = exp.to_table(target_table_name) 380 381 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 382 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 383 384 current_table_info = seq_get( 385 self.get_data_objects(current_table_schema, {current_table.name}), 0 386 ) 387 target_table_info = seq_get( 388 self.get_data_objects(target_table_schema, {target_table.name}), 0 389 ) 390 391 if current_table_info and target_table_info: 392 if target_table_info.is_clustered: 393 if target_table_info.clustering_key and ( 394 current_table_info.clustering_key != target_table_info.clustering_key 395 ): 396 operations.append( 397 TableAlterChangeClusterKeyOperation( 398 target_table=current_table, 399 clustering_key=target_table_info.clustering_key, 400 dialect=self.dialect, 401 ) 402 ) 403 elif current_table_info.is_clustered: 404 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 405 406 return operations
Determines the alter statements needed to change the current table into the structure of the target table.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
409def logical_merge( 410 engine_adapter: EngineAdapter, 411 target_table: TableName, 412 source_table: QueryOrDF, 413 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 414 unique_key: t.Sequence[exp.Expression], 415 when_matched: t.Optional[exp.Whens] = None, 416 merge_filter: t.Optional[exp.Expression] = None, 417 source_columns: t.Optional[t.List[str]] = None, 418) -> None: 419 """ 420 Merge implementation for engine adapters that do not support merge natively. 421 422 The merge is executed as follows: 423 1. Create a temporary table containing the new data to merge. 424 2. Delete rows from target table where unique_key cols match a row in the temporary table. 425 3. Insert the temporary table contents into the target table. Any duplicate, non-unique rows 426 within the temporary table are ommitted. 427 4. Drop the temporary table. 428 """ 429 if when_matched or merge_filter: 430 prop = "when_matched" if when_matched else "merge_filter" 431 raise SQLMeshError( 432 f"This engine does not support MERGE expressions and therefore `{prop}` is not supported." 433 ) 434 435 engine_adapter._replace_by_key( 436 target_table, 437 source_table, 438 target_columns_to_types, 439 unique_key, 440 is_unique_key=True, 441 source_columns=source_columns, 442 )
Merge implementation for engine adapters that do not support merge natively.
The merge is executed as follows:
- Create a temporary table containing the new data to merge.
- Delete rows from target table where unique_key cols match a row in the temporary table.
- Insert the temporary table contents into the target table. Any duplicate, non-unique rows within the temporary table are ommitted.
- Drop the temporary table.
445class RowDiffMixin(EngineAdapter): 446 # The maximum supported value for n in timestamp(n). 447 # Most databases are microsecond (6) but some can only handle millisecond (3) while others go to nanosecond (9) 448 MAX_TIMESTAMP_PRECISION = 6 449 450 def concat_columns( 451 self, 452 columns_to_types: t.Dict[str, exp.DataType], 453 decimal_precision: int = 3, 454 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 455 delimiter: str = ",", 456 ) -> exp.Expression: 457 """ 458 Produce an expression that generates a string version of a record, that is: 459 - Every column converted to a string representation, joined together into a single string using the specified :delimiter 460 """ 461 expressions_to_concat: t.List[exp.Expression] = [] 462 for idx, (column, type) in enumerate(columns_to_types.items()): 463 expressions_to_concat.append( 464 exp.func( 465 "COALESCE", 466 self.normalize_value( 467 exp.to_column(column), type, decimal_precision, timestamp_precision 468 ), 469 exp.Literal.string(""), 470 ) 471 ) 472 if idx < len(columns_to_types) - 1: 473 expressions_to_concat.append(exp.Literal.string(delimiter)) 474 475 return exp.func("CONCAT", *expressions_to_concat) 476 477 def normalize_value( 478 self, 479 expr: exp.Expression, 480 type: exp.DataType, 481 decimal_precision: int = 3, 482 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 483 ) -> exp.Expression: 484 """ 485 Return an expression that converts the values inside the column `col` to a normalized string 486 487 This string should be comparable across database engines, eg: 488 - `date` columns -> YYYY-MM-DD string 489 - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision 490 - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places 491 - `boolean` columns -> '1' or '0' 492 - NULLS -> "" (empty string) 493 """ 494 if type.is_type(exp.DataType.Type.BOOLEAN): 495 value = self._normalize_boolean_value(expr) 496 elif type.is_type(*exp.DataType.INTEGER_TYPES): 497 value = self._normalize_integer_value(expr) 498 elif type.is_type(*exp.DataType.REAL_TYPES): 499 # If there is no scale on the decimal type, treat it like an integer when comparing 500 # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0) 501 # and they should be treated as integers and not decimals 502 type_params = list(type.find_all(exp.DataTypeParam)) 503 if len(type_params) == 2 and type_params[-1].this.to_py() == 0: 504 value = self._normalize_integer_value(expr) 505 else: 506 value = self._normalize_decimal_value(expr, decimal_precision) 507 elif type.is_type(*exp.DataType.TEMPORAL_TYPES): 508 value = self._normalize_timestamp_value(expr, type, timestamp_precision) 509 elif type.is_type(*exp.DataType.NESTED_TYPES): 510 value = self._normalize_nested_value(expr) 511 else: 512 value = expr 513 514 return exp.cast(value, to=exp.DataType.build("VARCHAR")) 515 516 def _normalize_nested_value(self, expr: exp.Expression) -> exp.Expression: 517 return expr 518 519 def _normalize_timestamp_value( 520 self, expr: exp.Expression, type: exp.DataType, precision: int 521 ) -> exp.Expression: 522 if precision > self.MAX_TIMESTAMP_PRECISION: 523 raise ValueError( 524 f"Requested timestamp precision '{precision}' exceeds maximum supported precision: {self.MAX_TIMESTAMP_PRECISION}" 525 ) 526 527 is_date = type.is_type(exp.DataType.Type.DATE, exp.DataType.Type.DATE32) 528 529 format = NORMALIZED_DATE_FORMAT if is_date else NORMALIZED_TIMESTAMP_FORMAT 530 531 if type.is_type( 532 exp.DataType.Type.TIMESTAMPTZ, 533 exp.DataType.Type.TIMESTAMPLTZ, 534 exp.DataType.Type.TIMESTAMPNTZ, 535 ): 536 # Convert all timezone-aware values to UTC for comparison 537 expr = exp.AtTimeZone(this=expr, zone=exp.Literal.string("UTC")) 538 539 digits_to_chop_off = ( 540 6 - precision 541 ) # 6 = max precision across all adapters and also the max amount of digits TimeToStr will render since its based on `strftime` and `%f` only renders to microseconds 542 543 expr = exp.TimeToStr(this=expr, format=exp.Literal.string(format)) 544 if digits_to_chop_off > 0: 545 expr = exp.func( 546 "SUBSTRING", expr, 1, len("2023-01-01 12:13:14.000000") - digits_to_chop_off 547 ) 548 549 return expr 550 551 def _normalize_integer_value(self, expr: exp.Expression) -> exp.Expression: 552 return exp.cast(expr, "BIGINT") 553 554 def _normalize_decimal_value(self, expr: exp.Expression, precision: int) -> exp.Expression: 555 return exp.cast(expr, f"DECIMAL(38,{precision})") 556 557 def _normalize_boolean_value(self, expr: exp.Expression) -> exp.Expression: 558 return exp.cast(expr, "INT")
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
450 def concat_columns( 451 self, 452 columns_to_types: t.Dict[str, exp.DataType], 453 decimal_precision: int = 3, 454 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 455 delimiter: str = ",", 456 ) -> exp.Expression: 457 """ 458 Produce an expression that generates a string version of a record, that is: 459 - Every column converted to a string representation, joined together into a single string using the specified :delimiter 460 """ 461 expressions_to_concat: t.List[exp.Expression] = [] 462 for idx, (column, type) in enumerate(columns_to_types.items()): 463 expressions_to_concat.append( 464 exp.func( 465 "COALESCE", 466 self.normalize_value( 467 exp.to_column(column), type, decimal_precision, timestamp_precision 468 ), 469 exp.Literal.string(""), 470 ) 471 ) 472 if idx < len(columns_to_types) - 1: 473 expressions_to_concat.append(exp.Literal.string(delimiter)) 474 475 return exp.func("CONCAT", *expressions_to_concat)
Produce an expression that generates a string version of a record, that is: - Every column converted to a string representation, joined together into a single string using the specified :delimiter
477 def normalize_value( 478 self, 479 expr: exp.Expression, 480 type: exp.DataType, 481 decimal_precision: int = 3, 482 timestamp_precision: int = MAX_TIMESTAMP_PRECISION, 483 ) -> exp.Expression: 484 """ 485 Return an expression that converts the values inside the column `col` to a normalized string 486 487 This string should be comparable across database engines, eg: 488 - `date` columns -> YYYY-MM-DD string 489 - `datetime`/`timestamp`/`timestamptz` columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision 490 - `float` / `double` / `decimal` -> Value formatted to :decimal_precision decimal places 491 - `boolean` columns -> '1' or '0' 492 - NULLS -> "" (empty string) 493 """ 494 if type.is_type(exp.DataType.Type.BOOLEAN): 495 value = self._normalize_boolean_value(expr) 496 elif type.is_type(*exp.DataType.INTEGER_TYPES): 497 value = self._normalize_integer_value(expr) 498 elif type.is_type(*exp.DataType.REAL_TYPES): 499 # If there is no scale on the decimal type, treat it like an integer when comparing 500 # Some databases like Snowflake deliberately create all integer types as NUMERIC(<size>, 0) 501 # and they should be treated as integers and not decimals 502 type_params = list(type.find_all(exp.DataTypeParam)) 503 if len(type_params) == 2 and type_params[-1].this.to_py() == 0: 504 value = self._normalize_integer_value(expr) 505 else: 506 value = self._normalize_decimal_value(expr, decimal_precision) 507 elif type.is_type(*exp.DataType.TEMPORAL_TYPES): 508 value = self._normalize_timestamp_value(expr, type, timestamp_precision) 509 elif type.is_type(*exp.DataType.NESTED_TYPES): 510 value = self._normalize_nested_value(expr) 511 else: 512 value = expr 513 514 return exp.cast(value, to=exp.DataType.build("VARCHAR"))
Return an expression that converts the values inside the column col to a normalized string
This string should be comparable across database engines, eg:
- date columns -> YYYY-MM-DD string
- datetime/timestamp/timestamptz columns -> ISO-8601 string to :timestamp_precision digits of subsecond precision
- float / double / decimal -> Value formatted to :decimal_precision decimal places
- boolean columns -> '1' or '0'
- NULLS -> "" (empty string)
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
561class GrantsFromInfoSchemaMixin(EngineAdapter): 562 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("current_user") 563 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False 564 USE_CATALOG_IN_GRANTS = False 565 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges" 566 567 @staticmethod 568 @abc.abstractmethod 569 def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]: 570 pass 571 572 @abc.abstractmethod 573 def _get_current_schema(self) -> str: 574 pass 575 576 def _dcl_grants_config_expr( 577 self, 578 dcl_cmd: t.Type[DCL], 579 table: exp.Table, 580 grants_config: GrantsConfig, 581 table_type: DataObjectType = DataObjectType.TABLE, 582 ) -> t.List[exp.Expression]: 583 expressions: t.List[exp.Expression] = [] 584 if not grants_config: 585 return expressions 586 587 object_kind = self._grant_object_kind(table_type) 588 for privilege, principals in grants_config.items(): 589 args: t.Dict[str, t.Any] = { 590 "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))], 591 "securable": table.copy(), 592 } 593 if object_kind: 594 args["kind"] = exp.Var(this=object_kind) 595 if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS: 596 args["principals"] = [ 597 normalize_identifiers( 598 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 599 dialect=self.dialect, 600 ) 601 for principal in principals 602 ] 603 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 604 else: 605 for principal in principals: 606 args["principals"] = [ 607 normalize_identifiers( 608 parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect), 609 dialect=self.dialect, 610 ) 611 ] 612 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 613 614 return expressions 615 616 def _apply_grants_config_expr( 617 self, 618 table: exp.Table, 619 grants_config: GrantsConfig, 620 table_type: DataObjectType = DataObjectType.TABLE, 621 ) -> t.List[exp.Expression]: 622 return self._dcl_grants_config_expr(exp.Grant, table, grants_config, table_type) 623 624 def _revoke_grants_config_expr( 625 self, 626 table: exp.Table, 627 grants_config: GrantsConfig, 628 table_type: DataObjectType = DataObjectType.TABLE, 629 ) -> t.List[exp.Expression]: 630 return self._dcl_grants_config_expr(exp.Revoke, table, grants_config, table_type) 631 632 def _get_grant_expression(self, table: exp.Table) -> exp.Expression: 633 schema_identifier = table.args.get("db") or normalize_identifiers( 634 exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect 635 ) 636 schema_name = schema_identifier.this 637 table_name = table.args.get("this").this # type: ignore 638 639 grant_conditions = [ 640 exp.column("table_schema").eq(exp.Literal.string(schema_name)), 641 exp.column("table_name").eq(exp.Literal.string(table_name)), 642 exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 643 exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 644 ] 645 646 info_schema_table = normalize_identifiers( 647 exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"), 648 dialect=self.dialect, 649 ) 650 if self.USE_CATALOG_IN_GRANTS: 651 catalog_identifier = table.args.get("catalog") 652 if not catalog_identifier: 653 catalog_name = self.get_current_catalog() 654 if not catalog_name: 655 raise SQLMeshError( 656 "Current catalog could not be determined for fetching grants. This is unexpected." 657 ) 658 catalog_identifier = normalize_identifiers( 659 exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect 660 ) 661 catalog_name = catalog_identifier.this 662 info_schema_table.set("catalog", catalog_identifier.copy()) 663 grant_conditions.insert( 664 0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name)) 665 ) 666 667 return ( 668 exp.select("privilege_type", "grantee") 669 .from_(info_schema_table) 670 .where(exp.and_(*grant_conditions)) 671 ) 672 673 def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: 674 grant_expr = self._get_grant_expression(table) 675 676 results = self.fetchall(grant_expr) 677 678 grants_dict: GrantsConfig = {} 679 for privilege_raw, grantee_raw in results: 680 if privilege_raw is None or grantee_raw is None: 681 continue 682 683 privilege = str(privilege_raw) 684 grantee = str(grantee_raw) 685 if not privilege or not grantee: 686 continue 687 688 grantees = grants_dict.setdefault(privilege, []) 689 if grantee not in grantees: 690 grantees.append(grantee) 691 692 return grants_dict
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts