Edit on GitHub

sqlmesh.dbt.target

   1from __future__ import annotations
   2
   3import abc
   4import typing as t
   5from pathlib import Path
   6
   7from dbt.adapters.base import BaseRelation, Column
   8from pydantic import Field, AliasChoices
   9
  10from sqlmesh.core.console import get_console
  11from sqlmesh.core.config.connection import (
  12    AthenaConnectionConfig,
  13    BigQueryConnectionConfig,
  14    BigQueryConnectionMethod,
  15    BigQueryPriority,
  16    ClickhouseConnectionConfig,
  17    ConnectionConfig,
  18    DatabricksConnectionConfig,
  19    DuckDBConnectionConfig,
  20    MSSQLConnectionConfig,
  21    PostgresConnectionConfig,
  22    RedshiftConnectionConfig,
  23    SnowflakeConnectionConfig,
  24    TrinoAuthenticationMethod,
  25    TrinoConnectionConfig,
  26)
  27from sqlmesh.core.model import (
  28    IncrementalByTimeRangeKind,
  29    IncrementalByUniqueKeyKind,
  30    IncrementalUnmanagedKind,
  31)
  32from sqlmesh.core.schema_diff import NestedSupport
  33from sqlmesh.dbt.common import DbtConfig
  34from sqlmesh.dbt.relation import Policy
  35from sqlmesh.dbt.util import DBT_VERSION
  36from sqlmesh.utils import AttributeDict, classproperty
  37from sqlmesh.utils.errors import ConfigError
  38from sqlmesh.utils.pydantic import field_validator, model_validator
  39
  40IncrementalKind = t.Union[
  41    t.Type[IncrementalByUniqueKeyKind],
  42    t.Type[IncrementalByTimeRangeKind],
  43    t.Type[IncrementalUnmanagedKind],
  44]
  45
  46# We only serialize a subset of fields in order to avoid persisting sensitive information
  47SERIALIZABLE_FIELDS = {
  48    # core
  49    "name",
  50    "schema_",
  51    "type",
  52    "threads",
  53    # snowflake
  54    "database",
  55    "warehouse",
  56    "user",
  57    "role",
  58    "account",
  59    # postgres/redshift
  60    "dbname",
  61    "host",
  62    "port",
  63    # bigquery
  64    "project",
  65    "dataset",
  66}
  67
  68SCHEMA_DIFFER_OVERRIDES = {
  69    "schema_differ_overrides": {
  70        "treat_alter_data_type_as_destructive": True,
  71        "nested_support": NestedSupport.IGNORE,
  72    }
  73}
  74
  75
  76def with_schema_differ_overrides(
  77    func: t.Callable[..., ConnectionConfig],
  78) -> t.Callable[..., ConnectionConfig]:
  79    """Decorator that merges default config with kwargs."""
  80
  81    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
  82        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
  83        return func(self, **merged_kwargs)
  84
  85    return wrapper
  86
  87
  88class TargetConfig(abc.ABC, DbtConfig):
  89    """
  90    Configuration for DBT profile target
  91
  92    Args:
  93        type: The type of the data warehouse
  94        name: The name of this target
  95        database: Name of the database
  96        schema_: Name of the schema
  97        threads: The number of threads to run on
  98    """
  99
 100    # dbt
 101    type: str = "none"
 102    name: str
 103    database: str
 104    schema_: str = Field(alias="schema")
 105    threads: int = 1
 106    profile_name: t.Optional[str] = None
 107
 108    @classmethod
 109    def load(cls, data: t.Dict[str, t.Any]) -> TargetConfig:
 110        """
 111        Loads the configuration from the yaml provided for a profile target
 112
 113        Args:
 114            data: The yaml for the project's target output
 115
 116        Returns:
 117            The configuration of the provided profile target
 118        """
 119        db_type = data["type"]
 120        if db_type == "databricks":
 121            return DatabricksConfig(**data)
 122        if db_type == "duckdb":
 123            return DuckDbConfig(**data)
 124        if db_type == "postgres":
 125            return PostgresConfig(**data)
 126        if db_type == "redshift":
 127            return RedshiftConfig(**data)
 128        if db_type == "snowflake":
 129            return SnowflakeConfig(**data)
 130        if db_type == "bigquery":
 131            return BigQueryConfig(**data)
 132        if db_type == "sqlserver":
 133            return MSSQLConfig(**data)
 134        if db_type == "trino":
 135            return TrinoConfig(**data)
 136        if db_type == "clickhouse":
 137            return ClickhouseConfig(**data)
 138        if db_type == "athena":
 139            return AthenaConfig(**data)
 140
 141        raise ConfigError(f"{db_type} not supported.")
 142
 143    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 144        """The default incremental strategy for the db"""
 145        raise NotImplementedError
 146
 147    @with_schema_differ_overrides
 148    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 149        """Converts target config to SQLMesh connection config"""
 150        raise NotImplementedError
 151
 152    def attribute_dict(self) -> AttributeDict:
 153        fields = self.dict(include=SERIALIZABLE_FIELDS).copy()
 154        fields["target_name"] = self.name
 155        return AttributeDict(fields)
 156
 157    @classproperty
 158    def quote_policy(cls) -> Policy:
 159        return Policy()
 160
 161    @property
 162    def extra(self) -> t.Set[str]:
 163        return self.extra_fields(set(self.dict()))
 164
 165    @classproperty
 166    def relation_class(cls) -> t.Type[BaseRelation]:
 167        return BaseRelation
 168
 169    @classproperty
 170    def column_class(cls) -> t.Type[Column]:
 171        return Column
 172
 173    @property
 174    def dialect(self) -> str:
 175        return self.type
 176
 177
 178DUCKDB_IN_MEMORY = ":memory:"
 179
 180
 181class DuckDbConfig(TargetConfig):
 182    """
 183    Connection config for DuckDb target
 184
 185    Args:
 186        path: Location of the database file. If not specified, an in memory database is used.
 187        extensions: A list of autoloadable extensions to load.
 188        settings: A dictionary of settings to pass into the duckdb connector.
 189        secrets: A list of secrets to pass to the secret manager in the duckdb connector.
 190        filesystems: A list of `fsspec` filesystems to register in the duckdb connection.
 191    """
 192
 193    type: t.Literal["duckdb"] = "duckdb"
 194    database: str = "main"
 195    schema_: str = Field(default="main", alias="schema")
 196    path: str = DUCKDB_IN_MEMORY
 197    extensions: t.Optional[t.List[str]] = None
 198    settings: t.Optional[t.Dict[str, t.Any]] = None
 199    secrets: t.Optional[t.List[t.Dict[str, t.Any]]] = None
 200    filesystems: t.Optional[t.List[t.Dict[str, t.Any]]] = None
 201
 202    @model_validator(mode="before")
 203    def validate_authentication(cls, data: t.Any) -> t.Any:
 204        if not isinstance(data, dict):
 205            return data
 206
 207        if "database" not in data and DBT_VERSION >= (1, 5, 0):
 208            path = data.get("path")
 209            data["database"] = (
 210                "memory"
 211                if path is None or path == DUCKDB_IN_MEMORY
 212                else Path(t.cast(str, path)).stem
 213            )
 214
 215        if "threads" in data and t.cast(int, data["threads"]) > 1:
 216            get_console().log_warning("DuckDB does not support concurrency - setting threads to 1.")
 217
 218        return data
 219
 220    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 221        return "delete+insert"
 222
 223    @classproperty
 224    def relation_class(cls) -> t.Type[BaseRelation]:
 225        from dbt.adapters.duckdb.relation import DuckDBRelation
 226
 227        return DuckDBRelation
 228
 229    @with_schema_differ_overrides
 230    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 231        if self.extensions is not None:
 232            kwargs["extensions"] = self.extensions
 233        if self.settings is not None:
 234            kwargs["connector_config"] = self.settings
 235        if self.secrets is not None:
 236            kwargs["secrets"] = self.secrets
 237        if self.filesystems is not None:
 238            kwargs["filesystems"] = self.filesystems
 239        return DuckDBConnectionConfig(
 240            database=self.path,
 241            concurrent_tasks=1,
 242            **kwargs,
 243        )
 244
 245
 246class SnowflakeConfig(TargetConfig):
 247    """
 248    Project connection and operational configuration for the Snowflake target
 249
 250    Args:
 251        account: Snowflake account
 252        warehouse: Name of the warehouse
 253        user: Name of the user
 254        password: User's password
 255        role: Role of the user
 256        client_session_keep_alive: A boolean flag to extend the duration of the Snowflake session beyond 4 hours
 257        query_tag: tag for the query in Snowflake
 258        connect_retries: Number of times to retry if the Snowflake connector encounters an error
 259        connect_timeout: Number of seconds to wait between failed attempts
 260        retry_on_database_errors: A boolean flag to retry if a Snowflake connector Database error is encountered
 261        retry_all: A boolean flag to retry on all Snowflake connector errors
 262        authenticator: SSO authentication: Snowflake authentication method
 263        private_key: Key pair authentication: Private key
 264        private_key_path: Key pair authentication: Path to the private key, used instead of private_key
 265        private_key_passphrase: Key pair authentication: passphrase used to decrypt private key (if encrypted)
 266        token: OAuth authentication: The Snowflake OAuth 2.0 access token
 267    """
 268
 269    type: t.Literal["snowflake"] = "snowflake"
 270    account: str
 271    user: str
 272
 273    # User and password authentication
 274    password: t.Optional[str] = None
 275
 276    # SSO authentication
 277    authenticator: t.Optional[str] = None
 278
 279    # Key Pair Auth
 280    private_key: t.Optional[str] = None
 281    private_key_path: t.Optional[str] = None
 282    private_key_passphrase: t.Optional[str] = None
 283
 284    # TODO add other forms of authentication
 285
 286    # oauth access token
 287    token: t.Optional[str] = None
 288
 289    # Optional
 290    warehouse: t.Optional[str] = None
 291    role: t.Optional[str] = None
 292    client_session_keep_alive: bool = False
 293    query_tag: t.Optional[str] = None
 294    connect_retries: int = 0
 295    connect_timeout: int = 10
 296    retry_on_database_errors: bool = False
 297    retry_all: bool = False
 298
 299    @model_validator(mode="before")
 300    @classmethod
 301    def validate_authentication(cls, data: t.Any) -> t.Any:
 302        if not isinstance(data, dict) or (
 303            data.get("password")
 304            or data.get("authenticator")
 305            or data.get("private_key")
 306            or data.get("private_key_path")
 307        ):
 308            return data
 309
 310        raise ConfigError("No supported Snowflake authentication method found in target profile.")
 311
 312    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 313        return "merge"
 314
 315    @classproperty
 316    def relation_class(cls) -> t.Type[BaseRelation]:
 317        from dbt.adapters.snowflake import SnowflakeRelation
 318
 319        return SnowflakeRelation
 320
 321    @classproperty
 322    def column_class(cls) -> t.Type[Column]:
 323        from dbt.adapters.snowflake import SnowflakeColumn
 324
 325        return SnowflakeColumn
 326
 327    @with_schema_differ_overrides
 328    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 329        return SnowflakeConnectionConfig(
 330            user=self.user,
 331            password=self.password,
 332            authenticator=self.authenticator,
 333            account=self.account,
 334            warehouse=self.warehouse,
 335            database=self.database,
 336            role=self.role,
 337            concurrent_tasks=self.threads,
 338            token=self.token,
 339            private_key=self.private_key,
 340            private_key_path=self.private_key_path,
 341            private_key_passphrase=self.private_key_passphrase,
 342            **kwargs,
 343        )
 344
 345    @classproperty
 346    def quote_policy(cls) -> Policy:
 347        return Policy(database=False, schema=False, identifier=False)
 348
 349
 350class PostgresConfig(TargetConfig):
 351    """
 352    Project connection and operational configuration for the Postgres target
 353
 354    Args:
 355        host: The Postgres host to connect to
 356        user: Name of the user
 357        password: User's password
 358        port: The port to connect to
 359        dbname: Name of the database
 360        keepalives_idle: Seconds between TCP keepalive packets
 361        connect_timeout: Number of seconds to wait between failed attempts
 362        retries: Number of times to retry if the Postgres connector encounters an error
 363        search_path: Overrides the default search path
 364        role: Role of the user
 365        sslmode: SSL Mode used to connect to the database
 366    """
 367
 368    type: t.Literal["postgres"] = "postgres"
 369    host: str
 370    user: str
 371    password: str = Field(validation_alias=AliasChoices("pass", "password"))
 372    port: int
 373    dbname: str
 374    keepalives_idle: t.Optional[int] = None
 375    connect_timeout: int = 10
 376    retries: int = 1  # Currently Unsupported by SQLMesh
 377    search_path: t.Optional[str] = None  # Currently Unsupported by SQLMesh
 378    role: t.Optional[str] = None
 379    sslmode: t.Optional[str] = None
 380
 381    @model_validator(mode="before")
 382    @classmethod
 383    def validate_database(cls, data: t.Any) -> t.Any:
 384        if not isinstance(data, dict):
 385            return data
 386
 387        data["database"] = data.get("database") or data.get("dbname")
 388        if not data["database"]:
 389            raise ConfigError("Either database or dbname must be set")
 390
 391        return data
 392
 393    @field_validator("port")
 394    @classmethod
 395    def _validate_port(cls, v: t.Union[int, str]) -> int:
 396        return int(v)
 397
 398    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 399        return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"
 400
 401    @with_schema_differ_overrides
 402    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 403        return PostgresConnectionConfig(
 404            host=self.host,
 405            user=self.user,
 406            password=self.password,
 407            port=self.port,
 408            database=self.dbname,
 409            keepalives_idle=self.keepalives_idle,
 410            concurrent_tasks=self.threads,
 411            connect_timeout=self.connect_timeout,
 412            role=self.role,
 413            sslmode=self.sslmode,
 414            **kwargs,
 415        )
 416
 417
 418class RedshiftConfig(TargetConfig):
 419    """
 420    Project connection and operational configuration for the Redshift target
 421
 422    Args:
 423        host: The Redshift host to connect to
 424        user: Name of the user
 425        password: User's password
 426        port: The port to connect to
 427        dbname: Name of the database
 428        connect_timeout: Number of seconds to wait between failed attempts
 429        ra3_node: Enables cross-database sources
 430        search_path: Overrides the default search path
 431        sslmode: SSL Mode used to connect to the database
 432    """
 433
 434    # TODO add other forms of authentication
 435    type: t.Literal["redshift"] = "redshift"
 436    host: str
 437    user: str
 438    password: str = Field(validation_alias=AliasChoices("pass", "password"))
 439    port: int
 440    dbname: str
 441    connect_timeout: t.Optional[int] = None
 442    ra3_node: bool = True
 443    search_path: t.Optional[str] = None
 444    sslmode: t.Optional[str] = None
 445
 446    @model_validator(mode="before")
 447    @classmethod
 448    def validate_database(cls, data: t.Any) -> t.Any:
 449        if not isinstance(data, dict):
 450            return data
 451
 452        data["database"] = data.get("database") or data.get("dbname")
 453        if not data["database"]:
 454            raise ConfigError("Either database or dbname must be set")
 455
 456        return data
 457
 458    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 459        return "append"
 460
 461    @classproperty
 462    def relation_class(cls) -> t.Type[BaseRelation]:
 463        from dbt.adapters.redshift import RedshiftRelation
 464
 465        return RedshiftRelation
 466
 467    @classproperty
 468    def column_class(cls) -> t.Type[Column]:
 469        if DBT_VERSION < (1, 6, 0):
 470            from dbt.adapters.redshift import RedshiftColumn  # type: ignore
 471
 472            return RedshiftColumn
 473        return super(RedshiftConfig, cls).column_class
 474
 475    @with_schema_differ_overrides
 476    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 477        return RedshiftConnectionConfig(
 478            user=self.user,
 479            password=self.password,
 480            database=self.database,
 481            host=self.host,
 482            port=self.port,
 483            sslmode=self.sslmode,
 484            timeout=self.connect_timeout,
 485            concurrent_tasks=self.threads,
 486            **kwargs,
 487        )
 488
 489
 490class DatabricksConfig(TargetConfig):
 491    """
 492    Project connection and operational configuration for the Databricks target
 493
 494    Args:
 495        catalog: Catalog name to use for Unity Catalog
 496        host: The Databricks host to connect to
 497        http_path: The Databricks compute resources URL
 498        token: Personal access token
 499        database: Name of the database. Not applicable for Databricks and ignored
 500    """
 501
 502    type: t.Literal["databricks"] = "databricks"
 503    host: str
 504    http_path: str
 505    token: t.Optional[str] = None  # only required if auth_type is not set to 'oauth'
 506    database: t.Optional[str] = Field(alias="catalog")  # type: ignore
 507    auth_type: t.Optional[str] = None
 508    client_id: t.Optional[str] = None
 509    client_secret: t.Optional[str] = None
 510
 511    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 512        return "merge"
 513
 514    @classproperty
 515    def relation_class(cls) -> t.Type[BaseRelation]:
 516        from dbt.adapters.databricks.relation import DatabricksRelation
 517
 518        return DatabricksRelation
 519
 520    @classproperty
 521    def column_class(cls) -> t.Type[Column]:
 522        from dbt.adapters.databricks.column import DatabricksColumn
 523
 524        return DatabricksColumn
 525
 526    @with_schema_differ_overrides
 527    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 528        return DatabricksConnectionConfig(
 529            server_hostname=self.host,
 530            http_path=self.http_path,
 531            access_token=self.token,
 532            concurrent_tasks=self.threads,
 533            catalog=self.database,
 534            auth_type="databricks-oauth" if self.auth_type == "oauth" else self.auth_type,
 535            oauth_client_id=self.client_id,
 536            oauth_client_secret=self.client_secret,
 537            **kwargs,
 538        )
 539
 540
 541class BigQueryConfig(TargetConfig):
 542    """
 543    Project connection and operational configuration for the BigQuery target
 544
 545    Args:
 546        type: The type of the target (bigquery)
 547        method: The BigQuery authentication method to use
 548        project: The BigQuery project to connect to
 549        location: The BigQuery location to connect to
 550        keyfile: The path to the BigQuery keyfile
 551        keyfile_json: The BigQuery keyfile as a JSON string
 552        token: The BigQuery token
 553        refresh_token: The BigQuery refresh token
 554        client_id: The BigQuery client ID
 555        client_secret: The BigQuery client secret
 556        token_uri: The BigQuery token URI
 557        scopes: The BigQuery scopes
 558        impersonated_service_account: The service account to impersonate
 559        job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created
 560        job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete
 561        timeout_seconds: Alias for job_execution_timeout_seconds
 562        job_retries: The number of times to retry the underlying job if it fails
 563        retries: Alias for job_retries
 564        job_retry_deadline_seconds: Total number of seconds to wait while retrying the same query
 565        priority: The priority of the underlying job
 566        maximum_bytes_billed: The maximum number of bytes to be billed for the underlying job
 567    """
 568
 569    type: t.Literal["bigquery"] = "bigquery"
 570    method: t.Optional[str] = BigQueryConnectionMethod.OAUTH
 571    dataset: t.Optional[str] = None
 572    project: t.Optional[str] = None
 573    execution_project: t.Optional[str] = None
 574    quota_project: t.Optional[str] = None
 575    location: t.Optional[str] = None
 576    keyfile: t.Optional[str] = None
 577    keyfile_json: t.Optional[t.Dict[str, t.Any]] = None
 578    token: t.Optional[str] = None
 579    refresh_token: t.Optional[str] = None
 580    client_id: t.Optional[str] = None
 581    client_secret: t.Optional[str] = None
 582    token_uri: t.Optional[str] = None
 583    scopes: t.Tuple[str, ...] = (
 584        "https://www.googleapis.com/auth/bigquery",
 585        "https://www.googleapis.com/auth/cloud-platform",
 586        "https://www.googleapis.com/auth/drive",
 587    )
 588    impersonated_service_account: t.Optional[str] = None
 589    job_creation_timeout_seconds: t.Optional[int] = None
 590    job_execution_timeout_seconds: t.Optional[int] = None
 591    timeout_seconds: t.Optional[int] = None  # To support legacy config
 592    job_retries: t.Optional[int] = None
 593    retries: int = 1  # To support legacy config
 594    job_retry_deadline_seconds: t.Optional[int] = None
 595    priority: BigQueryPriority = BigQueryPriority.INTERACTIVE
 596    maximum_bytes_billed: t.Optional[int] = None
 597
 598    @model_validator(mode="before")
 599    @classmethod
 600    def validate_fields(cls, data: t.Any) -> t.Any:
 601        if not isinstance(data, dict):
 602            return data
 603
 604        # dbt treats schema and dataset interchangeably
 605        schema = data.get("schema") or data.get("dataset")
 606        if not schema:
 607            raise ConfigError("Either schema or dataset must be set")
 608        data["dataset"] = data["schema"] = schema
 609
 610        # dbt treats database and project interchangeably
 611        database = data.get("database") or data.get("project")
 612        if not database:
 613            raise ConfigError("Either database or project must be set")
 614        data["database"] = data["project"] = database
 615
 616        return data
 617
 618    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 619        return "merge"
 620
 621    @classproperty
 622    def relation_class(cls) -> t.Type[BaseRelation]:
 623        from dbt.adapters.bigquery.relation import BigQueryRelation
 624
 625        return BigQueryRelation
 626
 627    @classproperty
 628    def column_class(cls) -> t.Type[Column]:
 629        from dbt.adapters.bigquery import BigQueryColumn
 630
 631        return BigQueryColumn
 632
 633    @with_schema_differ_overrides
 634    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 635        job_retries = self.job_retries if self.job_retries is not None else self.retries
 636        job_execution_timeout_seconds = (
 637            self.job_execution_timeout_seconds
 638            if self.job_execution_timeout_seconds is not None
 639            else self.timeout_seconds
 640        )
 641        return BigQueryConnectionConfig(
 642            method=self.method,
 643            project=self.database,
 644            execution_project=self.execution_project,
 645            quota_project=self.quota_project,
 646            location=self.location,
 647            concurrent_tasks=self.threads,
 648            keyfile=self.keyfile,
 649            keyfile_json=self.keyfile_json,
 650            token=self.token,
 651            refresh_token=self.refresh_token,
 652            client_id=self.client_id,
 653            client_secret=self.client_secret,
 654            token_uri=self.token_uri,
 655            scopes=self.scopes,
 656            impersonated_service_account=self.impersonated_service_account,
 657            job_creation_timeout_seconds=self.job_creation_timeout_seconds,
 658            job_execution_timeout_seconds=job_execution_timeout_seconds,
 659            job_retries=job_retries,
 660            job_retry_deadline_seconds=self.job_retry_deadline_seconds,
 661            priority=self.priority,
 662            maximum_bytes_billed=self.maximum_bytes_billed,
 663            **kwargs,
 664        )
 665
 666
 667class MSSQLConfig(TargetConfig):
 668    """
 669    Project connection and operational configuration for the SQL Server (MSSQL) target
 670
 671    Args:
 672        host: The MSSQL server host to connect to
 673        server: Alias for host
 674        port: The MSSQL server port to connect to
 675        user: User name for authentication
 676        username: Alias for user
 677        UID: Alias for user
 678        password: User password for authentication
 679        PWD: Alias for password
 680        login_timeout: The number of seconds to wait for a login to complete
 681        query_timeout: The number of seconds to wait for a query to complete
 682        authentication: The authentication method to use (only "sql" is supported)
 683        schema_authorization: The principal who should own created schemas, not supported by SQLMesh
 684        driver: ODBC driver to use, not used by SQLMesh
 685        encrypt: A boolean flag to enable server connection encryption, not used by SQLMesh
 686        trust_cert: A boolean flag to trust the server certificate, not used by SQLMesh
 687        retries: Number of times to retry if the SQL Server connector encounters an error, not used by SQLMesh
 688        windows_login: A boolean flag to use Windows Authentication, not used by SQLMesh
 689        tenant_id: The tenant ID of the Azure Active Directory instance, not used by SQLMesh
 690        client_id: The client ID of the Azure Active Directory service principal, not used by SQLMesh
 691        client_secret: The client secret of the Azure Active Directory service principal, not used by SQLMesh
 692    """
 693
 694    type: t.Literal["sqlserver"] = "sqlserver"
 695    host: t.Optional[str] = None
 696    server: t.Optional[str] = None
 697    port: int = 1433
 698    database: str = Field(default="master")
 699    schema_: str = Field(default="dbo", alias="schema")
 700    user: t.Optional[str] = None
 701    username: t.Optional[str] = None
 702    UID: t.Optional[str] = None
 703    password: t.Optional[str] = None
 704    PWD: t.Optional[str] = None
 705    threads: int = 4
 706    login_timeout: t.Optional[int] = None
 707    query_timeout: t.Optional[int] = None
 708    authentication: t.Optional[str] = "sql"
 709    schema_authorization: t.Optional[str] = None  # Not supported by SQLMesh
 710
 711    # Unused ODBC parameters (SQLMesh uses pymssql instead of ODBC)
 712    driver: t.Optional[str] = None
 713    encrypt: t.Optional[bool] = None
 714    trust_cert: t.Optional[bool] = None
 715    retries: t.Optional[int] = None
 716
 717    # Unused authentication parameters (not supported by pymssql)
 718    windows_login: t.Optional[bool] = None  # pymssql doesn't require this flag for Windows Auth
 719    tenant_id: t.Optional[str] = None  # Azure Active Directory auth
 720    client_id: t.Optional[str] = None  # Azure Active Directory auth
 721    client_secret: t.Optional[str] = None  # Azure Active Directory auth
 722
 723    @model_validator(mode="before")
 724    @classmethod
 725    def validate_alias_fields(cls, data: t.Any) -> t.Any:
 726        if not isinstance(data, dict):
 727            return data
 728
 729        data["host"] = data.get("host") or data.get("server")
 730        if not data["host"]:
 731            raise ConfigError("Either host or server must be set")
 732
 733        data["user"] = data.get("user") or data.get("username") or data.get("UID")
 734        if not data["user"]:
 735            raise ConfigError("One of user, username, or UID must be set")
 736
 737        data["password"] = data.get("password") or data.get("PWD")
 738        if not data["password"]:
 739            raise ConfigError("Either password or PWD must be set")
 740
 741        return data
 742
 743    @field_validator("authentication")
 744    @classmethod
 745    def _validate_authentication(cls, v: str) -> str:
 746        if v != "sql":
 747            raise ConfigError("Only SQL and Windows Authentication are supported for SQL Server")
 748        return v
 749
 750    @field_validator("port")
 751    @classmethod
 752    def _validate_port(cls, v: t.Union[int, str]) -> int:
 753        return int(v)
 754
 755    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 756        # https://github.com/microsoft/dbt-fabric/blob/main/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql
 757        return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"
 758
 759    @classproperty
 760    def column_class(cls) -> t.Type[Column]:
 761        try:
 762            # 1.8.0+
 763            from dbt.adapters.sqlserver.sqlserver_column import SQLServerColumn
 764        except ImportError:
 765            # <1.8.0
 766            from dbt.adapters.sqlserver.sql_server_column import SQLServerColumn  # type: ignore
 767
 768        return SQLServerColumn
 769
 770    @property
 771    def dialect(self) -> str:
 772        return "tsql"
 773
 774    @with_schema_differ_overrides
 775    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 776        return MSSQLConnectionConfig(
 777            host=self.host,
 778            user=self.user,
 779            password=self.password,
 780            port=self.port,
 781            database=self.database,
 782            timeout=self.query_timeout,
 783            login_timeout=self.login_timeout,
 784            concurrent_tasks=self.threads,
 785            **kwargs,
 786        )
 787
 788
 789class TrinoConfig(TargetConfig):
 790    """
 791    Project connection and operational configuration for the Trino target.
 792
 793    Args:
 794        method: The Trino authentication method to use
 795        host: The server host to connect to
 796        port: The MSSQL server port to connect to
 797        database: Name of the Trino database/catalog
 798        schema: Name of the Trino schema
 799        user: User name for authentication
 800        password: User password for authentication
 801        roles: Trino catalog roles
 802        session_properties: Trino session properties
 803        retries: Number of times to retry if the Trino connector encounters an error
 804        timezone: The timezone to use for the Trino session
 805        http_headers: HTTP Headers to send alongside requests to Trino
 806        http_scheme: The HTTP scheme to use for requests to Trino (default: http, or https if kerberos, ldap or jwt auth)
 807        threads: The number of threads to run on
 808        impersonation_user:  LDAP authentication: override the provided username
 809        keytab: Kerberos authentication: Path to keytab
 810        krb5_config: Kerberos authentication: Path to config
 811        principal: Kerberos authentication: Principal
 812        service_name: Kerberos authentication: Service name
 813        hostname_override: Kerberos authentication: hostname for a host whose DNS name doesn't match
 814        mutual_authentication: Kerberos authentication: Boolean flag for mutual authentication.
 815        force_preemptive: Kerberos authentication: Boolean flag to preemptively initiate the GSS exchange.
 816        sanitize_mutual_error_response: Kerberos authentication: Boolean flag to strip content and headers from error responses.
 817        delegate: Kerberos authentication: Boolean flag for credential delegation (`GSS_C_DELEG_FLAG`)
 818        jwt_token: JWT authentication: JWT string
 819        client_certificate: Certification authentication: Path to client certificate
 820        client_private_key: Certification authentication: Path to client private key
 821        cert: Certification authentication: Full path to a certificate file
 822    """
 823
 824    _method_to_auth_enum: t.ClassVar[t.Dict[str, TrinoAuthenticationMethod]] = {
 825        "none": TrinoAuthenticationMethod.NO_AUTH,
 826        "ldap": TrinoAuthenticationMethod.LDAP,
 827        "kerberos": TrinoAuthenticationMethod.KERBEROS,
 828        "jwt": TrinoAuthenticationMethod.JWT,
 829        "certificate": TrinoAuthenticationMethod.CERTIFICATE,
 830        "oauth": TrinoAuthenticationMethod.OAUTH,
 831        "oauth_console": TrinoAuthenticationMethod.OAUTH,
 832    }
 833
 834    type: t.Literal["trino"] = "trino"
 835    host: str
 836    database: str
 837    schema_: str = Field(alias="schema")
 838    port: int = 443
 839    method: str
 840    user: t.Optional[str] = None
 841
 842    threads: int = 1
 843    roles: t.Optional[t.Dict[str, str]] = None
 844    session_properties: t.Optional[t.Dict[str, str]] = None
 845    retries: int = 3
 846    timezone: t.Optional[str] = None
 847    http_headers: t.Optional[t.Dict[str, str]] = None
 848    http_scheme: t.Optional[str] = None
 849    prepared_statements_enabled: bool = True  # not used by SQLMesh
 850
 851    # ldap authentication
 852    password: t.Optional[str] = None
 853    impersonation_user: t.Optional[str] = None
 854
 855    # kerberos authentication
 856    keytab: t.Optional[str] = None
 857    krb5_config: t.Optional[str] = None
 858    principal: t.Optional[str] = None
 859    service_name: str = "trino"
 860    hostname_override: t.Optional[str] = None
 861    mutual_authentication: bool = False
 862    force_preemptive: bool = False
 863    sanitize_mutual_error_response: bool = True
 864    delegate: bool = False
 865
 866    # jwt authentication
 867    jwt_token: t.Optional[str] = None
 868
 869    # certificate authentication
 870    client_certificate: t.Optional[str] = None
 871    client_private_key: t.Optional[str] = None
 872    cert: t.Optional[str] = None
 873
 874    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 875        return "append"
 876
 877    @classproperty
 878    def relation_class(cls) -> t.Type[BaseRelation]:
 879        from dbt.adapters.trino.relation import TrinoRelation
 880
 881        return TrinoRelation
 882
 883    @classproperty
 884    def column_class(cls) -> t.Type[Column]:
 885        from dbt.adapters.trino.column import TrinoColumn
 886
 887        return TrinoColumn
 888
 889    @with_schema_differ_overrides
 890    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
 891        return TrinoConnectionConfig(
 892            method=self._method_to_auth_enum[self.method],
 893            host=self.host,
 894            user=self.user,
 895            catalog=self.database,
 896            port=self.port,
 897            http_scheme=self.http_scheme,
 898            roles=self.roles,
 899            http_headers=self.http_headers,
 900            session_properties=self.session_properties,
 901            retries=self.retries,
 902            timezone=self.timezone,
 903            password=self.password,
 904            impersonation_user=self.impersonation_user,
 905            keytab=self.keytab,
 906            krb5_config=self.krb5_config,
 907            principal=self.principal,
 908            service_name=self.service_name,
 909            hostname_override=self.hostname_override,
 910            mutual_authentication=self.mutual_authentication,
 911            force_preemptive=self.force_preemptive,
 912            sanitize_mutual_error_response=self.sanitize_mutual_error_response,
 913            delegate=self.delegate,
 914            jwt_token=self.jwt_token,
 915            client_certificate=self.client_certificate,
 916            client_private_key=self.client_private_key,
 917            cert=self.cert,
 918            concurrent_tasks=self.threads,
 919            **kwargs,
 920        )
 921
 922
 923class ClickhouseConfig(TargetConfig):
 924    """
 925    Project connection and operational configuration for the Clickhouse target
 926
 927    Args:
 928      host: [localhost]
 929      user: [default] # User for all database operations
 930      password: [<empty string>] # Password for the user
 931      secure: [False] # Use TLS (native protocol) or HTTPS (http protocol)
 932      port: [8123]  # If not set, defaults to 8123, 8443 depending on the secure and driver settings
 933      connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse
 934      send_receive_timeout: [300] # Timeout in seconds to receive data from the ClickHouse server
 935      verify: [True] # Validate TLS certificate if using TLS/SSL
 936      cluster: [<empty string>] # If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster.
 937      custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
 938      schema: [default] # ClickHouse database for dbt models, not used by SQLMesh
 939      driver: [http] # http or native.  If not set this will be autodetermined based on port setting, not used by SQLMesh
 940      retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error), not used by SQLMesh
 941      compression: [<empty string>] # Use gzip compression if truthy (http), or compression type for a native connection, not used by SQLMesh
 942      cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud), not used by SQLMesh
 943      use_lw_deletes: [False] # Use the strategy `delete+insert` as the default incremental strategy, not used by SQLMesh
 944      check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. Not used by SQLMesh.
 945      local_suffix: [_local] # Table suffix of local tables on shards for distributed materializations, not used by SQLMesh
 946      local_db_prefix: [<empty string>] # Database prefix of local tables on shards for distributed materializations, not used by SQLMesh
 947      allow_automatic_deduplication: [False] # Enable ClickHouse automatic deduplication for Replicated tables, not used by SQLMesh
 948      tcp_keepalive: [False] # Native client only, specify TCP keepalive configuration. Specify custom keepalive settings as [idle_time_sec, interval_sec, probes], not used by SQLMesh
 949      sync_request_timeout: [5] # Timeout for server ping, not used by SQLMesh
 950      compress_block_size: [1048576] # Compression block size if compression is enabled, not used by SQLMesh
 951    """
 952
 953    host: str = "localhost"
 954    user: str = Field(default="default", alias="username")
 955    password: str = ""
 956    port: t.Optional[int] = None
 957    cluster: t.Optional[str] = None
 958    schema_: str = Field(default="default", alias="schema")
 959    connect_timeout: int = 10
 960    send_receive_timeout: int = 300
 961    verify: bool = True
 962    compression: str = ""
 963    custom_settings: t.Optional[t.Dict[str, t.Any]] = None
 964
 965    # Not used by SQLMesh
 966    driver: t.Optional[str] = None
 967    secure: bool = False
 968    retries: int = 1
 969    database_engine: t.Optional[str] = None
 970    cluster_mode: bool = False
 971    sync_request_timeout: int = 5
 972    compress_block_size: int = 1048576
 973    check_exchange: bool = True
 974    use_lw_deletes: bool = False
 975    allow_automatic_deduplication: bool = False
 976    tcp_keepalive: t.Union[bool, t.Tuple[int, ...], t.List[int]] = False
 977    database: str = ""
 978    local_suffix: str = "local"
 979    local_db_prefix: str = ""
 980
 981    type: t.Literal["clickhouse"] = "clickhouse"
 982
 983    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 984        # dbt-clickhouse name for temp table swap. That is sqlmesh's default
 985        #   strategy so doesn't require special handling during conversion.
 986        return "legacy"
 987
 988    @classproperty
 989    def relation_class(cls) -> t.Type[BaseRelation]:
 990        from dbt.adapters.clickhouse.relation import ClickHouseRelation
 991
 992        return ClickHouseRelation
 993
 994    @classproperty
 995    def column_class(cls) -> t.Type[Column]:
 996        from dbt.adapters.clickhouse.column import ClickHouseColumn
 997
 998        return ClickHouseColumn
 999
