Edit on GitHub

sqlmesh.core.engine_adapter.bigquery

   1from __future__ import annotations
   2
   3import logging
   4import typing as t
   5from collections import defaultdict
   6
   7from sqlglot import exp, parse_one
   8from sqlglot.transforms import remove_precision_parameterized_types
   9
  10from sqlmesh.core.dialect import to_schema
  11from sqlmesh.core.engine_adapter.base import _get_data_object_cache_key
  12from sqlmesh.core.engine_adapter.mixins import (
  13    ClusteredByMixin,
  14    GrantsFromInfoSchemaMixin,
  15    RowDiffMixin,
  16    TableAlterClusterByOperation,
  17)
  18from sqlmesh.core.engine_adapter.shared import (
  19    CatalogSupport,
  20    DataObject,
  21    DataObjectType,
  22    SourceQuery,
  23    set_catalog,
  24    InsertOverwriteStrategy,
  25)
  26from sqlmesh.core.node import IntervalUnit
  27from sqlmesh.core.schema_diff import TableAlterOperation, NestedSupport
  28from sqlmesh.utils import optional_import, get_source_columns_to_types
  29from sqlmesh.utils.date import to_datetime
  30from sqlmesh.utils.errors import SQLMeshError
  31from sqlmesh.utils.pandas import columns_to_types_from_dtypes
  32
  33if t.TYPE_CHECKING:
  34    import pandas as pd
  35    from google.api_core.retry import Retry
  36    from google.cloud import bigquery
  37    from google.cloud.bigquery import StandardSqlDataType
  38    from google.cloud.bigquery.client import Client as BigQueryClient
  39    from google.cloud.bigquery.job import QueryJob
  40    from google.cloud.bigquery.job.base import _AsyncJob as BigQueryQueryResult
  41    from google.cloud.bigquery.table import Table as BigQueryTable
  42
  43    from sqlmesh.core._typing import SchemaName, SessionProperties, TableName
  44    from sqlmesh.core.engine_adapter._typing import BigframeSession, DCL, DF, GrantsConfig, Query
  45    from sqlmesh.core.engine_adapter.base import QueryOrDF
  46
  47
  48logger = logging.getLogger(__name__)
  49
  50bigframes = optional_import("bigframes")
  51bigframes_pd = optional_import("bigframes.pandas")
  52
  53
  54NestedField = t.Tuple[str, str, t.List[str]]
  55NestedFieldsDict = t.Dict[str, t.List[NestedField]]
  56
  57
  58@set_catalog()
  59class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin):
  60    """
  61    BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API.
  62    """
  63
  64    DIALECT = "bigquery"
  65    DEFAULT_BATCH_SIZE = 1000
  66    SUPPORTS_TRANSACTIONS = False
  67    SUPPORTS_MATERIALIZED_VIEWS = True
  68    SUPPORTS_CLONING = True
  69    SUPPORTS_GRANTS = True
  70    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("session_user")
  71    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
  72    USE_CATALOG_IN_GRANTS = True
  73    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES"
  74    MAX_TABLE_COMMENT_LENGTH = 1024
  75    MAX_COLUMN_COMMENT_LENGTH = 1024
  76    SUPPORTS_QUERY_EXECUTION_TRACKING = True
  77    SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]
  78    INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE
  79
  80    SCHEMA_DIFFER_KWARGS = {
  81        "compatible_types": {
  82            exp.DataType.build("INT64", dialect=DIALECT): {
  83                exp.DataType.build("NUMERIC", dialect=DIALECT),
  84                exp.DataType.build("FLOAT64", dialect=DIALECT),
  85                exp.DataType.build("BIGNUMERIC", dialect=DIALECT),
  86            },
  87            exp.DataType.build("NUMERIC", dialect=DIALECT): {
  88                exp.DataType.build("FLOAT64", dialect=DIALECT),
  89                exp.DataType.build("BIGNUMERIC", dialect=DIALECT),
  90            },
  91            exp.DataType.build("DATE", dialect=DIALECT): {
  92                exp.DataType.build("DATETIME", dialect=DIALECT),
  93            },
  94        },
  95        "coerceable_types": {
  96            exp.DataType.build("FLOAT64", dialect=DIALECT): {
  97                exp.DataType.build("BIGNUMERIC", dialect=DIALECT),
  98            },
  99        },
 100        "support_coercing_compatible_types": True,
 101        "parameterized_type_defaults": {
 102            exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(38, 9), (0,)],
 103            exp.DataType.build("BIGDECIMAL", dialect=DIALECT).this: [(76.76, 38), (0,)],
 104        },
 105        "types_with_unlimited_length": {
 106            # parameterized `STRING(n)` can ALTER to unparameterized `STRING`
 107            exp.DataType.build("STRING", dialect=DIALECT).this: {
 108                exp.DataType.build("STRING", dialect=DIALECT).this,
 109            },
 110            # parameterized `BYTES(n)` can ALTER to unparameterized `BYTES`
 111            exp.DataType.build("BYTES", dialect=DIALECT).this: {
 112                exp.DataType.build("BYTES", dialect=DIALECT).this,
 113            },
 114        },
 115        "nested_support": NestedSupport.ALL_BUT_DROP,
 116    }
 117
 118    @property
 119    def client(self) -> BigQueryClient:
 120        return self.connection._client
 121
 122    @property
 123    def bigframe(self) -> t.Optional[BigframeSession]:
 124        if bigframes:
 125            options = bigframes.BigQueryOptions(
 126                credentials=self.client._credentials,
 127                project=self.client.project,
 128                location=self.client.location,
 129            )
 130            return bigframes.connect(context=options)
 131        return None
 132
 133    @property
 134    def _job_params(self) -> t.Dict[str, t.Any]:
 135        from sqlmesh.core.config.connection import BigQueryPriority
 136
 137        params = {
 138            "use_legacy_sql": False,
 139            "priority": self._extra_config.get(
 140                "priority", BigQueryPriority.INTERACTIVE.bigquery_constant
 141            ),
 142        }
 143        if self._extra_config.get("maximum_bytes_billed") is not None:
 144            params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed")
 145        if self._extra_config.get("reservation") is not None:
 146            params["reservation"] = self._extra_config.get("reservation")
 147        if self.correlation_id:
 148            # BigQuery label keys must be lowercase
 149            key = self.correlation_id.job_type.value.lower()
 150            params["labels"] = {key: self.correlation_id.job_id}
 151        return params
 152
 153    @property
 154    def catalog_support(self) -> CatalogSupport:
 155        return CatalogSupport.FULL_SUPPORT
 156
 157    def _df_to_source_queries(
 158        self,
 159        df: DF,
 160        target_columns_to_types: t.Dict[str, exp.DataType],
 161        batch_size: int,
 162        target_table: TableName,
 163        source_columns: t.Optional[t.List[str]] = None,
 164    ) -> t.List[SourceQuery]:
 165        import pandas as pd
 166
 167        source_columns_to_types = get_source_columns_to_types(
 168            target_columns_to_types, source_columns
 169        )
 170
 171        temp_bq_table = self.__get_temp_bq_table(
 172            self._get_temp_table(target_table or "pandas"), source_columns_to_types
 173        )
 174        temp_table = exp.table_(
 175            temp_bq_table.table_id,
 176            db=temp_bq_table.dataset_id,
 177            catalog=temp_bq_table.project,
 178        )
 179
 180        def query_factory() -> Query:
 181            ordered_df = df[list(source_columns_to_types)]
 182            if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame):
 183                ordered_df.to_gbq(
 184                    f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}",
 185                    if_exists="replace",
 186                )
 187            elif not self.table_exists(temp_table):
 188                # Make mypy happy
 189                assert isinstance(ordered_df, pd.DataFrame)
 190                self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False)
 191                result = self.__load_pandas_to_table(
 192                    temp_bq_table, ordered_df, source_columns_to_types, replace=False
 193                )
 194                if result.errors:
 195                    raise SQLMeshError(result.errors)
 196            return exp.select(
 197                *self._casted_columns(target_columns_to_types, source_columns=source_columns)
 198            ).from_(temp_table)
 199
 200        return [
 201            SourceQuery(
 202                query_factory=query_factory,
 203                cleanup_func=lambda: self.drop_table(temp_table),
 204            )
 205        ]
 206
 207    def close(self) -> t.Any:
 208        # Cancel all pending query jobs across all threads
 209        all_query_jobs = self._connection_pool.get_all_attributes("query_job")
 210        for query_job in all_query_jobs:
 211            if query_job:
 212                try:
 213                    if not self._db_call(query_job.done):
 214                        self._db_call(query_job.cancel)
 215                        logger.debug(
 216                            "Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
 217                            query_job.project,
 218                            query_job.location,
 219                            query_job.job_id,
 220                        )
 221                except Exception as ex:
 222                    logger.debug(
 223                        "Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s",
 224                        query_job.project,
 225                        query_job.location,
 226                        query_job.job_id,
 227                        str(ex),
 228                    )
 229
 230        return super().close()
 231
 232    def _begin_session(self, properties: SessionProperties) -> None:
 233        from google.cloud.bigquery import QueryJobConfig
 234
 235        query_label_property = properties.get("query_label")
 236        parsed_query_label: list[tuple[str, str]] = []
 237        if isinstance(query_label_property, (exp.Array, exp.Paren, exp.Tuple)):
 238            label_tuples = (
 239                [query_label_property.unnest()]
 240                if isinstance(query_label_property, exp.Paren)
 241                else query_label_property.expressions
 242            )
 243
 244            # query_label is a Paren, Array or Tuple of 2-tuples and validated at load time
 245            parsed_query_label.extend(
 246                (label_tuple.expressions[0].name, label_tuple.expressions[1].name)
 247                for label_tuple in label_tuples
 248            )
 249        elif query_label_property is not None:
 250            raise SQLMeshError(
 251                "Invalid value for `session_properties.query_label`. Must be an array or tuple."
 252            )
 253
 254        if self.correlation_id:
 255            parsed_query_label.append(
 256                (self.correlation_id.job_type.value.lower(), self.correlation_id.job_id)
 257            )
 258
 259        if parsed_query_label:
 260            query_label_str = ",".join([":".join(label) for label in parsed_query_label])
 261            query = f'SET @@query_label = "{query_label_str}";SELECT 1;'
 262        else:
 263            query = "SELECT 1;"
 264
 265        job = self.client.query(
 266            query,
 267            job_config=QueryJobConfig(create_session=True),
 268        )
 269        session_info = job.session_info
 270        session_id = session_info.session_id if session_info else None
 271        self._session_id = session_id
 272        job.result()
 273
 274    def _end_session(self) -> None:
 275        self._session_id = None
 276
 277    def _is_session_active(self) -> bool:
 278        return self._session_id is not None
 279
 280    def get_current_catalog(self) -> t.Optional[str]:
 281        """Returns the catalog name of the current connection."""
 282        return self.client.project
 283
 284    def set_current_catalog(self, catalog: str) -> None:
 285        """Sets the catalog name of the current connection."""
 286        self.client.project = catalog
 287
 288    def create_schema(
 289        self,
 290        schema_name: SchemaName,
 291        ignore_if_exists: bool = True,
 292        warn_on_error: bool = True,
 293        properties: t.List[exp.Expr] = [],
 294    ) -> None:
 295        """Create a schema from a name or qualified table name."""
 296        from google.api_core.exceptions import Conflict
 297
 298        try:
 299            super().create_schema(
 300                schema_name,
 301                ignore_if_exists=ignore_if_exists,
 302                warn_on_error=False,
 303            )
 304        except Exception as e:
 305            is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e)
 306            if is_already_exists_error and ignore_if_exists:
 307                return
 308            if not warn_on_error:
 309                raise
 310            logger.warning("Failed to create schema '%s': %s", schema_name, e)
 311
 312    def get_bq_schema(self, table_name: TableName) -> t.List[bigquery.SchemaField]:
 313        table = exp.to_table(table_name)
 314        if len(table.parts) == 3 and "." in table.name:
 315            self.execute(exp.select("*").from_(table).limit(0))
 316            query_job = self._query_job
 317            assert query_job is not None
 318            return query_job._query_results.schema
 319        return self._get_table(table).schema
 320
 321    def columns(
 322        self, table_name: TableName, include_pseudo_columns: bool = False
 323    ) -> t.Dict[str, exp.DataType]:
 324        """Fetches column names and types for the target table."""
 325
 326        def dtype_to_sql(
 327            dtype: t.Optional[StandardSqlDataType], field: bigquery.SchemaField
 328        ) -> str:
 329            assert dtype
 330            assert field
 331
 332            kind = dtype.type_kind
 333            assert kind
 334
 335            # Not using the enum value to preserve compatibility with older versions
 336            # of the BigQuery library.
 337            if kind.name == "ARRAY":
 338                return f"ARRAY<{dtype_to_sql(dtype.array_element_type, field)}>"
 339            if kind.name == "STRUCT":
 340                struct_type = dtype.struct_type
 341                assert struct_type
 342                fields = ", ".join(
 343                    f"{struct_field.name} {dtype_to_sql(struct_field.type, nested_field)}"
 344                    for struct_field, nested_field in zip(struct_type.fields, field.fields)
 345                )
 346                return f"STRUCT<{fields}>"
 347            if kind.name == "TYPE_KIND_UNSPECIFIED":
 348                field_type = field.field_type
 349
 350                if field_type == "RANGE":
 351                    # If the field is a RANGE then `range_element_type` should be set to
 352                    # one of `"DATE"`, `"DATETIME"` or `"TIMESTAMP"`.
 353                    return f"RANGE<{field.range_element_type.element_type}>"
 354
 355                return field_type
 356
 357            return kind.name
 358
 359        def create_mapping_schema(
 360            schema: t.Sequence[bigquery.SchemaField],
 361        ) -> t.Dict[str, exp.DataType]:
 362            return {
 363                field.name: exp.DataType.build(
 364                    dtype_to_sql(field.to_standard_sql().type, field), dialect=self.dialect
 365                )
 366                for field in schema
 367            }
 368
 369        table = exp.to_table(table_name)
 370        if len(table.parts) == 3 and "." in table.name:
 371            # The client's `get_table` method can't handle paths with >3 identifiers
 372            self.execute(exp.select("*").from_(table).limit(0))
 373            query_job = self._query_job
 374            assert query_job is not None
 375
 376            query_results = query_job._query_results
 377            columns = create_mapping_schema(query_results.schema)
 378        else:
 379            bq_table = self._get_table(table)
 380            columns = create_mapping_schema(bq_table.schema)
 381
 382            if include_pseudo_columns:
 383                if bq_table.time_partitioning and not bq_table.time_partitioning.field:
 384                    columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP", dialect="bigquery")
 385                    if bq_table.time_partitioning.type_ == "DAY":
 386                        columns["_PARTITIONDATE"] = exp.DataType.build("DATE")
 387                if bq_table.table_id.endswith("*"):
 388                    columns["_TABLE_SUFFIX"] = exp.DataType.build("STRING", dialect="bigquery")
 389                if (
 390                    bq_table.external_data_configuration is not None
 391                    and bq_table.external_data_configuration.source_format
 392                    in (
 393                        "CSV",
 394                        "NEWLINE_DELIMITED_JSON",
 395                        "AVRO",
 396                        "PARQUET",
 397                        "ORC",
 398                        "DATASTORE_BACKUP",
 399                    )
 400                ):
 401                    columns["_FILE_NAME"] = exp.DataType.build("STRING", dialect="bigquery")
 402
 403        return columns
 404
 405    def alter_table(
 406        self,
 407        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
 408    ) -> None:
 409        """
 410        Performs the alter statements to change the current table into the structure of the target table,
 411        and uses the API to add columns to structs, where SQL is not supported.
 412        """
 413        if not alter_expressions:
 414            return
 415
 416        cluster_by_operations, alter_statements = [], []
 417        for e in alter_expressions:
 418            if isinstance(e, TableAlterClusterByOperation):
 419                cluster_by_operations.append(e)
 420            elif isinstance(e, TableAlterOperation):
 421                alter_statements.append(e.expression)
 422            else:
 423                alter_statements.append(e)
 424
 425        for op in cluster_by_operations:
 426            self._update_clustering_key(op)
 427
 428        nested_fields, non_nested_expressions = self._split_alter_expressions(alter_statements)
 429
 430        if nested_fields:
 431            self._update_table_schema_nested_fields(nested_fields, alter_statements[0].this)
 432
 433        if non_nested_expressions:
 434            super().alter_table(non_nested_expressions)
 435
 436    def fetchone(
 437        self,
 438        query: t.Union[exp.Expr, str],
 439        ignore_unsupported_errors: bool = False,
 440        quote_identifiers: bool = False,
 441    ) -> t.Optional[t.Tuple]:
 442        """
 443        BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute
 444        configuration we have in place. Therefore this implementation calls execute instead.
 445        """
 446        self.execute(
 447            query,
 448            ignore_unsupported_errors=ignore_unsupported_errors,
 449            quote_identifiers=quote_identifiers,
 450        )
 451        try:
 452            return next(self._query_data)
 453        except StopIteration:
 454            return None
 455
 456    def fetchall(
 457        self,
 458        query: t.Union[exp.Expr, str],
 459        ignore_unsupported_errors: bool = False,
 460        quote_identifiers: bool = False,
 461    ) -> t.List[t.Tuple]:
 462        """
 463        BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute
 464        configuration we have in place. Therefore this implementation calls execute instead.
 465        """
 466        self.execute(
 467            query,
 468            ignore_unsupported_errors=ignore_unsupported_errors,
 469            quote_identifiers=quote_identifiers,
 470        )
 471        return list(self._query_data)
 472
 473    def _split_alter_expressions(
 474        self,
 475        alter_expressions: t.List[exp.Alter],
 476    ) -> t.Tuple[NestedFieldsDict, t.List[exp.Alter]]:
 477        """
 478        Returns a dictionary of the nested fields to add and a list of the non-nested alter expressions.
 479        """
 480        nested_fields_to_add: NestedFieldsDict = defaultdict(list)
 481        non_nested_expressions = []
 482
 483        for alter_expression in alter_expressions:
 484            action = alter_expression.args["actions"][0]
 485            if (
 486                isinstance(action, exp.ColumnDef)
 487                and isinstance(action.this, exp.Dot)
 488                and isinstance(action.kind, exp.DataType)
 489            ):
 490                root_field, *leaf_fields = action.this.this.sql(dialect=self.dialect).split(".")
 491                new_field = action.this.expression.sql(dialect=self.dialect)
 492                data_type = action.kind.sql(dialect=self.dialect)
 493                nested_fields_to_add[root_field].append((new_field, data_type, leaf_fields))
 494            else:
 495                non_nested_expressions.append(alter_expression)
 496
 497        return nested_fields_to_add, non_nested_expressions
 498
 499    def _build_nested_fields(
 500        self,
 501        current_fields: t.List[bigquery.SchemaField],
 502        fields_to_add: t.List[NestedField],
 503    ) -> t.List[bigquery.SchemaField]:
 504        """
 505        Recursively builds and updates the schema fields with the new nested fields.
 506        """
 507        from google.cloud import bigquery
 508
 509        new_fields = []
 510        root: t.List[t.Tuple[str, str]] = []
 511        leaves: NestedFieldsDict = defaultdict(list)
 512        for new_field, data_type, leaf_fields in fields_to_add:
 513            if leaf_fields:
 514                leaves[leaf_fields[0]].append((new_field, data_type, leaf_fields[1:]))
 515            else:
 516                root.append((new_field, data_type))
 517
 518        for field in current_fields:
 519            # If the new fields are nested, we need to recursively build them
 520            if field.name in leaves:
 521                subfields = list(field.fields)
 522                subfields = self._build_nested_fields(subfields, leaves[field.name])
 523                new_fields.append(
 524                    bigquery.SchemaField(
 525                        field.name, "RECORD", mode=field.mode, fields=tuple(subfields)
 526                    )
 527                )
 528            else:
 529                new_fields.append(field)
 530
 531        # Build and append the new root-level fields
 532        new_fields.extend(
 533            self.__get_bq_schemafield(
 534                new_field[0], exp.DataType.build(new_field[1], dialect=self.dialect)
 535            )
 536            for new_field in root
 537        )
 538        return new_fields
 539
 540    def _update_table_schema_nested_fields(
 541        self, nested_fields_to_add: NestedFieldsDict, table_name: str
 542    ) -> None:
 543        """
 544        Updates a BigQuery table schema by adding the new nested fields provided.
 545        """
 546        from google.cloud import bigquery
 547
 548        table = self._get_table(table_name)
 549        original_schema = table.schema
 550        new_schema = []
 551        for field in original_schema:
 552            if field.name in nested_fields_to_add:
 553                fields = self._build_nested_fields(
 554                    list(field.fields), nested_fields_to_add[field.name]
 555                )
 556                new_schema.append(
 557                    bigquery.SchemaField(
 558                        field.name,
 559                        "RECORD",
 560                        mode=field.mode,
 561                        fields=tuple(fields),
 562                    )
 563                )
 564            else:
 565                new_schema.append(field)
 566
 567        if new_schema != original_schema:
 568            table.schema = new_schema
 569            self.client.update_table(table, ["schema"])
 570
 571    def __load_pandas_to_table(
 572        self,
 573        table: bigquery.Table,
 574        df: pd.DataFrame,
 575        columns_to_types: t.Dict[str, exp.DataType],
 576        replace: bool = False,
 577    ) -> BigQueryQueryResult:
 578        """
 579        Loads a pandas dataframe into a table in BigQuery. Will do an overwrite if replace is True. Note that
 580        the replace will replace the entire table, not just the rows that are in the dataframe.
 581        """
 582        from google.cloud import bigquery
 583
 584        job_config = bigquery.job.LoadJobConfig(schema=self.__get_bq_schema(columns_to_types))
 585        if replace:
 586            job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
 587        logger.info(f"Loading dataframe to BigQuery. Table Path: {table.path}")
 588        # This client call does not support retry so we don't use the `_db_call` method.
 589        result = self.__retry(
 590            self.__db_load_table_from_dataframe,
 591        )(df=df, table=table, job_config=job_config)
 592        if result.errors:
 593            raise SQLMeshError(result.errors)
 594        return result
 595
 596    def __db_load_table_from_dataframe(
 597        self, df: pd.DataFrame, table: bigquery.Table, job_config: bigquery.LoadJobConfig
 598    ) -> BigQueryQueryResult:
 599        job = self.client.load_table_from_dataframe(
 600            dataframe=df, destination=table, job_config=job_config
 601        )
 602        return self._db_call(job.result)
 603
 604    def __get_bq_schemafield(self, name: str, tpe: exp.DataType) -> bigquery.SchemaField:
 605        from google.cloud import bigquery
 606
 607        mode = "NULLABLE"
 608        if tpe.is_type(exp.DataType.Type.ARRAY):
 609            mode = "REPEATED"
 610            tpe = tpe.expressions[0]
 611
 612        field_type = tpe.sql(dialect=self.dialect)
 613        fields = []
 614        if tpe.is_type(*exp.DataType.NESTED_TYPES):
 615            field_type = "RECORD"
 616            for inner_field in tpe.expressions:
 617                if isinstance(inner_field, exp.ColumnDef):
 618                    inner_name = inner_field.this.sql(dialect=self.dialect)
 619                    inner_type = inner_field.kind
 620                    if inner_type is None:
 621                        raise ValueError(
 622                            f"cannot convert unknown type to BQ schema field {inner_field}"
 623                        )
 624                    fields.append(self.__get_bq_schemafield(name=inner_name, tpe=inner_type))
 625                else:
 626                    raise ValueError(f"unexpected nested expression {inner_field}")
 627
 628        return bigquery.SchemaField(
 629            name=name,
 630            field_type=field_type,
 631            mode=mode,
 632            fields=fields,
 633        )
 634
 635    def __get_bq_schema(
 636        self, columns_to_types: t.Dict[str, exp.DataType]
 637    ) -> t.List[bigquery.SchemaField]:
 638        """
 639        Returns a bigquery schema object from a dictionary of column names to types.
 640        """
 641
 642        precisionless_col_to_types = {
 643            col_name: remove_precision_parameterized_types(col_type)
 644            for col_name, col_type in columns_to_types.items()
 645        }
 646        return [
 647            self.__get_bq_schemafield(name=col_name, tpe=t.cast(exp.DataType, col_type))
 648            for col_name, col_type in precisionless_col_to_types.items()
 649        ]
 650
 651    def __get_temp_bq_table(
 652        self, table: exp.Table, columns_to_type: t.Dict[str, exp.DataType]
 653    ) -> bigquery.Table:
 654        """
 655        Returns a bigquery table object that is temporary and will expire in 3 hours.
 656        """
 657        bq_table = self.__get_bq_table(table, columns_to_type)
 658        bq_table.expires = to_datetime("in 3 hours")
 659        return bq_table
 660
 661    def __get_bq_table(
 662        self, table: TableName, columns_to_type: t.Dict[str, exp.DataType]
 663    ) -> bigquery.Table:
 664        """
 665        Returns a bigquery table object with a schema defines that matches the columns_to_type dictionary.
 666        """
 667        from google.cloud import bigquery
 668
 669        table_ = exp.to_table(table).copy()
 670
 671        if not table_.catalog:
 672            table_.set("catalog", exp.to_identifier(self.default_catalog))
 673
 674        return bigquery.Table(
 675            table_ref=self._table_name(table_),
 676            schema=self.__get_bq_schema(columns_to_type),
 677        )
 678
 679    @property
 680    def __retry(self) -> Retry:
 681        from google.api_core import retry
 682
 683        return retry.Retry(
 684            predicate=_ErrorCounter(self._extra_config["job_retries"]).should_retry,
 685            deadline=self._extra_config.get("job_retry_deadline_seconds"),
 686            initial=1.0,
 687            maximum=3.0,
 688        )
 689
 690    def insert_overwrite_by_partition(
 691        self,
 692        table_name: TableName,
 693        query_or_df: QueryOrDF,
 694        partitioned_by: t.List[exp.Expr],
 695        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 696        source_columns: t.Optional[t.List[str]] = None,
 697    ) -> None:
 698        if len(partitioned_by) != 1:
 699            raise SQLMeshError(
 700                f"Bigquery only supports partitioning by one column, {len(partitioned_by)} were provided."
 701            )
 702
 703        partition_exp = partitioned_by[0]
 704        partition_column = partition_exp.find(exp.Column)
 705
 706        granularity = partition_exp.args.get("unit")
 707        if granularity:
 708            granularity = granularity.name.lower()
 709
 710        if not partition_column:
 711            partition_sql = partition_exp.sql(dialect=self.dialect)
 712            raise SQLMeshError(
 713                f"The partition expression '{partition_sql}' doesn't contain a column."
 714            )
 715        with (
 716            self.session({}),
 717            self.temp_table(
 718                query_or_df,
 719                name=table_name,
 720                partitioned_by=partitioned_by,
 721                source_columns=source_columns,
 722            ) as temp_table_name,
 723        ):
 724            if target_columns_to_types is None or target_columns_to_types[
 725                partition_column.name
 726            ] == exp.DataType.build("unknown"):
 727                target_columns_to_types = self.columns(table_name)
 728
 729            partition_type_sql = target_columns_to_types[partition_column.name].sql(
 730                dialect=self.dialect
 731            )
 732
 733            select_array_agg_partitions = select_partitions_expr(
 734                temp_table_name.db,
 735                temp_table_name.name,
 736                partition_type_sql,
 737                granularity=granularity,
 738                agg_func="ARRAY_AGG",
 739                catalog=temp_table_name.catalog or self.default_catalog,
 740            )
 741
 742            self.execute(
 743                f"DECLARE _sqlmesh_target_partitions_ ARRAY<{partition_type_sql}> DEFAULT ({select_array_agg_partitions});"
 744            )
 745
 746            where = t.cast(exp.Condition, partition_exp).isin(unnest="_sqlmesh_target_partitions_")
 747
 748            self._insert_overwrite_by_condition(
 749                table_name,
 750                [SourceQuery(query_factory=lambda: exp.select("*").from_(temp_table_name))],
 751                target_columns_to_types,
 752                where=where,
 753            )
 754
 755    def table_exists(self, table_name: TableName) -> bool:
 756        table = exp.to_table(table_name)
 757        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
 758        if data_object_cache_key in self._data_object_cache:
 759            logger.debug("Table existence cache hit: %s", data_object_cache_key)
 760            return self._data_object_cache[data_object_cache_key] is not None
 761
 762        try:
 763            from google.cloud.exceptions import NotFound
 764        except ModuleNotFoundError:
 765            from google.api_core.exceptions import NotFound
 766
 767        try:
 768            self._get_table(table_name)
 769            return True
 770        except NotFound:
 771            return False
 772
 773    def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]:
 774        from sqlmesh.utils.date import to_timestamp
 775
 776        datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list)
 777        for table_name in table_names:
 778            table = exp.to_table(table_name)
 779            datasets_to_tables[table.db].append(table.name)
 780
 781        results = []
 782
 783        for dataset, tables in datasets_to_tables.items():
 784            query = (
 785                f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE "
 786            )
 787            for i, table_name in enumerate(tables):
 788                query += f"TABLE_ID = '{table_name}'"
 789                if i < len(tables) - 1:
 790                    query += " OR "
 791            results.extend(self.fetchall(query))
 792
 793        return [to_timestamp(row[0]) for row in results]
 794
 795    def _get_table(self, table_name: TableName) -> BigQueryTable:
 796        """
 797        Returns a BigQueryTable object for the given table name.
 798
 799        Raises: `google.cloud.exceptions.NotFound` if the table does not exist.
 800        """
 801        return self._db_call(self.client.get_table, table=self._table_name(table_name))
 802
 803    def _table_name(self, table_name: TableName) -> str:
 804        # the api doesn't support backticks, so we can't call exp.table_name or sql
 805        return ".".join(part.name for part in exp.to_table(table_name).parts)
 806
 807    def _fetch_native_df(
 808        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 809    ) -> DF:
 810        self.execute(query, quote_identifiers=quote_identifiers)
 811        query_job = self._query_job
 812        assert query_job is not None
 813        return query_job.to_dataframe()
 814
 815    def _create_column_comments(
 816        self,
 817        table_name: TableName,
 818        column_comments: t.Dict[str, str],
 819        table_kind: str = "TABLE",
 820        materialized_view: bool = False,
 821    ) -> None:
 822        if not (table_kind == "VIEW" and materialized_view):
 823            table = self._get_table(table_name)
 824
 825            # convert Table object to dict
 826            table_def = table.to_api_repr()
 827
 828            # Set column descriptions, supporting nested fields (e.g. record.field.nested_field)
 829            for column, comment in column_comments.items():
 830                fields = table_def["schema"]["fields"]
 831                field_names = column.split(".")
 832                last_index = len(field_names) - 1
 833
 834                # Traverse the fields with nested fields down to leaf level
 835                for idx, name in enumerate(field_names):
 836                    if field := next((field for field in fields if field["name"] == name), None):
 837                        if idx == last_index:
 838                            field["description"] = self._truncate_comment(
 839                                comment, self.MAX_COLUMN_COMMENT_LENGTH
 840                            )
 841                        else:
 842                            fields = field.get("fields") or []
 843
 844            # An "etag" is BQ versioning metadata that changes when an object is updated/modified. `update_table`
 845            # compares the etags of the table object passed to it and the remote table, erroring if the etags
 846            # don't match. We set the local etag to None to avoid this check.
 847            table_def["etag"] = None
 848
 849            # convert dict back to a Table object
 850            table = table.from_api_repr(table_def)
 851
 852            # update table schema
 853            logger.info(f"Registering column comments for table {table_name}")
 854            self._db_call(self.client.update_table, table=table, fields=["schema"])
 855
 856    def _build_description_property_exp(
 857        self,
 858        description: str,
 859        trunc_method: t.Callable,
 860    ) -> exp.Property:
 861        return exp.Property(
 862            this=exp.to_identifier("description", quoted=True),
 863            value=exp.Literal.string(trunc_method(description)),
 864        )
 865
 866    def _build_partitioned_by_exp(
 867        self,
 868        partitioned_by: t.List[exp.Expr],
 869        *,
 870        partition_interval_unit: t.Optional[IntervalUnit] = None,
 871        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 872        **kwargs: t.Any,
 873    ) -> t.Optional[exp.PartitionedByProperty]:
 874        if len(partitioned_by) > 1:
 875            raise SQLMeshError("BigQuery only supports partitioning by a single column")
 876
 877        this = partitioned_by[0]
 878        if (
 879            isinstance(this, exp.Column)
 880            and partition_interval_unit is not None
 881            and not partition_interval_unit.is_minute
 882        ):
 883            column_type: t.Optional[exp.DataType] = (target_columns_to_types or {}).get(this.name)
 884
 885            if column_type == exp.DataType.build(
 886                "date", dialect=self.dialect
 887            ) and partition_interval_unit in (
 888                IntervalUnit.MONTH,
 889                IntervalUnit.YEAR,
 890            ):
 891                trunc_func = "DATE_TRUNC"
 892            elif column_type == exp.DataType.build("timestamp", dialect=self.dialect):
 893                trunc_func = "TIMESTAMP_TRUNC"
 894            elif column_type == exp.DataType.build("datetime", dialect=self.dialect):
 895                trunc_func = "DATETIME_TRUNC"
 896            else:
 897                trunc_func = ""
 898
 899            if trunc_func:
 900                this = exp.func(
 901                    trunc_func,
 902                    this,
 903                    exp.var(partition_interval_unit.value.upper()),
 904                    dialect=self.dialect,
 905                )
 906
 907        return exp.PartitionedByProperty(this=this)
 908
 909    def _build_table_properties_exp(
 910        self,
 911        catalog_name: t.Optional[str] = None,
 912        table_format: t.Optional[str] = None,
 913        storage_format: t.Optional[str] = None,
 914        partitioned_by: t.Optional[t.List[exp.Expr]] = None,
 915        partition_interval_unit: t.Optional[IntervalUnit] = None,
 916        clustered_by: t.Optional[t.List[exp.Expr]] = None,
 917        table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
 918        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 919        table_description: t.Optional[str] = None,
 920        table_kind: t.Optional[str] = None,
 921        **kwargs: t.Any,
 922    ) -> t.Optional[exp.Properties]:
 923        properties: t.List[exp.Expr] = []
 924
 925        if partitioned_by and (
 926            partitioned_by_prop := self._build_partitioned_by_exp(
 927                partitioned_by,
 928                partition_interval_unit=partition_interval_unit,
 929                target_columns_to_types=target_columns_to_types,
 930            )
 931        ):
 932            properties.append(partitioned_by_prop)
 933
 934        if clustered_by and (clustered_by_exp := self._build_clustered_by_exp(clustered_by)):
 935            properties.append(clustered_by_exp)
 936
 937        if table_description:
 938            properties.append(
 939                self._build_description_property_exp(
 940                    table_description, self._truncate_table_comment
 941                ),
 942            )
 943
 944        properties.extend(self._table_or_view_properties_to_expressions(table_properties))
 945
 946        if properties:
 947            return exp.Properties(expressions=properties)
 948        return None
 949
 950    def _build_column_def(
 951        self,
 952        col_name: str,
 953        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 954        engine_supports_schema_comments: bool = False,
 955        col_type: t.Optional[exp.DATA_TYPE] = None,
 956        nested_names: t.List[str] = [],
 957    ) -> exp.ColumnDef:
 958        # Helper function to build column definitions with column descriptions
 959        def _build_struct_with_descriptions(
 960            col_type: exp.DataType,
 961            nested_names: t.List[str],
 962        ) -> exp.DataType:
 963            column_expressions = []
 964            for column_def in col_type.expressions:
 965                # This is expected to  be true, but this check is included as a
 966                # precautionary measure in case of an unexpected edge case
 967                if isinstance(column_def, exp.ColumnDef):
 968                    column = self._build_column_def(
 969                        col_name=column_def.name,
 970                        column_descriptions=column_descriptions,
 971                        engine_supports_schema_comments=engine_supports_schema_comments,
 972                        col_type=column_def.kind,
 973                        nested_names=nested_names,
 974                    )
 975                else:
 976                    column = column_def
 977                column_expressions.append(column)
 978            return exp.DataType(this=col_type.this, expressions=column_expressions, nested=True)
 979
 980        # Recursively build column definitions for BigQuery's RECORDs (struct) and REPEATED RECORDs (array of struct)
 981        if isinstance(col_type, exp.DataType) and col_type.expressions:
 982            expressions = col_type.expressions
 983            if col_type.is_type(exp.DataType.Type.STRUCT):
 984                col_type = _build_struct_with_descriptions(col_type, nested_names + [col_name])
 985            elif col_type.is_type(exp.DataType.Type.ARRAY) and expressions[0].is_type(
 986                exp.DataType.Type.STRUCT
 987            ):
 988                col_type = exp.DataType(
 989                    this=exp.DataType.Type.ARRAY,
 990                    expressions=[
 991                        _build_struct_with_descriptions(
 992                            col_type.expressions[0], nested_names + [col_name]
 993                        )
 994                    ],
 995                    nested=True,
 996                )
 997
 998        return exp.ColumnDef(
 999            this=exp.to_identifier(col_name),
1000            kind=col_type,
1001            constraints=(
1002                self._build_col_comment_exp(
1003                    ".".join(nested_names + [col_name]), column_descriptions
1004                )
1005                if engine_supports_schema_comments and self.comments_enabled and column_descriptions
1006                else None
1007            ),
1008        )
1009
1010    def _build_col_comment_exp(
1011        self, col_name: str, column_descriptions: t.Dict[str, str]
1012    ) -> t.List[exp.ColumnConstraint]:
1013        comment = column_descriptions.get(col_name, None)
1014        if comment:
1015            return [
1016                exp.ColumnConstraint(
1017                    kind=exp.Properties(
1018                        expressions=[
1019                            self._build_description_property_exp(
1020                                comment, self._truncate_column_comment
1021                            ),
1022                        ]
1023                    )
1024                )
1025            ]
1026        return []
1027
1028    def _build_view_properties_exp(
1029        self,
1030        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
1031        table_description: t.Optional[str] = None,
1032        **kwargs: t.Any,
1033    ) -> t.Optional[exp.Properties]:
1034        """Creates a SQLGlot table properties expression for view"""
1035        properties: t.List[exp.Expr] = []
1036
1037        if table_description:
1038            properties.append(
1039                self._build_description_property_exp(
1040                    table_description, self._truncate_table_comment
1041                ),
1042            )
1043
1044        properties.extend(self._table_or_view_properties_to_expressions(view_properties))
1045
1046        if properties:
1047            return exp.Properties(expressions=properties)
1048        return None
1049
1050    def _build_create_comment_table_exp(
1051        self, table: exp.Table, table_comment: str, table_kind: str
1052    ) -> exp.Comment | str:
1053        table_sql = table.sql(dialect=self.dialect, identify=True)
1054
1055        truncated_comment = self._truncate_table_comment(table_comment)
1056        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
1057
1058        return f"ALTER {table_kind} {table_sql} SET OPTIONS(description = {comment_sql})"
1059
1060    def _build_create_comment_column_exp(
1061        self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE"
1062    ) -> exp.Comment | str:
1063        table_sql = table.sql(dialect=self.dialect, identify=True)
1064        column_sql = exp.column(column_name).sql(dialect=self.dialect, identify=True)
1065
1066        truncated_comment = self._truncate_column_comment(column_comment)
1067        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
1068
1069        return f"ALTER {table_kind} {table_sql} ALTER COLUMN {column_sql} SET OPTIONS(description = {comment_sql})"
1070
1071    def create_state_table(
1072        self,
1073        table_name: str,
1074        target_columns_to_types: t.Dict[str, exp.DataType],
1075        primary_key: t.Optional[t.Tuple[str, ...]] = None,
1076    ) -> None:
1077        self.create_table(
1078            table_name,
1079            target_columns_to_types,
1080        )
1081
1082    def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any) -> t.Any:
1083        return func(
1084            retry=self.__retry,
1085            *args,
1086            **kwargs,
1087        )
1088
1089    def _execute(
1090        self,
1091        sql: str,
1092        track_rows_processed: bool = False,
1093        **kwargs: t.Any,
1094    ) -> None:
1095        """Execute a sql query."""
1096        from google.cloud.bigquery import QueryJobConfig
1097        from google.cloud.bigquery.query import ConnectionProperty
1098
1099        # BigQuery's Python DB API implementation does not support retries, so we have to implement them ourselves.
1100        # So we update the cursor's query job and query data with the results of the new query job. This makes sure
1101        # that other cursor based operations execute correctly.
1102        session_id = self._session_id
1103        connection_properties = (
1104            [
1105                ConnectionProperty(key="session_id", value=session_id),
1106            ]
1107            if session_id
1108            else []
1109        )
1110
1111        # Create job config
1112        job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties)
1113
1114        self._query_job = self._db_call(
1115            self.client.query,
1116            query=sql,
1117            job_config=job_config,
1118            timeout=self._extra_config.get("job_creation_timeout_seconds"),
1119        )
1120        query_job = self._query_job
1121        assert query_job is not None
1122
1123        logger.debug(
1124            "BigQuery job created: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
1125            query_job.project,
1126            query_job.location,
1127            query_job.job_id,
1128        )
1129
1130        results = self._db_call(
1131            query_job.result,
1132            timeout=self._extra_config.get("job_execution_timeout_seconds"),  # type: ignore
1133        )
1134
1135        self._query_data = iter(results) if results.total_rows else iter([])
1136        query_results = query_job._query_results
1137        self.cursor._set_rowcount(query_results)
1138        self.cursor._set_description(query_results.schema)
1139
1140        if (
1141            track_rows_processed
1142            and self._query_execution_tracker
1143            and self._query_execution_tracker.is_tracking()
1144        ):
1145            num_rows = None
1146            if query_job.statement_type == "CREATE_TABLE_AS_SELECT":
1147                # since table was just created, number rows in table == number rows processed
1148                query_table = self.client.get_table(query_job.destination)
1149                num_rows = query_table.num_rows
1150            elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]:
1151                num_rows = query_job.num_dml_affected_rows
1152
1153            self._query_execution_tracker.record_execution(
1154                sql, num_rows, query_job.total_bytes_processed
1155            )
1156
1157    def _get_data_objects(
1158        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1159    ) -> t.List[DataObject]:
1160        """
1161        Returns all the data objects that exist in the given schema and optionally catalog.
1162        """
1163
1164        # The BigQuery Client's list_tables method does not support filtering by table name, so we have to
1165        # resort to using SQL instead.
1166        schema = to_schema(schema_name)
1167        catalog = schema.catalog or self.default_catalog
1168        query = (
1169            exp.select(
1170                exp.column("table_catalog").as_("catalog"),
1171                exp.column("table_name").as_("name"),
1172                exp.column("table_schema").as_("schema_name"),
1173                exp.case()
1174                .when(exp.column("table_type").eq("BASE TABLE"), exp.Literal.string("TABLE"))
1175                .when(exp.column("table_type").eq("CLONE"), exp.Literal.string("TABLE"))
1176                .when(exp.column("table_type").eq("EXTERNAL"), exp.Literal.string("TABLE"))
1177                .when(exp.column("table_type").eq("SNAPSHOT"), exp.Literal.string("TABLE"))
1178                .when(exp.column("table_type").eq("VIEW"), exp.Literal.string("VIEW"))
1179                .when(
1180                    exp.column("table_type").eq("MATERIALIZED VIEW"),
1181                    exp.Literal.string("MATERIALIZED_VIEW"),
1182                )
1183                .else_(exp.column("table_type"))
1184                .as_("type"),
1185                exp.column("clustering_key", "ci").as_("clustering_key"),
1186            )
1187            .with_(
1188                "clustering_info",
1189                as_=exp.select(
1190                    exp.column("table_catalog"),
1191                    exp.column("table_schema"),
1192                    exp.column("table_name"),
1193                    parse_one(
1194                        "string_agg(column_name order by clustering_ordinal_position)",
1195                        dialect=self.dialect,
1196                    ).as_("clustering_key"),
1197                )
1198                .from_(
1199                    exp.to_table(
1200                        f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.COLUMNS",
1201                        dialect=self.dialect,
1202                    )
1203                )
1204                .where(exp.column("clustering_ordinal_position").is_(exp.not_(exp.null())))
1205                .group_by("1", "2", "3"),
1206            )
1207            .from_(
1208                exp.to_table(
1209                    f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.TABLES", dialect=self.dialect
1210                )
1211            )
1212            .join(
1213                "clustering_info",
1214                using=["table_catalog", "table_schema", "table_name"],
1215                join_type="left",
1216                join_alias="ci",
1217            )
1218        )
1219        if object_names:
1220            query = query.where(exp.column("table_name").isin(*object_names))
1221
1222        try:
1223            df = self.fetchdf(query, quote_identifiers=True)
1224        except Exception as e:
1225            if "Not found" in str(e):
1226                return []
1227            raise
1228
1229        if df.empty:
1230            return []
1231        return [
1232            DataObject(
1233                catalog=row.catalog,  # type: ignore
1234                schema=row.schema_name,  # type: ignore
1235                name=row.name,  # type: ignore
1236                type=DataObjectType.from_str(row.type),  # type: ignore
1237                clustering_key=f"({row.clustering_key})" if row.clustering_key else None,  # type: ignore
1238            )
1239            for row in df.itertuples()
1240        ]
1241
1242    def _update_clustering_key(self, operation: TableAlterClusterByOperation) -> None:
1243        cluster_key_expressions = getattr(operation, "cluster_key_expressions", [])
1244        bq_table = self._get_table(operation.target_table)
1245
1246        rendered_columns = [c.sql(dialect=self.dialect) for c in cluster_key_expressions]
1247        bq_table.clustering_fields = (
1248            rendered_columns or None
1249        )  # causes a drop of the key if cluster_by is empty or None
1250
1251        self._db_call(self.client.update_table, table=bq_table, fields=["clustering_fields"])
1252
1253        if cluster_key_expressions:
1254            # BigQuery only applies new clustering going forward, so this rewrites the columns to apply the new clustering to historical data
1255            # ref: https://cloud.google.com/bigquery/docs/creating-clustered-tables#modifying-cluster-spec
1256            self.execute(
1257                exp.update(
1258                    operation.target_table,
1259                    {c: c for c in cluster_key_expressions},
1260                    where=exp.true(),
1261                )
1262            )
1263
1264    def _normalize_decimal_value(self, col: exp.Expr, precision: int) -> exp.Expr:
1265        return exp.func("FORMAT", exp.Literal.string(f"%.{precision}f"), col)
1266
1267    def _normalize_nested_value(self, col: exp.Expr) -> exp.Expr:
1268        return exp.func("TO_JSON_STRING", col, dialect=self.dialect)
1269
1270    @t.overload
1271    def _columns_to_types(
1272        self,
1273        query_or_df: DF,
1274        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1275        source_columns: t.Optional[t.List[str]] = None,
1276    ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ...
1277
1278    @t.overload
1279    def _columns_to_types(
1280        self,
1281        query_or_df: Query,
1282        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1283        source_columns: t.Optional[t.List[str]] = None,
1284    ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ...
1285
1286    def _columns_to_types(
1287        self,
1288        query_or_df: QueryOrDF,
1289        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1290        source_columns: t.Optional[t.List[str]] = None,
1291    ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]:
1292        if (
1293            not target_columns_to_types
1294            and bigframes
1295            and isinstance(query_or_df, bigframes.dataframe.DataFrame)
1296        ):
1297            # using dry_run=True attempts to prevent the DataFrame from being materialized just to read the column types from it
1298            dtypes = query_or_df.to_pandas(dry_run=True).columnDtypes
1299            target_columns_to_types = columns_to_types_from_dtypes(dtypes.items())
1300            return target_columns_to_types, list(source_columns or target_columns_to_types)
1301
1302        return super()._columns_to_types(
1303            query_or_df, target_columns_to_types, source_columns=source_columns
1304        )
1305
1306    def _native_df_to_pandas_df(
1307        self,
1308        query_or_df: QueryOrDF,
1309    ) -> t.Union[Query, pd.DataFrame]:
1310        if bigframes and isinstance(query_or_df, bigframes.dataframe.DataFrame):
1311            return query_or_df.to_pandas()
1312
1313        return super()._native_df_to_pandas_df(query_or_df)
1314
1315    @property
1316    def _query_data(self) -> t.Any:
1317        return self._connection_pool.get_attribute("query_data")
1318
1319    @_query_data.setter
1320    def _query_data(self, value: t.Any) -> None:
1321        self._connection_pool.set_attribute("query_data", value)
1322
1323    @property
1324    def _query_job(self) -> t.Optional[QueryJob]:
1325        return self._connection_pool.get_attribute("query_job")
1326
1327    @_query_job.setter
1328    def _query_job(self, value: t.Any) -> None:
1329        self._connection_pool.set_attribute("query_job", value)
1330
1331    @property
1332    def _session_id(self) -> t.Any:
1333        return self._connection_pool.get_attribute("session_id")
1334
1335    @_session_id.setter
1336    def _session_id(self, value: t.Any) -> None:
1337        self._connection_pool.set_attribute("session_id", value)
1338
1339    def _get_current_schema(self) -> str:
1340        raise NotImplementedError("BigQuery does not support current schema")
1341
1342    def _get_bq_dataset_location(self, project: str, dataset: str) -> str:
1343        return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location
1344
1345    def _get_grant_expression(self, table: exp.Table) -> exp.Expr:
1346        if not table.db:
1347            raise ValueError(
1348                f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)"
1349            )
1350        project = table.catalog or self.get_current_catalog()
1351        if not project:
1352            raise ValueError(
1353                f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)"
1354            )
1355
1356        dataset = table.db
1357        table_name = table.name
1358        location = self._get_bq_dataset_location(project, dataset)
1359
1360        # https://cloud.google.com/bigquery/docs/information-schema-object-privileges
1361        # OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier
1362        object_privileges_table = exp.to_table(
1363            f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}",
1364            dialect=self.dialect,
1365        )
1366        return (
1367            exp.select("privilege_type", "grantee")
1368            .from_(object_privileges_table)
1369            .where(
1370                exp.and_(
1371                    exp.column("object_schema").eq(exp.Literal.string(dataset)),
1372                    exp.column("object_name").eq(exp.Literal.string(table_name)),
1373                    # Filter out current_user
1374                    # BigQuery grantees format: "user:email" or "group:name"
1375                    exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[
1376                        exp.func("OFFSET", exp.Literal.number("1"))
1377                    ].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
1378                )
1379            )
1380        )
1381
1382    @staticmethod
1383    def _grant_object_kind(table_type: DataObjectType) -> str:
1384        if table_type == DataObjectType.VIEW:
1385            return "VIEW"
1386        if table_type == DataObjectType.MATERIALIZED_VIEW:
1387            # We actually need to use "MATERIALIZED VIEW" here even though it's not listed
1388            # as a supported resource_type in the BigQuery DCL doc:
1389            # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language
1390            return "MATERIALIZED VIEW"
1391        return "TABLE"
1392
1393    def _dcl_grants_config_expr(
1394        self,
1395        dcl_cmd: t.Type[DCL],
1396        table: exp.Table,
1397        grants_config: GrantsConfig,
1398        table_type: DataObjectType = DataObjectType.TABLE,
1399    ) -> t.List[exp.Expr]:
1400        expressions: t.List[exp.Expr] = []
1401        if not grants_config:
1402            return expressions
1403
1404        # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language
1405
1406        def normalize_principal(p: str) -> str:
1407            if ":" not in p:
1408                raise ValueError(f"Principal '{p}' missing a prefix label")
1409
1410            # allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:"
1411            if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"):
1412                if not p.startswith("specialGroup:"):
1413                    raise ValueError(
1414                        f"Special group principal '{p}' must start with 'specialGroup:' prefix label"
1415                    )
1416                return p
1417
1418            label, principal = p.split(":", 1)
1419            # always lowercase principals
1420            return f"{label}:{principal.lower()}"
1421
1422        object_kind = self._grant_object_kind(table_type)
1423        for privilege, principals in grants_config.items():
1424            if not principals:
1425                continue
1426
1427            noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals]
1428            args: t.Dict[str, t.Any] = {
1429                "privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))],
1430                "securable": table.copy(),
1431                "principals": noramlized_principals,
1432            }
1433
1434            if object_kind:
1435                args["kind"] = exp.Var(this=object_kind)
1436
1437            expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
1438
1439        return expressions
1440
1441
1442class _ErrorCounter:
1443    """
1444    A class that counts errors and determines whether or not to retry based on the number of errors and the error
1445    type.
1446
1447    Reference implementation: https://github.com/dbt-labs/dbt-bigquery/blob/8339a034929b12e027f0a143abf46582f3f6ffbc/dbt/adapters/bigquery/connections.py#L672
1448
1449    TODO: Implement a retry configuration that works across all engines
1450    """
1451
1452    def __init__(self, num_retries: int) -> None:
1453        self.num_retries = num_retries
1454        self.error_count = 0
1455
1456    @property
1457    def retryable_errors(self) -> t.Tuple[t.Type[Exception], ...]:
1458        try:
1459            from google.cloud.exceptions import ServerError
1460        except ModuleNotFoundError:
1461            from google.api_core.exceptions import ServerError
1462        from requests.exceptions import ConnectionError
1463
1464        return (ServerError, ConnectionError)
1465
1466    def _is_retryable(self, error: BaseException) -> bool:
1467        from google.api_core.exceptions import Forbidden
1468
1469        if isinstance(error, self.retryable_errors):
1470            return True
1471        if isinstance(error, Forbidden) and any(
1472            e["reason"] == "rateLimitExceeded" for e in error.errors
1473        ):
1474            return True
1475        return False
1476
1477    def should_retry(self, error: BaseException) -> bool:
1478        if self.num_retries == 0:
1479            return False
1480        self.error_count += 1
1481        if self._is_retryable(error) and self.error_count <= self.num_retries:
1482            logger.info(f"Retry Num {self.error_count} of {self.num_retries}. Error: {repr(error)}")
1483            return True
1484        return False
1485
1486
1487def select_partitions_expr(
1488    schema: str,
1489    table_name: str,
1490    data_type: t.Union[str, exp.DataType],
1491    granularity: t.Optional[str] = None,
1492    agg_func: str = "MAX",
1493    catalog: t.Optional[str] = None,
1494) -> str:
1495    """Generates a SQL expression that aggregates partition values for a table.
1496
1497    Args:
1498        schema: The schema (BigQuery dataset) of the table.
1499        table_name: The name of the table.
1500        data_type: The data type of the partition column.
1501        granularity: The granularity of the partition. Supported values are: 'day', 'month', 'year' and 'hour'.
1502        agg_func: The aggregation function to use.
1503        catalog: The catalog (BigQuery project ID) of the table.
1504
1505    Returns:
1506        A SELECT statement that aggregates partition values for a table.
1507    """
1508    partitions_table_name = f"`{schema}`.INFORMATION_SCHEMA.PARTITIONS"
1509    if catalog:
1510        partitions_table_name = f"`{catalog}`.{partitions_table_name}"
1511
1512    if isinstance(data_type, exp.DataType):
1513        data_type = data_type.sql(dialect="bigquery")
1514    data_type = data_type.upper()
1515
1516    parse_fun = f"PARSE_{data_type}" if data_type in ("DATE", "DATETIME", "TIMESTAMP") else None
1517    if parse_fun:
1518        granularity = granularity or "day"
1519        parse_format = GRANULARITY_TO_PARTITION_FORMAT[granularity.lower()]
1520        partition_expr = exp.func(
1521            parse_fun,
1522            exp.Literal.string(parse_format),
1523            exp.column("partition_id"),
1524            dialect="bigquery",
1525        )
1526    else:
1527        partition_expr = exp.cast(exp.column("partition_id"), "INT64", dialect="bigquery")
1528
1529    return (
1530        exp.select(exp.func(agg_func, partition_expr))
1531        .from_(partitions_table_name, dialect="bigquery")
1532        .where(
1533            f"table_name = '{table_name}' AND partition_id IS NOT NULL AND partition_id != '__NULL__'",
1534            copy=False,
1535        )
1536        .sql(dialect="bigquery")
1537    )
1538
1539
1540GRANULARITY_TO_PARTITION_FORMAT = {
1541    "day": "%Y%m%d",
1542    "month": "%Y%m",
1543    "year": "%Y",
1544    "hour": "%Y%m%d%H",
1545}
logger = <Logger sqlmesh.core.engine_adapter.bigquery (WARNING)>
NestedField = typing.Tuple[str, str, typing.List[str]]
NestedFieldsDict = typing.Dict[str, typing.List[typing.Tuple[str, str, typing.List[str]]]]
  59@set_catalog()
  60class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin):
  61    """
  62    BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API.
  63    """
  64
  65    DIALECT = "bigquery"
  66    DEFAULT_BATCH_SIZE = 1000
  67    SUPPORTS_TRANSACTIONS = False
  68    SUPPORTS_MATERIALIZED_VIEWS = True
  69    SUPPORTS_CLONING = True
  70    SUPPORTS_GRANTS = True
  71    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.func("session_user")
  72    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
  73    USE_CATALOG_IN_GRANTS = True
  74    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES"
  75    MAX_TABLE_COMMENT_LENGTH = 1024
  76    MAX_COLUMN_COMMENT_LENGTH = 1024
  77    SUPPORTS_QUERY_EXECUTION_TRACKING = True
  78    SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]
  79    INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE
  80
  81    SCHEMA_DIFFER_KWARGS = {
  82        "compatible_types": {
  83            exp.DataType.build("INT64", dialect=DIALECT): {
  84                exp.DataType.build("NUMERIC", dialect=DIALECT),
  85                exp.DataType.build("FLOAT64", dialect=DIALECT),
  86                exp.DataType.build("BIGNUMERIC", dialect=DIALECT),
  87            },
  88            exp.DataType.build("NUMERIC", dialect=DIALECT): {
  89                exp.DataType.build("FLOAT64", dialect=DIALECT),
  90                exp.DataType.build("BIGNUMERIC", dialect=DIALECT),
  91            },
  92            exp.DataType.build("DATE", dialect=DIALECT): {
  93                exp.DataType.build("DATETIME", dialect=DIALECT),
  94            },
  95        },
  96        "coerceable_types": {
  97            exp.DataType.build("FLOAT64", dialect=DIALECT): {
  98                exp.DataType.build("BIGNUMERIC", dialect=DIALECT),
  99            },
 100        },
 101        "support_coercing_compatible_types": True,
 102        "parameterized_type_defaults": {
 103            exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(38, 9), (0,)],
 104            exp.DataType.build("BIGDECIMAL", dialect=DIALECT).this: [(76.76, 38), (0,)],
 105        },
 106        "types_with_unlimited_length": {
 107            # parameterized `STRING(n)` can ALTER to unparameterized `STRING`
 108            exp.DataType.build("STRING", dialect=DIALECT).this: {
 109                exp.DataType.build("STRING", dialect=DIALECT).this,
 110            },
 111            # parameterized `BYTES(n)` can ALTER to unparameterized `BYTES`
 112            exp.DataType.build("BYTES", dialect=DIALECT).this: {
 113                exp.DataType.build("BYTES", dialect=DIALECT).this,
 114            },
 115        },
 116        "nested_support": NestedSupport.ALL_BUT_DROP,
 117    }
 118
 119    @property
 120    def client(self) -> BigQueryClient:
 121        return self.connection._client
 122
 123    @property
 124    def bigframe(self) -> t.Optional[BigframeSession]:
 125        if bigframes:
 126            options = bigframes.BigQueryOptions(
 127                credentials=self.client._credentials,
 128                project=self.client.project,
 129                location=self.client.location,
 130            )
 131            return bigframes.connect(context=options)
 132        return None
 133
 134    @property
 135    def _job_params(self) -> t.Dict[str, t.Any]:
 136        from sqlmesh.core.config.connection import BigQueryPriority
 137
 138        params = {
 139            "use_legacy_sql": False,
 140            "priority": self._extra_config.get(
 141                "priority", BigQueryPriority.INTERACTIVE.bigquery_constant
 142            ),
 143        }
 144        if self._extra_config.get("maximum_bytes_billed") is not None:
 145            params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed")
 146        if self._extra_config.get("reservation") is not None:
 147            params["reservation"] = self._extra_config.get("reservation")
 148        if self.correlation_id:
 149            # BigQuery label keys must be lowercase
 150            key = self.correlation_id.job_type.value.lower()
 151            params["labels"] = {key: self.correlation_id.job_id}
 152        return params
 153
 154    @property
 155    def catalog_support(self) -> CatalogSupport:
 156        return CatalogSupport.FULL_SUPPORT
 157
 158    def _df_to_source_queries(
 159        self,
 160        df: DF,
 161        target_columns_to_types: t.Dict[str, exp.DataType],
 162        batch_size: int,
 163        target_table: TableName,
 164        source_columns: t.Optional[t.List[str]] = None,
 165    ) -> t.List[SourceQuery]:
 166        import pandas as pd
 167
 168        source_columns_to_types = get_source_columns_to_types(
 169            target_columns_to_types, source_columns
 170        )
 171
 172        temp_bq_table = self.__get_temp_bq_table(
 173            self._get_temp_table(target_table or "pandas"), source_columns_to_types
 174        )
 175        temp_table = exp.table_(
 176            temp_bq_table.table_id,
 177            db=temp_bq_table.dataset_id,
 178            catalog=temp_bq_table.project,
 179        )
 180
 181        def query_factory() -> Query:
 182            ordered_df = df[list(source_columns_to_types)]
 183            if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame):
 184                ordered_df.to_gbq(
 185                    f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}",
 186                    if_exists="replace",
 187                )
 188            elif not self.table_exists(temp_table):
 189                # Make mypy happy
 190                assert isinstance(ordered_df, pd.DataFrame)
 191                self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False)
 192                result = self.__load_pandas_to_table(
 193                    temp_bq_table, ordered_df, source_columns_to_types, replace=False
 194                )
 195                if result.errors:
 196                    raise SQLMeshError(result.errors)
 197            return exp.select(
 198                *self._casted_columns(target_columns_to_types, source_columns=source_columns)
 199            ).from_(temp_table)
 200
 201        return [
 202            SourceQuery(
 203                query_factory=query_factory,
 204                cleanup_func=lambda: self.drop_table(temp_table),
 205            )
 206        ]
 207
 208    def close(self) -> t.Any:
 209        # Cancel all pending query jobs across all threads
 210        all_query_jobs = self._connection_pool.get_all_attributes("query_job")
 211        for query_job in all_query_jobs:
 212            if query_job:
 213                try:
 214                    if not self._db_call(query_job.done):
 215                        self._db_call(query_job.cancel)
 216                        logger.debug(
 217                            "Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
 218                            query_job.project,
 219                            query_job.location,
 220                            query_job.job_id,
 221                        )
 222                except Exception as ex:
 223                    logger.debug(
 224                        "Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s",
 225                        query_job.project,
 226                        query_job.location,
 227                        query_job.job_id,
 228                        str(ex),
 229                    )
 230
 231        return super().close()
 232
 233    def _begin_session(self, properties: SessionProperties) -> None:
 234        from google.cloud.bigquery import QueryJobConfig
 235
 236        query_label_property = properties.get("query_label")
 237        parsed_query_label: list[tuple[str, str]] = []
 238        if isinstance(query_label_property, (exp.Array, exp.Paren, exp.Tuple)):
 239            label_tuples = (
 240                [query_label_property.unnest()]
 241                if isinstance(query_label_property, exp.Paren)
 242                else query_label_property.expressions
 243            )
 244
 245            # query_label is a Paren, Array or Tuple of 2-tuples and validated at load time
 246            parsed_query_label.extend(
 247                (label_tuple.expressions[0].name, label_tuple.expressions[1].name)
 248                for label_tuple in label_tuples
 249            )
 250        elif query_label_property is not None:
 251            raise SQLMeshError(
 252                "Invalid value for `session_properties.query_label`. Must be an array or tuple."
 253            )
 254
 255        if self.correlation_id:
 256            parsed_query_label.append(
 257                (self.correlation_id.job_type.value.lower(), self.correlation_id.job_id)
 258            )
 259
 260        if parsed_query_label:
 261            query_label_str = ",".join([":".join(label) for label in parsed_query_label])
 262            query = f'SET @@query_label = "{query_label_str}";SELECT 1;'
 263        else:
 264            query = "SELECT 1;"
 265
 266        job = self.client.query(
 267            query,
 268            job_config=QueryJobConfig(create_session=True),
 269        )
 270        session_info = job.session_info
 271        session_id = session_info.session_id if session_info else None
 272        self._session_id = session_id
 273        job.result()
 274
 275    def _end_session(self) -> None:
 276        self._session_id = None
 277
 278    def _is_session_active(self) -> bool:
 279        return self._session_id is not None
 280
 281    def get_current_catalog(self) -> t.Optional[str]:
 282        """Returns the catalog name of the current connection."""
 283        return self.client.project
 284
 285    def set_current_catalog(self, catalog: str) -> None:
 286        """Sets the catalog name of the current connection."""
 287        self.client.project = catalog
 288
 289    def create_schema(
 290        self,
 291        schema_name: SchemaName,
 292        ignore_if_exists: bool = True,
 293        warn_on_error: bool = True,
 294        properties: t.List[exp.Expr] = [],
 295    ) -> None:
 296        """Create a schema from a name or qualified table name."""
 297        from google.api_core.exceptions import Conflict
 298
 299        try:
 300            super().create_schema(
 301                schema_name,
 302                ignore_if_exists=ignore_if_exists,
 303                warn_on_error=False,
 304            )
 305        except Exception as e:
 306            is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e)
 307            if is_already_exists_error and ignore_if_exists:
 308                return
 309            if not warn_on_error:
 310                raise
 311            logger.warning("Failed to create schema '%s': %s", schema_name, e)
 312
 313    def get_bq_schema(self, table_name: TableName) -> t.List[bigquery.SchemaField]:
 314        table = exp.to_table(table_name)
 315        if len(table.parts) == 3 and "." in table.name:
 316            self.execute(exp.select("*").from_(table).limit(0))
 317            query_job = self._query_job
 318            assert query_job is not None
 319            return query_job._query_results.schema
 320        return self._get_table(table).schema
 321
 322    def columns(
 323        self, table_name: TableName, include_pseudo_columns: bool = False
 324    ) -> t.Dict[str, exp.DataType]:
 325        """Fetches column names and types for the target table."""
 326
 327        def dtype_to_sql(
 328            dtype: t.Optional[StandardSqlDataType], field: bigquery.SchemaField
 329        ) -> str:
 330            assert dtype
 331            assert field
 332
 333            kind = dtype.type_kind
 334            assert kind
 335
 336            # Not using the enum value to preserve compatibility with older versions
 337            # of the BigQuery library.
 338            if kind.name == "ARRAY":
 339                return f"ARRAY<{dtype_to_sql(dtype.array_element_type, field)}>"
 340            if kind.name == "STRUCT":
 341                struct_type = dtype.struct_type
 342                assert struct_type
 343                fields = ", ".join(
 344                    f"{struct_field.name} {dtype_to_sql(struct_field.type, nested_field)}"
 345                    for struct_field, nested_field in zip(struct_type.fields, field.fields)
 346                )
 347                return f"STRUCT<{fields}>"
 348            if kind.name == "TYPE_KIND_UNSPECIFIED":
 349                field_type = field.field_type
 350
 351                if field_type == "RANGE":
 352                    # If the field is a RANGE then `range_element_type` should be set to
 353                    # one of `"DATE"`, `"DATETIME"` or `"TIMESTAMP"`.
 354                    return f"RANGE<{field.range_element_type.element_type}>"
 355
 356                return field_type
 357
 358            return kind.name
 359
 360        def create_mapping_schema(
 361            schema: t.Sequence[bigquery.SchemaField],
 362        ) -> t.Dict[str, exp.DataType]:
 363            return {
 364                field.name: exp.DataType.build(
 365                    dtype_to_sql(field.to_standard_sql().type, field), dialect=self.dialect
 366                )
 367                for field in schema
 368            }
 369
 370        table = exp.to_table(table_name)
 371        if len(table.parts) == 3 and "." in table.name:
 372            # The client's `get_table` method can't handle paths with >3 identifiers
 373            self.execute(exp.select("*").from_(table).limit(0))
 374            query_job = self._query_job
 375            assert query_job is not None
 376
 377            query_results = query_job._query_results
 378            columns = create_mapping_schema(query_results.schema)
 379        else:
 380            bq_table = self._get_table(table)
 381            columns = create_mapping_schema(bq_table.schema)
 382
 383            if include_pseudo_columns:
 384                if bq_table.time_partitioning and not bq_table.time_partitioning.field:
 385                    columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP", dialect="bigquery")
 386                    if bq_table.time_partitioning.type_ == "DAY":
 387                        columns["_PARTITIONDATE"] = exp.DataType.build("DATE")
 388                if bq_table.table_id.endswith("*"):
 389                    columns["_TABLE_SUFFIX"] = exp.DataType.build("STRING", dialect="bigquery")
 390                if (
 391                    bq_table.external_data_configuration is not None
 392                    and bq_table.external_data_configuration.source_format
 393                    in (
 394                        "CSV",
 395                        "NEWLINE_DELIMITED_JSON",
 396                        "AVRO",
 397                        "PARQUET",
 398                        "ORC",
 399                        "DATASTORE_BACKUP",
 400                    )
 401                ):
 402                    columns["_FILE_NAME"] = exp.DataType.build("STRING", dialect="bigquery")
 403
 404        return columns
 405
 406    def alter_table(
 407        self,
 408        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
 409    ) -> None:
 410        """
 411        Performs the alter statements to change the current table into the structure of the target table,
 412        and uses the API to add columns to structs, where SQL is not supported.
 413        """
 414        if not alter_expressions:
 415            return
 416
 417        cluster_by_operations, alter_statements = [], []
 418        for e in alter_expressions:
 419            if isinstance(e, TableAlterClusterByOperation):
 420                cluster_by_operations.append(e)
 421            elif isinstance(e, TableAlterOperation):
 422                alter_statements.append(e.expression)
 423            else:
 424                alter_statements.append(e)
 425
 426        for op in cluster_by_operations:
 427            self._update_clustering_key(op)
 428
 429        nested_fields, non_nested_expressions = self._split_alter_expressions(alter_statements)
 430
 431        if nested_fields:
 432            self._update_table_schema_nested_fields(nested_fields, alter_statements[0].this)
 433
 434        if non_nested_expressions:
 435            super().alter_table(non_nested_expressions)
 436
 437    def fetchone(
 438        self,
 439        query: t.Union[exp.Expr, str],
 440        ignore_unsupported_errors: bool = False,
 441        quote_identifiers: bool = False,
 442    ) -> t.Optional[t.Tuple]:
 443        """
 444        BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute
 445        configuration we have in place. Therefore this implementation calls execute instead.
 446        """
 447        self.execute(
 448            query,
 449            ignore_unsupported_errors=ignore_unsupported_errors,
 450            quote_identifiers=quote_identifiers,
 451        )
 452        try:
 453            return next(self._query_data)
 454        except StopIteration:
 455            return None
 456
 457    def fetchall(
 458        self,
 459        query: t.Union[exp.Expr, str],
 460        ignore_unsupported_errors: bool = False,
 461        quote_identifiers: bool = False,
 462    ) -> t.List[t.Tuple]:
 463        """
 464        BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute
 465        configuration we have in place. Therefore this implementation calls execute instead.
 466        """
 467        self.execute(
 468            query,
 469            ignore_unsupported_errors=ignore_unsupported_errors,
 470            quote_identifiers=quote_identifiers,
 471        )
 472        return list(self._query_data)
 473
 474    def _split_alter_expressions(
 475        self,
 476        alter_expressions: t.List[exp.Alter],
 477    ) -> t.Tuple[NestedFieldsDict, t.List[exp.Alter]]:
 478        """
 479        Returns a dictionary of the nested fields to add and a list of the non-nested alter expressions.
 480        """
 481        nested_fields_to_add: NestedFieldsDict = defaultdict(list)
 482        non_nested_expressions = []
 483
 484        for alter_expression in alter_expressions:
 485            action = alter_expression.args["actions"][0]
 486            if (
 487                isinstance(action, exp.ColumnDef)
 488                and isinstance(action.this, exp.Dot)
 489                and isinstance(action.kind, exp.DataType)
 490            ):
 491                root_field, *leaf_fields = action.this.this.sql(dialect=self.dialect).split(".")
 492                new_field = action.this.expression.sql(dialect=self.dialect)
 493                data_type = action.kind.sql(dialect=self.dialect)
 494                nested_fields_to_add[root_field].append((new_field, data_type, leaf_fields))
 495            else:
 496                non_nested_expressions.append(alter_expression)
 497
 498        return nested_fields_to_add, non_nested_expressions
 499
 500    def _build_nested_fields(
 501        self,
 502        current_fields: t.List[bigquery.SchemaField],
 503        fields_to_add: t.List[NestedField],
 504    ) -> t.List[bigquery.SchemaField]:
 505        """
 506        Recursively builds and updates the schema fields with the new nested fields.
 507        """
 508        from google.cloud import bigquery
 509
 510        new_fields = []
 511        root: t.List[t.Tuple[str, str]] = []
 512        leaves: NestedFieldsDict = defaultdict(list)
 513        for new_field, data_type, leaf_fields in fields_to_add:
 514            if leaf_fields:
 515                leaves[leaf_fields[0]].append((new_field, data_type, leaf_fields[1:]))
 516            else:
 517                root.append((new_field, data_type))
 518
 519        for field in current_fields:
 520            # If the new fields are nested, we need to recursively build them
 521            if field.name in leaves:
 522                subfields = list(field.fields)
 523                subfields = self._build_nested_fields(subfields, leaves[field.name])
 524                new_fields.append(
 525                    bigquery.SchemaField(
 526                        field.name, "RECORD", mode=field.mode, fields=tuple(subfields)
 527                    )
 528                )
 529            else:
 530                new_fields.append(field)
 531
 532        # Build and append the new root-level fields
 533        new_fields.extend(
 534            self.__get_bq_schemafield(
 535                new_field[0], exp.DataType.build(new_field[1], dialect=self.dialect)
 536            )
 537            for new_field in root
 538        )
 539        return new_fields
 540
 541    def _update_table_schema_nested_fields(
 542        self, nested_fields_to_add: NestedFieldsDict, table_name: str
 543    ) -> None:
 544        """
 545        Updates a BigQuery table schema by adding the new nested fields provided.
 546        """
 547        from google.cloud import bigquery
 548
 549        table = self._get_table(table_name)
 550        original_schema = table.schema
 551        new_schema = []
 552        for field in original_schema:
 553            if field.name in nested_fields_to_add:
 554                fields = self._build_nested_fields(
 555                    list(field.fields), nested_fields_to_add[field.name]
 556                )
 557                new_schema.append(
 558                    bigquery.SchemaField(
 559                        field.name,
 560                        "RECORD",
 561                        mode=field.mode,
 562                        fields=tuple(fields),
 563                    )
 564                )
 565            else:
 566                new_schema.append(field)
 567
 568        if new_schema != original_schema:
 569            table.schema = new_schema
 570            self.client.update_table(table, ["schema"])
 571
 572    def __load_pandas_to_table(
 573        self,
 574        table: bigquery.Table,
 575        df: pd.DataFrame,
 576        columns_to_types: t.Dict[str, exp.DataType],
 577        replace: bool = False,
 578    ) -> BigQueryQueryResult:
 579        """
 580        Loads a pandas dataframe into a table in BigQuery. Will do an overwrite if replace is True. Note that
 581        the replace will replace the entire table, not just the rows that are in the dataframe.
 582        """
 583        from google.cloud import bigquery
 584
 585        job_config = bigquery.job.LoadJobConfig(schema=self.__get_bq_schema(columns_to_types))
 586        if replace:
 587            job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
 588        logger.info(f"Loading dataframe to BigQuery. Table Path: {table.path}")
 589        # This client call does not support retry so we don't use the `_db_call` method.
 590        result = self.__retry(
 591            self.__db_load_table_from_dataframe,
 592        )(df=df, table=table, job_config=job_config)
 593        if result.errors:
 594            raise SQLMeshError(result.errors)
 595        return result
 596
 597    def __db_load_table_from_dataframe(
 598        self, df: pd.DataFrame, table: bigquery.Table, job_config: bigquery.LoadJobConfig
 599    ) -> BigQueryQueryResult:
 600        job = self.client.load_table_from_dataframe(
 601            dataframe=df, destination=table, job_config=job_config
 602        )
 603        return self._db_call(job.result)
 604
 605    def __get_bq_schemafield(self, name: str, tpe: exp.DataType) -> bigquery.SchemaField:
 606        from google.cloud import bigquery
 607
 608        mode = "NULLABLE"
 609        if tpe.is_type(exp.DataType.Type.ARRAY):
 610            mode = "REPEATED"
 611            tpe = tpe.expressions[0]
 612
 613        field_type = tpe.sql(dialect=self.dialect)
 614        fields = []
 615        if tpe.is_type(*exp.DataType.NESTED_TYPES):
 616            field_type = "RECORD"
 617            for inner_field in tpe.expressions:
 618                if isinstance(inner_field, exp.ColumnDef):
 619                    inner_name = inner_field.this.sql(dialect=self.dialect)
 620                    inner_type = inner_field.kind
 621                    if inner_type is None:
 622                        raise ValueError(
 623                            f"cannot convert unknown type to BQ schema field {inner_field}"
 624                        )
 625                    fields.append(self.__get_bq_schemafield(name=inner_name, tpe=inner_type))
 626                else:
 627                    raise ValueError(f"unexpected nested expression {inner_field}")
 628
 629        return bigquery.SchemaField(
 630            name=name,
 631            field_type=field_type,
 632            mode=mode,
 633            fields=fields,
 634        )
 635
 636    def __get_bq_schema(
 637        self, columns_to_types: t.Dict[str, exp.DataType]
 638    ) -> t.List[bigquery.SchemaField]:
 639        """
 640        Returns a bigquery schema object from a dictionary of column names to types.
 641        """
 642
 643        precisionless_col_to_types = {
 644            col_name: remove_precision_parameterized_types(col_type)
 645            for col_name, col_type in columns_to_types.items()
 646        }
 647        return [
 648            self.__get_bq_schemafield(name=col_name, tpe=t.cast(exp.DataType, col_type))
 649            for col_name, col_type in precisionless_col_to_types.items()
 650        ]
 651
 652    def __get_temp_bq_table(
 653        self, table: exp.Table, columns_to_type: t.Dict[str, exp.DataType]
 654    ) -> bigquery.Table:
 655        """
 656        Returns a bigquery table object that is temporary and will expire in 3 hours.
 657        """
 658        bq_table = self.__get_bq_table(table, columns_to_type)
 659        bq_table.expires = to_datetime("in 3 hours")
 660        return bq_table
 661
 662    def __get_bq_table(
 663        self, table: TableName, columns_to_type: t.Dict[str, exp.DataType]
 664    ) -> bigquery.Table:
 665        """
 666        Returns a bigquery table object with a schema defines that matches the columns_to_type dictionary.
 667        """
 668        from google.cloud import bigquery
 669
 670        table_ = exp.to_table(table).copy()
 671
 672        if not table_.catalog:
 673            table_.set("catalog", exp.to_identifier(self.default_catalog))
 674
 675        return bigquery.Table(
 676            table_ref=self._table_name(table_),
 677            schema=self.__get_bq_schema(columns_to_type),
 678        )
 679
 680    @property
 681    def __retry(self) -> Retry:
 682        from google.api_core import retry
 683
 684        return retry.Retry(
 685            predicate=_ErrorCounter(self._extra_config["job_retries"]).should_retry,
 686            deadline=self._extra_config.get("job_retry_deadline_seconds"),
 687            initial=1.0,
 688            maximum=3.0,
 689        )
 690
 691    def insert_overwrite_by_partition(
 692        self,
 693        table_name: TableName,
 694        query_or_df: QueryOrDF,
 695        partitioned_by: t.List[exp.Expr],
 696        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 697        source_columns: t.Optional[t.List[str]] = None,
 698    ) -> None:
 699        if len(partitioned_by) != 1:
 700            raise SQLMeshError(
 701                f"Bigquery only supports partitioning by one column, {len(partitioned_by)} were provided."
 702            )
 703
 704        partition_exp = partitioned_by[0]
 705        partition_column = partition_exp.find(exp.Column)
 706
 707        granularity = partition_exp.args.get("unit")
 708        if granularity:
 709            granularity = granularity.name.lower()
 710
 711        if not partition_column:
 712            partition_sql = partition_exp.sql(dialect=self.dialect)
 713            raise SQLMeshError(
 714                f"The partition expression '{partition_sql}' doesn't contain a column."
 715            )
 716        with (
 717            self.session({}),
 718            self.temp_table(
 719                query_or_df,
 720                name=table_name,
 721                partitioned_by=partitioned_by,
 722                source_columns=source_columns,
 723            ) as temp_table_name,
 724        ):
 725            if target_columns_to_types is None or target_columns_to_types[
 726                partition_column.name
 727            ] == exp.DataType.build("unknown"):
 728                target_columns_to_types = self.columns(table_name)
 729
 730            partition_type_sql = target_columns_to_types[partition_column.name].sql(
 731                dialect=self.dialect
 732            )
 733
 734            select_array_agg_partitions = select_partitions_expr(
 735                temp_table_name.db,
 736                temp_table_name.name,
 737                partition_type_sql,
 738                granularity=granularity,
 739                agg_func="ARRAY_AGG",
 740                catalog=temp_table_name.catalog or self.default_catalog,
 741            )
 742
 743            self.execute(
 744                f"DECLARE _sqlmesh_target_partitions_ ARRAY<{partition_type_sql}> DEFAULT ({select_array_agg_partitions});"
 745            )
 746
 747            where = t.cast(exp.Condition, partition_exp).isin(unnest="_sqlmesh_target_partitions_")
 748
 749            self._insert_overwrite_by_condition(
 750                table_name,
 751                [SourceQuery(query_factory=lambda: exp.select("*").from_(temp_table_name))],
 752                target_columns_to_types,
 753                where=where,
 754            )
 755
 756    def table_exists(self, table_name: TableName) -> bool:
 757        table = exp.to_table(table_name)
 758        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
 759        if data_object_cache_key in self._data_object_cache:
 760            logger.debug("Table existence cache hit: %s", data_object_cache_key)
 761            return self._data_object_cache[data_object_cache_key] is not None
 762
 763        try:
 764            from google.cloud.exceptions import NotFound
 765        except ModuleNotFoundError:
 766            from google.api_core.exceptions import NotFound
 767
 768        try:
 769            self._get_table(table_name)
 770            return True
 771        except NotFound:
 772            return False
 773
 774    def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]:
 775        from sqlmesh.utils.date import to_timestamp
 776
 777        datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list)
 778        for table_name in table_names:
 779            table = exp.to_table(table_name)
 780            datasets_to_tables[table.db].append(table.name)
 781
 782        results = []
 783
 784        for dataset, tables in datasets_to_tables.items():
 785            query = (
 786                f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE "
 787            )
 788            for i, table_name in enumerate(tables):
 789                query += f"TABLE_ID = '{table_name}'"
 790                if i < len(tables) - 1:
 791                    query += " OR "
 792            results.extend(self.fetchall(query))
 793
 794        return [to_timestamp(row[0]) for row in results]
 795
 796    def _get_table(self, table_name: TableName) -> BigQueryTable:
 797        """
 798        Returns a BigQueryTable object for the given table name.
 799
 800        Raises: `google.cloud.exceptions.NotFound` if the table does not exist.
 801        """
 802        return self._db_call(self.client.get_table, table=self._table_name(table_name))
 803
 804    def _table_name(self, table_name: TableName) -> str:
 805        # the api doesn't support backticks, so we can't call exp.table_name or sql
 806        return ".".join(part.name for part in exp.to_table(table_name).parts)
 807
 808    def _fetch_native_df(
 809        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 810    ) -> DF:
 811        self.execute(query, quote_identifiers=quote_identifiers)
 812        query_job = self._query_job
 813        assert query_job is not None
 814        return query_job.to_dataframe()
 815
 816    def _create_column_comments(
 817        self,
 818        table_name: TableName,
 819        column_comments: t.Dict[str, str],
 820        table_kind: str = "TABLE",
 821        materialized_view: bool = False,
 822    ) -> None:
 823        if not (table_kind == "VIEW" and materialized_view):
 824            table = self._get_table(table_name)
 825
 826            # convert Table object to dict
 827            table_def = table.to_api_repr()
 828
 829            # Set column descriptions, supporting nested fields (e.g. record.field.nested_field)
 830            for column, comment in column_comments.items():
 831                fields = table_def["schema"]["fields"]
 832                field_names = column.split(".")
 833                last_index = len(field_names) - 1
 834
 835                # Traverse the fields with nested fields down to leaf level
 836                for idx, name in enumerate(field_names):
 837                    if field := next((field for field in fields if field["name"] == name), None):
 838                        if idx == last_index:
 839                            field["description"] = self._truncate_comment(
 840                                comment, self.MAX_COLUMN_COMMENT_LENGTH
 841                            )
 842                        else:
 843                            fields = field.get("fields") or []
 844
 845            # An "etag" is BQ versioning metadata that changes when an object is updated/modified. `update_table`
 846            # compares the etags of the table object passed to it and the remote table, erroring if the etags
 847            # don't match. We set the local etag to None to avoid this check.
 848            table_def["etag"] = None
 849
 850            # convert dict back to a Table object
 851            table = table.from_api_repr(table_def)
 852
 853            # update table schema
 854            logger.info(f"Registering column comments for table {table_name}")
 855            self._db_call(self.client.update_table, table=table, fields=["schema"])
 856
 857    def _build_description_property_exp(
 858        self,
 859        description: str,
 860        trunc_method: t.Callable,
 861    ) -> exp.Property:
 862        return exp.Property(
 863            this=exp.to_identifier("description", quoted=True),
 864            value=exp.Literal.string(trunc_method(description)),
 865        )
 866
 867    def _build_partitioned_by_exp(
 868        self,
 869        partitioned_by: t.List[exp.Expr],
 870        *,
 871        partition_interval_unit: t.Optional[IntervalUnit] = None,
 872        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 873        **kwargs: t.Any,
 874    ) -> t.Optional[exp.PartitionedByProperty]:
 875        if len(partitioned_by) > 1:
 876            raise SQLMeshError("BigQuery only supports partitioning by a single column")
 877
 878        this = partitioned_by[0]
 879        if (
 880            isinstance(this, exp.Column)
 881            and partition_interval_unit is not None
 882            and not partition_interval_unit.is_minute
 883        ):
 884            column_type: t.Optional[exp.DataType] = (target_columns_to_types or {}).get(this.name)
 885
 886            if column_type == exp.DataType.build(
 887                "date", dialect=self.dialect
 888            ) and partition_interval_unit in (
 889                IntervalUnit.MONTH,
 890                IntervalUnit.YEAR,
 891            ):
 892                trunc_func = "DATE_TRUNC"
 893            elif column_type == exp.DataType.build("timestamp", dialect=self.dialect):
 894                trunc_func = "TIMESTAMP_TRUNC"
 895            elif column_type == exp.DataType.build("datetime", dialect=self.dialect):
 896                trunc_func = "DATETIME_TRUNC"
 897            else:
 898                trunc_func = ""
 899
 900            if trunc_func:
 901                this = exp.func(
 902                    trunc_func,
 903                    this,
 904                    exp.var(partition_interval_unit.value.upper()),
 905                    dialect=self.dialect,
 906                )
 907
 908        return exp.PartitionedByProperty(this=this)
 909
 910    def _build_table_properties_exp(
 911        self,
 912        catalog_name: t.Optional[str] = None,
 913        table_format: t.Optional[str] = None,
 914        storage_format: t.Optional[str] = None,
 915        partitioned_by: t.Optional[t.List[exp.Expr]] = None,
 916        partition_interval_unit: t.Optional[IntervalUnit] = None,
 917        clustered_by: t.Optional[t.List[exp.Expr]] = None,
 918        table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
 919        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
 920        table_description: t.Optional[str] = None,
 921        table_kind: t.Optional[str] = None,
 922        **kwargs: t.Any,
 923    ) -> t.Optional[exp.Properties]:
 924        properties: t.List[exp.Expr] = []
 925
 926        if partitioned_by and (
 927            partitioned_by_prop := self._build_partitioned_by_exp(
 928                partitioned_by,
 929                partition_interval_unit=partition_interval_unit,
 930                target_columns_to_types=target_columns_to_types,
 931            )
 932        ):
 933            properties.append(partitioned_by_prop)
 934
 935        if clustered_by and (clustered_by_exp := self._build_clustered_by_exp(clustered_by)):
 936            properties.append(clustered_by_exp)
 937
 938        if table_description:
 939            properties.append(
 940                self._build_description_property_exp(
 941                    table_description, self._truncate_table_comment
 942                ),
 943            )
 944
 945        properties.extend(self._table_or_view_properties_to_expressions(table_properties))
 946
 947        if properties:
 948            return exp.Properties(expressions=properties)
 949        return None
 950
 951    def _build_column_def(
 952        self,
 953        col_name: str,
 954        column_descriptions: t.Optional[t.Dict[str, str]] = None,
 955        engine_supports_schema_comments: bool = False,
 956        col_type: t.Optional[exp.DATA_TYPE] = None,
 957        nested_names: t.List[str] = [],
 958    ) -> exp.ColumnDef:
 959        # Helper function to build column definitions with column descriptions
 960        def _build_struct_with_descriptions(
 961            col_type: exp.DataType,
 962            nested_names: t.List[str],
 963        ) -> exp.DataType:
 964            column_expressions = []
 965            for column_def in col_type.expressions:
 966                # This is expected to  be true, but this check is included as a
 967                # precautionary measure in case of an unexpected edge case
 968                if isinstance(column_def, exp.ColumnDef):
 969                    column = self._build_column_def(
 970                        col_name=column_def.name,
 971                        column_descriptions=column_descriptions,
 972                        engine_supports_schema_comments=engine_supports_schema_comments,
 973                        col_type=column_def.kind,
 974                        nested_names=nested_names,
 975                    )
 976                else:
 977                    column = column_def
 978                column_expressions.append(column)
 979            return exp.DataType(this=col_type.this, expressions=column_expressions, nested=True)
 980
 981        # Recursively build column definitions for BigQuery's RECORDs (struct) and REPEATED RECORDs (array of struct)
 982        if isinstance(col_type, exp.DataType) and col_type.expressions:
 983            expressions = col_type.expressions
 984            if col_type.is_type(exp.DataType.Type.STRUCT):
 985                col_type = _build_struct_with_descriptions(col_type, nested_names + [col_name])
 986            elif col_type.is_type(exp.DataType.Type.ARRAY) and expressions[0].is_type(
 987                exp.DataType.Type.STRUCT
 988            ):
 989                col_type = exp.DataType(
 990                    this=exp.DataType.Type.ARRAY,
 991                    expressions=[
 992                        _build_struct_with_descriptions(
 993                            col_type.expressions[0], nested_names + [col_name]
 994                        )
 995                    ],
 996                    nested=True,
 997                )
 998
 999        return exp.ColumnDef(
1000            this=exp.to_identifier(col_name),
1001            kind=col_type,
1002            constraints=(
1003                self._build_col_comment_exp(
1004                    ".".join(nested_names + [col_name]), column_descriptions
1005                )
1006                if engine_supports_schema_comments and self.comments_enabled and column_descriptions
1007                else None
1008            ),
1009        )
1010
1011    def _build_col_comment_exp(
1012        self, col_name: str, column_descriptions: t.Dict[str, str]
1013    ) -> t.List[exp.ColumnConstraint]:
1014        comment = column_descriptions.get(col_name, None)
1015        if comment:
1016            return [
1017                exp.ColumnConstraint(
1018                    kind=exp.Properties(
1019                        expressions=[
1020                            self._build_description_property_exp(
1021                                comment, self._truncate_column_comment
1022                            ),
1023                        ]
1024                    )
1025                )
1026            ]
1027        return []
1028
1029    def _build_view_properties_exp(
1030        self,
1031        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
1032        table_description: t.Optional[str] = None,
1033        **kwargs: t.Any,
1034    ) -> t.Optional[exp.Properties]:
1035        """Creates a SQLGlot table properties expression for view"""
1036        properties: t.List[exp.Expr] = []
1037
1038        if table_description:
1039            properties.append(
1040                self._build_description_property_exp(
1041                    table_description, self._truncate_table_comment
1042                ),
1043            )
1044
1045        properties.extend(self._table_or_view_properties_to_expressions(view_properties))
1046
1047        if properties:
1048            return exp.Properties(expressions=properties)
1049        return None
1050
1051    def _build_create_comment_table_exp(
1052        self, table: exp.Table, table_comment: str, table_kind: str
1053    ) -> exp.Comment | str:
1054        table_sql = table.sql(dialect=self.dialect, identify=True)
1055
1056        truncated_comment = self._truncate_table_comment(table_comment)
1057        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
1058
1059        return f"ALTER {table_kind} {table_sql} SET OPTIONS(description = {comment_sql})"
1060
1061    def _build_create_comment_column_exp(
1062        self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE"
1063    ) -> exp.Comment | str:
1064        table_sql = table.sql(dialect=self.dialect, identify=True)
1065        column_sql = exp.column(column_name).sql(dialect=self.dialect, identify=True)
1066
1067        truncated_comment = self._truncate_column_comment(column_comment)
1068        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
1069
1070        return f"ALTER {table_kind} {table_sql} ALTER COLUMN {column_sql} SET OPTIONS(description = {comment_sql})"
1071
1072    def create_state_table(
1073        self,
1074        table_name: str,
1075        target_columns_to_types: t.Dict[str, exp.DataType],
1076        primary_key: t.Optional[t.Tuple[str, ...]] = None,
1077    ) -> None:
1078        self.create_table(
1079            table_name,
1080            target_columns_to_types,
1081        )
1082
1083    def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any) -> t.Any:
1084        return func(
1085            retry=self.__retry,
1086            *args,
1087            **kwargs,
1088        )
1089
1090    def _execute(
1091        self,
1092        sql: str,
1093        track_rows_processed: bool = False,
1094        **kwargs: t.Any,
1095    ) -> None:
1096        """Execute a sql query."""
1097        from google.cloud.bigquery import QueryJobConfig
1098        from google.cloud.bigquery.query import ConnectionProperty
1099
1100        # BigQuery's Python DB API implementation does not support retries, so we have to implement them ourselves.
1101        # So we update the cursor's query job and query data with the results of the new query job. This makes sure
1102        # that other cursor based operations execute correctly.
1103        session_id = self._session_id
1104        connection_properties = (
1105            [
1106                ConnectionProperty(key="session_id", value=session_id),
1107            ]
1108            if session_id
1109            else []
1110        )
1111
1112        # Create job config
1113        job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties)
1114
1115        self._query_job = self._db_call(
1116            self.client.query,
1117            query=sql,
1118            job_config=job_config,
1119            timeout=self._extra_config.get("job_creation_timeout_seconds"),
1120        )
1121        query_job = self._query_job
1122        assert query_job is not None
1123
1124        logger.debug(
1125            "BigQuery job created: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
1126            query_job.project,
1127            query_job.location,
1128            query_job.job_id,
1129        )
1130
1131        results = self._db_call(
1132            query_job.result,
1133            timeout=self._extra_config.get("job_execution_timeout_seconds"),  # type: ignore
1134        )
1135
1136        self._query_data = iter(results) if results.total_rows else iter([])
1137        query_results = query_job._query_results
1138        self.cursor._set_rowcount(query_results)
1139        self.cursor._set_description(query_results.schema)
1140
1141        if (
1142            track_rows_processed
1143            and self._query_execution_tracker
1144            and self._query_execution_tracker.is_tracking()
1145        ):
1146            num_rows = None
1147            if query_job.statement_type == "CREATE_TABLE_AS_SELECT":
1148                # since table was just created, number rows in table == number rows processed
1149                query_table = self.client.get_table(query_job.destination)
1150                num_rows = query_table.num_rows
1151            elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]:
1152                num_rows = query_job.num_dml_affected_rows
1153
1154            self._query_execution_tracker.record_execution(
1155                sql, num_rows, query_job.total_bytes_processed
1156            )
1157
1158    def _get_data_objects(
1159        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
1160    ) -> t.List[DataObject]:
1161        """
1162        Returns all the data objects that exist in the given schema and optionally catalog.
1163        """
1164
1165        # The BigQuery Client's list_tables method does not support filtering by table name, so we have to
1166        # resort to using SQL instead.
1167        schema = to_schema(schema_name)
1168        catalog = schema.catalog or self.default_catalog
1169        query = (
1170            exp.select(
1171                exp.column("table_catalog").as_("catalog"),
1172                exp.column("table_name").as_("name"),
1173                exp.column("table_schema").as_("schema_name"),
1174                exp.case()
1175                .when(exp.column("table_type").eq("BASE TABLE"), exp.Literal.string("TABLE"))
1176                .when(exp.column("table_type").eq("CLONE"), exp.Literal.string("TABLE"))
1177                .when(exp.column("table_type").eq("EXTERNAL"), exp.Literal.string("TABLE"))
1178                .when(exp.column("table_type").eq("SNAPSHOT"), exp.Literal.string("TABLE"))
1179                .when(exp.column("table_type").eq("VIEW"), exp.Literal.string("VIEW"))
1180                .when(
1181                    exp.column("table_type").eq("MATERIALIZED VIEW"),
1182                    exp.Literal.string("MATERIALIZED_VIEW"),
1183                )
1184                .else_(exp.column("table_type"))
1185                .as_("type"),
1186                exp.column("clustering_key", "ci").as_("clustering_key"),
1187            )
1188            .with_(
1189                "clustering_info",
1190                as_=exp.select(
1191                    exp.column("table_catalog"),
1192                    exp.column("table_schema"),
1193                    exp.column("table_name"),
1194                    parse_one(
1195                        "string_agg(column_name order by clustering_ordinal_position)",
1196                        dialect=self.dialect,
1197                    ).as_("clustering_key"),
1198                )
1199                .from_(
1200                    exp.to_table(
1201                        f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.COLUMNS",
1202                        dialect=self.dialect,
1203                    )
1204                )
1205                .where(exp.column("clustering_ordinal_position").is_(exp.not_(exp.null())))
1206                .group_by("1", "2", "3"),
1207            )
1208            .from_(
1209                exp.to_table(
1210                    f"`{catalog}`.`{schema.db}`.INFORMATION_SCHEMA.TABLES", dialect=self.dialect
1211                )
1212            )
1213            .join(
1214                "clustering_info",
1215                using=["table_catalog", "table_schema", "table_name"],
1216                join_type="left",
1217                join_alias="ci",
1218            )
1219        )
1220        if object_names:
1221            query = query.where(exp.column("table_name").isin(*object_names))
1222
1223        try:
1224            df = self.fetchdf(query, quote_identifiers=True)
1225        except Exception as e:
1226            if "Not found" in str(e):
1227                return []
1228            raise
1229
1230        if df.empty:
1231            return []
1232        return [
1233            DataObject(
1234                catalog=row.catalog,  # type: ignore
1235                schema=row.schema_name,  # type: ignore
1236                name=row.name,  # type: ignore
1237                type=DataObjectType.from_str(row.type),  # type: ignore
1238                clustering_key=f"({row.clustering_key})" if row.clustering_key else None,  # type: ignore
1239            )
1240            for row in df.itertuples()
1241        ]
1242
1243    def _update_clustering_key(self, operation: TableAlterClusterByOperation) -> None:
1244        cluster_key_expressions = getattr(operation, "cluster_key_expressions", [])
1245        bq_table = self._get_table(operation.target_table)
1246
1247        rendered_columns = [c.sql(dialect=self.dialect) for c in cluster_key_expressions]
1248        bq_table.clustering_fields = (
1249            rendered_columns or None
1250        )  # causes a drop of the key if cluster_by is empty or None
1251
1252        self._db_call(self.client.update_table, table=bq_table, fields=["clustering_fields"])
1253
1254        if cluster_key_expressions:
1255            # BigQuery only applies new clustering going forward, so this rewrites the columns to apply the new clustering to historical data
1256            # ref: https://cloud.google.com/bigquery/docs/creating-clustered-tables#modifying-cluster-spec
1257            self.execute(
1258                exp.update(
1259                    operation.target_table,
1260                    {c: c for c in cluster_key_expressions},
1261                    where=exp.true(),
1262                )
1263            )
1264
1265    def _normalize_decimal_value(self, col: exp.Expr, precision: int) -> exp.Expr:
1266        return exp.func("FORMAT", exp.Literal.string(f"%.{precision}f"), col)
1267
1268    def _normalize_nested_value(self, col: exp.Expr) -> exp.Expr:
1269        return exp.func("TO_JSON_STRING", col, dialect=self.dialect)
1270
1271    @t.overload
1272    def _columns_to_types(
1273        self,
1274        query_or_df: DF,
1275        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1276        source_columns: t.Optional[t.List[str]] = None,
1277    ) -> t.Tuple[t.Dict[str, exp.DataType], t.List[str]]: ...
1278
1279    @t.overload
1280    def _columns_to_types(
1281        self,
1282        query_or_df: Query,
1283        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1284        source_columns: t.Optional[t.List[str]] = None,
1285    ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]: ...
1286
1287    def _columns_to_types(
1288        self,
1289        query_or_df: QueryOrDF,
1290        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1291        source_columns: t.Optional[t.List[str]] = None,
1292    ) -> t.Tuple[t.Optional[t.Dict[str, exp.DataType]], t.Optional[t.List[str]]]:
1293        if (
1294            not target_columns_to_types
1295            and bigframes
1296            and isinstance(query_or_df, bigframes.dataframe.DataFrame)
1297        ):
1298            # using dry_run=True attempts to prevent the DataFrame from being materialized just to read the column types from it
1299            dtypes = query_or_df.to_pandas(dry_run=True).columnDtypes
1300            target_columns_to_types = columns_to_types_from_dtypes(dtypes.items())
1301            return target_columns_to_types, list(source_columns or target_columns_to_types)
1302
1303        return super()._columns_to_types(
1304            query_or_df, target_columns_to_types, source_columns=source_columns
1305        )
1306
1307    def _native_df_to_pandas_df(
1308        self,
1309        query_or_df: QueryOrDF,
1310    ) -> t.Union[Query, pd.DataFrame]:
1311        if bigframes and isinstance(query_or_df, bigframes.dataframe.DataFrame):
1312            return query_or_df.to_pandas()
1313
1314        return super()._native_df_to_pandas_df(query_or_df)
1315
1316    @property
1317    def _query_data(self) -> t.Any:
1318        return self._connection_pool.get_attribute("query_data")
1319
1320    @_query_data.setter
1321    def _query_data(self, value: t.Any) -> None:
1322        self._connection_pool.set_attribute("query_data", value)
1323
1324    @property
1325    def _query_job(self) -> t.Optional[QueryJob]:
1326        return self._connection_pool.get_attribute("query_job")
1327
1328    @_query_job.setter
1329    def _query_job(self, value: t.Any) -> None:
1330        self._connection_pool.set_attribute("query_job", value)
1331
1332    @property
1333    def _session_id(self) -> t.Any:
1334        return self._connection_pool.get_attribute("session_id")
1335
1336    @_session_id.setter
1337    def _session_id(self, value: t.Any) -> None:
1338        self._connection_pool.set_attribute("session_id", value)
1339
1340    def _get_current_schema(self) -> str:
1341        raise NotImplementedError("BigQuery does not support current schema")
1342
1343    def _get_bq_dataset_location(self, project: str, dataset: str) -> str:
1344        return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location
1345
1346    def _get_grant_expression(self, table: exp.Table) -> exp.Expr:
1347        if not table.db:
1348            raise ValueError(
1349                f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)"
1350            )
1351        project = table.catalog or self.get_current_catalog()
1352        if not project:
1353            raise ValueError(
1354                f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)"
1355            )
1356
1357        dataset = table.db
1358        table_name = table.name
1359        location = self._get_bq_dataset_location(project, dataset)
1360
1361        # https://cloud.google.com/bigquery/docs/information-schema-object-privileges
1362        # OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier
1363        object_privileges_table = exp.to_table(
1364            f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}",
1365            dialect=self.dialect,
1366        )
1367        return (
1368            exp.select("privilege_type", "grantee")
1369            .from_(object_privileges_table)
1370            .where(
1371                exp.and_(
1372                    exp.column("object_schema").eq(exp.Literal.string(dataset)),
1373                    exp.column("object_name").eq(exp.Literal.string(table_name)),
1374                    # Filter out current_user
1375                    # BigQuery grantees format: "user:email" or "group:name"
1376                    exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[
1377                        exp.func("OFFSET", exp.Literal.number("1"))
1378                    ].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
1379                )
1380            )
1381        )
1382
1383    @staticmethod
1384    def _grant_object_kind(table_type: DataObjectType) -> str:
1385        if table_type == DataObjectType.VIEW:
1386            return "VIEW"
1387        if table_type == DataObjectType.MATERIALIZED_VIEW:
1388            # We actually need to use "MATERIALIZED VIEW" here even though it's not listed
1389            # as a supported resource_type in the BigQuery DCL doc:
1390            # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language
1391            return "MATERIALIZED VIEW"
1392        return "TABLE"
1393
1394    def _dcl_grants_config_expr(
1395        self,
1396        dcl_cmd: t.Type[DCL],
1397        table: exp.Table,
1398        grants_config: GrantsConfig,
1399        table_type: DataObjectType = DataObjectType.TABLE,
1400    ) -> t.List[exp.Expr]:
1401        expressions: t.List[exp.Expr] = []
1402        if not grants_config:
1403            return expressions
1404
1405        # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language
1406
1407        def normalize_principal(p: str) -> str:
1408            if ":" not in p:
1409                raise ValueError(f"Principal '{p}' missing a prefix label")
1410
1411            # allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:"
1412            if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"):
1413                if not p.startswith("specialGroup:"):
1414                    raise ValueError(
1415                        f"Special group principal '{p}' must start with 'specialGroup:' prefix label"
1416                    )
1417                return p
1418
1419            label, principal = p.split(":", 1)
1420            # always lowercase principals
1421            return f"{label}:{principal.lower()}"
1422
1423        object_kind = self._grant_object_kind(table_type)
1424        for privilege, principals in grants_config.items():
1425            if not principals:
1426                continue
1427
1428            noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals]
1429            args: t.Dict[str, t.Any] = {
1430                "privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))],
1431                "securable": table.copy(),
1432                "principals": noramlized_principals,
1433            }
1434
1435            if object_kind:
1436                args["kind"] = exp.Var(this=object_kind)
1437
1438            expressions.append(dcl_cmd(**args))  # type: ignore[arg-type]
1439
1440        return expressions

