sqlmesh.core.engine_adapter.bigquery
1from __future__ import annotations 2 3import logging 4import typing as t 5from collections import defaultdict 6 7from sqlglot import exp, parse_one 8from sqlglot.transforms import remove_precision_parameterized_types 9 10from sqlmesh.core.dialect import to_schema 11from sqlmesh.core.engine_adapter.base import _get_data_object_cache_key 12from sqlmesh.core.engine_adapter.mixins import ( 13 ClusteredByMixin, 14 GrantsFromInfoSchemaMixin, 15 RowDiffMixin, 16 TableAlterClusterByOperation, 17) 18from sqlmesh.core.engine_adapter.shared import ( 19 CatalogSupport, 20 DataObject, 21 DataObjectType, 22 SourceQuery, 23 set_catalog, 24 InsertOverwriteStrategy, 25) 26from sqlmesh.core.node import IntervalUnit 27from sqlmesh.core.schema_diff import TableAlterOperation, NestedSupport 28from sqlmesh.utils import optional_import, get_source_columns_to_types 29from sqlmesh.utils.date import to_datetime 30from sqlmesh.utils.errors import SQLMeshError 31from sqlmesh.utils.pandas import columns_to_types_from_dtypes 32 33if t.TYPE_CHECKING: 34 import pandas as pd 35 from google.api_core.retry import Retry 36 from google.cloud import bigquery 37 from google.cloud.bigquery import StandardSqlDataType 38 from google.cloud.bigquery.client import Client as BigQueryClient 39 from google.cloud.bigquery.job import QueryJob 40 from google.cloud.bigquery.job.base import _AsyncJob as BigQueryQueryResult 41 from google.cloud.bigquery.table import Table as BigQueryTable 42 43 from sqlmesh.core._typing import SchemaName, SessionProperties, TableName 44 from sqlmesh.core.engine_adapter._typing import BigframeSession, DCL, DF, GrantsConfig, Query 45 from sqlmesh.core.engine_adapter.base import QueryOrDF 46 47 48logger = logging.getLogger(__name__) 49 50bigframes = optional_import("bigframes") 51bigframes_pd = optional_import("bigframes.pandas") 52 53 54NestedField = t.Tuple[str, str, t.List[str]] 55NestedFieldsDict = t.Dict[str, t.List[NestedField]] 56 57 58@set_catalog() 59class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin): 60 """ 61 BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API. 62 """ 63 64 DIALECT = "bigquery" 65 DEFAULT_BATCH_SIZE = 1000 66 SUPPORTS_TRANSACTIONS = False 67 SUPPORTS_MATERIALIZED_VIEWS = True 68 SUPPORTS_CLONING = True 69 SUPPORTS_GRANTS = True 70 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("session_user") 71 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True 72 USE_CATALOG_IN_GRANTS = True 73 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES" 74 MAX_TABLE_COMMENT_LENGTH = 1024 75 MAX_COLUMN_COMMENT_LENGTH = 1024 76 SUPPORTS_QUERY_EXECUTION_TRACKING = True 77 SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] 78 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE 79 80 SCHEMA_DIFFER_KWARGS = { 81 "compatible_types": { 82 exp.DataType.build("INT64", dialect=DIALECT): { 83 exp.DataType.build("NUMERIC", dialect=DIALECT), 84 exp.DataType.build("FLOAT64", dialect=DIALECT), 85 exp.DataType.build("BIGNUMERIC", dialect=DIALECT), 86 }, 87 exp.DataType.build("NUMERIC", dialect=DIALECT): { 88 exp.DataType.build("FLOAT64", dialect=DIALECT), 89 exp.DataType.build("BIGNUMERIC", dialect=DIALECT), 90 }, 91 exp.DataType.build("DATE", dialect=DIALECT): { 92 exp.DataType.build("DATETIME", dialect=DIALECT), 93 }, 94 }, 95 "coerceable_types": { 96 exp.DataType.build("FLOAT64", dialect=DIALECT): { 97 exp.DataType.build("BIGNUMERIC", dialect=DIALECT), 98 }, 99 }, 100 "support_coercing_compatible_types": True, 101 "parameterized_type_defaults": { 102 exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(38, 9), (0,)], 103 exp.DataType.build("BIGDECIMAL", dialect=DIALECT).this: [(76.76, 38), (0,)], 104 }, 105 "types_with_unlimited_length": { 106 # parameterized `STRING(n)` can ALTER to unparameterized `STRING` 107 exp.DataType.build("STRING", dialect=DIALECT).this: { 108 exp.DataType.build("STRING", dialect=DIALECT).this, 109 }, 110 # parameterized `BYTES(n)` can ALTER to unparameterized `BYTES` 111 exp.DataType.build("BYTES", dialect=DIALECT).this: { 112 exp.DataType.build("BYTES", dialect=DIALECT).this, 113 }, 114 }, 115 "nested_support": NestedSupport.ALL_BUT_DROP, 116 } 117 118 @property 119 def client(self) -> BigQueryClient: 120 return self.connection._client 121 122 @property 123 def bigframe(self) -> t.Optional[BigframeSession]: 124 if bigframes: 125 options = bigframes.BigQueryOptions( 126 credentials=self.client._credentials, 127 project=self.client.project, 128 location=self.client.location, 129 ) 130 return bigframes.connect(context=options) 131 return None 132 133 @property 134 def _job_params(self) -> t.Dict[str, t.Any]: 135 from sqlmesh.core.config.connection import BigQueryPriority 136 137 params = { 138 "use_legacy_sql": False, 139 "priority": self._extra_config.get( 140 "priority", BigQueryPriority.INTERACTIVE.bigquery_constant 141 ), 142 } 143 if self._extra_config.get("maximum_bytes_billed") is not None: 144 params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed") 145 if self._extra_config.get("reservation") is not None: 146 params["reservation"] = self._extra_config.get("reservation") 147 if self.correlation_id: 148 # BigQuery label keys must be lowercase 149 key = self.correlation_id.job_type.value.lower() 150 params["labels"] = {key: self.correlation_id.job_id} 151 return params 152 153 @property 154 def catalog_support(self) -> CatalogSupport: 155 return CatalogSupport.FULL_SUPPORT 156 157 def _df_to_source_queries( 158 self, 159 df: DF, 160 target_columns_to_types: t.Dict[str, exp.DataType], 161 batch_size: int, 162 target_table: TableName, 163 source_columns: t.Optional[t.List[str]] = None, 164 ) -> t.List[SourceQuery]: 165 import pandas as pd 166 167 source_columns_to_types = get_source_columns_to_types( 168 target_columns_to_types, source_columns 169 ) 170 171 temp_bq_table = self.__get_temp_bq_table( 172 self._get_temp_table(target_table or "pandas"), source_columns_to_types 173 ) 174 temp_table = exp.table_( 175 temp_bq_table.table_id, 176 db=temp_bq_table.dataset_id, 177 catalog=temp_bq_table.project, 178 ) 179 180 def query_factory() -> Query: 181 ordered_df = df[list(source_columns_to_types)] 182 if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame): 183 ordered_df.to_gbq( 184 f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}", 185 if_exists="replace", 186 ) 187 elif not self.table_exists(temp_table): 188 # Make mypy happy 189 assert isinstance(ordered_df, pd.DataFrame) 190 self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False) 191 result = self.__load_pandas_to_table( 192 temp_bq_table, ordered_df, source_columns_to_types, replace=False 193 ) 194 if result.errors: 195 raise SQLMeshError(result.errors) 196 return exp.select( 197 *self._casted_columns(target_columns_to_types, source_columns=source_columns) 198 ).from_(temp_table) 199 200 return [ 201 SourceQuery( 202 query_factory=query_factory, 203 cleanup_func=lambda: self.drop_table(temp_table), 204 ) 205 ] 206 207 def close(self) -> t.Any: 208 # Cancel all pending query jobs across all threads 209 all_query_jobs = self._connection_pool.get_all_attributes("query_job") 210 for query_job in all_query_jobs: 211 if query_job: 212 try: 213 if not self._db_call(query_job.done): 214 self._db_call(query_job.cancel) 215 logger.debug( 216 "Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s", 217 query_job.project, 218 query_job.location, 219 query_job.job_id, 220 ) 221 except Exception as ex: 222 logger.debug( 223 "Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s", 224 query_job.project, 225 query_job.location, 226 query_job.job_id, 227 str(ex), 228 ) 229 230 return super().close() 231 232 def _begin_session(self, properties: SessionProperties) -> None: 233 from google.cloud.bigquery import QueryJobConfig 234 235 query_label_property = properties.get("query_label") 236 parsed_query_label: list[tuple[str, str]] = [] 237 if isinstance(query_label_property, (exp.Array, exp.Paren, exp.Tuple)): 238 label_tuples = ( 239 [query_label_property.unnest()] 240 if isinstance(query_label_property, exp.Paren) 241 else query_label_property.expressions 242 ) 243 244 # query_label is a Paren, Array or Tuple of 2-tuples and validated at load time 245 parsed_query_label.extend( 246 (label_tuple.expressions[0].name, label_tuple.expressions[1].name) 247 for label_tuple in label_tuples 248 ) 249 elif query_label_property is not None: 250 raise SQLMeshError( 251 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 252 ) 253 254 if self.correlation_id: 255 parsed_query_label.append( 256 (self.correlation_id.job_type.value.lower(), self.correlation_id.job_id) 257 ) 258 259 if parsed_query_label: 260 query_label_str = ",".join([":".join(label) for label in parsed_query_label]) 261 query = f'SET @@query_label = "{query_label_str}";SELECT 1;' 262 else: 263 query = "SELECT 1;" 264 265 job = self.client.query( 266 query, 267 job_config=QueryJobConfig(create_session=True), 268 ) 269 session_info = job.session_info 270 session_id = session_info.session_id if session_info else None 271 self._session_id = session_id 272 job.result() 273 274 def _end_session(self) -> None: 275 self._session_id = None 276 277 def _is_session_active(self) -> bool: 278 return self._session_id is not None 279 280 def get_current_catalog(self) -> t.Optional[str]: 281 """Returns the catalog name of the current connection.""" 282 return self.client.project 283 284 def set_current_catalog(self, catalog: str) -> None: 285 """Sets the catalog name of the current connection.""" 286 self.client.project = catalog 287 288 def create_schema( 289 self, 290 schema_name: SchemaName, 291 ignore_if_exists: bool = True, 292 warn_on_error: bool = True, 293 properties: t.List[exp.Expr] = [], 294 ) -> None: 295 """Create a schema from a name or qualified table name.""" 296 from google.api_core.exceptions import Conflict 297 298 try: 299 super().create_schema( 300 schema_name, 301 ignore_if_exists=ignore_if_exists, 302 warn_on_error=False, 303 ) 304 except Exception as e: 305 is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e) 306 if is_already_exists_error and ignore_if_exists: 307 return 308 if not warn_on_error: 309 raise 310 logger.warning("Failed to create schema '%s': %s", schema_name, e) 311 312 def get_bq_schema(self, table_name: TableName) -> t.List[bigquery.SchemaField]: 313 table = exp.to_table(table_name) 314 if len(table.parts) == 3 and "." in table.name: 315 self.execute(exp.select("*").from_(table).limit(0)) 316 query_job = self._query_job 317 assert query_job is not None 318 return query_job._query_results.schema 319 return self._get_table(table).schema 320 321 def columns( 322 self, table_name: TableName, include_pseudo_columns: bool = False 323 ) -> t.Dict[str, exp.DataType]: 324 """Fetches column names and types for the target table.""" 325 326 def dtype_to_sql( 327 dtype: t.Optional[StandardSqlDataType], field: bigquery.SchemaField 328 ) -> str: 329 assert dtype 330 assert field 331 332 kind = dtype.type_kind 333 assert kind 334 335 # Not using the enum value to preserve compatibility with older versions 336 # of the BigQuery library. 337 if kind.name == "ARRAY": 338 return f"ARRAY<{dtype_to_sql(dtype.array_element_type, field)}>" 339 if kind.name == "STRUCT": 340 struct_type = dtype.struct_type 341 assert struct_type 342 fields = ", ".join( 343 f"{struct_field.name} {dtype_to_sql(struct_field.type, nested_field)}" 344 for struct_field, nested_field in zip(struct_type.fields, field.fields) 345 ) 346 return f"STRUCT<{fields}>" 347 if kind.name == "TYPE_KIND_UNSPECIFIED": 348 field_type = field.field_type 349 350 if field_type == "RANGE": 351 # If the field is a RANGE then `range_element_type` should be set to 352 # one of `"DATE"`, `"DATETIME"` or `"TIMESTAMP"`. 353 return f"RANGE<{field.range_element_type.element_type}>" 354 355 return field_type 356 357 return kind.name 358 359 def create_mapping_schema( 360 schema: t.Sequence[bigquery.SchemaField], 361 ) -> t.Dict[str, exp.DataType]: 362 return { 363 field.name: exp.DataType.build( 364 dtype_to_sql(field.to_standard_sql().type, field), dialect=self.dialect 365 ) 366 for field in schema 367 } 368 369 table = exp.to_table(table_name) 370 if len(table.parts) == 3 and "." in table.name: 371 # The client's `get_table` method can't handle paths with >3 identifiers 372 self.execute(exp.select("*").from_(table).limit(0)) 373 query_job = self._query_job 374 assert query_job is not None 375 376 query_results = query_job._query_results 377 columns = create_mapping_schema(query_results.schema) 378 else: 379 bq_table = self._get_table(table) 380 columns = create_mapping_schema(bq_table.schema) 381 382 if include_pseudo_columns: 383 if bq_table.time_partitioning and not bq_table.time_partitioning.field: 384 columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP", dialect="bigquery") 385 if bq_table.time_partitioning.type_ == "DAY": 386 columns["_PARTITIONDATE"] = exp.DataType.build("DATE") 387 if bq_table.table_id.endswith("*"): 388 columns["_TABLE_SUFFIX"] = exp.DataType.build("STRING", dialect="bigquery") 389 if ( 390 bq_table.external_data_configuration is not None 391 and bq_table.external_data_configuration.source_format 392 in ( 393 "CSV", 394 "NEWLINE_DELIMITED_JSON", 395 "AVRO", 396 "PARQUET", 397 "ORC", 398 "DATASTORE_BACKUP", 399 ) 400 ): 401 columns["_FILE_NAME"] = exp.DataType.build("STRING", dialect="bigquery") 402 403 return columns 404 405 def alter_table( 406 self, 407 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 408 ) -> None: 409 """ 410 Performs the alter statements to change the current table into the structure of the target table, 411 and uses the API to add columns to structs, where SQL is not supported. 412 """ 413 if not alter_expressions: 414 return 415 416 cluster_by_operations, alter_statements = [], [] 417 for e in alter_expressions: 418 if isinstance(e, TableAlterClusterByOperation): 419 cluster_by_operations.append(e) 420 elif isinstance(e, TableAlterOperation): 421 alter_statements.append(e.expression) 422 else: 423 alter_statements.append(e) 424 425 for op in cluster_by_operations: 426 self._update_clustering_key(op) 427 428 nested_fields, non_nested_expressions = self._split_alter_expressions(alter_statements) 429 430 if nested_fields: 431 self._update_table_schema_nested_fields(nested_fields, alter_statements[0].this) 432 433 if non_nested_expressions: 434 super().alter_table(non_nested_expressions) 435 436 def fetchone( 437 self, 438 query: t.Union[exp.Expr, str], 439 ignore_unsupported_errors: bool = False, 440 quote_identifiers: bool = False, 441 ) -> t.Optional[t.Tuple]: 442 """ 443 BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute 444 configuration we have in place. Therefore this implementation calls execute instead. 445 """ 446 self.execute( 447 query, 448 ignore_unsupported_errors=ignore_unsupported_errors, 449 quote_identifiers=quote_identifiers, 450 ) 451 try: 452 return next(self._query_data) 453 except StopIteration: 454 return None 455 456 def fetchall( 457 self, 458 query: t.Union[exp.Expr, str], 459 ignore_unsupported_errors: bool = False, 460 quote_identifiers: bool = False, 461 ) -> t.List[t.Tuple]: 462 """ 463 BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute 464 configuration we have in place. Therefore this implementation calls execute instead. 465 """ 466 self.execute( 467 query, 468 ignore_unsupported_errors=ignore_unsupported_errors, 469 quote_identifiers=quote_identifiers, 470 ) 471 return list(self._query_data) 472 473 def _split_alter_expressions( 474 self, 475 alter_expressions: t.List[exp.Alter], 476 ) -> t.Tuple[NestedFieldsDict, t.List[exp.Alter]]: 477 """ 478 Returns a dictionary of the nested fields to add and a list of the non-nested alter expressions. 479 """ 480 nested_fields_to_add: NestedFieldsDict = defaultdict(list) 481 non_nested_expressions = [] 482 483 for alter_expression in alter_expressions: 484 action = alter_expression.args["actions"][0] 485 if ( 486 isinstance(action, exp.ColumnDef) 487 and isinstance(action.this, exp.Dot) 488 and isinstance(action.kind, exp.DataType) 489 ): 490 root_field, *leaf_fields = action.this.this.sql(dialect=self.dialect).split(".") 491 new_field = action.this.expression.sql(dialect=self.dialect) 492 data_type = action.kind.sql(dialect=self.dialect) 493 nested_fields_to_add[root_field].append((new_field, data_type, leaf_fields)) 494 else: 495 non_nested_expressions.append(alter_expression) 496 497 return nested_fields_to_add, non_nested_expressions 498 499 def _build_nested_fields( 500 self, 501 current_fields: t.List[bigquery.SchemaField], 502 fields_to_add: t.List[NestedField], 503 ) -> t.List[bigquery.SchemaField]: 504 """ 505 Recursively builds and updates the schema fields with the new nested fields. 506 """ 507 from google.cloud import bigquery 508 509 new_fields = [] 510 root: t.List[t.Tuple[str, str]] = [] 511 leaves: NestedFieldsDict = defaultdict(list) 512 for new_field, data_type, leaf_fields in fields_to_add: 513 if leaf_fields: 514 leaves[leaf_fields[0]].append((new_field, data_type, leaf_fields[1:])) 515 else: 516 root.append((new_field, data_type)) 517 518 for field in current_fields: 519 # If the new fields are nested, we need to recursively build them 520 if field.name in leaves: 521 subfields = list(field.fields) 522 subfields = self._build_nested_fields(subfields, leaves[field.name]) 523 new_fields.append( 524 bigquery.SchemaField( 525 field.name, "RECORD", mode=field.mode, fields=tuple(subfields) 526 ) 527 ) 528 else: 529 new_fields.append(field) 530 531 # Build and append the new root-level fields 532 new_fields.extend( 533 self.__get_bq_schemafield( 534 new_field[0], exp.DataType.build(new_field[1], dialect=self.dialect) 535 ) 536 for new_field in root 537 ) 538 return new_fields 539 540 def _update_table_schema_nested_fields( 541 self, nested_fields_to_add: NestedFieldsDict, table_name: str 542 ) -> None: 543 """ 544 Updates a BigQuery table schema by adding the new nested fields provided. 545 """ 546 from google.cloud import bigquery 547 548 table = self._get_table(table_name) 549 original_schema = table.schema 550 new_schema = [] 551 for field in original_schema: 552 if field.name in nested_fields_to_add: 553 fields = self._build_nested_fields( 554 list(field.fields), nested_fields_to_add[field.name] 555 ) 556 new_schema.append( 557 bigquery.SchemaField( 558 field.name, 559 "RECORD", 560 mode=field.mode, 561 fields=tuple(fields), 562 ) 563 ) 564 else: 565 new_schema.append(field) 566 567 if new_schema != original_schema: 568 table.schema = new_schema 569 self.client.update_table(table, ["schema"]) 570 571 def __load_pandas_to_table( 572 self, 573 table: bigquery.Table, 574 df: pd.DataFrame, 575 columns_to_types: t.Dict[str, exp.DataType], 576 replace: bool = False, 577 ) -> BigQueryQueryResult: 578 """ 579 Loads a pandas dataframe into a table in BigQuery. Will do an overwrite if replace is True. Note that 580 the replace will replace the entire table, not just the rows that are in the dataframe. 581 """ 582 from google.cloud import bigquery 583 584 job_config = bigquery.job.LoadJobConfig(schema=self.__get_bq_schema(columns_to_types)) 585 if replace: 586 job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE 587 logger.info(f"Loading dataframe to BigQuery. Table Path: {table.path}") 588 # This client call does not support retry so we don't use the `_db_call` method. 589 result = self.__retry( 590 self.__db_load_table_from_dataframe, 591 )(df=df, table=table, job_config=job_config) 592 if result.errors: 593 raise SQLMeshError(result.errors) 594 return result 595 596 def __db_load_table_from_dataframe( 597 self, df: pd.DataFrame, table: bigquery.Table, job_config: bigquery.LoadJobConfig 598 ) -> BigQueryQueryResult: 599 job = self.client.load_table_from_dataframe( 600 dataframe=df, destination=table, job_config=job_config 601 ) 602 return self._db_call(job.result) 603 604 def __get_bq_schemafield(self, name: str, tpe: exp.DataType) -> bigquery.SchemaField: 605 from google.cloud import bigquery 606 607 mode = "NULLABLE" 608 if tpe.is_type(exp.DataType.Type.ARRAY): 609 mode = "REPEATED" 610 tpe = tpe.expressions[0] 611 612 field_type = tpe.sql(dialect=self.dialect) 613 fields = [] 614 if tpe.is_type(*exp.DataType.NESTED_TYPES): 615 field_type = "RECORD" 616 for inner_field in tpe.expressions: 617 if isinstance(inner_field, exp.ColumnDef): 618 inner_name = inner_field.this.sql(dialect=self.dialect) 619 inner_type = inner_field.kind 620 if inner_type is None: 621 raise ValueError( 622 f"cannot convert unknown type to BQ schema field {inner_field}" 623 ) 624 fields.append(self.__get_bq_schemafield(name=inner_name, tpe=inner_type)) 625 else: 626 raise ValueError(f"unexpected nested expression {inner_field}") 627 628 return bigquery.SchemaField( 629 name=name, 630 field_type=field_type, 631 mode=mode, 632 fields=fields, 633 ) 634 635 def __get_bq_schema( 636 self, columns_to_types: t.Dict[str, exp.DataType] 637 ) -> t.List[bigquery.SchemaField]: 638 """ 639 Returns a bigquery schema object from a dictionary of column names to types. 640 """ 641 642 precisionless_col_to_types = { 643 col_name: remove_precision_parameterized_types(col_type) 644 for col_name, col_type in columns_to_types.items() 645 } 646 return [ 647 self.__get_bq_schemafield(name=col_name, tpe=t.cast(exp.DataType, col_type)) 648 for col_name, col_type in precisionless_col_to_types.items() 649 ] 650 651 def __get_temp_bq_table( 652 self, table: exp.Table, columns_to_type: t.Dict[str, exp.DataType] 653 ) -> bigquery.Table: 654 """ 655 Returns a bigquery table object that is temporary and will expire in 3 hours. 656 """ 657 bq_table = self.__get_bq_table(table, columns_to_type) 658 bq_table.expires = to_datetime("in 3 hours") 659 return bq_table 660 661 def __get_bq_table( 662 self, table: TableName, columns_to_type: t.Dict[str, exp.DataType] 663 ) -> bigquery.Table: 664 """ 665 Returns a bigquery table object with a schema defines that matches the columns_to_type dictionary. 666 """ 667 from google.cloud import bigquery 668 669 table_ = exp.to_table(table).copy() 670 671 if not table_.catalog: 672 table_.set("catalog", exp.to_identifier(self.default_catalog)) 673 674 return bigquery.Table( 675 table_ref=self._table_name(table_), 676 schema=self.__get_bq_schema(columns_to_type), 677 ) 678 679 @property 680 def __retry(self) -> Retry: 681 from google.api_core import retry 682 683 return retry.Retry( 684 predicate=_ErrorCounter(self._extra_config["job_retries"]).should_retry, 685 deadline=self._extra_config.get("job_retry_deadline_seconds"), 686 initial=1.0, 687 maximum=3.0, 688 ) 689 690 def insert_overwrite_by_partition( 691 self, 692 table_name: TableName, 693 query_or_df: QueryOrDF, 694 partitioned_by: t.List[exp.Expr], 695 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 696 source_columns: t.Optional[t.List[str]] = None, 697 ) -> None: 698 if len(partitioned_by) != 1: 699 raise SQLMeshError( 700 f"Bigquery only supports partitioning by one column, {len(partitioned_by)} were provided." 701 ) 702 703 partition_exp = partitioned_by[0] 704 partition_column = partition_exp.find(exp.Column) 705 706 granularity = partition_exp.args.get("unit") 707 if granularity: 708 granularity = granularity.name.lower() 709 710 if not partition_column: 711 partition_sql = partition_exp.sql(dialect=self.dialect) 712 raise SQLMeshError( 713 f"The partition expression '{partition_sql}' doesn't contain a column." 714 ) 715 with ( 716 self.session({}), 717 self.temp_table( 718 query_or_df, 719 name=table_name, 720 partitioned_by=partitioned_by, 721 source_columns=source_columns, 722 ) as temp_table_name, 723 ): 724 if target_columns_to_types is None or target_columns_to_types[ 725 partition_column.name 726 ] == exp.DataType.build("unknown"): 727 target_columns_to_types = self.columns(table_name) 728 729 partition_type_sql = target_columns_to_types[partition_column.name].sql( 730 dialect=self.dialect 731 ) 732 733 select_array_agg_partitions = select_partitions_expr( 734 temp_table_name.db, 735 temp_table_name.name, 736 partition_type_sql, 737 granularity=granularity, 738 agg_func="ARRAY_AGG", 739 catalog=temp_table_name.catalog or self.default_catalog, 740 ) 741 742 self.execute( 743 f"DECLARE _sqlmesh_target_partitions_ ARRAY<{partition_type_sql}> DEFAULT ({select_array_agg_partitions});" 744 ) 745 746 where = t.cast(exp.Condition, partition_exp).isin(unnest="_sqlmesh_target_partitions_") 747 748 self._insert_overwrite_by_condition( 749 table_name, 750 [SourceQuery(query_factory=lambda: exp.select("*").from_(temp_table_name))], 751 target_columns_to_types, 752 where=where, 753 ) 754 755 def table_exists(self, table_name: TableName) -> bool: 756 table = exp.to_table(table_name) 757 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 758 if data_object_cache_key in self._data_object_cache: 759 logger.debug("Table existence cache hit: %s", data_object_cache_key) 760 return self._data_object_cache[data_object_cache_key] is not None 761 762 try: 763 from google.cloud.exceptions import NotFound 764 except ModuleNotFoundError: 765 from google.api_core.exceptions import NotFound 766 767 try: 768 self._get_table(table_name) 769 return True 770 except NotFound: 771 return False 772 773 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 774 from sqlmesh.utils.date import to_timestamp 775 776 datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list) 777 for table_name in table_names: 778 table = exp.to_table(table_name) 779 datasets_to_tables[table.db].append(table.name) 780 781 results = [] 782 783 for dataset, tables in datasets_to_tables.items(): 784 query = ( 785 f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE " 786 ) 787 for i, table_name in enumerate(tables): 788 query += f"TABLE_ID = '{table_name}'" 789 if i < len(tables) - 1: 790 query += " OR " 791 results.extend(self.fetchall(query)) 792 793 return [to_timestamp(row[0]) for row in results] 794 795 def _get_table(self, table_name: TableName) -> BigQueryTable: 796 """ 797 Returns a BigQueryTable object for the given table name. 798 799 Raises: `google.cloud.exceptions.NotFound` if the table does not exist. 800 """ 801 return self._db_call(self.client.get_table, table=self._table_name(table_name)) 802 803 def _table_name(self, table_name: TableName) -> str: 804 # the api doesn't support backticks, so we can't call exp.table_name or sql 805 return ".".join(part.name for part in exp.to_table(table_name).parts) 806 807 def _fetch_native_df( 808 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 809 ) -> DF: 810 self.execute(query, quote_identifiers=quote_identifiers) 811 query_job = self._query_job 812 assert query_job is not None 813 return query_job.to_dataframe() 814 815 def _create_column_comments( 816 self, 817 table_name: TableName, 818 column_comments: t.Dict[str, str], 819 table_kind: str = "TABLE", 820 materialized_view: bool = False, 821 ) -> None: 822 if not (table_kind == "VIEW" and materialized_view): 823 table = self._get_table(table_name) 824 825 # convert Table object to dict 826 table_def = table.to_api_repr() 827 828 # Set column descriptions, supporting nested fields (e.g. record.field.nested_field) 829 for column, comment in column_comments.items(): 830 fields = table_def["schema"]["fields"] 831 field_names = column.split(".") 832 last_index = len(field_names) - 1 833 834 # Traverse the fields with nested fields down to leaf level 835 for idx, name in enumerate(field_names): 836 if field := next((field for field in fields if field["name"] == name), None): 837 if idx == last_index: 838 field["description"] = self._truncate_comment( 839 comment, self.MAX_COLUMN_COMMENT_LENGTH 840 ) 841 else: 842 fields = field.get("fields") or [] 843 844 # An "etag" is BQ versioning metadata that changes when an object is updated/modified. `update_table` 845 # compares the etags of the table object passed to it and the remote table, erroring if the etags 846 # don't match. We set the local etag to None to avoid this check. 847 table_def["etag"] = None 848 849 # convert dict back to a Table object 850 table = table.from_api_repr(table_def) 851 852 # update table schema 853 logger.info(f"Registering column comments for table {table_name}") 854 self._db_call(self.client.update_table, table=table, fields=["schema"]) 855 856 def _build_description_property_exp( 857 self, 858 description: str, 859 trunc_method: t.Callable, 860 ) -> exp.Property: 861 return exp.Property( 862 this=exp.to_identifier("description", quoted=True), 863 value=exp.Literal.string(trunc_method(description)), 864 ) 865 866 def _build_partitioned_by_exp( 867 self, 868 partitioned_by: t.List[exp.Expr], 869 *, 870 partition_interval_unit: t.Optional[IntervalUnit] = None, 871 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 872 **kwargs: t.Any, 873 ) -> t.Optional[exp.PartitionedByProperty]: 874 if len(partitioned_by) > 1: 875 raise SQLMeshError("BigQuery only supports partitioning by a single column") 876 877 this = partitioned_by[0] 878 if ( 879 isinstance(this, exp.Column) 880 and partition_interval_unit is not None 881 and not partition_interval_unit.is_minute 882 ): 883 column_type: t.Optional[exp.DataType] = (target_columns_to_types or {}).get(this.name) 884 885 if column_type == exp.DataType.build( 886 "date", dialect=self.dialect 887 ) and partition_interval_unit in ( 888 IntervalUnit.MONTH, 889 IntervalUnit.YEAR, 890 ): 891 trunc_func = "DATE_TRUNC" 892 elif column_type == exp.DataType.build("timestamp", dialect=self.dialect): 893 trunc_func = "TIMESTAMP_TRUNC" 894 elif column_type == exp.DataType.build("datetime", dialect=self.dialect): 895 trunc_func = "DATETIME_TRUNC" 896 else: 897 trunc_func = "" 898 899 if trunc_func: 900 this = exp.func( 901 trunc_func, 902 this, 903 exp.var(partition_interval_unit.value.upper()), 904 dialect=self.dialect, 905 ) 906 907 return exp.PartitionedByProperty(this=this) 908 909 def _build_table_properties_exp( 910 self, 911 catalog_name: t.Optional[str] = None, 912 table_format: t.Optional[str] = None, 913 storage_format: t.Optional[str] = None, 914 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 915 partition_interval_unit: t.Optional[IntervalUnit] = None, 916 clustered_by: t.Optional[t.List[exp.Expr]] = None, 917 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 918 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 919 table_description: t.Optional[str] = None, 920 table_kind: t.Optional[str] = None, 921 **kwargs: t.Any, 922 ) -> t.Optional[exp.Properties]: 923 properties: t.List[exp.Expr] = [] 924 925 if partitioned_by and ( 926 partitioned_by_prop := self._build_partitioned_by_exp( 927 partitioned_by, 928 partition_interval_unit=partition_interval_unit, 929 target_columns_to_types=target_columns_to_types, 930 ) 931 ): 932 properties.append(partitioned_by_prop) 933 934 if clustered_by and (clustered_by_exp := self._build_clustered_by_exp(clustered_by)): 935 properties.append(clustered_by_exp) 936 937 if table_description: 938 properties.append( 939 self._build_description_property_exp( 940 table_description, self._truncate_table_comment 941 ), 942 ) 943 944 properties.extend(self._table_or_view_properties_to_expressions(table_properties)) 945 946 if properties: 947 return exp.Properties(expressions=properties) 948 return None 949 950 def _build_column_def( 951 self, 952 col_name: str, 953 column_descriptions: t.Optional[t.Dict[str, str]] = None, 954 engine_supports_schema_comments: bool = False, 955 col_type: t.Optional[exp.DATA_TYPE] = None, 956 nested_names: t.List[str] = [], 957 ) -> exp.ColumnDef: 958 # Helper function to build column definitions with column descriptions 959 def _build_struct_with_descriptions( 960 col_type: exp.DataType, 961 nested_names: t.List[str], 962 ) -> exp.DataType: 963 column_expressions = [] 964 for column_def in col_type.expressions: 965 # This is expected to be true, but this check is included as a 966 # precautionary measure in case of an unexpected edge case 967 if isinstance(column_def, exp.ColumnDef): 968 column = self._build_column_def( 969 col_name=column_def.name, 970 column_descriptions=column_descriptions, 971 engine_supports_schema_comments=engine_supports_schema_comments, 972 col_type=column_def.kind, 973 nested_names=nested_names, 974 ) 975 else: 976 column = column_def 977 column_expressions.append(column) 978 return exp.DataType(this=col_type.this, expressions=column_expressions, nested=True) 979 980 # Recursively build column definitions for BigQuery's RECORDs (struct) and REPEATED RECORDs (array of struct) 981 if isinstance(col_type, exp.DataType) and col_type.expressions: 982 expressions = col_type.expressions 983 if col_type.is_type(exp.DataType.Type.STRUCT): 984 col_type = _build_struct_with_descriptions(col_type, nested_names + [col_name]) 985 elif col_type.is_type(exp.DataType.Type.ARRAY) and expressions[0].is_type( 986 exp.DataType.Type.STRUCT 987 ): 988 col_type = exp.DataType( 989 this=exp.DataType.Type.ARRAY, 990 expressions=[ 991 _build_struct_with_descriptions( 992 col_type.expressions[0], nested_names + [col_name] 993 ) 994 ], 995 nested=True, 996 ) 997 998 return exp.ColumnDef( 999 this=exp.to_identifier(col_name), 1000 kind=col_type, 1001 constraints=( 1002 self._build_col_comment_exp( 1003 ".".join(nested_names + [col_name]), column_descriptions 1004 ) 1005 if engine_supports_schema_comments and self.comments_enabled and column_descriptions 1006 else None 1007 ), 1008 ) 1009 1010 def _build_col_comment_exp( 1011 self, col_name: str, column_descriptions: t.Dict[str, str] 1012 ) -> t.List[exp.ColumnConstraint]: 1013 comment = column_descriptions.get(col_name, None) 1014 if comment: 1015 return [ 1016 exp.ColumnConstraint( 1017 kind=exp.Properties( 1018 expressions=[ 1019 self._build_description_property_exp( 1020 comment, self._truncate_column_comment 1021 ), 1022 ] 1023 ) 1024 ) 1025 ] 1026 return [] 1027 1028 def _build_view_properties_exp( 1029 self, 1030 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 1031 table_description: t.Optional[str] = None, 1032 **kwargs: t.Any, 1033 ) -> t.Optional[exp.Properties]: 1034 """Creates a SQLGlot table properties expression for view""" 1035 properties: t.List[exp.Expr] = [] 1036 1037 if table_description: 1038 properties.append( 1039 self._build_description_property_exp( 1040 table_description, self._truncate_table_comment 1041 ), 1042 ) 1043 1044 properties.extend(self._table_or_view_properties_to_expressions(view_properties)) 1045 1046 if properties: 1047 return exp.Properties(expressions=properties) 1048 return None 1049 1050 def _build_create_comment_table_exp( 1051 self, table: exp.Table, table_comment: str, table_kind: str 1052 ) -> exp.Comment | str: 1053 table_sql = table.sql(dialect=self.dialect, identify=True) 1054 1055 truncated_comment = self._truncate_table_comment(table_comment) 1056 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 1057 1058 return f"ALTER {table_kind} {table_sql} SET OPTIONS(description = {comment_sql})" 1059 1060 def _build_create_comment_column_exp( 1061 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 1062 ) -> exp.Comment | str: 1063 table_sql = table.sql(dialect=self.dialect, identify=True) 1064 column_sql = exp.column(column_name).sql(dialect=self.dialect, identify=True) 1065 1066 truncated_comment = self._truncate_column_comment(column_comment) 1067 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 1068 1069 return f"ALTER {table_kind} {table_sql} ALTER COLUMN {column_sql} SET OPTIONS(description = {comment_sql})" 1070 1071 def create_state_table( 1072 self, 1073 table_name: str, 1074 target_columns_to_types: t.Dict[str, exp.DataType], 1075 primary_key: t.Optional[t.Tuple[str, ...]] = None, 1076 ) -> None: 1077 self.create_table( 1078 table_name, 1079 target_columns_to_types, 1080 ) 1081 1082 def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any) -> t.Any: 1083 return func( 1084 retry=self.__retry, 1085 *args, 1086 **kwargs, 1087 ) 1088 1089 def _execute( 1090 self, 1091 sql: str, 1092 track_rows_processed: bool = False, 1093 **kwargs: t.Any, 1094 ) -> None: 1095 """Execute a sql query.""" 1096 from google.cloud.bigquery import QueryJobConfig 1097 from google.cloud.bigquery.query import ConnectionProperty 1098 1099 # BigQuery's Python DB API implementation does not support retries, so we have to implement them ourselves. 1100 # So we update the cursor's query job and query data with the results of the new query job. This makes sure 1101 # that other cursor based operations execute correctly. 1102 session_id = self._session_id 1103 connection_properties = ( 1104 [ 1105 ConnectionProperty(key="session_id", value=session_id), 1106 ] 1107 if session_id 1108 else [] 1109 ) 1110 1111 # Create job config 1112 job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties) 1113 1114 self._query_job = self._db_call( 1115 self.client.query, 1116 query=sql, 1117 job_config=job_config, 1118 timeout=self._extra_config.get("job_creation_timeout_seconds"), 1119 ) 1120 query_job = self._query_job 1121 assert query_job is not None 1122 1123 logger.debug( 1124 "BigQuery job created: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s", 1125 query_job.project, 1126 query_job.location, 1127 query_job.job_id, 1128 ) 1129 1130 results = self._db_call( 1131 query_job.result, 1132 timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore 1133 ) 1134 1135 self._query_data = iter(results) if results.total_rows else iter([]) 1136 query_results = query_job._query_results 1137 self.cursor._set_rowcount(query_results) 1138 self.cursor._set_description(query_results.schema) 1139 1140 if ( 1141 track_rows_processed 1142 and self._query_execution_tracker 1143 and self._query_execution_tracker.is_tracking() 1144 ): 1145 num_rows = None 1146 if query_job.statement_type == "CREATE_TABLE_AS_SELECT": 1147 # since table was just created, number rows in table == number rows processed 1148 query_table = self.client.get_table(query_job.destination) 1149 num_rows = query_table.num_rows 1150 elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]: 1151 num_rows = query_job.num_dml_affected_rows 1152 1153 self._query_execution_tracker.record_execution( 1154 sql, num_rows, query_job.total_bytes_processed 1155 ) 1156 1157 def _get_data_objects( 1158 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1159 ) -> t.List[DataObject]: 1160 """ 1161 Returns all the data objects that exist in the given schema and optionally catalog. 1162 """ 1163 1164 # The BigQuery Client's list_tables method does not support filtering by table name, so we have to 1165 # resort to using SQL instead. 1166 schema = to_schema(schema_name) 1167 catalog = schema.catalog or self.default_catalog 1168 query = ( 1169 exp.select( 1170 exp.column("table_catalog").as_("catalog"), 1171 exp.column("table_name").as_("name"), 1172 exp.column("table_schema").as_("schema_name"), 1173 exp.case() 1174 .when(exp.column("table_type").eq("BASE TABLE"), exp.Literal.string("TABLE")) 1175 .when(exp.column("table_type").eq("CLONE"), exp.Literal.string("TABLE")) 1176 .when(exp.column("table_type").eq("EXTERNAL"), exp.Literal.string("TABLE")) 1177 .when(exp.column("table_type").eq("SNAPSHOT"), exp.Literal.string("TABLE")) 1178 .when(exp.column("table_type").eq("VIEW"), exp.Literal.string("VIEW")) 1179 .when( 1180 exp.column("table_type").eq("MATERIALIZED VIEW"), 1181 exp.Literal.string("MATERIALIZED_VIEW"), 1182 ) 1183 .else_(exp.column("table_type")) 1184 .as_("type"), 1185 exp.column("clustering_key", "ci").as_("clustering_key"), 1186 ) 1187 .with_( 1188 "clustering_info", 1189 as_=exp.select( 1190 exp.column("table_catalog"), 1191 exp.column("table_schema"), 1192 exp.column("table_name"), 1193 parse_one( 1194 "string_agg(column_name order by clustering_ordinal_position)", 1195 dialect=self.dialect, 1196 ).as_("clustering_key"), 1197 ) 1198 .from_( 1199 exp.to_table( 1200 f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.COLUMNS", 1201 dialect=self.dialect, 1202 ) 1203 ) 1204 .where(exp.column("clustering_ordinal_position").is_(exp.not_(exp.null()))) 1205 .group_by("1", "2", "3"), 1206 ) 1207 .from_( 1208 exp.to_table( 1209 f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.TABLES", dialect=self.dialect 1210 ) 1211 ) 1212 .join( 1213 "clustering_info", 1214 using=["table_catalog", "table_schema", "table_name"], 1215 join_type="left", 1216 join_alias="ci", 1217 ) 1218 ) 1219 if object_names: 1220 query = query.where(exp.column("table_name").isin(*object_names)) 1221 1222 try: 1223 df = self.fetchdf(query, quote_identifiers=True) 1224 except Exception as e: 1225 if "Not found" in str(e): 1226 return [] 1227 raise 1228 1229 if df.empty: 1230 return [] 1231 return [ 1232 DataObject( 1233 catalog=row.catalog, # type: ignore 1234 schema=row.schema_name, # type: ignore 1235 name=row.name, # type: ignore 1236 type=DataObjectType.from_str(row.type), # type: ignore 1237 clustering_key=f"({row.clustering_key})" if row.clustering_key else None, # type: ignore 1238 ) 1239 for row in df.itertuples() 1240 ] 1241 1242 def _update_clustering_key(self, operation: TableAlterClusterByOperation) -> None: 1243 cluster_key_expressions = getattr(operation, "cluster_key_expressions", []) 1244 bq_table = self._get_table(operation.target_table) 1245 1246 rendered_columns = [c.sql(dialect=self.dialect) for c in cluster_key_expressions] 1247 bq_table.clustering_fields = ( 1248 rendered_columns or None 1249 ) # causes a drop of the key if cluster_by is empty or None 1250 1251 self._db_call(self.client.update_table, table=bq_table, fields=["clustering_fields"]) 1252 1253 if cluster_key_expressions: 1254 # BigQuery only applies new clustering going forward, so this rewrites the columns to apply the new clustering to historical data 1255 # ref: https://cloud.google.com/bigquery/docs/creating-clustered-tables#modifying-cluster-spec 1256 self.execute( 1257 exp.update( 1258 operation.target_table, 1259 {c: c for c in cluster_key_expressions}, 1260 where=exp.true(), 1261 ) 1262 ) 1263 1264 def _normalize_decimal_value(self, col: exp.Expr, precision: int) -> exp.Expr: 1265 return exp.func("FORMAT", exp.Literal.string(f"%.{precision}f"), col) 1266 1267 def _normalize_nested_value(self, col: exp.Expr) -> exp.Expr: 1268 return exp.func("TO_JSON_STRING", col, dialect=self.dialect) 1269 1270 @t.overload 1271 def _columns_to_types( 1272 self, 1273 query_or_df: DF, 1274 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1275 source_columns: t.Optional[t.List[str]] = None, 1276 ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ... 1277 1278 @t.overload 1279 def _columns_to_types( 1280 self, 1281 query_or_df: Query, 1282 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1283 source_columns: t.Optional[t.List[str]] = None, 1284 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ... 1285 1286 def _columns_to_types( 1287 self, 1288 query_or_df: QueryOrDF, 1289 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1290 source_columns: t.Optional[t.List[str]] = None, 1291 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: 1292 if ( 1293 not target_columns_to_types 1294 and bigframes 1295 and isinstance(query_or_df, bigframes.dataframe.DataFrame) 1296 ): 1297 # using dry_run=True attempts to prevent the DataFrame from being materialized just to read the column types from it 1298 dtypes = query_or_df.to_pandas(dry_run=True).columnDtypes 1299 target_columns_to_types = columns_to_types_from_dtypes(dtypes.items()) 1300 return target_columns_to_types, list(source_columns or target_columns_to_types) 1301 1302 return super()._columns_to_types( 1303 query_or_df, target_columns_to_types, source_columns=source_columns 1304 ) 1305 1306 def _native_df_to_pandas_df( 1307 self, 1308 query_or_df: QueryOrDF, 1309 ) -> t.Union[Query, pd.DataFrame]: 1310 if bigframes and isinstance(query_or_df, bigframes.dataframe.DataFrame): 1311 return query_or_df.to_pandas() 1312 1313 return super()._native_df_to_pandas_df(query_or_df) 1314 1315 @property 1316 def _query_data(self) -> t.Any: 1317 return self._connection_pool.get_attribute("query_data") 1318 1319 @_query_data.setter 1320 def _query_data(self, value: t.Any) -> None: 1321 self._connection_pool.set_attribute("query_data", value) 1322 1323 @property 1324 def _query_job(self) -> t.Optional[QueryJob]: 1325 return self._connection_pool.get_attribute("query_job") 1326 1327 @_query_job.setter 1328 def _query_job(self, value: t.Any) -> None: 1329 self._connection_pool.set_attribute("query_job", value) 1330 1331 @property 1332 def _session_id(self) -> t.Any: 1333 return self._connection_pool.get_attribute("session_id") 1334 1335 @_session_id.setter 1336 def _session_id(self, value: t.Any) -> None: 1337 self._connection_pool.set_attribute("session_id", value) 1338 1339 def _get_current_schema(self) -> str: 1340 raise NotImplementedError("BigQuery does not support current schema") 1341 1342 def _get_bq_dataset_location(self, project: str, dataset: str) -> str: 1343 return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location 1344 1345 def _get_grant_expression(self, table: exp.Table) -> exp.Expr: 1346 if not table.db: 1347 raise ValueError( 1348 f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)" 1349 ) 1350 project = table.catalog or self.get_current_catalog() 1351 if not project: 1352 raise ValueError( 1353 f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)" 1354 ) 1355 1356 dataset = table.db 1357 table_name = table.name 1358 location = self._get_bq_dataset_location(project, dataset) 1359 1360 # https://cloud.google.com/bigquery/docs/information-schema-object-privileges 1361 # OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier 1362 object_privileges_table = exp.to_table( 1363 f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}", 1364 dialect=self.dialect, 1365 ) 1366 return ( 1367 exp.select("privilege_type", "grantee") 1368 .from_(object_privileges_table) 1369 .where( 1370 exp.and_( 1371 exp.column("object_schema").eq(exp.Literal.string(dataset)), 1372 exp.column("object_name").eq(exp.Literal.string(table_name)), 1373 # Filter out current_user 1374 # BigQuery grantees format: "user:email" or "group:name" 1375 exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[ 1376 exp.func("OFFSET", exp.Literal.number("1")) 1377 ].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 1378 ) 1379 ) 1380 ) 1381 1382 @staticmethod 1383 def _grant_object_kind(table_type: DataObjectType) -> str: 1384 if table_type == DataObjectType.VIEW: 1385 return "VIEW" 1386 if table_type == DataObjectType.MATERIALIZED_VIEW: 1387 # We actually need to use "MATERIALIZED VIEW" here even though it's not listed 1388 # as a supported resource_type in the BigQuery DCL doc: 1389 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language 1390 return "MATERIALIZED VIEW" 1391 return "TABLE" 1392 1393 def _dcl_grants_config_expr( 1394 self, 1395 dcl_cmd: t.Type[DCL], 1396 table: exp.Table, 1397 grants_config: GrantsConfig, 1398 table_type: DataObjectType = DataObjectType.TABLE, 1399 ) -> t.List[exp.Expr]: 1400 expressions: t.List[exp.Expr] = [] 1401 if not grants_config: 1402 return expressions 1403 1404 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language 1405 1406 def normalize_principal(p: str) -> str: 1407 if ":" not in p: 1408 raise ValueError(f"Principal '{p}' missing a prefix label") 1409 1410 # allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:" 1411 if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"): 1412 if not p.startswith("specialGroup:"): 1413 raise ValueError( 1414 f"Special group principal '{p}' must start with 'specialGroup:' prefix label" 1415 ) 1416 return p 1417 1418 label, principal = p.split(":", 1) 1419 # always lowercase principals 1420 return f"{label}:{principal.lower()}" 1421 1422 object_kind = self._grant_object_kind(table_type) 1423 for privilege, principals in grants_config.items(): 1424 if not principals: 1425 continue 1426 1427 noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals] 1428 args: t.Dict[str, t.Any] = { 1429 "privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))], 1430 "securable": table.copy(), 1431 "principals": noramlized_principals, 1432 } 1433 1434 if object_kind: 1435 args["kind"] = exp.Var(this=object_kind) 1436 1437 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 1438 1439 return expressions 1440 1441 1442class _ErrorCounter: 1443 """ 1444 A class that counts errors and determines whether or not to retry based on the number of errors and the error 1445 type. 1446 1447 Reference implementation: https://github.com/dbt-labs/dbt-bigquery/blob/8339a034929b12e027f0a143abf46582f3f6ffbc/dbt/adapters/bigquery/connections.py#L672 1448 1449 TODO: Implement a retry configuration that works across all engines 1450 """ 1451 1452 def __init__(self, num_retries: int) -> None: 1453 self.num_retries = num_retries 1454 self.error_count = 0 1455 1456 @property 1457 def retryable_errors(self) -> t.Tuple[t.Type[Exception], ...]: 1458 try: 1459 from google.cloud.exceptions import ServerError 1460 except ModuleNotFoundError: 1461 from google.api_core.exceptions import ServerError 1462 from requests.exceptions import ConnectionError 1463 1464 return (ServerError, ConnectionError) 1465 1466 def _is_retryable(self, error: BaseException) -> bool: 1467 from google.api_core.exceptions import Forbidden 1468 1469 if isinstance(error, self.retryable_errors): 1470 return True 1471 if isinstance(error, Forbidden) and any( 1472 e["reason"] == "rateLimitExceeded" for e in error.errors 1473 ): 1474 return True 1475 return False 1476 1477 def should_retry(self, error: BaseException) -> bool: 1478 if self.num_retries == 0: 1479 return False 1480 self.error_count += 1 1481 if self._is_retryable(error) and self.error_count <= self.num_retries: 1482 logger.info(f"Retry Num {self.error_count} of {self.num_retries}. Error: {repr(error)}") 1483 return True 1484 return False 1485 1486 1487def select_partitions_expr( 1488 schema: str, 1489 table_name: str, 1490 data_type: t.Union[str, exp.DataType], 1491 granularity: t.Optional[str] = None, 1492 agg_func: str = "MAX", 1493 catalog: t.Optional[str] = None, 1494) -> str: 1495 """Generates a SQL expression that aggregates partition values for a table. 1496 1497 Args: 1498 schema: The schema (BigQuery dataset) of the table. 1499 table_name: The name of the table. 1500 data_type: The data type of the partition column. 1501 granularity: The granularity of the partition. Supported values are: 'day', 'month', 'year' and 'hour'. 1502 agg_func: The aggregation function to use. 1503 catalog: The catalog (BigQuery project ID) of the table. 1504 1505 Returns: 1506 A SELECT statement that aggregates partition values for a table. 1507 """ 1508 partitions_table_name = f"`{schema}`.INFORMATION_SCHEMA.PARTITIONS" 1509 if catalog: 1510 partitions_table_name = f"`{catalog}`.{partitions_table_name}" 1511 1512 if isinstance(data_type, exp.DataType): 1513 data_type = data_type.sql(dialect="bigquery") 1514 data_type = data_type.upper() 1515 1516 parse_fun = f"PARSE_{data_type}" if data_type in ("DATE", "DATETIME", "TIMESTAMP") else None 1517 if parse_fun: 1518 granularity = granularity or "day" 1519 parse_format = GRANULARITY_TO_PARTITION_FORMAT[granularity.lower()] 1520 partition_expr = exp.func( 1521 parse_fun, 1522 exp.Literal.string(parse_format), 1523 exp.column("partition_id"), 1524 dialect="bigquery", 1525 ) 1526 else: 1527 partition_expr = exp.cast(exp.column("partition_id"), "INT64", dialect="bigquery") 1528 1529 return ( 1530 exp.select(exp.func(agg_func, partition_expr)) 1531 .from_(partitions_table_name, dialect="bigquery") 1532 .where( 1533 f"table_name = '{table_name}' AND partition_id IS NOT NULL AND partition_id != '__NULL__'", 1534 copy=False, 1535 ) 1536 .sql(dialect="bigquery") 1537 ) 1538 1539 1540GRANULARITY_TO_PARTITION_FORMAT = { 1541 "day": "%Y%m%d", 1542 "month": "%Y%m", 1543 "year": "%Y", 1544 "hour": "%Y%m%d%H", 1545}
59@set_catalog() 60class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin): 61 """ 62 BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API. 63 """ 64 65 DIALECT = "bigquery" 66 DEFAULT_BATCH_SIZE = 1000 67 SUPPORTS_TRANSACTIONS = False 68 SUPPORTS_MATERIALIZED_VIEWS = True 69 SUPPORTS_CLONING = True 70 SUPPORTS_GRANTS = True 71 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("session_user") 72 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True 73 USE_CATALOG_IN_GRANTS = True 74 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES" 75 MAX_TABLE_COMMENT_LENGTH = 1024 76 MAX_COLUMN_COMMENT_LENGTH = 1024 77 SUPPORTS_QUERY_EXECUTION_TRACKING = True 78 SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] 79 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE 80 81 SCHEMA_DIFFER_KWARGS = { 82 "compatible_types": { 83 exp.DataType.build("INT64", dialect=DIALECT): { 84 exp.DataType.build("NUMERIC", dialect=DIALECT), 85 exp.DataType.build("FLOAT64", dialect=DIALECT), 86 exp.DataType.build("BIGNUMERIC", dialect=DIALECT), 87 }, 88 exp.DataType.build("NUMERIC", dialect=DIALECT): { 89 exp.DataType.build("FLOAT64", dialect=DIALECT), 90 exp.DataType.build("BIGNUMERIC", dialect=DIALECT), 91 }, 92 exp.DataType.build("DATE", dialect=DIALECT): { 93 exp.DataType.build("DATETIME", dialect=DIALECT), 94 }, 95 }, 96 "coerceable_types": { 97 exp.DataType.build("FLOAT64", dialect=DIALECT): { 98 exp.DataType.build("BIGNUMERIC", dialect=DIALECT), 99 }, 100 }, 101 "support_coercing_compatible_types": True, 102 "parameterized_type_defaults": { 103 exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(38, 9), (0,)], 104 exp.DataType.build("BIGDECIMAL", dialect=DIALECT).this: [(76.76, 38), (0,)], 105 }, 106 "types_with_unlimited_length": { 107 # parameterized `STRING(n)` can ALTER to unparameterized `STRING` 108 exp.DataType.build("STRING", dialect=DIALECT).this: { 109 exp.DataType.build("STRING", dialect=DIALECT).this, 110 }, 111 # parameterized `BYTES(n)` can ALTER to unparameterized `BYTES` 112 exp.DataType.build("BYTES", dialect=DIALECT).this: { 113 exp.DataType.build("BYTES", dialect=DIALECT).this, 114 }, 115 }, 116 "nested_support": NestedSupport.ALL_BUT_DROP, 117 } 118 119 @property 120 def client(self) -> BigQueryClient: 121 return self.connection._client 122 123 @property 124 def bigframe(self) -> t.Optional[BigframeSession]: 125 if bigframes: 126 options = bigframes.BigQueryOptions( 127 credentials=self.client._credentials, 128 project=self.client.project, 129 location=self.client.location, 130 ) 131 return bigframes.connect(context=options) 132 return None 133 134 @property 135 def _job_params(self) -> t.Dict[str, t.Any]: 136 from sqlmesh.core.config.connection import BigQueryPriority 137 138 params = { 139 "use_legacy_sql": False, 140 "priority": self._extra_config.get( 141 "priority", BigQueryPriority.INTERACTIVE.bigquery_constant 142 ), 143 } 144 if self._extra_config.get("maximum_bytes_billed") is not None: 145 params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed") 146 if self._extra_config.get("reservation") is not None: 147 params["reservation"] = self._extra_config.get("reservation") 148 if self.correlation_id: 149 # BigQuery label keys must be lowercase 150 key = self.correlation_id.job_type.value.lower() 151 params["labels"] = {key: self.correlation_id.job_id} 152 return params 153 154 @property 155 def catalog_support(self) -> CatalogSupport: 156 return CatalogSupport.FULL_SUPPORT 157 158 def _df_to_source_queries( 159 self, 160 df: DF, 161 target_columns_to_types: t.Dict[str, exp.DataType], 162 batch_size: int, 163 target_table: TableName, 164 source_columns: t.Optional[t.List[str]] = None, 165 ) -> t.List[SourceQuery]: 166 import pandas as pd 167 168 source_columns_to_types = get_source_columns_to_types( 169 target_columns_to_types, source_columns 170 ) 171 172 temp_bq_table = self.__get_temp_bq_table( 173 self._get_temp_table(target_table or "pandas"), source_columns_to_types 174 ) 175 temp_table = exp.table_( 176 temp_bq_table.table_id, 177 db=temp_bq_table.dataset_id, 178 catalog=temp_bq_table.project, 179 ) 180 181 def query_factory() -> Query: 182 ordered_df = df[list(source_columns_to_types)] 183 if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame): 184 ordered_df.to_gbq( 185 f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}", 186 if_exists="replace", 187 ) 188 elif not self.table_exists(temp_table): 189 # Make mypy happy 190 assert isinstance(ordered_df, pd.DataFrame) 191 self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False) 192 result = self.__load_pandas_to_table( 193 temp_bq_table, ordered_df, source_columns_to_types, replace=False 194 ) 195 if result.errors: 196 raise SQLMeshError(result.errors) 197 return exp.select( 198 *self._casted_columns(target_columns_to_types, source_columns=source_columns) 199 ).from_(temp_table) 200 201 return [ 202 SourceQuery( 203 query_factory=query_factory, 204 cleanup_func=lambda: self.drop_table(temp_table), 205 ) 206 ] 207 208 def close(self) -> t.Any: 209 # Cancel all pending query jobs across all threads 210 all_query_jobs = self._connection_pool.get_all_attributes("query_job") 211 for query_job in all_query_jobs: 212 if query_job: 213 try: 214 if not self._db_call(query_job.done): 215 self._db_call(query_job.cancel) 216 logger.debug( 217 "Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s", 218 query_job.project, 219 query_job.location, 220 query_job.job_id, 221 ) 222 except Exception as ex: 223 logger.debug( 224 "Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s", 225 query_job.project, 226 query_job.location, 227 query_job.job_id, 228 str(ex), 229 ) 230 231 return super().close() 232 233 def _begin_session(self, properties: SessionProperties) -> None: 234 from google.cloud.bigquery import QueryJobConfig 235 236 query_label_property = properties.get("query_label") 237 parsed_query_label: list[tuple[str, str]] = [] 238 if isinstance(query_label_property, (exp.Array, exp.Paren, exp.Tuple)): 239 label_tuples = ( 240 [query_label_property.unnest()] 241 if isinstance(query_label_property, exp.Paren) 242 else query_label_property.expressions 243 ) 244 245 # query_label is a Paren, Array or Tuple of 2-tuples and validated at load time 246 parsed_query_label.extend( 247 (label_tuple.expressions[0].name, label_tuple.expressions[1].name) 248 for label_tuple in label_tuples 249 ) 250 elif query_label_property is not None: 251 raise SQLMeshError( 252 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 253 ) 254 255 if self.correlation_id: 256 parsed_query_label.append( 257 (self.correlation_id.job_type.value.lower(), self.correlation_id.job_id) 258 ) 259 260 if parsed_query_label: 261 query_label_str = ",".join([":".join(label) for label in parsed_query_label]) 262 query = f'SET @@query_label = "{query_label_str}";SELECT 1;' 263 else: 264 query = "SELECT 1;" 265 266 job = self.client.query( 267 query, 268 job_config=QueryJobConfig(create_session=True), 269 ) 270 session_info = job.session_info 271 session_id = session_info.session_id if session_info else None 272 self._session_id = session_id 273 job.result() 274 275 def _end_session(self) -> None: 276 self._session_id = None 277 278 def _is_session_active(self) -> bool: 279 return self._session_id is not None 280 281 def get_current_catalog(self) -> t.Optional[str]: 282 """Returns the catalog name of the current connection.""" 283 return self.client.project 284 285 def set_current_catalog(self, catalog: str) -> None: 286 """Sets the catalog name of the current connection.""" 287 self.client.project = catalog 288 289 def create_schema( 290 self, 291 schema_name: SchemaName, 292 ignore_if_exists: bool = True, 293 warn_on_error: bool = True, 294 properties: t.List[exp.Expr] = [], 295 ) -> None: 296 """Create a schema from a name or qualified table name.""" 297 from google.api_core.exceptions import Conflict 298 299 try: 300 super().create_schema( 301 schema_name, 302 ignore_if_exists=ignore_if_exists, 303 warn_on_error=False, 304 ) 305 except Exception as e: 306 is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e) 307 if is_already_exists_error and ignore_if_exists: 308 return 309 if not warn_on_error: 310 raise 311 logger.warning("Failed to create schema '%s': %s", schema_name, e) 312 313 def get_bq_schema(self, table_name: TableName) -> t.List[bigquery.SchemaField]: 314 table = exp.to_table(table_name) 315 if len(table.parts) == 3 and "." in table.name: 316 self.execute(exp.select("*").from_(table).limit(0)) 317 query_job = self._query_job 318 assert query_job is not None 319 return query_job._query_results.schema 320 return self._get_table(table).schema 321 322 def columns( 323 self, table_name: TableName, include_pseudo_columns: bool = False 324 ) -> t.Dict[str, exp.DataType]: 325 """Fetches column names and types for the target table.""" 326 327 def dtype_to_sql( 328 dtype: t.Optional[StandardSqlDataType], field: bigquery.SchemaField 329 ) -> str: 330 assert dtype 331 assert field 332 333 kind = dtype.type_kind 334 assert kind 335 336 # Not using the enum value to preserve compatibility with older versions 337 # of the BigQuery library. 338 if kind.name == "ARRAY": 339 return f"ARRAY<{dtype_to_sql(dtype.array_element_type, field)}>" 340 if kind.name == "STRUCT": 341 struct_type = dtype.struct_type 342 assert struct_type 343 fields = ", ".join( 344 f"{struct_field.name} {dtype_to_sql(struct_field.type, nested_field)}" 345 for struct_field, nested_field in zip(struct_type.fields, field.fields) 346 ) 347 return f"STRUCT<{fields}>" 348 if kind.name == "TYPE_KIND_UNSPECIFIED": 349 field_type = field.field_type 350 351 if field_type == "RANGE": 352 # If the field is a RANGE then `range_element_type` should be set to 353 # one of `"DATE"`, `"DATETIME"` or `"TIMESTAMP"`. 354 return f"RANGE<{field.range_element_type.element_type}>" 355 356 return field_type 357 358 return kind.name 359 360 def create_mapping_schema( 361 schema: t.Sequence[bigquery.SchemaField], 362 ) -> t.Dict[str, exp.DataType]: 363 return { 364 field.name: exp.DataType.build( 365 dtype_to_sql(field.to_standard_sql().type, field), dialect=self.dialect 366 ) 367 for field in schema 368 } 369 370 table = exp.to_table(table_name) 371 if len(table.parts) == 3 and "." in table.name: 372 # The client's `get_table` method can't handle paths with >3 identifiers 373 self.execute(exp.select("*").from_(table).limit(0)) 374 query_job = self._query_job 375 assert query_job is not None 376 377 query_results = query_job._query_results 378 columns = create_mapping_schema(query_results.schema) 379 else: 380 bq_table = self._get_table(table) 381 columns = create_mapping_schema(bq_table.schema) 382 383 if include_pseudo_columns: 384 if bq_table.time_partitioning and not bq_table.time_partitioning.field: 385 columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP", dialect="bigquery") 386 if bq_table.time_partitioning.type_ == "DAY": 387 columns["_PARTITIONDATE"] = exp.DataType.build("DATE") 388 if bq_table.table_id.endswith("*"): 389 columns["_TABLE_SUFFIX"] = exp.DataType.build("STRING", dialect="bigquery") 390 if ( 391 bq_table.external_data_configuration is not None 392 and bq_table.external_data_configuration.source_format 393 in ( 394 "CSV", 395 "NEWLINE_DELIMITED_JSON", 396 "AVRO", 397 "PARQUET", 398 "ORC", 399 "DATASTORE_BACKUP", 400 ) 401 ): 402 columns["_FILE_NAME"] = exp.DataType.build("STRING", dialect="bigquery") 403 404 return columns 405 406 def alter_table( 407 self, 408 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 409 ) -> None: 410 """ 411 Performs the alter statements to change the current table into the structure of the target table, 412 and uses the API to add columns to structs, where SQL is not supported. 413 """ 414 if not alter_expressions: 415 return 416 417 cluster_by_operations, alter_statements = [], [] 418 for e in alter_expressions: 419 if isinstance(e, TableAlterClusterByOperation): 420 cluster_by_operations.append(e) 421 elif isinstance(e, TableAlterOperation): 422 alter_statements.append(e.expression) 423 else: 424 alter_statements.append(e) 425 426 for op in cluster_by_operations: 427 self._update_clustering_key(op) 428 429 nested_fields, non_nested_expressions = self._split_alter_expressions(alter_statements) 430 431 if nested_fields: 432 self._update_table_schema_nested_fields(nested_fields, alter_statements[0].this) 433 434 if non_nested_expressions: 435 super().alter_table(non_nested_expressions) 436 437 def fetchone( 438 self, 439 query: t.Union[exp.Expr, str], 440 ignore_unsupported_errors: bool = False, 441 quote_identifiers: bool = False, 442 ) -> t.Optional[t.Tuple]: 443 """ 444 BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute 445 configuration we have in place. Therefore this implementation calls execute instead. 446 """ 447 self.execute( 448 query, 449 ignore_unsupported_errors=ignore_unsupported_errors, 450 quote_identifiers=quote_identifiers, 451 ) 452 try: 453 return next(self._query_data) 454 except StopIteration: 455 return None 456 457 def fetchall( 458 self, 459 query: t.Union[exp.Expr, str], 460 ignore_unsupported_errors: bool = False, 461 quote_identifiers: bool = False, 462 ) -> t.List[t.Tuple]: 463 """ 464 BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute 465 configuration we have in place. Therefore this implementation calls execute instead. 466 """ 467 self.execute( 468 query, 469 ignore_unsupported_errors=ignore_unsupported_errors, 470 quote_identifiers=quote_identifiers, 471 ) 472 return list(self._query_data) 473 474 def _split_alter_expressions( 475 self, 476 alter_expressions: t.List[exp.Alter], 477 ) -> t.Tuple[NestedFieldsDict, t.List[exp.Alter]]: 478 """ 479 Returns a dictionary of the nested fields to add and a list of the non-nested alter expressions. 480 """ 481 nested_fields_to_add: NestedFieldsDict = defaultdict(list) 482 non_nested_expressions = [] 483 484 for alter_expression in alter_expressions: 485 action = alter_expression.args["actions"][0] 486 if ( 487 isinstance(action, exp.ColumnDef) 488 and isinstance(action.this, exp.Dot) 489 and isinstance(action.kind, exp.DataType) 490 ): 491 root_field, *leaf_fields = action.this.this.sql(dialect=self.dialect).split(".") 492 new_field = action.this.expression.sql(dialect=self.dialect) 493 data_type = action.kind.sql(dialect=self.dialect) 494 nested_fields_to_add[root_field].append((new_field, data_type, leaf_fields)) 495 else: 496 non_nested_expressions.append(alter_expression) 497 498 return nested_fields_to_add, non_nested_expressions 499 500 def _build_nested_fields( 501 self, 502 current_fields: t.List[bigquery.SchemaField], 503 fields_to_add: t.List[NestedField], 504 ) -> t.List[bigquery.SchemaField]: 505 """ 506 Recursively builds and updates the schema fields with the new nested fields. 507 """ 508 from google.cloud import bigquery 509 510 new_fields = [] 511 root: t.List[t.Tuple[str, str]] = [] 512 leaves: NestedFieldsDict = defaultdict(list) 513 for new_field, data_type, leaf_fields in fields_to_add: 514 if leaf_fields: 515 leaves[leaf_fields[0]].append((new_field, data_type, leaf_fields[1:])) 516 else: 517 root.append((new_field, data_type)) 518 519 for field in current_fields: 520 # If the new fields are nested, we need to recursively build them 521 if field.name in leaves: 522 subfields = list(field.fields) 523 subfields = self._build_nested_fields(subfields, leaves[field.name]) 524 new_fields.append( 525 bigquery.SchemaField( 526 field.name, "RECORD", mode=field.mode, fields=tuple(subfields) 527 ) 528 ) 529 else: 530 new_fields.append(field) 531 532 # Build and append the new root-level fields 533 new_fields.extend( 534 self.__get_bq_schemafield( 535 new_field[0], exp.DataType.build(new_field[1], dialect=self.dialect) 536 ) 537 for new_field in root 538 ) 539 return new_fields 540 541 def _update_table_schema_nested_fields( 542 self, nested_fields_to_add: NestedFieldsDict, table_name: str 543 ) -> None: 544 """ 545 Updates a BigQuery table schema by adding the new nested fields provided. 546 """ 547 from google.cloud import bigquery 548 549 table = self._get_table(table_name) 550 original_schema = table.schema 551 new_schema = [] 552 for field in original_schema: 553 if field.name in nested_fields_to_add: 554 fields = self._build_nested_fields( 555 list(field.fields), nested_fields_to_add[field.name] 556 ) 557 new_schema.append( 558 bigquery.SchemaField( 559 field.name, 560 "RECORD", 561 mode=field.mode, 562 fields=tuple(fields), 563 ) 564 ) 565 else: 566 new_schema.append(field) 567 568 if new_schema != original_schema: 569 table.schema = new_schema 570 self.client.update_table(table, ["schema"]) 571 572 def __load_pandas_to_table( 573 self, 574 table: bigquery.Table, 575 df: pd.DataFrame, 576 columns_to_types: t.Dict[str, exp.DataType], 577 replace: bool = False, 578 ) -> BigQueryQueryResult: 579 """ 580 Loads a pandas dataframe into a table in BigQuery. Will do an overwrite if replace is True. Note that 581 the replace will replace the entire table, not just the rows that are in the dataframe. 582 """ 583 from google.cloud import bigquery 584 585 job_config = bigquery.job.LoadJobConfig(schema=self.__get_bq_schema(columns_to_types)) 586 if replace: 587 job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE 588 logger.info(f"Loading dataframe to BigQuery. Table Path: {table.path}") 589 # This client call does not support retry so we don't use the `_db_call` method. 590 result = self.__retry( 591 self.__db_load_table_from_dataframe, 592 )(df=df, table=table, job_config=job_config) 593 if result.errors: 594 raise SQLMeshError(result.errors) 595 return result 596 597 def __db_load_table_from_dataframe( 598 self, df: pd.DataFrame, table: bigquery.Table, job_config: bigquery.LoadJobConfig 599 ) -> BigQueryQueryResult: 600 job = self.client.load_table_from_dataframe( 601 dataframe=df, destination=table, job_config=job_config 602 ) 603 return self._db_call(job.result) 604 605 def __get_bq_schemafield(self, name: str, tpe: exp.DataType) -> bigquery.SchemaField: 606 from google.cloud import bigquery 607 608 mode = "NULLABLE" 609 if tpe.is_type(exp.DataType.Type.ARRAY): 610 mode = "REPEATED" 611 tpe = tpe.expressions[0] 612 613 field_type = tpe.sql(dialect=self.dialect) 614 fields = [] 615 if tpe.is_type(*exp.DataType.NESTED_TYPES): 616 field_type = "RECORD" 617 for inner_field in tpe.expressions: 618 if isinstance(inner_field, exp.ColumnDef): 619 inner_name = inner_field.this.sql(dialect=self.dialect) 620 inner_type = inner_field.kind 621 if inner_type is None: 622 raise ValueError( 623 f"cannot convert unknown type to BQ schema field {inner_field}" 624 ) 625 fields.append(self.__get_bq_schemafield(name=inner_name, tpe=inner_type)) 626 else: 627 raise ValueError(f"unexpected nested expression {inner_field}") 628 629 return bigquery.SchemaField( 630 name=name, 631 field_type=field_type, 632 mode=mode, 633 fields=fields, 634 ) 635 636 def __get_bq_schema( 637 self, columns_to_types: t.Dict[str, exp.DataType] 638 ) -> t.List[bigquery.SchemaField]: 639 """ 640 Returns a bigquery schema object from a dictionary of column names to types. 641 """ 642 643 precisionless_col_to_types = { 644 col_name: remove_precision_parameterized_types(col_type) 645 for col_name, col_type in columns_to_types.items() 646 } 647 return [ 648 self.__get_bq_schemafield(name=col_name, tpe=t.cast(exp.DataType, col_type)) 649 for col_name, col_type in precisionless_col_to_types.items() 650 ] 651 652 def __get_temp_bq_table( 653 self, table: exp.Table, columns_to_type: t.Dict[str, exp.DataType] 654 ) -> bigquery.Table: 655 """ 656 Returns a bigquery table object that is temporary and will expire in 3 hours. 657 """ 658 bq_table = self.__get_bq_table(table, columns_to_type) 659 bq_table.expires = to_datetime("in 3 hours") 660 return bq_table 661 662 def __get_bq_table( 663 self, table: TableName, columns_to_type: t.Dict[str, exp.DataType] 664 ) -> bigquery.Table: 665 """ 666 Returns a bigquery table object with a schema defines that matches the columns_to_type dictionary. 667 """ 668 from google.cloud import bigquery 669 670 table_ = exp.to_table(table).copy() 671 672 if not table_.catalog: 673 table_.set("catalog", exp.to_identifier(self.default_catalog)) 674 675 return bigquery.Table( 676 table_ref=self._table_name(table_), 677 schema=self.__get_bq_schema(columns_to_type), 678 ) 679 680 @property 681 def __retry(self) -> Retry: 682 from google.api_core import retry 683 684 return retry.Retry( 685 predicate=_ErrorCounter(self._extra_config["job_retries"]).should_retry, 686 deadline=self._extra_config.get("job_retry_deadline_seconds"), 687 initial=1.0, 688 maximum=3.0, 689 ) 690 691 def insert_overwrite_by_partition( 692 self, 693 table_name: TableName, 694 query_or_df: QueryOrDF, 695 partitioned_by: t.List[exp.Expr], 696 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 697 source_columns: t.Optional[t.List[str]] = None, 698 ) -> None: 699 if len(partitioned_by) != 1: 700 raise SQLMeshError( 701 f"Bigquery only supports partitioning by one column, {len(partitioned_by)} were provided." 702 ) 703 704 partition_exp = partitioned_by[0] 705 partition_column = partition_exp.find(exp.Column) 706 707 granularity = partition_exp.args.get("unit") 708 if granularity: 709 granularity = granularity.name.lower() 710 711 if not partition_column: 712 partition_sql = partition_exp.sql(dialect=self.dialect) 713 raise SQLMeshError( 714 f"The partition expression '{partition_sql}' doesn't contain a column." 715 ) 716 with ( 717 self.session({}), 718 self.temp_table( 719 query_or_df, 720 name=table_name, 721 partitioned_by=partitioned_by, 722 source_columns=source_columns, 723 ) as temp_table_name, 724 ): 725 if target_columns_to_types is None or target_columns_to_types[ 726 partition_column.name 727 ] == exp.DataType.build("unknown"): 728 target_columns_to_types = self.columns(table_name) 729 730 partition_type_sql = target_columns_to_types[partition_column.name].sql( 731 dialect=self.dialect 732 ) 733 734 select_array_agg_partitions = select_partitions_expr( 735 temp_table_name.db, 736 temp_table_name.name, 737 partition_type_sql, 738 granularity=granularity, 739 agg_func="ARRAY_AGG", 740 catalog=temp_table_name.catalog or self.default_catalog, 741 ) 742 743 self.execute( 744 f"DECLARE _sqlmesh_target_partitions_ ARRAY<{partition_type_sql}> DEFAULT ({select_array_agg_partitions});" 745 ) 746 747 where = t.cast(exp.Condition, partition_exp).isin(unnest="_sqlmesh_target_partitions_") 748 749 self._insert_overwrite_by_condition( 750 table_name, 751 [SourceQuery(query_factory=lambda: exp.select("*").from_(temp_table_name))], 752 target_columns_to_types, 753 where=where, 754 ) 755 756 def table_exists(self, table_name: TableName) -> bool: 757 table = exp.to_table(table_name) 758 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 759 if data_object_cache_key in self._data_object_cache: 760 logger.debug("Table existence cache hit: %s", data_object_cache_key) 761 return self._data_object_cache[data_object_cache_key] is not None 762 763 try: 764 from google.cloud.exceptions import NotFound 765 except ModuleNotFoundError: 766 from google.api_core.exceptions import NotFound 767 768 try: 769 self._get_table(table_name) 770 return True 771 except NotFound: 772 return False 773 774 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 775 from sqlmesh.utils.date import to_timestamp 776 777 datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list) 778 for table_name in table_names: 779 table = exp.to_table(table_name) 780 datasets_to_tables[table.db].append(table.name) 781 782 results = [] 783 784 for dataset, tables in datasets_to_tables.items(): 785 query = ( 786 f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE " 787 ) 788 for i, table_name in enumerate(tables): 789 query += f"TABLE_ID = '{table_name}'" 790 if i < len(tables) - 1: 791 query += " OR " 792 results.extend(self.fetchall(query)) 793 794 return [to_timestamp(row[0]) for row in results] 795 796 def _get_table(self, table_name: TableName) -> BigQueryTable: 797 """ 798 Returns a BigQueryTable object for the given table name. 799 800 Raises: `google.cloud.exceptions.NotFound` if the table does not exist. 801 """ 802 return self._db_call(self.client.get_table, table=self._table_name(table_name)) 803 804 def _table_name(self, table_name: TableName) -> str: 805 # the api doesn't support backticks, so we can't call exp.table_name or sql 806 return ".".join(part.name for part in exp.to_table(table_name).parts) 807 808 def _fetch_native_df( 809 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 810 ) -> DF: 811 self.execute(query, quote_identifiers=quote_identifiers) 812 query_job = self._query_job 813 assert query_job is not None 814 return query_job.to_dataframe() 815 816 def _create_column_comments( 817 self, 818 table_name: TableName, 819 column_comments: t.Dict[str, str], 820 table_kind: str = "TABLE", 821 materialized_view: bool = False, 822 ) -> None: 823 if not (table_kind == "VIEW" and materialized_view): 824 table = self._get_table(table_name) 825 826 # convert Table object to dict 827 table_def = table.to_api_repr() 828 829 # Set column descriptions, supporting nested fields (e.g. record.field.nested_field) 830 for column, comment in column_comments.items(): 831 fields = table_def["schema"]["fields"] 832 field_names = column.split(".") 833 last_index = len(field_names) - 1 834 835 # Traverse the fields with nested fields down to leaf level 836 for idx, name in enumerate(field_names): 837 if field := next((field for field in fields if field["name"] == name), None): 838 if idx == last_index: 839 field["description"] = self._truncate_comment( 840 comment, self.MAX_COLUMN_COMMENT_LENGTH 841 ) 842 else: 843 fields = field.get("fields") or [] 844 845 # An "etag" is BQ versioning metadata that changes when an object is updated/modified. `update_table` 846 # compares the etags of the table object passed to it and the remote table, erroring if the etags 847 # don't match. We set the local etag to None to avoid this check. 848 table_def["etag"] = None 849 850 # convert dict back to a Table object 851 table = table.from_api_repr(table_def) 852 853 # update table schema 854 logger.info(f"Registering column comments for table {table_name}") 855 self._db_call(self.client.update_table, table=table, fields=["schema"]) 856 857 def _build_description_property_exp( 858 self, 859 description: str, 860 trunc_method: t.Callable, 861 ) -> exp.Property: 862 return exp.Property( 863 this=exp.to_identifier("description", quoted=True), 864 value=exp.Literal.string(trunc_method(description)), 865 ) 866 867 def _build_partitioned_by_exp( 868 self, 869 partitioned_by: t.List[exp.Expr], 870 *, 871 partition_interval_unit: t.Optional[IntervalUnit] = None, 872 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 873 **kwargs: t.Any, 874 ) -> t.Optional[exp.PartitionedByProperty]: 875 if len(partitioned_by) > 1: 876 raise SQLMeshError("BigQuery only supports partitioning by a single column") 877 878 this = partitioned_by[0] 879 if ( 880 isinstance(this, exp.Column) 881 and partition_interval_unit is not None 882 and not partition_interval_unit.is_minute 883 ): 884 column_type: t.Optional[exp.DataType] = (target_columns_to_types or {}).get(this.name) 885 886 if column_type == exp.DataType.build( 887 "date", dialect=self.dialect 888 ) and partition_interval_unit in ( 889 IntervalUnit.MONTH, 890 IntervalUnit.YEAR, 891 ): 892 trunc_func = "DATE_TRUNC" 893 elif column_type == exp.DataType.build("timestamp", dialect=self.dialect): 894 trunc_func = "TIMESTAMP_TRUNC" 895 elif column_type == exp.DataType.build("datetime", dialect=self.dialect): 896 trunc_func = "DATETIME_TRUNC" 897 else: 898 trunc_func = "" 899 900 if trunc_func: 901 this = exp.func( 902 trunc_func, 903 this, 904 exp.var(partition_interval_unit.value.upper()), 905 dialect=self.dialect, 906 ) 907 908 return exp.PartitionedByProperty(this=this) 909 910 def _build_table_properties_exp( 911 self, 912 catalog_name: t.Optional[str] = None, 913 table_format: t.Optional[str] = None, 914 storage_format: t.Optional[str] = None, 915 partitioned_by: t.Optional[t.List[exp.Expr]] = None, 916 partition_interval_unit: t.Optional[IntervalUnit] = None, 917 clustered_by: t.Optional[t.List[exp.Expr]] = None, 918 table_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 919 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 920 table_description: t.Optional[str] = None, 921 table_kind: t.Optional[str] = None, 922 **kwargs: t.Any, 923 ) -> t.Optional[exp.Properties]: 924 properties: t.List[exp.Expr] = [] 925 926 if partitioned_by and ( 927 partitioned_by_prop := self._build_partitioned_by_exp( 928 partitioned_by, 929 partition_interval_unit=partition_interval_unit, 930 target_columns_to_types=target_columns_to_types, 931 ) 932 ): 933 properties.append(partitioned_by_prop) 934 935 if clustered_by and (clustered_by_exp := self._build_clustered_by_exp(clustered_by)): 936 properties.append(clustered_by_exp) 937 938 if table_description: 939 properties.append( 940 self._build_description_property_exp( 941 table_description, self._truncate_table_comment 942 ), 943 ) 944 945 properties.extend(self._table_or_view_properties_to_expressions(table_properties)) 946 947 if properties: 948 return exp.Properties(expressions=properties) 949 return None 950 951 def _build_column_def( 952 self, 953 col_name: str, 954 column_descriptions: t.Optional[t.Dict[str, str]] = None, 955 engine_supports_schema_comments: bool = False, 956 col_type: t.Optional[exp.DATA_TYPE] = None, 957 nested_names: t.List[str] = [], 958 ) -> exp.ColumnDef: 959 # Helper function to build column definitions with column descriptions 960 def _build_struct_with_descriptions( 961 col_type: exp.DataType, 962 nested_names: t.List[str], 963 ) -> exp.DataType: 964 column_expressions = [] 965 for column_def in col_type.expressions: 966 # This is expected to be true, but this check is included as a 967 # precautionary measure in case of an unexpected edge case 968 if isinstance(column_def, exp.ColumnDef): 969 column = self._build_column_def( 970 col_name=column_def.name, 971 column_descriptions=column_descriptions, 972 engine_supports_schema_comments=engine_supports_schema_comments, 973 col_type=column_def.kind, 974 nested_names=nested_names, 975 ) 976 else: 977 column = column_def 978 column_expressions.append(column) 979 return exp.DataType(this=col_type.this, expressions=column_expressions, nested=True) 980 981 # Recursively build column definitions for BigQuery's RECORDs (struct) and REPEATED RECORDs (array of struct) 982 if isinstance(col_type, exp.DataType) and col_type.expressions: 983 expressions = col_type.expressions 984 if col_type.is_type(exp.DataType.Type.STRUCT): 985 col_type = _build_struct_with_descriptions(col_type, nested_names + [col_name]) 986 elif col_type.is_type(exp.DataType.Type.ARRAY) and expressions[0].is_type( 987 exp.DataType.Type.STRUCT 988 ): 989 col_type = exp.DataType( 990 this=exp.DataType.Type.ARRAY, 991 expressions=[ 992 _build_struct_with_descriptions( 993 col_type.expressions[0], nested_names + [col_name] 994 ) 995 ], 996 nested=True, 997 ) 998 999 return exp.ColumnDef( 1000 this=exp.to_identifier(col_name), 1001 kind=col_type, 1002 constraints=( 1003 self._build_col_comment_exp( 1004 ".".join(nested_names + [col_name]), column_descriptions 1005 ) 1006 if engine_supports_schema_comments and self.comments_enabled and column_descriptions 1007 else None 1008 ), 1009 ) 1010 1011 def _build_col_comment_exp( 1012 self, col_name: str, column_descriptions: t.Dict[str, str] 1013 ) -> t.List[exp.ColumnConstraint]: 1014 comment = column_descriptions.get(col_name, None) 1015 if comment: 1016 return [ 1017 exp.ColumnConstraint( 1018 kind=exp.Properties( 1019 expressions=[ 1020 self._build_description_property_exp( 1021 comment, self._truncate_column_comment 1022 ), 1023 ] 1024 ) 1025 ) 1026 ] 1027 return [] 1028 1029 def _build_view_properties_exp( 1030 self, 1031 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 1032 table_description: t.Optional[str] = None, 1033 **kwargs: t.Any, 1034 ) -> t.Optional[exp.Properties]: 1035 """Creates a SQLGlot table properties expression for view""" 1036 properties: t.List[exp.Expr] = [] 1037 1038 if table_description: 1039 properties.append( 1040 self._build_description_property_exp( 1041 table_description, self._truncate_table_comment 1042 ), 1043 ) 1044 1045 properties.extend(self._table_or_view_properties_to_expressions(view_properties)) 1046 1047 if properties: 1048 return exp.Properties(expressions=properties) 1049 return None 1050 1051 def _build_create_comment_table_exp( 1052 self, table: exp.Table, table_comment: str, table_kind: str 1053 ) -> exp.Comment | str: 1054 table_sql = table.sql(dialect=self.dialect, identify=True) 1055 1056 truncated_comment = self._truncate_table_comment(table_comment) 1057 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 1058 1059 return f"ALTER {table_kind} {table_sql} SET OPTIONS(description = {comment_sql})" 1060 1061 def _build_create_comment_column_exp( 1062 self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" 1063 ) -> exp.Comment | str: 1064 table_sql = table.sql(dialect=self.dialect, identify=True) 1065 column_sql = exp.column(column_name).sql(dialect=self.dialect, identify=True) 1066 1067 truncated_comment = self._truncate_column_comment(column_comment) 1068 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 1069 1070 return f"ALTER {table_kind} {table_sql} ALTER COLUMN {column_sql} SET OPTIONS(description = {comment_sql})" 1071 1072 def create_state_table( 1073 self, 1074 table_name: str, 1075 target_columns_to_types: t.Dict[str, exp.DataType], 1076 primary_key: t.Optional[t.Tuple[str, ...]] = None, 1077 ) -> None: 1078 self.create_table( 1079 table_name, 1080 target_columns_to_types, 1081 ) 1082 1083 def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any) -> t.Any: 1084 return func( 1085 retry=self.__retry, 1086 *args, 1087 **kwargs, 1088 ) 1089 1090 def _execute( 1091 self, 1092 sql: str, 1093 track_rows_processed: bool = False, 1094 **kwargs: t.Any, 1095 ) -> None: 1096 """Execute a sql query.""" 1097 from google.cloud.bigquery import QueryJobConfig 1098 from google.cloud.bigquery.query import ConnectionProperty 1099 1100 # BigQuery's Python DB API implementation does not support retries, so we have to implement them ourselves. 1101 # So we update the cursor's query job and query data with the results of the new query job. This makes sure 1102 # that other cursor based operations execute correctly. 1103 session_id = self._session_id 1104 connection_properties = ( 1105 [ 1106 ConnectionProperty(key="session_id", value=session_id), 1107 ] 1108 if session_id 1109 else [] 1110 ) 1111 1112 # Create job config 1113 job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties) 1114 1115 self._query_job = self._db_call( 1116 self.client.query, 1117 query=sql, 1118 job_config=job_config, 1119 timeout=self._extra_config.get("job_creation_timeout_seconds"), 1120 ) 1121 query_job = self._query_job 1122 assert query_job is not None 1123 1124 logger.debug( 1125 "BigQuery job created: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s", 1126 query_job.project, 1127 query_job.location, 1128 query_job.job_id, 1129 ) 1130 1131 results = self._db_call( 1132 query_job.result, 1133 timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore 1134 ) 1135 1136 self._query_data = iter(results) if results.total_rows else iter([]) 1137 query_results = query_job._query_results 1138 self.cursor._set_rowcount(query_results) 1139 self.cursor._set_description(query_results.schema) 1140 1141 if ( 1142 track_rows_processed 1143 and self._query_execution_tracker 1144 and self._query_execution_tracker.is_tracking() 1145 ): 1146 num_rows = None 1147 if query_job.statement_type == "CREATE_TABLE_AS_SELECT": 1148 # since table was just created, number rows in table == number rows processed 1149 query_table = self.client.get_table(query_job.destination) 1150 num_rows = query_table.num_rows 1151 elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]: 1152 num_rows = query_job.num_dml_affected_rows 1153 1154 self._query_execution_tracker.record_execution( 1155 sql, num_rows, query_job.total_bytes_processed 1156 ) 1157 1158 def _get_data_objects( 1159 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 1160 ) -> t.List[DataObject]: 1161 """ 1162 Returns all the data objects that exist in the given schema and optionally catalog. 1163 """ 1164 1165 # The BigQuery Client's list_tables method does not support filtering by table name, so we have to 1166 # resort to using SQL instead. 1167 schema = to_schema(schema_name) 1168 catalog = schema.catalog or self.default_catalog 1169 query = ( 1170 exp.select( 1171 exp.column("table_catalog").as_("catalog"), 1172 exp.column("table_name").as_("name"), 1173 exp.column("table_schema").as_("schema_name"), 1174 exp.case() 1175 .when(exp.column("table_type").eq("BASE TABLE"), exp.Literal.string("TABLE")) 1176 .when(exp.column("table_type").eq("CLONE"), exp.Literal.string("TABLE")) 1177 .when(exp.column("table_type").eq("EXTERNAL"), exp.Literal.string("TABLE")) 1178 .when(exp.column("table_type").eq("SNAPSHOT"), exp.Literal.string("TABLE")) 1179 .when(exp.column("table_type").eq("VIEW"), exp.Literal.string("VIEW")) 1180 .when( 1181 exp.column("table_type").eq("MATERIALIZED VIEW"), 1182 exp.Literal.string("MATERIALIZED_VIEW"), 1183 ) 1184 .else_(exp.column("table_type")) 1185 .as_("type"), 1186 exp.column("clustering_key", "ci").as_("clustering_key"), 1187 ) 1188 .with_( 1189 "clustering_info", 1190 as_=exp.select( 1191 exp.column("table_catalog"), 1192 exp.column("table_schema"), 1193 exp.column("table_name"), 1194 parse_one( 1195 "string_agg(column_name order by clustering_ordinal_position)", 1196 dialect=self.dialect, 1197 ).as_("clustering_key"), 1198 ) 1199 .from_( 1200 exp.to_table( 1201 f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.COLUMNS", 1202 dialect=self.dialect, 1203 ) 1204 ) 1205 .where(exp.column("clustering_ordinal_position").is_(exp.not_(exp.null()))) 1206 .group_by("1", "2", "3"), 1207 ) 1208 .from_( 1209 exp.to_table( 1210 f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.TABLES", dialect=self.dialect 1211 ) 1212 ) 1213 .join( 1214 "clustering_info", 1215 using=["table_catalog", "table_schema", "table_name"], 1216 join_type="left", 1217 join_alias="ci", 1218 ) 1219 ) 1220 if object_names: 1221 query = query.where(exp.column("table_name").isin(*object_names)) 1222 1223 try: 1224 df = self.fetchdf(query, quote_identifiers=True) 1225 except Exception as e: 1226 if "Not found" in str(e): 1227 return [] 1228 raise 1229 1230 if df.empty: 1231 return [] 1232 return [ 1233 DataObject( 1234 catalog=row.catalog, # type: ignore 1235 schema=row.schema_name, # type: ignore 1236 name=row.name, # type: ignore 1237 type=DataObjectType.from_str(row.type), # type: ignore 1238 clustering_key=f"({row.clustering_key})" if row.clustering_key else None, # type: ignore 1239 ) 1240 for row in df.itertuples() 1241 ] 1242 1243 def _update_clustering_key(self, operation: TableAlterClusterByOperation) -> None: 1244 cluster_key_expressions = getattr(operation, "cluster_key_expressions", []) 1245 bq_table = self._get_table(operation.target_table) 1246 1247 rendered_columns = [c.sql(dialect=self.dialect) for c in cluster_key_expressions] 1248 bq_table.clustering_fields = ( 1249 rendered_columns or None 1250 ) # causes a drop of the key if cluster_by is empty or None 1251 1252 self._db_call(self.client.update_table, table=bq_table, fields=["clustering_fields"]) 1253 1254 if cluster_key_expressions: 1255 # BigQuery only applies new clustering going forward, so this rewrites the columns to apply the new clustering to historical data 1256 # ref: https://cloud.google.com/bigquery/docs/creating-clustered-tables#modifying-cluster-spec 1257 self.execute( 1258 exp.update( 1259 operation.target_table, 1260 {c: c for c in cluster_key_expressions}, 1261 where=exp.true(), 1262 ) 1263 ) 1264 1265 def _normalize_decimal_value(self, col: exp.Expr, precision: int) -> exp.Expr: 1266 return exp.func("FORMAT", exp.Literal.string(f"%.{precision}f"), col) 1267 1268 def _normalize_nested_value(self, col: exp.Expr) -> exp.Expr: 1269 return exp.func("TO_JSON_STRING", col, dialect=self.dialect) 1270 1271 @t.overload 1272 def _columns_to_types( 1273 self, 1274 query_or_df: DF, 1275 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1276 source_columns: t.Optional[t.List[str]] = None, 1277 ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ... 1278 1279 @t.overload 1280 def _columns_to_types( 1281 self, 1282 query_or_df: Query, 1283 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1284 source_columns: t.Optional[t.List[str]] = None, 1285 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ... 1286 1287 def _columns_to_types( 1288 self, 1289 query_or_df: QueryOrDF, 1290 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 1291 source_columns: t.Optional[t.List[str]] = None, 1292 ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: 1293 if ( 1294 not target_columns_to_types 1295 and bigframes 1296 and isinstance(query_or_df, bigframes.dataframe.DataFrame) 1297 ): 1298 # using dry_run=True attempts to prevent the DataFrame from being materialized just to read the column types from it 1299 dtypes = query_or_df.to_pandas(dry_run=True).columnDtypes 1300 target_columns_to_types = columns_to_types_from_dtypes(dtypes.items()) 1301 return target_columns_to_types, list(source_columns or target_columns_to_types) 1302 1303 return super()._columns_to_types( 1304 query_or_df, target_columns_to_types, source_columns=source_columns 1305 ) 1306 1307 def _native_df_to_pandas_df( 1308 self, 1309 query_or_df: QueryOrDF, 1310 ) -> t.Union[Query, pd.DataFrame]: 1311 if bigframes and isinstance(query_or_df, bigframes.dataframe.DataFrame): 1312 return query_or_df.to_pandas() 1313 1314 return super()._native_df_to_pandas_df(query_or_df) 1315 1316 @property 1317 def _query_data(self) -> t.Any: 1318 return self._connection_pool.get_attribute("query_data") 1319 1320 @_query_data.setter 1321 def _query_data(self, value: t.Any) -> None: 1322 self._connection_pool.set_attribute("query_data", value) 1323 1324 @property 1325 def _query_job(self) -> t.Optional[QueryJob]: 1326 return self._connection_pool.get_attribute("query_job") 1327 1328 @_query_job.setter 1329 def _query_job(self, value: t.Any) -> None: 1330 self._connection_pool.set_attribute("query_job", value) 1331 1332 @property 1333 def _session_id(self) -> t.Any: 1334 return self._connection_pool.get_attribute("session_id") 1335 1336 @_session_id.setter 1337 def _session_id(self, value: t.Any) -> None: 1338 self._connection_pool.set_attribute("session_id", value) 1339 1340 def _get_current_schema(self) -> str: 1341 raise NotImplementedError("BigQuery does not support current schema") 1342 1343 def _get_bq_dataset_location(self, project: str, dataset: str) -> str: 1344 return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location 1345 1346 def _get_grant_expression(self, table: exp.Table) -> exp.Expr: 1347 if not table.db: 1348 raise ValueError( 1349 f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)" 1350 ) 1351 project = table.catalog or self.get_current_catalog() 1352 if not project: 1353 raise ValueError( 1354 f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)" 1355 ) 1356 1357 dataset = table.db 1358 table_name = table.name 1359 location = self._get_bq_dataset_location(project, dataset) 1360 1361 # https://cloud.google.com/bigquery/docs/information-schema-object-privileges 1362 # OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier 1363 object_privileges_table = exp.to_table( 1364 f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}", 1365 dialect=self.dialect, 1366 ) 1367 return ( 1368 exp.select("privilege_type", "grantee") 1369 .from_(object_privileges_table) 1370 .where( 1371 exp.and_( 1372 exp.column("object_schema").eq(exp.Literal.string(dataset)), 1373 exp.column("object_name").eq(exp.Literal.string(table_name)), 1374 # Filter out current_user 1375 # BigQuery grantees format: "user:email" or "group:name" 1376 exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[ 1377 exp.func("OFFSET", exp.Literal.number("1")) 1378 ].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), 1379 ) 1380 ) 1381 ) 1382 1383 @staticmethod 1384 def _grant_object_kind(table_type: DataObjectType) -> str: 1385 if table_type == DataObjectType.VIEW: 1386 return "VIEW" 1387 if table_type == DataObjectType.MATERIALIZED_VIEW: 1388 # We actually need to use "MATERIALIZED VIEW" here even though it's not listed 1389 # as a supported resource_type in the BigQuery DCL doc: 1390 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language 1391 return "MATERIALIZED VIEW" 1392 return "TABLE" 1393 1394 def _dcl_grants_config_expr( 1395 self, 1396 dcl_cmd: t.Type[DCL], 1397 table: exp.Table, 1398 grants_config: GrantsConfig, 1399 table_type: DataObjectType = DataObjectType.TABLE, 1400 ) -> t.List[exp.Expr]: 1401 expressions: t.List[exp.Expr] = [] 1402 if not grants_config: 1403 return expressions 1404 1405 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language 1406 1407 def normalize_principal(p: str) -> str: 1408 if ":" not in p: 1409 raise ValueError(f"Principal '{p}' missing a prefix label") 1410 1411 # allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:" 1412 if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"): 1413 if not p.startswith("specialGroup:"): 1414 raise ValueError( 1415 f"Special group principal '{p}' must start with 'specialGroup:' prefix label" 1416 ) 1417 return p 1418 1419 label, principal = p.split(":", 1) 1420 # always lowercase principals 1421 return f"{label}:{principal.lower()}" 1422 1423 object_kind = self._grant_object_kind(table_type) 1424 for privilege, principals in grants_config.items(): 1425 if not principals: 1426 continue 1427 1428 noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals] 1429 args: t.Dict[str, t.Any] = { 1430 "privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))], 1431 "securable": table.copy(), 1432 "principals": noramlized_principals, 1433 } 1434 1435 if object_kind: 1436 args["kind"] = exp.Var(this=object_kind) 1437 1438 expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] 1439 1440 return expressions
BigQuery Engine Adapter using the google-cloud-bigquery library's DB API.
123 @property 124 def bigframe(self) -> t.Optional[BigframeSession]: 125 if bigframes: 126 options = bigframes.BigQueryOptions( 127 credentials=self.client._credentials, 128 project=self.client.project, 129 location=self.client.location, 130 ) 131 return bigframes.connect(context=options) 132 return None
208 def close(self) -> t.Any: 209 # Cancel all pending query jobs across all threads 210 all_query_jobs = self._connection_pool.get_all_attributes("query_job") 211 for query_job in all_query_jobs: 212 if query_job: 213 try: 214 if not self._db_call(query_job.done): 215 self._db_call(query_job.cancel) 216 logger.debug( 217 "Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s", 218 query_job.project, 219 query_job.location, 220 query_job.job_id, 221 ) 222 except Exception as ex: 223 logger.debug( 224 "Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s", 225 query_job.project, 226 query_job.location, 227 query_job.job_id, 228 str(ex), 229 ) 230 231 return super().close()
Closes all open connections and releases all allocated resources.
281 def get_current_catalog(self) -> t.Optional[str]: 282 """Returns the catalog name of the current connection.""" 283 return self.client.project
Returns the catalog name of the current connection.
285 def set_current_catalog(self, catalog: str) -> None: 286 """Sets the catalog name of the current connection.""" 287 self.client.project = catalog
Sets the catalog name of the current connection.
289 def create_schema( 290 self, 291 schema_name: SchemaName, 292 ignore_if_exists: bool = True, 293 warn_on_error: bool = True, 294 properties: t.List[exp.Expr] = [], 295 ) -> None: 296 """Create a schema from a name or qualified table name.""" 297 from google.api_core.exceptions import Conflict 298 299 try: 300 super().create_schema( 301 schema_name, 302 ignore_if_exists=ignore_if_exists, 303 warn_on_error=False, 304 ) 305 except Exception as e: 306 is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e) 307 if is_already_exists_error and ignore_if_exists: 308 return 309 if not warn_on_error: 310 raise 311 logger.warning("Failed to create schema '%s': %s", schema_name, e)
Create a schema from a name or qualified table name.
313 def get_bq_schema(self, table_name: TableName) -> t.List[bigquery.SchemaField]: 314 table = exp.to_table(table_name) 315 if len(table.parts) == 3 and "." in table.name: 316 self.execute(exp.select("*").from_(table).limit(0)) 317 query_job = self._query_job 318 assert query_job is not None 319 return query_job._query_results.schema 320 return self._get_table(table).schema
322 def columns( 323 self, table_name: TableName, include_pseudo_columns: bool = False 324 ) -> t.Dict[str, exp.DataType]: 325 """Fetches column names and types for the target table.""" 326 327 def dtype_to_sql( 328 dtype: t.Optional[StandardSqlDataType], field: bigquery.SchemaField 329 ) -> str: 330 assert dtype 331 assert field 332 333 kind = dtype.type_kind 334 assert kind 335 336 # Not using the enum value to preserve compatibility with older versions 337 # of the BigQuery library. 338 if kind.name == "ARRAY": 339 return f"ARRAY<{dtype_to_sql(dtype.array_element_type, field)}>" 340 if kind.name == "STRUCT": 341 struct_type = dtype.struct_type 342 assert struct_type 343 fields = ", ".join( 344 f"{struct_field.name} {dtype_to_sql(struct_field.type, nested_field)}" 345 for struct_field, nested_field in zip(struct_type.fields, field.fields) 346 ) 347 return f"STRUCT<{fields}>" 348 if kind.name == "TYPE_KIND_UNSPECIFIED": 349 field_type = field.field_type 350 351 if field_type == "RANGE": 352 # If the field is a RANGE then `range_element_type` should be set to 353 # one of `"DATE"`, `"DATETIME"` or `"TIMESTAMP"`. 354 return f"RANGE<{field.range_element_type.element_type}>" 355 356 return field_type 357 358 return kind.name 359 360 def create_mapping_schema( 361 schema: t.Sequence[bigquery.SchemaField], 362 ) -> t.Dict[str, exp.DataType]: 363 return { 364 field.name: exp.DataType.build( 365 dtype_to_sql(field.to_standard_sql().type, field), dialect=self.dialect 366 ) 367 for field in schema 368 } 369 370 table = exp.to_table(table_name) 371 if len(table.parts) == 3 and "." in table.name: 372 # The client's `get_table` method can't handle paths with >3 identifiers 373 self.execute(exp.select("*").from_(table).limit(0)) 374 query_job = self._query_job 375 assert query_job is not None 376 377 query_results = query_job._query_results 378 columns = create_mapping_schema(query_results.schema) 379 else: 380 bq_table = self._get_table(table) 381 columns = create_mapping_schema(bq_table.schema) 382 383 if include_pseudo_columns: 384 if bq_table.time_partitioning and not bq_table.time_partitioning.field: 385 columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP", dialect="bigquery") 386 if bq_table.time_partitioning.type_ == "DAY": 387 columns["_PARTITIONDATE"] = exp.DataType.build("DATE") 388 if bq_table.table_id.endswith("*"): 389 columns["_TABLE_SUFFIX"] = exp.DataType.build("STRING", dialect="bigquery") 390 if ( 391 bq_table.external_data_configuration is not None 392 and bq_table.external_data_configuration.source_format 393 in ( 394 "CSV", 395 "NEWLINE_DELIMITED_JSON", 396 "AVRO", 397 "PARQUET", 398 "ORC", 399 "DATASTORE_BACKUP", 400 ) 401 ): 402 columns["_FILE_NAME"] = exp.DataType.build("STRING", dialect="bigquery") 403 404 return columns
Fetches column names and types for the target table.
406 def alter_table( 407 self, 408 alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], 409 ) -> None: 410 """ 411 Performs the alter statements to change the current table into the structure of the target table, 412 and uses the API to add columns to structs, where SQL is not supported. 413 """ 414 if not alter_expressions: 415 return 416 417 cluster_by_operations, alter_statements = [], [] 418 for e in alter_expressions: 419 if isinstance(e, TableAlterClusterByOperation): 420 cluster_by_operations.append(e) 421 elif isinstance(e, TableAlterOperation): 422 alter_statements.append(e.expression) 423 else: 424 alter_statements.append(e) 425 426 for op in cluster_by_operations: 427 self._update_clustering_key(op) 428 429 nested_fields, non_nested_expressions = self._split_alter_expressions(alter_statements) 430 431 if nested_fields: 432 self._update_table_schema_nested_fields(nested_fields, alter_statements[0].this) 433 434 if non_nested_expressions: 435 super().alter_table(non_nested_expressions)
Performs the alter statements to change the current table into the structure of the target table, and uses the API to add columns to structs, where SQL is not supported.
437 def fetchone( 438 self, 439 query: t.Union[exp.Expr, str], 440 ignore_unsupported_errors: bool = False, 441 quote_identifiers: bool = False, 442 ) -> t.Optional[t.Tuple]: 443 """ 444 BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute 445 configuration we have in place. Therefore this implementation calls execute instead. 446 """ 447 self.execute( 448 query, 449 ignore_unsupported_errors=ignore_unsupported_errors, 450 quote_identifiers=quote_identifiers, 451 ) 452 try: 453 return next(self._query_data) 454 except StopIteration: 455 return None
BigQuery's fetchone method doesn't call execute and therefore would not benefit from the execute
configuration we have in place. Therefore this implementation calls execute instead.
457 def fetchall( 458 self, 459 query: t.Union[exp.Expr, str], 460 ignore_unsupported_errors: bool = False, 461 quote_identifiers: bool = False, 462 ) -> t.List[t.Tuple]: 463 """ 464 BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute 465 configuration we have in place. Therefore this implementation calls execute instead. 466 """ 467 self.execute( 468 query, 469 ignore_unsupported_errors=ignore_unsupported_errors, 470 quote_identifiers=quote_identifiers, 471 ) 472 return list(self._query_data)
BigQuery's fetchone method doesn't call execute and therefore would not benefit from the execute
configuration we have in place. Therefore this implementation calls execute instead.
691 def insert_overwrite_by_partition( 692 self, 693 table_name: TableName, 694 query_or_df: QueryOrDF, 695 partitioned_by: t.List[exp.Expr], 696 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 697 source_columns: t.Optional[t.List[str]] = None, 698 ) -> None: 699 if len(partitioned_by) != 1: 700 raise SQLMeshError( 701 f"Bigquery only supports partitioning by one column, {len(partitioned_by)} were provided." 702 ) 703 704 partition_exp = partitioned_by[0] 705 partition_column = partition_exp.find(exp.Column) 706 707 granularity = partition_exp.args.get("unit") 708 if granularity: 709 granularity = granularity.name.lower() 710 711 if not partition_column: 712 partition_sql = partition_exp.sql(dialect=self.dialect) 713 raise SQLMeshError( 714 f"The partition expression '{partition_sql}' doesn't contain a column." 715 ) 716 with ( 717 self.session({}), 718 self.temp_table( 719 query_or_df, 720 name=table_name, 721 partitioned_by=partitioned_by, 722 source_columns=source_columns, 723 ) as temp_table_name, 724 ): 725 if target_columns_to_types is None or target_columns_to_types[ 726 partition_column.name 727 ] == exp.DataType.build("unknown"): 728 target_columns_to_types = self.columns(table_name) 729 730 partition_type_sql = target_columns_to_types[partition_column.name].sql( 731 dialect=self.dialect 732 ) 733 734 select_array_agg_partitions = select_partitions_expr( 735 temp_table_name.db, 736 temp_table_name.name, 737 partition_type_sql, 738 granularity=granularity, 739 agg_func="ARRAY_AGG", 740 catalog=temp_table_name.catalog or self.default_catalog, 741 ) 742 743 self.execute( 744 f"DECLARE _sqlmesh_target_partitions_ ARRAY<{partition_type_sql}> DEFAULT ({select_array_agg_partitions});" 745 ) 746 747 where = t.cast(exp.Condition, partition_exp).isin(unnest="_sqlmesh_target_partitions_") 748 749 self._insert_overwrite_by_condition( 750 table_name, 751 [SourceQuery(query_factory=lambda: exp.select("*").from_(temp_table_name))], 752 target_columns_to_types, 753 where=where, 754 )
756 def table_exists(self, table_name: TableName) -> bool: 757 table = exp.to_table(table_name) 758 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 759 if data_object_cache_key in self._data_object_cache: 760 logger.debug("Table existence cache hit: %s", data_object_cache_key) 761 return self._data_object_cache[data_object_cache_key] is not None 762 763 try: 764 from google.cloud.exceptions import NotFound 765 except ModuleNotFoundError: 766 from google.api_core.exceptions import NotFound 767 768 try: 769 self._get_table(table_name) 770 return True 771 except NotFound: 772 return False
774 def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: 775 from sqlmesh.utils.date import to_timestamp 776 777 datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list) 778 for table_name in table_names: 779 table = exp.to_table(table_name) 780 datasets_to_tables[table.db].append(table.name) 781 782 results = [] 783 784 for dataset, tables in datasets_to_tables.items(): 785 query = ( 786 f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE " 787 ) 788 for i, table_name in enumerate(tables): 789 query += f"TABLE_ID = '{table_name}'" 790 if i < len(tables) - 1: 791 query += " OR " 792 results.extend(self.fetchall(query)) 793 794 return [to_timestamp(row[0]) for row in results]
1072 def create_state_table( 1073 self, 1074 table_name: str, 1075 target_columns_to_types: t.Dict[str, exp.DataType], 1076 primary_key: t.Optional[t.Tuple[str, ...]] = None, 1077 ) -> None: 1078 self.create_table( 1079 table_name, 1080 target_columns_to_types, 1081 )
Create a table to store SQLMesh internal state.
Arguments:
- table_name: The name of the table to create. Can be fully qualified or just table name.
- target_columns_to_types: A mapping between the column name and its data type.
- primary_key: Determines the table primary key.
358 def get_alter_operations( 359 self, 360 current_table_name: TableName, 361 target_table_name: TableName, 362 *, 363 ignore_destructive: bool = False, 364 ignore_additive: bool = False, 365 ) -> t.List[TableAlterOperation]: 366 operations = super().get_alter_operations( 367 current_table_name, 368 target_table_name, 369 ignore_destructive=ignore_destructive, 370 ignore_additive=ignore_additive, 371 ) 372 373 # check for a change in clustering 374 current_table = exp.to_table(current_table_name) 375 target_table = exp.to_table(target_table_name) 376 377 current_table_schema = schema_(current_table.db, catalog=current_table.catalog) 378 target_table_schema = schema_(target_table.db, catalog=target_table.catalog) 379 380 current_table_info = seq_get( 381 self.get_data_objects(current_table_schema, {current_table.name}), 0 382 ) 383 target_table_info = seq_get( 384 self.get_data_objects(target_table_schema, {target_table.name}), 0 385 ) 386 387 if current_table_info and target_table_info: 388 if target_table_info.is_clustered: 389 if target_table_info.clustering_key and ( 390 current_table_info.clustering_key != target_table_info.clustering_key 391 ): 392 operations.append( 393 TableAlterChangeClusterKeyOperation( 394 target_table=current_table, 395 clustering_key=target_table_info.clustering_key, 396 dialect=self.dialect, 397 ) 398 ) 399 elif current_table_info.is_clustered: 400 operations.append(TableAlterDropClusterKeyOperation(target_table=current_table)) 401 402 return operations
Determines the alter statements needed to change the current table into the structure of the target table.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_INDEXES
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- comments_enabled
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- create_view
- drop_schema
- drop_view
- create_catalog
- drop_catalog
- delete_from
- insert_append
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
1488def select_partitions_expr( 1489 schema: str, 1490 table_name: str, 1491 data_type: t.Union[str, exp.DataType], 1492 granularity: t.Optional[str] = None, 1493 agg_func: str = "MAX", 1494 catalog: t.Optional[str] = None, 1495) -> str: 1496 """Generates a SQL expression that aggregates partition values for a table. 1497 1498 Args: 1499 schema: The schema (BigQuery dataset) of the table. 1500 table_name: The name of the table. 1501 data_type: The data type of the partition column. 1502 granularity: The granularity of the partition. Supported values are: 'day', 'month', 'year' and 'hour'. 1503 agg_func: The aggregation function to use. 1504 catalog: The catalog (BigQuery project ID) of the table. 1505 1506 Returns: 1507 A SELECT statement that aggregates partition values for a table. 1508 """ 1509 partitions_table_name = f"`{schema}`.INFORMATION_SCHEMA.PARTITIONS" 1510 if catalog: 1511 partitions_table_name = f"`{catalog}`.{partitions_table_name}" 1512 1513 if isinstance(data_type, exp.DataType): 1514 data_type = data_type.sql(dialect="bigquery") 1515 data_type = data_type.upper() 1516 1517 parse_fun = f"PARSE_{data_type}" if data_type in ("DATE", "DATETIME", "TIMESTAMP") else None 1518 if parse_fun: 1519 granularity = granularity or "day" 1520 parse_format = GRANULARITY_TO_PARTITION_FORMAT[granularity.lower()] 1521 partition_expr = exp.func( 1522 parse_fun, 1523 exp.Literal.string(parse_format), 1524 exp.column("partition_id"), 1525 dialect="bigquery", 1526 ) 1527 else: 1528 partition_expr = exp.cast(exp.column("partition_id"), "INT64", dialect="bigquery") 1529 1530 return ( 1531 exp.select(exp.func(agg_func, partition_expr)) 1532 .from_(partitions_table_name, dialect="bigquery") 1533 .where( 1534 f"table_name = '{table_name}' AND partition_id IS NOT NULL AND partition_id != '__NULL__'", 1535 copy=False, 1536 ) 1537 .sql(dialect="bigquery") 1538 )
Generates a SQL expression that aggregates partition values for a table.
Arguments:
- schema: The schema (BigQuery dataset) of the table.
- table_name: The name of the table.
- data_type: The data type of the partition column.
- granularity: The granularity of the partition. Supported values are: 'day', 'month', 'year' and 'hour'.
- agg_func: The aggregation function to use.
- catalog: The catalog (BigQuery project ID) of the table.
Returns:
A SELECT statement that aggregates partition values for a table.