1000    @with_schema_differ_overrides
1001    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
1002        return ClickhouseConnectionConfig(
1003            host=self.host,
1004            username=self.user,
1005            password=self.password,
1006            port=self.port,
1007            cluster=self.cluster,
1008            connect_timeout=self.connect_timeout,
1009            send_receive_timeout=self.send_receive_timeout,
1010            verify=self.verify,
1011            compression_method=self.compression,
1012            connection_settings=self.custom_settings,
1013            **kwargs,
1014        )
1015
1016
1017class AthenaConfig(TargetConfig):
1018    """
1019    Project connection and operational configuration for the Athena target.
1020
1021    Args:
1022        s3_staging_dir: S3 location to store Athena query results and metadata
1023        s3_data_dir: Prefix for storing tables, if different from the connection's s3_staging_dir
1024        s3_data_naming: How to generate table paths in s3_data_dir
1025        s3_tmp_table_dir: Prefix for storing temporary tables, if different from the connection's s3_data_dir
1026        region_name: AWS region of your Athena instance
1027        schema: Specify the schema (Athena database) to build models into (lowercase only)
1028        database: Specify the database (Data catalog) to build models into (lowercase only)
1029        poll_interval: Interval in seconds to use for polling the status of query results in Athena
1030        debug_query_state: Flag if debug message with Athena query state is needed
1031        aws_access_key_id: Access key ID of the user performing requests
1032        aws_secret_access_key: Secret access key of the user performing requests
1033        aws_profile_name: Profile to use from your AWS shared credentials file
1034        work_group: Identifier of Athena workgroup
1035        skip_workgroup_check: Indicates if the WorkGroup check (additional AWS call) can be skipped
1036        num_retries: Number of times to retry a failing query
1037        num_boto3_retries: Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables)
1038        num_iceberg_retries: Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR
1039        spark_work_group: Identifier of Athena Spark workgroup for running Python models
1040        seed_s3_upload_args: Dictionary containing boto3 ExtraArgs when uploading to S3
1041        lf_tags_database: Default LF tags for new database if it's created by dbt
1042    """
1043
1044    type: t.Literal["athena"] = "athena"
1045    threads: int = 4
1046
1047    s3_staging_dir: t.Optional[str] = None
1048    s3_data_dir: t.Optional[str] = None
1049    s3_data_naming: t.Optional[str] = None
1050    s3_tmp_table_dir: t.Optional[str] = None
1051    poll_interval: t.Optional[int] = None
1052    debug_query_state: bool = False
1053    work_group: t.Optional[str] = None
1054    skip_workgroup_check: t.Optional[bool] = None
1055    spark_work_group: t.Optional[str] = None
1056
1057    aws_access_key_id: t.Optional[str] = None
1058    aws_secret_access_key: t.Optional[str] = None
1059    aws_profile_name: t.Optional[str] = None
1060    region_name: t.Optional[str] = None
1061
1062    num_retries: t.Optional[int] = None
1063    num_boto3_retries: t.Optional[int] = None
1064    num_iceberg_retries: t.Optional[int] = None
1065
1066    seed_s3_upload_args: t.Dict[str, str] = {}
1067    lf_tags_database: t.Dict[str, str] = {}
1068
1069    @classproperty
1070    def relation_class(cls) -> t.Type[BaseRelation]:
1071        from dbt.adapters.athena.relation import AthenaRelation
1072
1073        return AthenaRelation
1074
1075    @classproperty
1076    def column_class(cls) -> t.Type[Column]:
1077        from dbt.adapters.athena.column import AthenaColumn
1078
1079        return AthenaColumn
1080
1081    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
1082        return "insert_overwrite"
1083
1084    @with_schema_differ_overrides
1085    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
1086        return AthenaConnectionConfig(
1087            type="athena",
1088            aws_access_key_id=self.aws_access_key_id,
1089            aws_secret_access_key=self.aws_secret_access_key,
1090            region_name=self.region_name,
1091            work_group=self.work_group,
1092            s3_staging_dir=self.s3_staging_dir,
1093            s3_warehouse_location=self.s3_data_dir,
1094            schema_name=self.schema_,
1095            catalog_name=self.database,
1096            concurrent_tasks=self.threads,
1097            **kwargs,
1098        )
1099
1100
1101TARGET_TYPE_TO_CONFIG_CLASS: t.Dict[str, t.Type[TargetConfig]] = {
1102    "databricks": DatabricksConfig,
1103    "duckdb": DuckDbConfig,
1104    "postgres": PostgresConfig,
1105    "redshift": RedshiftConfig,
1106    "snowflake": SnowflakeConfig,
1107    "bigquery": BigQueryConfig,
1108    "sqlserver": MSSQLConfig,
1109    "tsql": MSSQLConfig,
1110    "trino": TrinoConfig,
1111    "athena": AthenaConfig,
1112    "clickhouse": ClickhouseConfig,
1113}
SERIALIZABLE_FIELDS = {'name', 'schema_', 'database', 'project', 'role', 'user', 'warehouse', 'dbname', 'port', 'host', 'dataset', 'account', 'type', 'threads'}
SCHEMA_DIFFER_OVERRIDES = {'schema_differ_overrides': {'treat_alter_data_type_as_destructive': True, 'nested_support': <NestedSupport.IGNORE: 'IGNORE'>}}
def with_schema_differ_overrides( func: Callable[..., sqlmesh.core.config.connection.ConnectionConfig]) -> Callable[..., sqlmesh.core.config.connection.ConnectionConfig]:
77def with_schema_differ_overrides(
78    func: t.Callable[..., ConnectionConfig],
79) -> t.Callable[..., ConnectionConfig]:
80    """Decorator that merges default config with kwargs."""
81
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)
85
86    return wrapper