BigQuery Engine Adapter using the google-cloud-bigquery library's DB API.

DIALECT = 'bigquery'
DEFAULT_BATCH_SIZE = 1000
SUPPORTS_TRANSACTIONS = False
SUPPORTS_MATERIALIZED_VIEWS = True
SUPPORTS_CLONING = True
SUPPORTS_GRANTS = True
CURRENT_USER_OR_ROLE_EXPRESSION: sqlglot.expressions.core.Expr = SessionUser()
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
USE_CATALOG_IN_GRANTS = True
GRANT_INFORMATION_SCHEMA_TABLE_NAME = 'OBJECT_PRIVILEGES'
MAX_TABLE_COMMENT_LENGTH = 1024
MAX_COLUMN_COMMENT_LENGTH = 1024
SUPPORTS_QUERY_EXECUTION_TRACKING = True
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ['SCHEMA']
INSERT_OVERWRITE_STRATEGY = <InsertOverwriteStrategy.MERGE: 5>
SCHEMA_DIFFER_KWARGS = {'compatible_types': {DataType(this=DType.BIGINT, nested=False): {DataType(this=DType.BIGDECIMAL, nested=False), DataType(this=DType.DOUBLE, nested=False), DataType(this=DType.DECIMAL, nested=False)}, DataType(this=DType.DECIMAL, nested=False): {DataType(this=DType.BIGDECIMAL, nested=False), DataType(this=DType.DOUBLE, nested=False)}, DataType(this=DType.DATE, nested=False): {DataType(this=DType.TIMESTAMP, nested=False)}}, 'coerceable_types': {DataType(this=DType.DOUBLE, nested=False): {DataType(this=DType.BIGDECIMAL, nested=False)}}, 'support_coercing_compatible_types': True, 'parameterized_type_defaults': {<DType.DECIMAL: 'DECIMAL'>: [(38, 9), (0,)], <DType.BIGDECIMAL: 'BIGDECIMAL'>: [(76.76, 38), (0,)]}, 'types_with_unlimited_length': {<DType.TEXT: 'TEXT'>: {<DType.TEXT: 'TEXT'>}, <DType.BINARY: 'BINARY'>: {<DType.BINARY: 'BINARY'>}}, 'nested_support': <NestedSupport.ALL_BUT_DROP: 'ALL_BUT_DROP'>}
client: google.cloud.bigquery.client.Client
119    @property
120    def client(self) -> BigQueryClient:
121        return self.connection._client
bigframe: Optional[<MagicMock id='132726899292576'>]
123    @property
124    def bigframe(self) -> t.Optional[BigframeSession]:
125        if bigframes:
126            options = bigframes.BigQueryOptions(
127                credentials=self.client._credentials,
128                project=self.client.project,
129                location=self.client.location,
130            )
131            return bigframes.connect(context=options)
132        return None
154    @property
155    def catalog_support(self) -> CatalogSupport:
156        return CatalogSupport.FULL_SUPPORT
def close(self) -> Any:
208    def close(self) -> t.Any:
209        # Cancel all pending query jobs across all threads
210        all_query_jobs = self._connection_pool.get_all_attributes("query_job")
211        for query_job in all_query_jobs:
212            if query_job:
213                try:
214                    if not self._db_call(query_job.done):
215                        self._db_call(query_job.cancel)
216                        logger.debug(
217                            "Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
218                            query_job.project,
219                            query_job.location,
220                            query_job.job_id,
221                        )
222                except Exception as ex:
223                    logger.debug(
224                        "Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s",
225                        query_job.project,
226                        query_job.location,
227                        query_job.job_id,
228                        str(ex),
229                    )
230
231        return super().close()