Decorator that merges default config with kwargs.

class TargetConfig(abc.ABC, sqlmesh.dbt.common.DbtConfig):
 89class TargetConfig(abc.ABC, DbtConfig):
 90    """
 91    Configuration for DBT profile target
 92
 93    Args:
 94        type: The type of the data warehouse
 95        name: The name of this target
 96        database: Name of the database
 97        schema_: Name of the schema
 98        threads: The number of threads to run on
 99    """
100
101    # dbt
102    type: str = "none"
103    name: str
104    database: str
105    schema_: str = Field(alias="schema")
106    threads: int = 1
107    profile_name: t.Optional[str] = None
108
109    @classmethod
110    def load(cls, data: t.Dict[str, t.Any]) -> TargetConfig:
111        """
112        Loads the configuration from the yaml provided for a profile target
113
114        Args:
115            data: The yaml for the project's target output
116
117        Returns:
118            The configuration of the provided profile target
119        """
120        db_type = data["type"]
121        if db_type == "databricks":
122            return DatabricksConfig(**data)
123        if db_type == "duckdb":
124            return DuckDbConfig(**data)
125        if db_type == "postgres":
126            return PostgresConfig(**data)
127        if db_type == "redshift":
128            return RedshiftConfig(**data)
129        if db_type == "snowflake":
130            return SnowflakeConfig(**data)
131        if db_type == "bigquery":
132            return BigQueryConfig(**data)
133        if db_type == "sqlserver":
134            return MSSQLConfig(**data)
135        if db_type == "trino":
136            return TrinoConfig(**data)
137        if db_type == "clickhouse":
138            return ClickhouseConfig(**data)
139        if db_type == "athena":
140            return AthenaConfig(**data)
141
142        raise ConfigError(f"{db_type} not supported.")
143
144    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
145        """The default incremental strategy for the db"""
146        raise NotImplementedError
147
148    @with_schema_differ_overrides
149    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
150        """Converts target config to SQLMesh connection config"""
151        raise NotImplementedError
152
153    def attribute_dict(self) -> AttributeDict:
154        fields = self.dict(include=SERIALIZABLE_FIELDS).copy()
155        fields["target_name"] = self.name
156        return AttributeDict(fields)
157
158    @classproperty
159    def quote_policy(cls) -> Policy:
160        return Policy()
161
162    @property
163    def extra(self) -> t.Set[str]:
164        return self.extra_fields(set(self.dict()))
165
166    @classproperty
167    def relation_class(cls) -> t.Type[BaseRelation]:
168        return BaseRelation
169
170    @classproperty
171    def column_class(cls) -> t.Type[Column]:
172        return Column
173
174    @property
175    def dialect(self) -> str:
176        return self.type

Configuration for DBT profile target

Arguments:
  • type: The type of the data warehouse
  • name: The name of this target
  • database: Name of the database
  • schema_: Name of the schema
  • threads: The number of threads to run on
type: str
name: str
database: str
schema_: str
threads: int
profile_name: Optional[str]
@classmethod
def load(cls, data: Dict[str, Any]) -> TargetConfig:
109    @classmethod
110    def load(cls, data: t.Dict[str, t.Any]) -> TargetConfig:
111        """
112        Loads the configuration from the yaml provided for a profile target
113
114        Args:
115            data: The yaml for the project's target output
116
117        Returns:
118            The configuration of the provided profile target
119        """
120        db_type = data["type"]
121        if db_type == "databricks":
122            return DatabricksConfig(**data)
123        if db_type == "duckdb":
124            return DuckDbConfig(**data)
125        if db_type == "postgres":
126            return PostgresConfig(**data)
127        if db_type == "redshift":
128            return RedshiftConfig(**data)
129        if db_type == "snowflake":
130            return SnowflakeConfig(**data)
131        if db_type == "bigquery":
132            return BigQueryConfig(**data)
133        if db_type == "sqlserver":
134            return MSSQLConfig(**data)
135        if db_type == "trino":
136            return TrinoConfig(**data)
137        if db_type == "clickhouse":
138            return ClickhouseConfig(**data)
139        if db_type == "athena":
140            return AthenaConfig(**data)
141
142        raise ConfigError(f"{db_type} not supported.")

Loads the configuration from the yaml provided for a profile target

Arguments:
  • data: The yaml for the project's target output
Returns:

The configuration of the provided profile target

144    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
145        """The default incremental strategy for the db"""
146        raise NotImplementedError

The default incremental strategy for the db

def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

def attribute_dict(self) -> sqlmesh.utils.AttributeDict:
153    def attribute_dict(self) -> AttributeDict:
154        fields = self.dict(include=SERIALIZABLE_FIELDS).copy()
155        fields["target_name"] = self.name
156        return AttributeDict(fields)
quote_policy: dbt.adapters.contracts.relation.Policy
158    @classproperty
159    def quote_policy(cls) -> Policy:
160        return Policy()

Policy(database: bool = True, schema: bool = True, identifier: bool = True)

extra: Set[str]
162    @property
163    def extra(self) -> t.Set[str]:
164        return self.extra_fields(set(self.dict()))
relation_class: Type[dbt.adapters.base.relation.BaseRelation]
166    @classproperty
167    def relation_class(cls) -> t.Type[BaseRelation]:
168        return BaseRelation

BaseRelation(path: dbt.adapters.contracts.relation.Path, type: Optional[dbt.adapters.contracts.relation.RelationType] = None, quote_character: str = '"', include_policy: dbt.adapters.contracts.relation.Policy = , quote_policy: dbt.adapters.contracts.relation.Policy = , dbt_created: bool = False, limit: Optional[int] = None, event_time_filter: Optional[dbt.adapters.base.relation.EventTimeFilter] = None, require_alias: bool = True, catalog: Optional[str] = None, renameable_relations: Union[Tuple, FrozenSet] = , replaceable_relations: Union[Tuple, FrozenSet] = )

column_class: Type[dbt.adapters.base.column.Column]
170    @classproperty
171    def column_class(cls) -> t.Type[Column]:
172        return Column

Column(column: str, dtype: str, char_size: Optional[int] = None, numeric_precision: Optional[Any] = None, numeric_scale: Optional[Any] = None)

dialect: str
174    @property
175    def dialect(self) -> str:
176        return self.type
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
DUCKDB_IN_MEMORY = ':memory:'
class DuckDbConfig(TargetConfig):
182class DuckDbConfig(TargetConfig):
183    """
184    Connection config for DuckDb target
185
186    Args:
187        path: Location of the database file. If not specified, an in memory database is used.
188        extensions: A list of autoloadable extensions to load.
189        settings: A dictionary of settings to pass into the duckdb connector.
190        secrets: A list of secrets to pass to the secret manager in the duckdb connector.
191        filesystems: A list of `fsspec` filesystems to register in the duckdb connection.
192    """
193
194    type: t.Literal["duckdb"] = "duckdb"
195    database: str = "main"
196    schema_: str = Field(default="main", alias="schema")
197    path: str = DUCKDB_IN_MEMORY
198    extensions: t.Optional[t.List[str]] = None
199    settings: t.Optional[t.Dict[str, t.Any]] = None
200    secrets: t.Optional[t.List[t.Dict[str, t.Any]]] = None
201    filesystems: t.Optional[t.List[t.Dict[str, t.Any]]] = None
202
203    @model_validator(mode="before")
204    def validate_authentication(cls, data: t.Any) -> t.Any:
205        if not isinstance(data, dict):
206            return data
207
208        if "database" not in data and DBT_VERSION >= (1, 5, 0):
209            path = data.get("path")
210            data["database"] = (
211                "memory"
212                if path is None or path == DUCKDB_IN_MEMORY
213                else Path(t.cast(str, path)).stem
214            )
215
216        if "threads" in data and t.cast(int, data["threads"]) > 1:
217            get_console().log_warning("DuckDB does not support concurrency - setting threads to 1.")
218
219        return data
220
221    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
222        return "delete+insert"
223
224    @classproperty
225    def relation_class(cls) -> t.Type[BaseRelation]:
226        from dbt.adapters.duckdb.relation import DuckDBRelation
227
228        return DuckDBRelation
229
230    @with_schema_differ_overrides
231    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
232        if self.extensions is not None:
233            kwargs["extensions"] = self.extensions
234        if self.settings is not None:
235            kwargs["connector_config"] = self.settings
236        if self.secrets is not None:
237            kwargs["secrets"] = self.secrets
238        if self.filesystems is not None:
239            kwargs["filesystems"] = self.filesystems
240        return DuckDBConnectionConfig(
241            database=self.path,
242            concurrent_tasks=1,
243            **kwargs,
244        )

Connection config for DuckDb target

Arguments:
  • path: Location of the database file. If not specified, an in memory database is used.
  • extensions: A list of autoloadable extensions to load.
  • settings: A dictionary of settings to pass into the duckdb connector.
  • secrets: A list of secrets to pass to the secret manager in the duckdb connector.
  • filesystems: A list of fsspec filesystems to register in the duckdb connection.