Closes all open connections and releases all allocated resources.

def get_current_catalog(self) -> Optional[str]:
281    def get_current_catalog(self) -> t.Optional[str]:
282        """Returns the catalog name of the current connection."""
283        return self.client.project

Returns the catalog name of the current connection.

def set_current_catalog(self, catalog: str) -> None:
285    def set_current_catalog(self, catalog: str) -> None:
286        """Sets the catalog name of the current connection."""
287        self.client.project = catalog

Sets the catalog name of the current connection.

def create_schema( self, schema_name: Union[str, sqlglot.expressions.query.Table], ignore_if_exists: bool = True, warn_on_error: bool = True, properties: List[sqlglot.expressions.core.Expr] = []) -> None:
289    def create_schema(
290        self,
291        schema_name: SchemaName,
292        ignore_if_exists: bool = True,
293        warn_on_error: bool = True,
294        properties: t.List[exp.Expr] = [],
295    ) -> None:
296        """Create a schema from a name or qualified table name."""
297        from google.api_core.exceptions import Conflict
298
299        try:
300            super().create_schema(
301                schema_name,
302                ignore_if_exists=ignore_if_exists,
303                warn_on_error=False,
304            )
305        except Exception as e:
306            is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e)
307            if is_already_exists_error and ignore_if_exists:
308                return
309            if not warn_on_error:
310                raise
311            logger.warning("Failed to create schema '%s': %s", schema_name, e)