type: Literal['duckdb']
database: str
schema_: str
path: str
extensions: Optional[List[str]]
settings: Optional[Dict[str, Any]]
secrets: Optional[List[Dict[str, Any]]]
filesystems: Optional[List[Dict[str, Any]]]
@model_validator(mode='before')
def validate_authentication(cls, data: Any) -> Any:
203    @model_validator(mode="before")
204    def validate_authentication(cls, data: t.Any) -> t.Any:
205        if not isinstance(data, dict):
206            return data
207
208        if "database" not in data and DBT_VERSION >= (1, 5, 0):
209            path = data.get("path")
210            data["database"] = (
211                "memory"
212                if path is None or path == DUCKDB_IN_MEMORY
213                else Path(t.cast(str, path)).stem
214            )
215
216        if "threads" in data and t.cast(int, data["threads"]) > 1:
217            get_console().log_warning("DuckDB does not support concurrency - setting threads to 1.")
218
219        return data
221    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
222        return "delete+insert"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
224    @classproperty
225    def relation_class(cls) -> t.Type[BaseRelation]:
226        from dbt.adapters.duckdb.relation import DuckDBRelation
227
228        return DuckDBRelation
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
threads
profile_name
load
attribute_dict
quote_policy
extra
column_class
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnowflakeConfig(TargetConfig):
247class SnowflakeConfig(TargetConfig):
248    """
249    Project connection and operational configuration for the Snowflake target
250
251    Args:
252        account: Snowflake account
253        warehouse: Name of the warehouse
254        user: Name of the user
255        password: User's password
256        role: Role of the user
257        client_session_keep_alive: A boolean flag to extend the duration of the Snowflake session beyond 4 hours
258        query_tag: tag for the query in Snowflake
259        connect_retries: Number of times to retry if the Snowflake connector encounters an error
260        connect_timeout: Number of seconds to wait between failed attempts
261        retry_on_database_errors: A boolean flag to retry if a Snowflake connector Database error is encountered
262        retry_all: A boolean flag to retry on all Snowflake connector errors
263        authenticator: SSO authentication: Snowflake authentication method
264        private_key: Key pair authentication: Private key
265        private_key_path: Key pair authentication: Path to the private key, used instead of private_key
266        private_key_passphrase: Key pair authentication: passphrase used to decrypt private key (if encrypted)
267        token: OAuth authentication: The Snowflake OAuth 2.0 access token
268    """
269
270    type: t.Literal["snowflake"] = "snowflake"
271    account: str
272    user: str
273
274    # User and password authentication
275    password: t.Optional[str] = None
276
277    # SSO authentication
278    authenticator: t.Optional[str] = None
279
280    # Key Pair Auth
281    private_key: t.Optional[str] = None
282    private_key_path: t.Optional[str] = None
283    private_key_passphrase: t.Optional[str] = None
284
285    # TODO add other forms of authentication
286
287    # oauth access token
288    token: t.Optional[str] = None
289
290    # Optional
291    warehouse: t.Optional[str] = None
292    role: t.Optional[str] = None
293    client_session_keep_alive: bool = False
294    query_tag: t.Optional[str] = None
295    connect_retries: int = 0
296    connect_timeout: int = 10
297    retry_on_database_errors: bool = False
298    retry_all: bool = False
299
300    @model_validator(mode="before")
301    @classmethod
302    def validate_authentication(cls, data: t.Any) -> t.Any:
303        if not isinstance(data, dict) or (
304            data.get("password")
305            or data.get("authenticator")
306            or data.get("private_key")
307            or data.get("private_key_path")
308        ):
309            return data
310
311        raise ConfigError("No supported Snowflake authentication method found in target profile.")
312
313    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
314        return "merge"
315
316    @classproperty
317    def relation_class(cls) -> t.Type[BaseRelation]:
318        from dbt.adapters.snowflake import SnowflakeRelation
319
320        return SnowflakeRelation
321
322    @classproperty
323    def column_class(cls) -> t.Type[Column]:
324        from dbt.adapters.snowflake import SnowflakeColumn
325
326        return SnowflakeColumn
327
328    @with_schema_differ_overrides
329    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
330        return SnowflakeConnectionConfig(
331            user=self.user,
332            password=self.password,
333            authenticator=self.authenticator,
334            account=self.account,
335            warehouse=self.warehouse,
336            database=self.database,
337            role=self.role,
338            concurrent_tasks=self.threads,
339            token=self.token,
340            private_key=self.private_key,
341            private_key_path=self.private_key_path,
342            private_key_passphrase=self.private_key_passphrase,
343            **kwargs,
344        )
345
346    @classproperty
347    def quote_policy(cls) -> Policy:
348        return Policy(database=False, schema=False, identifier=False)

Project connection and operational configuration for the Snowflake target

Arguments:
  • account: Snowflake account
  • warehouse: Name of the warehouse
  • user: Name of the user
  • password: User's password
  • role: Role of the user
  • client_session_keep_alive: A boolean flag to extend the duration of the Snowflake session beyond 4 hours
  • query_tag: tag for the query in Snowflake
  • connect_retries: Number of times to retry if the Snowflake connector encounters an error
  • connect_timeout: Number of seconds to wait between failed attempts
  • retry_on_database_errors: A boolean flag to retry if a Snowflake connector Database error is encountered
  • retry_all: A boolean flag to retry on all Snowflake connector errors
  • authenticator: SSO authentication: Snowflake authentication method
  • private_key: Key pair authentication: Private key
  • private_key_path: Key pair authentication: Path to the private key, used instead of private_key
  • private_key_passphrase: Key pair authentication: passphrase used to decrypt private key (if encrypted)
  • token: OAuth authentication: The Snowflake OAuth 2.0 access token
type: Literal['snowflake']
account: str
user: str
password: Optional[str]
authenticator: Optional[str]
private_key: Optional[str]
private_key_path: Optional[str]
private_key_passphrase: Optional[str]
token: Optional[str]
warehouse: Optional[str]
role: Optional[str]
client_session_keep_alive: bool
query_tag: Optional[str]
connect_retries: int
connect_timeout: int
retry_on_database_errors: bool
retry_all: bool
@model_validator(mode='before')
@classmethod
def validate_authentication(cls, data: Any) -> Any:
300    @model_validator(mode="before")
301    @classmethod
302    def validate_authentication(cls, data: t.Any) -> t.Any:
303        if not isinstance(data, dict) or (
304            data.get("password")
305            or data.get("authenticator")
306            or data.get("private_key")
307            or data.get("private_key_path")
308        ):
309            return data
310
311        raise ConfigError("No supported Snowflake authentication method found in target profile.")
313    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
314        return "merge"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
316    @classproperty
317    def relation_class(cls) -> t.Type[BaseRelation]:
318        from dbt.adapters.snowflake import SnowflakeRelation
319
320        return SnowflakeRelation
column_class: Type[dbt.adapters.base.column.Column]
322    @classproperty
323    def column_class(cls) -> t.Type[Column]:
324        from dbt.adapters.snowflake import SnowflakeColumn
325
326        return SnowflakeColumn
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

quote_policy: dbt.adapters.contracts.relation.Policy
346    @classproperty
347    def quote_policy(cls) -> Policy:
348        return Policy(database=False, schema=False, identifier=False)

Policy(database: bool = True, schema: bool = True, identifier: bool = True)

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
database
schema_
threads
profile_name
load
attribute_dict
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class PostgresConfig(TargetConfig):
351class PostgresConfig(TargetConfig):
352    """
353    Project connection and operational configuration for the Postgres target
354
355    Args:
356        host: The Postgres host to connect to
357        user: Name of the user
358        password: User's password
359        port: The port to connect to
360        dbname: Name of the database
361        keepalives_idle: Seconds between TCP keepalive packets
362        connect_timeout: Number of seconds to wait between failed attempts
363        retries: Number of times to retry if the Postgres connector encounters an error
364        search_path: Overrides the default search path
365        role: Role of the user
366        sslmode: SSL Mode used to connect to the database
367    """
368
369    type: t.Literal["postgres"] = "postgres"
370    host: str
371    user: str
372    password: str = Field(validation_alias=AliasChoices("pass", "password"))
373    port: int
374    dbname: str
375    keepalives_idle: t.Optional[int] = None
376    connect_timeout: int = 10
377    retries: int = 1  # Currently Unsupported by SQLMesh
378    search_path: t.Optional[str] = None  # Currently Unsupported by SQLMesh
379    role: t.Optional[str] = None
380    sslmode: t.Optional[str] = None
381
382    @model_validator(mode="before")
383    @classmethod
384    def validate_database(cls, data: t.Any) -> t.Any:
385        if not isinstance(data, dict):
386            return data
387
388        data["database"] = data.get("database") or data.get("dbname")
389        if not data["database"]:
390            raise ConfigError("Either database or dbname must be set")
391
392        return data
393
394    @field_validator("port")
395    @classmethod
396    def _validate_port(cls, v: t.Union[int, str]) -> int:
397        return int(v)
398
399    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
400        return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"
401
402    @with_schema_differ_overrides
403    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
404        return PostgresConnectionConfig(
405            host=self.host,
406            user=self.user,
407            password=self.password,
408            port=self.port,
409            database=self.dbname,
410            keepalives_idle=self.keepalives_idle,
411            concurrent_tasks=self.threads,
412            connect_timeout=self.connect_timeout,
413            role=self.role,
414            sslmode=self.sslmode,
415            **kwargs,
416        )

Project connection and operational configuration for the Postgres target

Arguments:
  • host: The Postgres host to connect to
  • user: Name of the user
  • password: User's password
  • port: The port to connect to
  • dbname: Name of the database
  • keepalives_idle: Seconds between TCP keepalive packets
  • connect_timeout: Number of seconds to wait between failed attempts
  • retries: Number of times to retry if the Postgres connector encounters an error
  • search_path: Overrides the default search path
  • role: Role of the user
  • sslmode: SSL Mode used to connect to the database
type: Literal['postgres']
host: str
user: str
password: str
port: int
dbname: str
keepalives_idle: Optional[int]
connect_timeout: int
retries: int
search_path: Optional[str]
role: Optional[str]
sslmode: Optional[str]
@model_validator(mode='before')
@classmethod
def validate_database(cls, data: Any) -> Any:
382    @model_validator(mode="before")
383    @classmethod
384    def validate_database(cls, data: t.Any) -> t.Any:
385        if not isinstance(data, dict):
386            return data
387
388        data["database"] = data.get("database") or data.get("dbname")
389        if not data["database"]:
390            raise ConfigError("Either database or dbname must be set")
391
392        return data
399    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
400        return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"

The default incremental strategy for the db

def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
database
schema_
threads
profile_name
load
attribute_dict
quote_policy
extra
relation_class
column_class
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class RedshiftConfig(TargetConfig):
419class RedshiftConfig(TargetConfig):
420    """
421    Project connection and operational configuration for the Redshift target
422
423    Args:
424        host: The Redshift host to connect to
425        user: Name of the user
426        password: User's password
427        port: The port to connect to
428        dbname: Name of the database
429        connect_timeout: Number of seconds to wait between failed attempts
430        ra3_node: Enables cross-database sources
431        search_path: Overrides the default search path
432        sslmode: SSL Mode used to connect to the database
433    """
434
435    # TODO add other forms of authentication
436    type: t.Literal["redshift"] = "redshift"
437    host: str
438    user: str
439    password: str = Field(validation_alias=AliasChoices("pass", "password"))
440    port: int
441    dbname: str
442    connect_timeout: t.Optional[int] = None
443    ra3_node: bool = True
444    search_path: t.Optional[str] = None
445    sslmode: t.Optional[str] = None
446
447    @model_validator(mode="before")
448    @classmethod
449    def validate_database(cls, data: t.Any) -> t.Any:
450        if not isinstance(data, dict):
451            return data
452
453        data["database"] = data.get("database") or data.get("dbname")
454        if not data["database"]:
455            raise ConfigError("Either database or dbname must be set")
456
457        return data
458
459    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
460        return "append"
461
462    @classproperty
463    def relation_class(cls) -> t.Type[BaseRelation]:
464        from dbt.adapters.redshift import RedshiftRelation
465
466        return RedshiftRelation
467
468    @classproperty
469    def column_class(cls) -> t.Type[Column]:
470        if DBT_VERSION < (1, 6, 0):
471            from dbt.adapters.redshift import RedshiftColumn  # type: ignore
472
473            return RedshiftColumn
474        return super(RedshiftConfig, cls).column_class
475
476    @with_schema_differ_overrides
477    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
478        return RedshiftConnectionConfig(
479            user=self.user,
480            password=self.password,
481            database=self.database,
482            host=self.host,
483            port=self.port,
484            sslmode=self.sslmode,
485            timeout=self.connect_timeout,
486            concurrent_tasks=self.threads,
487            **kwargs,
488        )

Project connection and operational configuration for the Redshift target

Arguments:
  • host: The Redshift host to connect to
  • user: Name of the user
  • password: User's password
  • port: The port to connect to
  • dbname: Name of the database
  • connect_timeout: Number of seconds to wait between failed attempts
  • ra3_node: Enables cross-database sources
  • search_path: Overrides the default search path
  • sslmode: SSL Mode used to connect to the database
type: Literal['redshift']
host: str
user: str
password: str
port: int
dbname: str
connect_timeout: Optional[int]
ra3_node: bool
search_path: Optional[str]
sslmode: Optional[str]
@model_validator(mode='before')
@classmethod
def validate_database(cls, data: Any) -> Any:
447    @model_validator(mode="before")
448    @classmethod
449    def validate_database(cls, data: t.Any) -> t.Any:
450        if not isinstance(data, dict):
451            return data
452
453        data["database"] = data.get("database") or data.get("dbname")
454        if not data["database"]:
455            raise ConfigError("Either database or dbname must be set")
456
457        return data
459    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
460        return "append"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
462    @classproperty
463    def relation_class(cls) -> t.Type[BaseRelation]:
464        from dbt.adapters.redshift import RedshiftRelation
465
466        return RedshiftRelation
column_class: Type[dbt.adapters.base.column.Column]
468    @classproperty
469    def column_class(cls) -> t.Type[Column]:
470        if DBT_VERSION < (1, 6, 0):
471            from dbt.adapters.redshift import RedshiftColumn  # type: ignore
472
473            return RedshiftColumn
474        return super(RedshiftConfig, cls).column_class

Column(column: str, dtype: str, char_size: Optional[int] = None, numeric_precision: Optional[Any] = None, numeric_scale: Optional[Any] = None)

def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
database
schema_
threads
profile_name
load
attribute_dict
quote_policy
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class DatabricksConfig(TargetConfig):
491class DatabricksConfig(TargetConfig):
492    """
493    Project connection and operational configuration for the Databricks target
494
495    Args:
496        catalog: Catalog name to use for Unity Catalog
497        host: The Databricks host to connect to
498        http_path: The Databricks compute resources URL
499        token: Personal access token
500        database: Name of the database. Not applicable for Databricks and ignored
501    """
502
503    type: t.Literal["databricks"] = "databricks"
504    host: str
505    http_path: str
506    token: t.Optional[str] = None  # only required if auth_type is not set to 'oauth'
507    database: t.Optional[str] = Field(alias="catalog")  # type: ignore
508    auth_type: t.Optional[str] = None
509    client_id: t.Optional[str] = None
510    client_secret: t.Optional[str] = None
511
512    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
513        return "merge"
514
515    @classproperty
516    def relation_class(cls) -> t.Type[BaseRelation]:
517        from dbt.adapters.databricks.relation import DatabricksRelation
518
519        return DatabricksRelation
520
521    @classproperty
522    def column_class(cls) -> t.Type[Column]:
523        from dbt.adapters.databricks.column import DatabricksColumn
524
525        return DatabricksColumn
526
527    @with_schema_differ_overrides
528    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
529        return DatabricksConnectionConfig(
530            server_hostname=self.host,
531            http_path=self.http_path,
532            access_token=self.token,
533            concurrent_tasks=self.threads,
534            catalog=self.database,
535            auth_type="databricks-oauth" if self.auth_type == "oauth" else self.auth_type,
536            oauth_client_id=self.client_id,
537            oauth_client_secret=self.client_secret,
538            **kwargs,
539        )

Project connection and operational configuration for the Databricks target

Arguments:
  • catalog: Catalog name to use for Unity Catalog
  • host: The Databricks host to connect to
  • http_path: The Databricks compute resources URL
  • token: Personal access token
  • database: Name of the database. Not applicable for Databricks and ignored
type: Literal['databricks']
host: str
http_path: str
token: Optional[str]
database: Optional[str]
auth_type: Optional[str]
client_id: Optional[str]
client_secret: Optional[str]
512    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
513        return "merge"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
515    @classproperty
516    def relation_class(cls) -> t.Type[BaseRelation]:
517        from dbt.adapters.databricks.relation import DatabricksRelation
518
519        return DatabricksRelation
column_class: Type[dbt.adapters.base.column.Column]
521    @classproperty
522    def column_class(cls) -> t.Type[Column]:
523        from dbt.adapters.databricks.column import DatabricksColumn
524
525        return DatabricksColumn
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
schema_
threads
profile_name
load
attribute_dict
quote_policy
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BigQueryConfig(TargetConfig):
542class BigQueryConfig(TargetConfig):
543    """
544    Project connection and operational configuration for the BigQuery target
545
546    Args:
547        type: The type of the target (bigquery)
548        method: The BigQuery authentication method to use
549        project: The BigQuery project to connect to
550        location: The BigQuery location to connect to
551        keyfile: The path to the BigQuery keyfile
552        keyfile_json: The BigQuery keyfile as a JSON string
553        token: The BigQuery token
554        refresh_token: The BigQuery refresh token
555        client_id: The BigQuery client ID
556        client_secret: The BigQuery client secret
557        token_uri: The BigQuery token URI
558        scopes: The BigQuery scopes
559        impersonated_service_account: The service account to impersonate
560        job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created
561        job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete
562        timeout_seconds: Alias for job_execution_timeout_seconds
563        job_retries: The number of times to retry the underlying job if it fails
564        retries: Alias for job_retries
565        job_retry_deadline_seconds: Total number of seconds to wait while retrying the same query
566        priority: The priority of the underlying job
567        maximum_bytes_billed: The maximum number of bytes to be billed for the underlying job
568    """
569
570    type: t.Literal["bigquery"] = "bigquery"
571    method: t.Optional[str] = BigQueryConnectionMethod.OAUTH
572    dataset: t.Optional[str] = None
573    project: t.Optional[str] = None
574    execution_project: t.Optional[str] = None
575    quota_project: t.Optional[str] = None
576    location: t.Optional[str] = None
577    keyfile: t.Optional[str] = None
578    keyfile_json: t.Optional[t.Dict[str, t.Any]] = None
579    token: t.Optional[str] = None
580    refresh_token: t.Optional[str] = None
581    client_id: t.Optional[str] = None
582    client_secret: t.Optional[str] = None
583    token_uri: t.Optional[str] = None
584    scopes: t.Tuple[str, ...] = (
585        "https://www.googleapis.com/auth/bigquery",
586        "https://www.googleapis.com/auth/cloud-platform",
587        "https://www.googleapis.com/auth/drive",
588    )
589    impersonated_service_account: t.Optional[str] = None
590    job_creation_timeout_seconds: t.Optional[int] = None
591    job_execution_timeout_seconds: t.Optional[int] = None
592    timeout_seconds: t.Optional[int] = None  # To support legacy config
593    job_retries: t.Optional[int] = None
594    retries: int = 1  # To support legacy config
595    job_retry_deadline_seconds: t.Optional[int] = None
596    priority: BigQueryPriority = BigQueryPriority.INTERACTIVE
597    maximum_bytes_billed: t.Optional[int] = None
598
599    @model_validator(mode="before")
600    @classmethod
601    def validate_fields(cls, data: t.Any) -> t.Any:
602        if not isinstance(data, dict):
603            return data
604
605        # dbt treats schema and dataset interchangeably
606        schema = data.get("schema") or data.get("dataset")
607        if not schema:
608            raise ConfigError("Either schema or dataset must be set")
609        data["dataset"] = data["schema"] = schema
610
611        # dbt treats database and project interchangeably
612        database = data.get("database") or data.get("project")
613        if not database:
614            raise ConfigError("Either database or project must be set")
615        data["database"] = data["project"] = database
616
617        return data
618
619    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
620        return "merge"
621
622    @classproperty
623    def relation_class(cls) -> t.Type[BaseRelation]:
624        from dbt.adapters.bigquery.relation import BigQueryRelation
625
626        return BigQueryRelation
627
628    @classproperty
629    def column_class(cls) -> t.Type[Column]:
630        from dbt.adapters.bigquery import BigQueryColumn
631
632        return BigQueryColumn
633
634    @with_schema_differ_overrides
635    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
636        job_retries = self.job_retries if self.job_retries is not None else self.retries
637        job_execution_timeout_seconds = (
638            self.job_execution_timeout_seconds
639            if self.job_execution_timeout_seconds is not None
640            else self.timeout_seconds
641        )
642        return BigQueryConnectionConfig(
643            method=self.method,
644            project=self.database,
645            execution_project=self.execution_project,
646            quota_project=self.quota_project,
647            location=self.location,
648            concurrent_tasks=self.threads,
649            keyfile=self.keyfile,
650            keyfile_json=self.keyfile_json,
651            token=self.token,
652            refresh_token=self.refresh_token,
653            client_id=self.client_id,
654            client_secret=self.client_secret,
655            token_uri=self.token_uri,
656            scopes=self.scopes,
657            impersonated_service_account=self.impersonated_service_account,
658            job_creation_timeout_seconds=self.job_creation_timeout_seconds,
659            job_execution_timeout_seconds=job_execution_timeout_seconds,
660            job_retries=job_retries,
661            job_retry_deadline_seconds=self.job_retry_deadline_seconds,
662            priority=self.priority,
663            maximum_bytes_billed=self.maximum_bytes_billed,
664            **kwargs,
665        )

Project connection and operational configuration for the BigQuery target

Arguments:
  • type: The type of the target (bigquery)
  • method: The BigQuery authentication method to use
  • project: The BigQuery project to connect to
  • location: The BigQuery location to connect to
  • keyfile: The path to the BigQuery keyfile
  • keyfile_json: The BigQuery keyfile as a JSON string
  • token: The BigQuery token
  • refresh_token: The BigQuery refresh token
  • client_id: The BigQuery client ID
  • client_secret: The BigQuery client secret
  • token_uri: The BigQuery token URI
  • scopes: The BigQuery scopes
  • impersonated_service_account: The service account to impersonate
  • job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created
  • job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete
  • timeout_seconds: Alias for job_execution_timeout_seconds
  • job_retries: The number of times to retry the underlying job if it fails
  • retries: Alias for job_retries
  • job_retry_deadline_seconds: Total number of seconds to wait while retrying the same query
  • priority: The priority of the underlying job
  • maximum_bytes_billed: The maximum number of bytes to be billed for the underlying job
type: Literal['bigquery']
method: Optional[str]
dataset: Optional[str]
project: Optional[str]
execution_project: Optional[str]
quota_project: Optional[str]
location: Optional[str]
keyfile: Optional[str]
keyfile_json: Optional[Dict[str, Any]]
token: Optional[str]
refresh_token: Optional[str]
client_id: Optional[str]
client_secret: Optional[str]
token_uri: Optional[str]
scopes: Tuple[str, ...]
impersonated_service_account: Optional[str]
job_creation_timeout_seconds: Optional[int]
job_execution_timeout_seconds: Optional[int]
timeout_seconds: Optional[int]
job_retries: Optional[int]
retries: int
job_retry_deadline_seconds: Optional[int]
maximum_bytes_billed: Optional[int]
@model_validator(mode='before')
@classmethod
def validate_fields(cls, data: Any) -> Any:
599    @model_validator(mode="before")
600    @classmethod
601    def validate_fields(cls, data: t.Any) -> t.Any:
602        if not isinstance(data, dict):
603            return data
604
605        # dbt treats schema and dataset interchangeably
606        schema = data.get("schema") or data.get("dataset")
607        if not schema:
608            raise ConfigError("Either schema or dataset must be set")
609        data["dataset"] = data["schema"] = schema
610
611        # dbt treats database and project interchangeably
612        database = data.get("database") or data.get("project")
613        if not database:
614            raise ConfigError("Either database or project must be set")
615        data["database"] = data["project"] = database
616
617        return data
619    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
620        return "merge"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
622    @classproperty
623    def relation_class(cls) -> t.Type[BaseRelation]:
624        from dbt.adapters.bigquery.relation import BigQueryRelation
625
626        return BigQueryRelation
column_class: Type[dbt.adapters.base.column.Column]
628    @classproperty
629    def column_class(cls) -> t.Type[Column]:
630        from dbt.adapters.bigquery import BigQueryColumn
631
632        return BigQueryColumn
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
database
schema_
threads
profile_name
load
attribute_dict
quote_policy
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class MSSQLConfig(TargetConfig):
668class MSSQLConfig(TargetConfig):
669    """
670    Project connection and operational configuration for the SQL Server (MSSQL) target
671
672    Args:
673        host: The MSSQL server host to connect to
674        server: Alias for host
675        port: The MSSQL server port to connect to
676        user: User name for authentication
677        username: Alias for user
678        UID: Alias for user
679        password: User password for authentication
680        PWD: Alias for password
681        login_timeout: The number of seconds to wait for a login to complete
682        query_timeout: The number of seconds to wait for a query to complete
683        authentication: The authentication method to use (only "sql" is supported)
684        schema_authorization: The principal who should own created schemas, not supported by SQLMesh
685        driver: ODBC driver to use, not used by SQLMesh
686        encrypt: A boolean flag to enable server connection encryption, not used by SQLMesh
687        trust_cert: A boolean flag to trust the server certificate, not used by SQLMesh
688        retries: Number of times to retry if the SQL Server connector encounters an error, not used by SQLMesh
689        windows_login: A boolean flag to use Windows Authentication, not used by SQLMesh
690        tenant_id: The tenant ID of the Azure Active Directory instance, not used by SQLMesh
691        client_id: The client ID of the Azure Active Directory service principal, not used by SQLMesh
692        client_secret: The client secret of the Azure Active Directory service principal, not used by SQLMesh
693    """
694
695    type: t.Literal["sqlserver"] = "sqlserver"
696    host: t.Optional[str] = None
697    server: t.Optional[str] = None
698    port: int = 1433
699    database: str = Field(default="master")
700    schema_: str = Field(default="dbo", alias="schema")
701    user: t.Optional[str] = None
702    username: t.Optional[str] = None
703    UID: t.Optional[str] = None
704    password: t.Optional[str] = None
705    PWD: t.Optional[str] = None
706    threads: int = 4
707    login_timeout: t.Optional[int] = None
708    query_timeout: t.Optional[int] = None
709    authentication: t.Optional[str] = "sql"
710    schema_authorization: t.Optional[str] = None  # Not supported by SQLMesh
711
712    # Unused ODBC parameters (SQLMesh uses pymssql instead of ODBC)
713    driver: t.Optional[str] = None
714    encrypt: t.Optional[bool] = None
715    trust_cert: t.Optional[bool] = None
716    retries: t.Optional[int] = None
717
718    # Unused authentication parameters (not supported by pymssql)
719    windows_login: t.Optional[bool] = None  # pymssql doesn't require this flag for Windows Auth
720    tenant_id: t.Optional[str] = None  # Azure Active Directory auth
721    client_id: t.Optional[str] = None  # Azure Active Directory auth
722    client_secret: t.Optional[str] = None  # Azure Active Directory auth
723
724    @model_validator(mode="before")
725    @classmethod
726    def validate_alias_fields(cls, data: t.Any) -> t.Any:
727        if not isinstance(data, dict):
728            return data
729
730        data["host"] = data.get("host") or data.get("server")
731        if not data["host"]:
732            raise ConfigError("Either host or server must be set")
733
734        data["user"] = data.get("user") or data.get("username") or data.get("UID")
735        if not data["user"]:
736            raise ConfigError("One of user, username, or UID must be set")
737
738        data["password"] = data.get("password") or data.get("PWD")
739        if not data["password"]:
740            raise ConfigError("Either password or PWD must be set")
741
742        return data
743
744    @field_validator("authentication")
745    @classmethod
746    def _validate_authentication(cls, v: str) -> str:
747        if v != "sql":
748            raise ConfigError("Only SQL and Windows Authentication are supported for SQL Server")
749        return v
750
751    @field_validator("port")
752    @classmethod
753    def _validate_port(cls, v: t.Union[int, str]) -> int:
754        return int(v)
755
756    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
757        # https://github.com/microsoft/dbt-fabric/blob/main/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql
758        return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"
759
760    @classproperty
761    def column_class(cls) -> t.Type[Column]:
762        try:
763            # 1.8.0+
764            from dbt.adapters.sqlserver.sqlserver_column import SQLServerColumn
765        except ImportError:
766            # <1.8.0
767            from dbt.adapters.sqlserver.sql_server_column import SQLServerColumn  # type: ignore
768
769        return SQLServerColumn
770
771    @property
772    def dialect(self) -> str:
773        return "tsql"
774
775    @with_schema_differ_overrides
776    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
777        return MSSQLConnectionConfig(
778            host=self.host,
779            user=self.user,
780            password=self.password,
781            port=self.port,
782            database=self.database,
783            timeout=self.query_timeout,
784            login_timeout=self.login_timeout,
785            concurrent_tasks=self.threads,
786            **kwargs,
787        )