Create a schema from a name or qualified table name.

def get_bq_schema( self, table_name: Union[str, sqlglot.expressions.query.Table]) -> List[google.cloud.bigquery.schema.SchemaField]:
313    def get_bq_schema(self, table_name: TableName) -> t.List[bigquery.SchemaField]:
314        table = exp.to_table(table_name)
315        if len(table.parts) == 3 and "." in table.name:
316            self.execute(exp.select("*").from_(table).limit(0))
317            query_job = self._query_job
318            assert query_job is not None
319            return query_job._query_results.schema
320        return self._get_table(table).schema
def columns( self, table_name: Union[str, sqlglot.expressions.query.Table], include_pseudo_columns: bool = False) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
322    def columns(
323        self, table_name: TableName, include_pseudo_columns: bool = False
324    ) -> t.Dict[str, exp.DataType]:
325        """Fetches column names and types for the target table."""
326
327        def dtype_to_sql(
328            dtype: t.Optional[StandardSqlDataType], field: bigquery.SchemaField
329        ) -> str:
330            assert dtype
331            assert field
332
333            kind = dtype.type_kind
334            assert kind
335
336            # Not using the enum value to preserve compatibility with older versions
337            # of the BigQuery library.
338            if kind.name == "ARRAY":
339                return f"ARRAY<{dtype_to_sql(dtype.array_element_type, field)}>"
340            if kind.name == "STRUCT":
341                struct_type = dtype.struct_type
342                assert struct_type
343                fields = ", ".join(
344                    f"{struct_field.name} {dtype_to_sql(struct_field.type, nested_field)}"
345                    for struct_field, nested_field in zip(struct_type.fields, field.fields)
346                )
347                return f"STRUCT<{fields}>"
348            if kind.name == "TYPE_KIND_UNSPECIFIED":
349                field_type = field.field_type
350
351                if field_type == "RANGE":
352                    # If the field is a RANGE then `range_element_type` should be set to
353                    # one of `"DATE"`, `"DATETIME"` or `"TIMESTAMP"`.
354                    return f"RANGE<{field.range_element_type.element_type}>"
355
356                return field_type
357
358            return kind.name
359
360        def create_mapping_schema(
361            schema: t.Sequence[bigquery.SchemaField],
362        ) -> t.Dict[str, exp.DataType]:
363            return {
364                field.name: exp.DataType.build(
365                    dtype_to_sql(field.to_standard_sql().type, field), dialect=self.dialect
366                )
367                for field in schema
368            }
369
370        table = exp.to_table(table_name)
371        if len(table.parts) == 3 and "." in table.name:
372            # The client's `get_table` method can't handle paths with >3 identifiers
373            self.execute(exp.select("*").from_(table).limit(0))
374            query_job = self._query_job
375            assert query_job is not None
376
377            query_results = query_job._query_results
378            columns = create_mapping_schema(query_results.schema)
379        else:
380            bq_table = self._get_table(table)
381            columns = create_mapping_schema(bq_table.schema)
382
383            if include_pseudo_columns:
384                if bq_table.time_partitioning and not bq_table.time_partitioning.field:
385                    columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP", dialect="bigquery")
386                    if bq_table.time_partitioning.type_ == "DAY":
387                        columns["_PARTITIONDATE"] = exp.DataType.build("DATE")
388                if bq_table.table_id.endswith("*"):
389                    columns["_TABLE_SUFFIX"] = exp.DataType.build("STRING", dialect="bigquery")
390                if (
391                    bq_table.external_data_configuration is not None
392                    and bq_table.external_data_configuration.source_format
393                    in (
394                        "CSV",
395                        "NEWLINE_DELIMITED_JSON",
396                        "AVRO",
397                        "PARQUET",
398                        "ORC",
399                        "DATASTORE_BACKUP",
400                    )
401                ):
402                    columns["_FILE_NAME"] = exp.DataType.build("STRING", dialect="bigquery")
403
404        return columns