Project connection and operational configuration for the SQL Server (MSSQL) target

Arguments:
  • host: The MSSQL server host to connect to
  • server: Alias for host
  • port: The MSSQL server port to connect to
  • user: User name for authentication
  • username: Alias for user
  • UID: Alias for user
  • password: User password for authentication
  • PWD: Alias for password
  • login_timeout: The number of seconds to wait for a login to complete
  • query_timeout: The number of seconds to wait for a query to complete
  • authentication: The authentication method to use (only "sql" is supported)
  • schema_authorization: The principal who should own created schemas, not supported by SQLMesh
  • driver: ODBC driver to use, not used by SQLMesh
  • encrypt: A boolean flag to enable server connection encryption, not used by SQLMesh
  • trust_cert: A boolean flag to trust the server certificate, not used by SQLMesh
  • retries: Number of times to retry if the SQL Server connector encounters an error, not used by SQLMesh
  • windows_login: A boolean flag to use Windows Authentication, not used by SQLMesh
  • tenant_id: The tenant ID of the Azure Active Directory instance, not used by SQLMesh
  • client_id: The client ID of the Azure Active Directory service principal, not used by SQLMesh
  • client_secret: The client secret of the Azure Active Directory service principal, not used by SQLMesh
type: Literal['sqlserver']
host: Optional[str]
server: Optional[str]
port: int
database: str
schema_: str
user: Optional[str]
username: Optional[str]
UID: Optional[str]
password: Optional[str]
PWD: Optional[str]
threads: int
login_timeout: Optional[int]
query_timeout: Optional[int]
authentication: Optional[str]
schema_authorization: Optional[str]
driver: Optional[str]
encrypt: Optional[bool]
trust_cert: Optional[bool]
retries: Optional[int]
windows_login: Optional[bool]
tenant_id: Optional[str]
client_id: Optional[str]
client_secret: Optional[str]
@model_validator(mode='before')
@classmethod
def validate_alias_fields(cls, data: Any) -> Any:
724    @model_validator(mode="before")
725    @classmethod
726    def validate_alias_fields(cls, data: t.Any) -> t.Any:
727        if not isinstance(data, dict):
728            return data
729
730        data["host"] = data.get("host") or data.get("server")
731        if not data["host"]:
732            raise ConfigError("Either host or server must be set")
733
734        data["user"] = data.get("user") or data.get("username") or data.get("UID")
735        if not data["user"]:
736            raise ConfigError("One of user, username, or UID must be set")
737
738        data["password"] = data.get("password") or data.get("PWD")
739        if not data["password"]:
740            raise ConfigError("Either password or PWD must be set")
741
742        return data
756    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
757        # https://github.com/microsoft/dbt-fabric/blob/main/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql
758        return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"

The default incremental strategy for the db

column_class: Type[dbt.adapters.base.column.Column]
760    @classproperty
761    def column_class(cls) -> t.Type[Column]:
762        try:
763            # 1.8.0+
764            from dbt.adapters.sqlserver.sqlserver_column import SQLServerColumn
765        except ImportError:
766            # <1.8.0
767            from dbt.adapters.sqlserver.sql_server_column import SQLServerColumn  # type: ignore
768
769        return SQLServerColumn
dialect: str
771    @property
772    def dialect(self) -> str:
773        return "tsql"
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
profile_name
load
attribute_dict
quote_policy
extra
relation_class
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class TrinoConfig(TargetConfig):
790class TrinoConfig(TargetConfig):
791    """
792    Project connection and operational configuration for the Trino target.
793
794    Args:
795        method: The Trino authentication method to use
796        host: The server host to connect to
797        port: The MSSQL server port to connect to
798        database: Name of the Trino database/catalog
799        schema: Name of the Trino schema
800        user: User name for authentication
801        password: User password for authentication
802        roles: Trino catalog roles
803        session_properties: Trino session properties
804        retries: Number of times to retry if the Trino connector encounters an error
805        timezone: The timezone to use for the Trino session
806        http_headers: HTTP Headers to send alongside requests to Trino
807        http_scheme: The HTTP scheme to use for requests to Trino (default: http, or https if kerberos, ldap or jwt auth)
808        threads: The number of threads to run on
809        impersonation_user:  LDAP authentication: override the provided username
810        keytab: Kerberos authentication: Path to keytab
811        krb5_config: Kerberos authentication: Path to config
812        principal: Kerberos authentication: Principal
813        service_name: Kerberos authentication: Service name
814        hostname_override: Kerberos authentication: hostname for a host whose DNS name doesn't match
815        mutual_authentication: Kerberos authentication: Boolean flag for mutual authentication.
816        force_preemptive: Kerberos authentication: Boolean flag to preemptively initiate the GSS exchange.
817        sanitize_mutual_error_response: Kerberos authentication: Boolean flag to strip content and headers from error responses.
818        delegate: Kerberos authentication: Boolean flag for credential delegation (`GSS_C_DELEG_FLAG`)
819        jwt_token: JWT authentication: JWT string
820        client_certificate: Certification authentication: Path to client certificate
821        client_private_key: Certification authentication: Path to client private key
822        cert: Certification authentication: Full path to a certificate file
823    """
824
825    _method_to_auth_enum: t.ClassVar[t.Dict[str, TrinoAuthenticationMethod]] = {
826        "none": TrinoAuthenticationMethod.NO_AUTH,
827        "ldap": TrinoAuthenticationMethod.LDAP,
828        "kerberos": TrinoAuthenticationMethod.KERBEROS,
829        "jwt": TrinoAuthenticationMethod.JWT,
830        "certificate": TrinoAuthenticationMethod.CERTIFICATE,
831        "oauth": TrinoAuthenticationMethod.OAUTH,
832        "oauth_console": TrinoAuthenticationMethod.OAUTH,
833    }
834
835    type: t.Literal["trino"] = "trino"
836    host: str
837    database: str
838    schema_: str = Field(alias="schema")
839    port: int = 443
840    method: str
841    user: t.Optional[str] = None
842
843    threads: int = 1
844    roles: t.Optional[t.Dict[str, str]] = None
845    session_properties: t.Optional[t.Dict[str, str]] = None
846    retries: int = 3
847    timezone: t.Optional[str] = None
848    http_headers: t.Optional[t.Dict[str, str]] = None
849    http_scheme: t.Optional[str] = None
850    prepared_statements_enabled: bool = True  # not used by SQLMesh
851
852    # ldap authentication
853    password: t.Optional[str] = None
854    impersonation_user: t.Optional[str] = None
855
856    # kerberos authentication
857    keytab: t.Optional[str] = None
858    krb5_config: t.Optional[str] = None
859    principal: t.Optional[str] = None
860    service_name: str = "trino"
861    hostname_override: t.Optional[str] = None
862    mutual_authentication: bool = False
863    force_preemptive: bool = False
864    sanitize_mutual_error_response: bool = True
865    delegate: bool = False
866
867    # jwt authentication
868    jwt_token: t.Optional[str] = None
869
870    # certificate authentication
871    client_certificate: t.Optional[str] = None
872    client_private_key: t.Optional[str] = None
873    cert: t.Optional[str] = None
874
875    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
876        return "append"
877
878    @classproperty
879    def relation_class(cls) -> t.Type[BaseRelation]:
880        from dbt.adapters.trino.relation import TrinoRelation
881
882        return TrinoRelation
883
884    @classproperty
885    def column_class(cls) -> t.Type[Column]:
886        from dbt.adapters.trino.column import TrinoColumn
887
888        return TrinoColumn
889
890    @with_schema_differ_overrides
891    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
892        return TrinoConnectionConfig(
893            method=self._method_to_auth_enum[self.method],
894            host=self.host,
895            user=self.user,
896            catalog=self.database,
897            port=self.port,
898            http_scheme=self.http_scheme,
899            roles=self.roles,
900            http_headers=self.http_headers,
901            session_properties=self.session_properties,
902            retries=self.retries,
903            timezone=self.timezone,
904            password=self.password,
905            impersonation_user=self.impersonation_user,
906            keytab=self.keytab,
907            krb5_config=self.krb5_config,
908            principal=self.principal,
909            service_name=self.service_name,
910            hostname_override=self.hostname_override,
911            mutual_authentication=self.mutual_authentication,
912            force_preemptive=self.force_preemptive,
913            sanitize_mutual_error_response=self.sanitize_mutual_error_response,
914            delegate=self.delegate,
915            jwt_token=self.jwt_token,
916            client_certificate=self.client_certificate,
917            client_private_key=self.client_private_key,
918            cert=self.cert,
919            concurrent_tasks=self.threads,
920            **kwargs,
921        )

Project connection and operational configuration for the Trino target.

Arguments:
  • method: The Trino authentication method to use
  • host: The server host to connect to
  • port: The MSSQL server port to connect to
  • database: Name of the Trino database/catalog
  • schema: Name of the Trino schema
  • user: User name for authentication
  • password: User password for authentication
  • roles: Trino catalog roles
  • session_properties: Trino session properties
  • retries: Number of times to retry if the Trino connector encounters an error
  • timezone: The timezone to use for the Trino session
  • http_headers: HTTP Headers to send alongside requests to Trino
  • http_scheme: The HTTP scheme to use for requests to Trino (default: http, or https if kerberos, ldap or jwt auth)
  • threads: The number of threads to run on
  • impersonation_user: LDAP authentication: override the provided username
  • keytab: Kerberos authentication: Path to keytab
  • krb5_config: Kerberos authentication: Path to config
  • principal: Kerberos authentication: Principal
  • service_name: Kerberos authentication: Service name
  • hostname_override: Kerberos authentication: hostname for a host whose DNS name doesn't match
  • mutual_authentication: Kerberos authentication: Boolean flag for mutual authentication.
  • force_preemptive: Kerberos authentication: Boolean flag to preemptively initiate the GSS exchange.
  • sanitize_mutual_error_response: Kerberos authentication: Boolean flag to strip content and headers from error responses.
  • delegate: Kerberos authentication: Boolean flag for credential delegation (GSS_C_DELEG_FLAG)
  • jwt_token: JWT authentication: JWT string
  • client_certificate: Certification authentication: Path to client certificate
  • client_private_key: Certification authentication: Path to client private key
  • cert: Certification authentication: Full path to a certificate file
type: Literal['trino']
host: str
database: str
schema_: str
port: int
method: str
user: Optional[str]
threads: int
roles: Optional[Dict[str, str]]
session_properties: Optional[Dict[str, str]]
retries: int
timezone: Optional[str]
http_headers: Optional[Dict[str, str]]
http_scheme: Optional[str]
prepared_statements_enabled: bool
password: Optional[str]
impersonation_user: Optional[str]
keytab: Optional[str]
krb5_config: Optional[str]
principal: Optional[str]
service_name: str
hostname_override: Optional[str]
mutual_authentication: bool
force_preemptive: bool
sanitize_mutual_error_response: bool
delegate: bool
jwt_token: Optional[str]
client_certificate: Optional[str]
client_private_key: Optional[str]
cert: Optional[str]
875    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
876        return "append"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
878    @classproperty
879    def relation_class(cls) -> t.Type[BaseRelation]:
880        from dbt.adapters.trino.relation import TrinoRelation
881
882        return TrinoRelation
column_class: Type[dbt.adapters.base.column.Column]
884    @classproperty
885    def column_class(cls) -> t.Type[Column]:
886        from dbt.adapters.trino.column import TrinoColumn
887
888        return TrinoColumn
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
profile_name
load
attribute_dict
quote_policy
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class ClickhouseConfig(TargetConfig):
 924class ClickhouseConfig(TargetConfig):
 925    """
 926    Project connection and operational configuration for the Clickhouse target
 927
 928    Args:
 929      host: [localhost]
 930      user: [default] # User for all database operations
 931      password: [<empty string>] # Password for the user
 932      secure: [False] # Use TLS (native protocol) or HTTPS (http protocol)
 933      port: [8123]  # If not set, defaults to 8123, 8443 depending on the secure and driver settings
 934      connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse
 935      send_receive_timeout: [300] # Timeout in seconds to receive data from the ClickHouse server
 936      verify: [True] # Validate TLS certificate if using TLS/SSL
 937      cluster: [<empty string>] # If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster.
 938      custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
 939      schema: [default] # ClickHouse database for dbt models, not used by SQLMesh
 940      driver: [http] # http or native.  If not set this will be autodetermined based on port setting, not used by SQLMesh
 941      retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error), not used by SQLMesh
 942      compression: [<empty string>] # Use gzip compression if truthy (http), or compression type for a native connection, not used by SQLMesh
 943      cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud), not used by SQLMesh
 944      use_lw_deletes: [False] # Use the strategy `delete+insert` as the default incremental strategy, not used by SQLMesh
 945      check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. Not used by SQLMesh.
 946      local_suffix: [_local] # Table suffix of local tables on shards for distributed materializations, not used by SQLMesh
 947      local_db_prefix: [<empty string>] # Database prefix of local tables on shards for distributed materializations, not used by SQLMesh
 948      allow_automatic_deduplication: [False] # Enable ClickHouse automatic deduplication for Replicated tables, not used by SQLMesh
 949      tcp_keepalive: [False] # Native client only, specify TCP keepalive configuration. Specify custom keepalive settings as [idle_time_sec, interval_sec, probes], not used by SQLMesh
 950      sync_request_timeout: [5] # Timeout for server ping, not used by SQLMesh
 951      compress_block_size: [1048576] # Compression block size if compression is enabled, not used by SQLMesh
 952    """
 953
 954    host: str = "localhost"
 955    user: str = Field(default="default", alias="username")
 956    password: str = ""
 957    port: t.Optional[int] = None
 958    cluster: t.Optional[str] = None
 959    schema_: str = Field(default="default", alias="schema")
 960    connect_timeout: int = 10
 961    send_receive_timeout: int = 300
 962    verify: bool = True
 963    compression: str = ""
 964    custom_settings: t.Optional[t.Dict[str, t.Any]] = None
 965
 966    # Not used by SQLMesh
 967    driver: t.Optional[str] = None
 968    secure: bool = False
 969    retries: int = 1
 970    database_engine: t.Optional[str] = None
 971    cluster_mode: bool = False
 972    sync_request_timeout: int = 5
 973    compress_block_size: int = 1048576
 974    check_exchange: bool = True
 975    use_lw_deletes: bool = False
 976    allow_automatic_deduplication: bool = False
 977    tcp_keepalive: t.Union[bool, t.Tuple[int, ...], t.List[int]] = False
 978    database: str = ""
 979    local_suffix: str = "local"
 980    local_db_prefix: str = ""
 981
 982    type: t.Literal["clickhouse"] = "clickhouse"
 983
 984    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
 985        # dbt-clickhouse name for temp table swap. That is sqlmesh's default
 986        #   strategy so doesn't require special handling during conversion.
 987        return "legacy"
 988
 989    @classproperty
 990    def relation_class(cls) -> t.Type[BaseRelation]:
 991        from dbt.adapters.clickhouse.relation import ClickHouseRelation
 992
 993        return ClickHouseRelation
 994
 995    @classproperty
 996    def column_class(cls) -> t.Type[Column]:
 997        from dbt.adapters.clickhouse.column import ClickHouseColumn
 998
 999        return ClickHouseColumn