Fetches column names and types for the target table.

def alter_table( self, alter_expressions: Union[List[sqlglot.expressions.ddl.Alter], List[sqlmesh.core.schema_diff.TableAlterOperation]]) -> None:
406    def alter_table(
407        self,
408        alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]],
409    ) -> None:
410        """
411        Performs the alter statements to change the current table into the structure of the target table,
412        and uses the API to add columns to structs, where SQL is not supported.
413        """
414        if not alter_expressions:
415            return
416
417        cluster_by_operations, alter_statements = [], []
418        for e in alter_expressions:
419            if isinstance(e, TableAlterClusterByOperation):
420                cluster_by_operations.append(e)
421            elif isinstance(e, TableAlterOperation):
422                alter_statements.append(e.expression)
423            else:
424                alter_statements.append(e)
425
426        for op in cluster_by_operations:
427            self._update_clustering_key(op)
428
429        nested_fields, non_nested_expressions = self._split_alter_expressions(alter_statements)
430
431        if nested_fields:
432            self._update_table_schema_nested_fields(nested_fields, alter_statements[0].this)
433
434        if non_nested_expressions:
435            super().alter_table(non_nested_expressions)

Performs the alter statements to change the current table into the structure of the target table, and uses the API to add columns to structs, where SQL is not supported.

def fetchone( self, query: Union[sqlglot.expressions.core.Expr, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> Optional[Tuple]:
437    def fetchone(
438        self,
439        query: t.Union[exp.Expr, str],
440        ignore_unsupported_errors: bool = False,
441        quote_identifiers: bool = False,
442    ) -> t.Optional[t.Tuple]:
443        """
444        BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute
445        configuration we have in place. Therefore this implementation calls execute instead.
446        """
447        self.execute(
448            query,
449            ignore_unsupported_errors=ignore_unsupported_errors,
450            quote_identifiers=quote_identifiers,
451        )
452        try:
453            return next(self._query_data)
454        except StopIteration:
455            return None

BigQuery's fetchone method doesn't call execute and therefore would not benefit from the execute configuration we have in place. Therefore this implementation calls execute instead.

def fetchall( self, query: Union[sqlglot.expressions.core.Expr, str], ignore_unsupported_errors: bool = False, quote_identifiers: bool = False) -> List[Tuple]:
457    def fetchall(
458        self,
459        query: t.Union[exp.Expr, str],
460        ignore_unsupported_errors: bool = False,
461        quote_identifiers: bool = False,
462    ) -> t.List[t.Tuple]:
463        """
464        BigQuery's `fetchone` method doesn't call execute and therefore would not benefit from the execute
465        configuration we have in place. Therefore this implementation calls execute instead.
466        """
467        self.execute(
468            query,
469            ignore_unsupported_errors=ignore_unsupported_errors,
470            quote_identifiers=quote_identifiers,
471        )
472        return list(self._query_data)

BigQuery's fetchone method doesn't call execute and therefore would not benefit from the execute configuration we have in place. Therefore this implementation calls execute instead.

def insert_overwrite_by_partition( self, table_name: Union[str, sqlglot.expressions.query.Table], query_or_df: <MagicMock id='132726902250016'>, partitioned_by: List[sqlglot.expressions.core.Expr], target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]] = None, source_columns: Optional[List[str]] = None) -> None:
691    def insert_overwrite_by_partition(
692        self,
693        table_name: TableName,
694        query_or_df: QueryOrDF,
695        partitioned_by: t.List[exp.Expr],
696        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
697        source_columns: t.Optional[t.List[str]] = None,
698    ) -> None:
699        if len(partitioned_by) != 1:
700            raise SQLMeshError(
701                f"Bigquery only supports partitioning by one column, {len(partitioned_by)} were provided."
702            )
703
704        partition_exp = partitioned_by[0]
705        partition_column = partition_exp.find(exp.Column)
706
707        granularity = partition_exp.args.get("unit")
708        if granularity:
709            granularity = granularity.name.lower()
710
711        if not partition_column:
712            partition_sql = partition_exp.sql(dialect=self.dialect)
713            raise SQLMeshError(
714                f"The partition expression '{partition_sql}' doesn't contain a column."
715            )
716        with (
717            self.session({}),
718            self.temp_table(
719                query_or_df,
720                name=table_name,
721                partitioned_by=partitioned_by,
722                source_columns=source_columns,
723            ) as temp_table_name,
724        ):
725            if target_columns_to_types is None or target_columns_to_types[
726                partition_column.name
727            ] == exp.DataType.build("unknown"):
728                target_columns_to_types = self.columns(table_name)
729
730            partition_type_sql = target_columns_to_types[partition_column.name].sql(
731                dialect=self.dialect
732            )
733
734            select_array_agg_partitions = select_partitions_expr(
735                temp_table_name.db,
736                temp_table_name.name,
737                partition_type_sql,
738                granularity=granularity,
739                agg_func="ARRAY_AGG",
740                catalog=temp_table_name.catalog or self.default_catalog,
741            )
742
743            self.execute(
744                f"DECLARE _sqlmesh_target_partitions_ ARRAY<{partition_type_sql}> DEFAULT ({select_array_agg_partitions});"
745            )
746
747            where = t.cast(exp.Condition, partition_exp).isin(unnest="_sqlmesh_target_partitions_")
748
749            self._insert_overwrite_by_condition(
750                table_name,
751                [SourceQuery(query_factory=lambda: exp.select("*").from_(temp_table_name))],
752                target_columns_to_types,
753                where=where,
754            )
def table_exists(self, table_name: Union[str, sqlglot.expressions.query.Table]) -> bool:
756    def table_exists(self, table_name: TableName) -> bool:
757        table = exp.to_table(table_name)
758        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
759        if data_object_cache_key in self._data_object_cache:
760            logger.debug("Table existence cache hit: %s", data_object_cache_key)
761            return self._data_object_cache[data_object_cache_key] is not None
762
763        try:
764            from google.cloud.exceptions import NotFound
765        except ModuleNotFoundError:
766            from google.api_core.exceptions import NotFound
767
768        try:
769            self._get_table(table_name)
770            return True
771        except NotFound:
772            return False
def get_table_last_modified_ts( self, table_names: List[Union[str, sqlglot.expressions.query.Table]]) -> List[int]:
774    def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]:
775        from sqlmesh.utils.date import to_timestamp
776
777        datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list)
778        for table_name in table_names:
779            table = exp.to_table(table_name)
780            datasets_to_tables[table.db].append(table.name)
781
782        results = []
783
784        for dataset, tables in datasets_to_tables.items():
785            query = (
786                f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE "
787            )
788            for i, table_name in enumerate(tables):
789                query += f"TABLE_ID = '{table_name}'"
790                if i < len(tables) - 1:
791                    query += " OR "
792            results.extend(self.fetchall(query))
793
794        return [to_timestamp(row[0]) for row in results]
def create_state_table( self, table_name: str, target_columns_to_types: Dict[str, sqlglot.expressions.datatypes.DataType], primary_key: Optional[Tuple[str, ...]] = None) -> None:
1072    def create_state_table(
1073        self,
1074        table_name: str,
1075        target_columns_to_types: t.Dict[str, exp.DataType],
1076        primary_key: t.Optional[t.Tuple[str, ...]] = None,
1077    ) -> None:
1078        self.create_table(
1079            table_name,
1080            target_columns_to_types,
1081        )