1000
1001    @with_schema_differ_overrides
1002    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
1003        return ClickhouseConnectionConfig(
1004            host=self.host,
1005            username=self.user,
1006            password=self.password,
1007            port=self.port,
1008            cluster=self.cluster,
1009            connect_timeout=self.connect_timeout,
1010            send_receive_timeout=self.send_receive_timeout,
1011            verify=self.verify,
1012            compression_method=self.compression,
1013            connection_settings=self.custom_settings,
1014            **kwargs,
1015        )

Project connection and operational configuration for the Clickhouse target

Arguments:
  • host: [localhost]
  • user: [default] # User for all database operations
  • password: [] # Password for the user
  • secure: [False] # Use TLS (native protocol) or HTTPS (http protocol)
  • port: [8123] # If not set, defaults to 8123, 8443 depending on the secure and driver settings
  • connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse
  • send_receive_timeout: [300] # Timeout in seconds to receive data from the ClickHouse server
  • verify: [True] # Validate TLS certificate if using TLS/SSL
  • cluster: [] # If set, certain DDL/table operations will be executed with the ON CLUSTER clause using this cluster.
  • custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
  • schema: [default] # ClickHouse database for dbt models, not used by SQLMesh
  • driver: [http] # http or native. If not set this will be autodetermined based on port setting, not used by SQLMesh
  • retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error), not used by SQLMesh
  • compression: [] # Use gzip compression if truthy (http), or compression type for a native connection, not used by SQLMesh
  • cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud), not used by SQLMesh
  • use_lw_deletes: [False] # Use the strategy delete+insert as the default incremental strategy, not used by SQLMesh
  • check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. Not used by SQLMesh.
  • local_suffix: [_local] # Table suffix of local tables on shards for distributed materializations, not used by SQLMesh
  • local_db_prefix: [] # Database prefix of local tables on shards for distributed materializations, not used by SQLMesh
  • allow_automatic_deduplication: [False] # Enable ClickHouse automatic deduplication for Replicated tables, not used by SQLMesh
  • tcp_keepalive: [False] # Native client only, specify TCP keepalive configuration. Specify custom keepalive settings as [idle_time_sec, interval_sec, probes], not used by SQLMesh
  • sync_request_timeout: [5] # Timeout for server ping, not used by SQLMesh
  • compress_block_size: [1048576] # Compression block size if compression is enabled, not used by SQLMesh
host: str
user: str
password: str
port: Optional[int]
cluster: Optional[str]
schema_: str
connect_timeout: int
send_receive_timeout: int
verify: bool
compression: str
custom_settings: Optional[Dict[str, Any]]
driver: Optional[str]
secure: bool
retries: int
database_engine: Optional[str]
cluster_mode: bool
sync_request_timeout: int
compress_block_size: int
check_exchange: bool
use_lw_deletes: bool
allow_automatic_deduplication: bool
tcp_keepalive: Union[bool, Tuple[int, ...], List[int]]
database: str
local_suffix: str
local_db_prefix: str
type: Literal['clickhouse']
984    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
985        # dbt-clickhouse name for temp table swap. That is sqlmesh's default
986        #   strategy so doesn't require special handling during conversion.
987        return "legacy"

The default incremental strategy for the db

relation_class: Type[dbt.adapters.base.relation.BaseRelation]
989    @classproperty
990    def relation_class(cls) -> t.Type[BaseRelation]:
991        from dbt.adapters.clickhouse.relation import ClickHouseRelation
992
993        return ClickHouseRelation
column_class: Type[dbt.adapters.base.column.Column]
995    @classproperty
996    def column_class(cls) -> t.Type[Column]:
997        from dbt.adapters.clickhouse.column import ClickHouseColumn
998
999        return ClickHouseColumn
def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
threads
profile_name
load
attribute_dict
quote_policy
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class AthenaConfig(TargetConfig):
1018class AthenaConfig(TargetConfig):
1019    """
1020    Project connection and operational configuration for the Athena target.
1021
1022    Args:
1023        s3_staging_dir: S3 location to store Athena query results and metadata
1024        s3_data_dir: Prefix for storing tables, if different from the connection's s3_staging_dir
1025        s3_data_naming: How to generate table paths in s3_data_dir
1026        s3_tmp_table_dir: Prefix for storing temporary tables, if different from the connection's s3_data_dir
1027        region_name: AWS region of your Athena instance
1028        schema: Specify the schema (Athena database) to build models into (lowercase only)
1029        database: Specify the database (Data catalog) to build models into (lowercase only)
1030        poll_interval: Interval in seconds to use for polling the status of query results in Athena
1031        debug_query_state: Flag if debug message with Athena query state is needed
1032        aws_access_key_id: Access key ID of the user performing requests
1033        aws_secret_access_key: Secret access key of the user performing requests
1034        aws_profile_name: Profile to use from your AWS shared credentials file
1035        work_group: Identifier of Athena workgroup
1036        skip_workgroup_check: Indicates if the WorkGroup check (additional AWS call) can be skipped
1037        num_retries: Number of times to retry a failing query
1038        num_boto3_retries: Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables)
1039        num_iceberg_retries: Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR
1040        spark_work_group: Identifier of Athena Spark workgroup for running Python models
1041        seed_s3_upload_args: Dictionary containing boto3 ExtraArgs when uploading to S3
1042        lf_tags_database: Default LF tags for new database if it's created by dbt
1043    """
1044
1045    type: t.Literal["athena"] = "athena"
1046    threads: int = 4
1047
1048    s3_staging_dir: t.Optional[str] = None
1049    s3_data_dir: t.Optional[str] = None
1050    s3_data_naming: t.Optional[str] = None
1051    s3_tmp_table_dir: t.Optional[str] = None
1052    poll_interval: t.Optional[int] = None
1053    debug_query_state: bool = False
1054    work_group: t.Optional[str] = None
1055    skip_workgroup_check: t.Optional[bool] = None
1056    spark_work_group: t.Optional[str] = None
1057
1058    aws_access_key_id: t.Optional[str] = None
1059    aws_secret_access_key: t.Optional[str] = None
1060    aws_profile_name: t.Optional[str] = None
1061    region_name: t.Optional[str] = None
1062
1063    num_retries: t.Optional[int] = None
1064    num_boto3_retries: t.Optional[int] = None
1065    num_iceberg_retries: t.Optional[int] = None
1066
1067    seed_s3_upload_args: t.Dict[str, str] = {}
1068    lf_tags_database: t.Dict[str, str] = {}
1069
1070    @classproperty
1071    def relation_class(cls) -> t.Type[BaseRelation]:
1072        from dbt.adapters.athena.relation import AthenaRelation
1073
1074        return AthenaRelation
1075
1076    @classproperty
1077    def column_class(cls) -> t.Type[Column]:
1078        from dbt.adapters.athena.column import AthenaColumn
1079
1080        return AthenaColumn
1081
1082    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
1083        return "insert_overwrite"
1084
1085    @with_schema_differ_overrides
1086    def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
1087        return AthenaConnectionConfig(
1088            type="athena",
1089            aws_access_key_id=self.aws_access_key_id,
1090            aws_secret_access_key=self.aws_secret_access_key,
1091            region_name=self.region_name,
1092            work_group=self.work_group,
1093            s3_staging_dir=self.s3_staging_dir,
1094            s3_warehouse_location=self.s3_data_dir,
1095            schema_name=self.schema_,
1096            catalog_name=self.database,
1097            concurrent_tasks=self.threads,
1098            **kwargs,
1099        )

Project connection and operational configuration for the Athena target.

Arguments:
  • s3_staging_dir: S3 location to store Athena query results and metadata
  • s3_data_dir: Prefix for storing tables, if different from the connection's s3_staging_dir
  • s3_data_naming: How to generate table paths in s3_data_dir
  • s3_tmp_table_dir: Prefix for storing temporary tables, if different from the connection's s3_data_dir
  • region_name: AWS region of your Athena instance
  • schema: Specify the schema (Athena database) to build models into (lowercase only)
  • database: Specify the database (Data catalog) to build models into (lowercase only)
  • poll_interval: Interval in seconds to use for polling the status of query results in Athena
  • debug_query_state: Flag if debug message with Athena query state is needed
  • aws_access_key_id: Access key ID of the user performing requests
  • aws_secret_access_key: Secret access key of the user performing requests
  • aws_profile_name: Profile to use from your AWS shared credentials file
  • work_group: Identifier of Athena workgroup
  • skip_workgroup_check: Indicates if the WorkGroup check (additional AWS call) can be skipped
  • num_retries: Number of times to retry a failing query
  • num_boto3_retries: Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables)
  • num_iceberg_retries: Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR
  • spark_work_group: Identifier of Athena Spark workgroup for running Python models
  • seed_s3_upload_args: Dictionary containing boto3 ExtraArgs when uploading to S3
  • lf_tags_database: Default LF tags for new database if it's created by dbt
type: Literal['athena']
threads: int
s3_staging_dir: Optional[str]
s3_data_dir: Optional[str]
s3_data_naming: Optional[str]
s3_tmp_table_dir: Optional[str]
poll_interval: Optional[int]
debug_query_state: bool
work_group: Optional[str]
skip_workgroup_check: Optional[bool]
spark_work_group: Optional[str]
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
aws_profile_name: Optional[str]
region_name: Optional[str]
num_retries: Optional[int]
num_boto3_retries: Optional[int]
num_iceberg_retries: Optional[int]
seed_s3_upload_args: Dict[str, str]
lf_tags_database: Dict[str, str]
relation_class: Type[dbt.adapters.base.relation.BaseRelation]
1070    @classproperty
1071    def relation_class(cls) -> t.Type[BaseRelation]:
1072        from dbt.adapters.athena.relation import AthenaRelation
1073
1074        return AthenaRelation
column_class: Type[dbt.adapters.base.column.Column]
1076    @classproperty
1077    def column_class(cls) -> t.Type[Column]:
1078        from dbt.adapters.athena.column import AthenaColumn
1079
1080        return AthenaColumn
1082    def default_incremental_strategy(self, kind: IncrementalKind) -> str:
1083        return "insert_overwrite"

The default incremental strategy for the db

def to_sqlmesh( self: TargetConfig, **kwargs: Any) -> sqlmesh.core.config.connection.ConnectionConfig:
82    def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig:
83        merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs}
84        return func(self, **merged_kwargs)

Converts target config to SQLMesh connection config

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
TargetConfig
name
database
schema_
profile_name
load
attribute_dict
quote_policy
extra
dialect
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
TARGET_TYPE_TO_CONFIG_CLASS: Dict[str, Type[TargetConfig]] = {'databricks': <class 'DatabricksConfig'>, 'duckdb': <class 'DuckDbConfig'>, 'postgres': <class 'PostgresConfig'>, 'redshift': <class 'RedshiftConfig'>, 'snowflake': <class 'SnowflakeConfig'>, 'bigquery': <class 'BigQueryConfig'>, 'sqlserver': <class 'MSSQLConfig'>, 'tsql': <class 'MSSQLConfig'>, 'trino': <class 'TrinoConfig'>, 'athena': <class 'AthenaConfig'>, 'clickhouse': <class 'ClickhouseConfig'>}