Create a table to store SQLMesh internal state.

Arguments:
  • table_name: The name of the table to create. Can be fully qualified or just table name.
  • target_columns_to_types: A mapping between the column name and its data type.
  • primary_key: Determines the table primary key.
def get_alter_operations( self, current_table_name: Union[str, sqlglot.expressions.query.Table], target_table_name: Union[str, sqlglot.expressions.query.Table], *, ignore_destructive: bool = False, ignore_additive: bool = False) -> List[sqlmesh.core.schema_diff.TableAlterOperation]:
358    def get_alter_operations(
359        self,
360        current_table_name: TableName,
361        target_table_name: TableName,
362        *,
363        ignore_destructive: bool = False,
364        ignore_additive: bool = False,
365    ) -> t.List[TableAlterOperation]:
366        operations = super().get_alter_operations(
367            current_table_name,
368            target_table_name,
369            ignore_destructive=ignore_destructive,
370            ignore_additive=ignore_additive,
371        )
372
373        # check for a change in clustering
374        current_table = exp.to_table(current_table_name)
375        target_table = exp.to_table(target_table_name)
376
377        current_table_schema = schema_(current_table.db, catalog=current_table.catalog)
378        target_table_schema = schema_(target_table.db, catalog=target_table.catalog)
379
380        current_table_info = seq_get(
381            self.get_data_objects(current_table_schema, {current_table.name}), 0
382        )
383        target_table_info = seq_get(
384            self.get_data_objects(target_table_schema, {target_table.name}), 0
385        )
386
387        if current_table_info and target_table_info:
388            if target_table_info.is_clustered:
389                if target_table_info.clustering_key and (
390                    current_table_info.clustering_key != target_table_info.clustering_key
391                ):
392                    operations.append(
393                        TableAlterChangeClusterKeyOperation(
394                            target_table=current_table,
395                            clustering_key=target_table_info.clustering_key,
396                            dialect=self.dialect,
397                        )
398                    )
399            elif current_table_info.is_clustered:
400                operations.append(TableAlterDropClusterKeyOperation(target_table=current_table))
401
402        return operations

Determines the alter statements needed to change the current table into the structure of the target table.

def select_partitions_expr( schema: str, table_name: str, data_type: Union[str, sqlglot.expressions.datatypes.DataType], granularity: Optional[str] = None, agg_func: str = 'MAX', catalog: Optional[str] = None) -> str:
1488def select_partitions_expr(
1489    schema: str,
1490    table_name: str,
1491    data_type: t.Union[str, exp.DataType],
1492    granularity: t.Optional[str] = None,
1493    agg_func: str = "MAX",
1494    catalog: t.Optional[str] = None,
1495) -> str:
1496    """Generates a SQL expression that aggregates partition values for a table.
1497
1498    Args:
1499        schema: The schema (BigQuery dataset) of the table.
1500        table_name: The name of the table.
1501        data_type: The data type of the partition column.
1502        granularity: The granularity of the partition. Supported values are: 'day', 'month', 'year' and 'hour'.
1503        agg_func: The aggregation function to use.
1504        catalog: The catalog (BigQuery project ID) of the table.
1505
1506    Returns:
1507        A SELECT statement that aggregates partition values for a table.
1508    """
1509    partitions_table_name = f"`{schema}`.INFORMATION_SCHEMA.PARTITIONS"
1510    if catalog:
1511        partitions_table_name = f"`{catalog}`.{partitions_table_name}"
1512
1513    if isinstance(data_type, exp.DataType):
1514        data_type = data_type.sql(dialect="bigquery")
1515    data_type = data_type.upper()
1516
1517    parse_fun = f"PARSE_{data_type}" if data_type in ("DATE", "DATETIME", "TIMESTAMP") else None
1518    if parse_fun:
1519        granularity = granularity or "day"
1520        parse_format = GRANULARITY_TO_PARTITION_FORMAT[granularity.lower()]
1521        partition_expr = exp.func(
1522            parse_fun,
1523            exp.Literal.string(parse_format),
1524            exp.column("partition_id"),
1525            dialect="bigquery",
1526        )
1527    else:
1528        partition_expr = exp.cast(exp.column("partition_id"), "INT64", dialect="bigquery")
1529
1530    return (
1531        exp.select(exp.func(agg_func, partition_expr))
1532        .from_(partitions_table_name, dialect="bigquery")
1533        .where(
1534            f"table_name = '{table_name}' AND partition_id IS NOT NULL AND partition_id != '__NULL__'",
1535            copy=False,
1536        )
1537        .sql(dialect="bigquery")
1538    )

Generates a SQL expression that aggregates partition values for a table.

Arguments:
  • schema: The schema (BigQuery dataset) of the table.
  • table_name: The name of the table.
  • data_type: The data type of the partition column.
  • granularity: The granularity of the partition. Supported values are: 'day', 'month', 'year' and 'hour'.
  • agg_func: The aggregation function to use.
  • catalog: The catalog (BigQuery project ID) of the table.
Returns:

A SELECT statement that aggregates partition values for a table.

GRANULARITY_TO_PARTITION_FORMAT = {'day': '%Y%m%d', 'month': '%Y%m', 'year': '%Y', 'hour': '%Y%m%d%H'}