sqlmesh.dbt.target
1from __future__ import annotations 2 3import abc 4import typing as t 5from pathlib import Path 6 7from dbt.adapters.base import BaseRelation, Column 8from pydantic import Field, AliasChoices 9 10from sqlmesh.core.console import get_console 11from sqlmesh.core.config.connection import ( 12 AthenaConnectionConfig, 13 BigQueryConnectionConfig, 14 BigQueryConnectionMethod, 15 BigQueryPriority, 16 ClickhouseConnectionConfig, 17 ConnectionConfig, 18 DatabricksConnectionConfig, 19 DuckDBConnectionConfig, 20 MSSQLConnectionConfig, 21 PostgresConnectionConfig, 22 RedshiftConnectionConfig, 23 SnowflakeConnectionConfig, 24 TrinoAuthenticationMethod, 25 TrinoConnectionConfig, 26) 27from sqlmesh.core.model import ( 28 IncrementalByTimeRangeKind, 29 IncrementalByUniqueKeyKind, 30 IncrementalUnmanagedKind, 31) 32from sqlmesh.core.schema_diff import NestedSupport 33from sqlmesh.dbt.common import DbtConfig 34from sqlmesh.dbt.relation import Policy 35from sqlmesh.dbt.util import DBT_VERSION 36from sqlmesh.utils import AttributeDict, classproperty 37from sqlmesh.utils.errors import ConfigError 38from sqlmesh.utils.pydantic import field_validator, model_validator 39 40IncrementalKind = t.Union[ 41 t.Type[IncrementalByUniqueKeyKind], 42 t.Type[IncrementalByTimeRangeKind], 43 t.Type[IncrementalUnmanagedKind], 44] 45 46# We only serialize a subset of fields in order to avoid persisting sensitive information 47SERIALIZABLE_FIELDS = { 48 # core 49 "name", 50 "schema_", 51 "type", 52 "threads", 53 # snowflake 54 "database", 55 "warehouse", 56 "user", 57 "role", 58 "account", 59 # postgres/redshift 60 "dbname", 61 "host", 62 "port", 63 # bigquery 64 "project", 65 "dataset", 66} 67 68SCHEMA_DIFFER_OVERRIDES = { 69 "schema_differ_overrides": { 70 "treat_alter_data_type_as_destructive": True, 71 "nested_support": NestedSupport.IGNORE, 72 } 73} 74 75 76def with_schema_differ_overrides( 77 func: t.Callable[..., ConnectionConfig], 78) -> t.Callable[..., ConnectionConfig]: 79 """Decorator that merges default config with kwargs.""" 80 81 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 82 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 83 return func(self, **merged_kwargs) 84 85 return wrapper 86 87 88class TargetConfig(abc.ABC, DbtConfig): 89 """ 90 Configuration for DBT profile target 91 92 Args: 93 type: The type of the data warehouse 94 name: The name of this target 95 database: Name of the database 96 schema_: Name of the schema 97 threads: The number of threads to run on 98 """ 99 100 # dbt 101 type: str = "none" 102 name: str 103 database: str 104 schema_: str = Field(alias="schema") 105 threads: int = 1 106 profile_name: t.Optional[str] = None 107 108 @classmethod 109 def load(cls, data: t.Dict[str, t.Any]) -> TargetConfig: 110 """ 111 Loads the configuration from the yaml provided for a profile target 112 113 Args: 114 data: The yaml for the project's target output 115 116 Returns: 117 The configuration of the provided profile target 118 """ 119 db_type = data["type"] 120 if db_type == "databricks": 121 return DatabricksConfig(**data) 122 if db_type == "duckdb": 123 return DuckDbConfig(**data) 124 if db_type == "postgres": 125 return PostgresConfig(**data) 126 if db_type == "redshift": 127 return RedshiftConfig(**data) 128 if db_type == "snowflake": 129 return SnowflakeConfig(**data) 130 if db_type == "bigquery": 131 return BigQueryConfig(**data) 132 if db_type == "sqlserver": 133 return MSSQLConfig(**data) 134 if db_type == "trino": 135 return TrinoConfig(**data) 136 if db_type == "clickhouse": 137 return ClickhouseConfig(**data) 138 if db_type == "athena": 139 return AthenaConfig(**data) 140 141 raise ConfigError(f"{db_type} not supported.") 142 143 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 144 """The default incremental strategy for the db""" 145 raise NotImplementedError 146 147 @with_schema_differ_overrides 148 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 149 """Converts target config to SQLMesh connection config""" 150 raise NotImplementedError 151 152 def attribute_dict(self) -> AttributeDict: 153 fields = self.dict(include=SERIALIZABLE_FIELDS).copy() 154 fields["target_name"] = self.name 155 return AttributeDict(fields) 156 157 @classproperty 158 def quote_policy(cls) -> Policy: 159 return Policy() 160 161 @property 162 def extra(self) -> t.Set[str]: 163 return self.extra_fields(set(self.dict())) 164 165 @classproperty 166 def relation_class(cls) -> t.Type[BaseRelation]: 167 return BaseRelation 168 169 @classproperty 170 def column_class(cls) -> t.Type[Column]: 171 return Column 172 173 @property 174 def dialect(self) -> str: 175 return self.type 176 177 178DUCKDB_IN_MEMORY = ":memory:" 179 180 181class DuckDbConfig(TargetConfig): 182 """ 183 Connection config for DuckDb target 184 185 Args: 186 path: Location of the database file. If not specified, an in memory database is used. 187 extensions: A list of autoloadable extensions to load. 188 settings: A dictionary of settings to pass into the duckdb connector. 189 secrets: A list of secrets to pass to the secret manager in the duckdb connector. 190 filesystems: A list of `fsspec` filesystems to register in the duckdb connection. 191 """ 192 193 type: t.Literal["duckdb"] = "duckdb" 194 database: str = "main" 195 schema_: str = Field(default="main", alias="schema") 196 path: str = DUCKDB_IN_MEMORY 197 extensions: t.Optional[t.List[str]] = None 198 settings: t.Optional[t.Dict[str, t.Any]] = None 199 secrets: t.Optional[t.List[t.Dict[str, t.Any]]] = None 200 filesystems: t.Optional[t.List[t.Dict[str, t.Any]]] = None 201 202 @model_validator(mode="before") 203 def validate_authentication(cls, data: t.Any) -> t.Any: 204 if not isinstance(data, dict): 205 return data 206 207 if "database" not in data and DBT_VERSION >= (1, 5, 0): 208 path = data.get("path") 209 data["database"] = ( 210 "memory" 211 if path is None or path == DUCKDB_IN_MEMORY 212 else Path(t.cast(str, path)).stem 213 ) 214 215 if "threads" in data and t.cast(int, data["threads"]) > 1: 216 get_console().log_warning("DuckDB does not support concurrency - setting threads to 1.") 217 218 return data 219 220 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 221 return "delete+insert" 222 223 @classproperty 224 def relation_class(cls) -> t.Type[BaseRelation]: 225 from dbt.adapters.duckdb.relation import DuckDBRelation 226 227 return DuckDBRelation 228 229 @with_schema_differ_overrides 230 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 231 if self.extensions is not None: 232 kwargs["extensions"] = self.extensions 233 if self.settings is not None: 234 kwargs["connector_config"] = self.settings 235 if self.secrets is not None: 236 kwargs["secrets"] = self.secrets 237 if self.filesystems is not None: 238 kwargs["filesystems"] = self.filesystems 239 return DuckDBConnectionConfig( 240 database=self.path, 241 concurrent_tasks=1, 242 **kwargs, 243 ) 244 245 246class SnowflakeConfig(TargetConfig): 247 """ 248 Project connection and operational configuration for the Snowflake target 249 250 Args: 251 account: Snowflake account 252 warehouse: Name of the warehouse 253 user: Name of the user 254 password: User's password 255 role: Role of the user 256 client_session_keep_alive: A boolean flag to extend the duration of the Snowflake session beyond 4 hours 257 query_tag: tag for the query in Snowflake 258 connect_retries: Number of times to retry if the Snowflake connector encounters an error 259 connect_timeout: Number of seconds to wait between failed attempts 260 retry_on_database_errors: A boolean flag to retry if a Snowflake connector Database error is encountered 261 retry_all: A boolean flag to retry on all Snowflake connector errors 262 authenticator: SSO authentication: Snowflake authentication method 263 private_key: Key pair authentication: Private key 264 private_key_path: Key pair authentication: Path to the private key, used instead of private_key 265 private_key_passphrase: Key pair authentication: passphrase used to decrypt private key (if encrypted) 266 token: OAuth authentication: The Snowflake OAuth 2.0 access token 267 """ 268 269 type: t.Literal["snowflake"] = "snowflake" 270 account: str 271 user: str 272 273 # User and password authentication 274 password: t.Optional[str] = None 275 276 # SSO authentication 277 authenticator: t.Optional[str] = None 278 279 # Key Pair Auth 280 private_key: t.Optional[str] = None 281 private_key_path: t.Optional[str] = None 282 private_key_passphrase: t.Optional[str] = None 283 284 # TODO add other forms of authentication 285 286 # oauth access token 287 token: t.Optional[str] = None 288 289 # Optional 290 warehouse: t.Optional[str] = None 291 role: t.Optional[str] = None 292 client_session_keep_alive: bool = False 293 query_tag: t.Optional[str] = None 294 connect_retries: int = 0 295 connect_timeout: int = 10 296 retry_on_database_errors: bool = False 297 retry_all: bool = False 298 299 @model_validator(mode="before") 300 @classmethod 301 def validate_authentication(cls, data: t.Any) -> t.Any: 302 if not isinstance(data, dict) or ( 303 data.get("password") 304 or data.get("authenticator") 305 or data.get("private_key") 306 or data.get("private_key_path") 307 ): 308 return data 309 310 raise ConfigError("No supported Snowflake authentication method found in target profile.") 311 312 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 313 return "merge" 314 315 @classproperty 316 def relation_class(cls) -> t.Type[BaseRelation]: 317 from dbt.adapters.snowflake import SnowflakeRelation 318 319 return SnowflakeRelation 320 321 @classproperty 322 def column_class(cls) -> t.Type[Column]: 323 from dbt.adapters.snowflake import SnowflakeColumn 324 325 return SnowflakeColumn 326 327 @with_schema_differ_overrides 328 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 329 return SnowflakeConnectionConfig( 330 user=self.user, 331 password=self.password, 332 authenticator=self.authenticator, 333 account=self.account, 334 warehouse=self.warehouse, 335 database=self.database, 336 role=self.role, 337 concurrent_tasks=self.threads, 338 token=self.token, 339 private_key=self.private_key, 340 private_key_path=self.private_key_path, 341 private_key_passphrase=self.private_key_passphrase, 342 **kwargs, 343 ) 344 345 @classproperty 346 def quote_policy(cls) -> Policy: 347 return Policy(database=False, schema=False, identifier=False) 348 349 350class PostgresConfig(TargetConfig): 351 """ 352 Project connection and operational configuration for the Postgres target 353 354 Args: 355 host: The Postgres host to connect to 356 user: Name of the user 357 password: User's password 358 port: The port to connect to 359 dbname: Name of the database 360 keepalives_idle: Seconds between TCP keepalive packets 361 connect_timeout: Number of seconds to wait between failed attempts 362 retries: Number of times to retry if the Postgres connector encounters an error 363 search_path: Overrides the default search path 364 role: Role of the user 365 sslmode: SSL Mode used to connect to the database 366 """ 367 368 type: t.Literal["postgres"] = "postgres" 369 host: str 370 user: str 371 password: str = Field(validation_alias=AliasChoices("pass", "password")) 372 port: int 373 dbname: str 374 keepalives_idle: t.Optional[int] = None 375 connect_timeout: int = 10 376 retries: int = 1 # Currently Unsupported by SQLMesh 377 search_path: t.Optional[str] = None # Currently Unsupported by SQLMesh 378 role: t.Optional[str] = None 379 sslmode: t.Optional[str] = None 380 381 @model_validator(mode="before") 382 @classmethod 383 def validate_database(cls, data: t.Any) -> t.Any: 384 if not isinstance(data, dict): 385 return data 386 387 data["database"] = data.get("database") or data.get("dbname") 388 if not data["database"]: 389 raise ConfigError("Either database or dbname must be set") 390 391 return data 392 393 @field_validator("port") 394 @classmethod 395 def _validate_port(cls, v: t.Union[int, str]) -> int: 396 return int(v) 397 398 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 399 return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append" 400 401 @with_schema_differ_overrides 402 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 403 return PostgresConnectionConfig( 404 host=self.host, 405 user=self.user, 406 password=self.password, 407 port=self.port, 408 database=self.dbname, 409 keepalives_idle=self.keepalives_idle, 410 concurrent_tasks=self.threads, 411 connect_timeout=self.connect_timeout, 412 role=self.role, 413 sslmode=self.sslmode, 414 **kwargs, 415 ) 416 417 418class RedshiftConfig(TargetConfig): 419 """ 420 Project connection and operational configuration for the Redshift target 421 422 Args: 423 host: The Redshift host to connect to 424 user: Name of the user 425 password: User's password 426 port: The port to connect to 427 dbname: Name of the database 428 connect_timeout: Number of seconds to wait between failed attempts 429 ra3_node: Enables cross-database sources 430 search_path: Overrides the default search path 431 sslmode: SSL Mode used to connect to the database 432 """ 433 434 # TODO add other forms of authentication 435 type: t.Literal["redshift"] = "redshift" 436 host: str 437 user: str 438 password: str = Field(validation_alias=AliasChoices("pass", "password")) 439 port: int 440 dbname: str 441 connect_timeout: t.Optional[int] = None 442 ra3_node: bool = True 443 search_path: t.Optional[str] = None 444 sslmode: t.Optional[str] = None 445 446 @model_validator(mode="before") 447 @classmethod 448 def validate_database(cls, data: t.Any) -> t.Any: 449 if not isinstance(data, dict): 450 return data 451 452 data["database"] = data.get("database") or data.get("dbname") 453 if not data["database"]: 454 raise ConfigError("Either database or dbname must be set") 455 456 return data 457 458 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 459 return "append" 460 461 @classproperty 462 def relation_class(cls) -> t.Type[BaseRelation]: 463 from dbt.adapters.redshift import RedshiftRelation 464 465 return RedshiftRelation 466 467 @classproperty 468 def column_class(cls) -> t.Type[Column]: 469 if DBT_VERSION < (1, 6, 0): 470 from dbt.adapters.redshift import RedshiftColumn # type: ignore 471 472 return RedshiftColumn 473 return super(RedshiftConfig, cls).column_class 474 475 @with_schema_differ_overrides 476 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 477 return RedshiftConnectionConfig( 478 user=self.user, 479 password=self.password, 480 database=self.database, 481 host=self.host, 482 port=self.port, 483 sslmode=self.sslmode, 484 timeout=self.connect_timeout, 485 concurrent_tasks=self.threads, 486 **kwargs, 487 ) 488 489 490class DatabricksConfig(TargetConfig): 491 """ 492 Project connection and operational configuration for the Databricks target 493 494 Args: 495 catalog: Catalog name to use for Unity Catalog 496 host: The Databricks host to connect to 497 http_path: The Databricks compute resources URL 498 token: Personal access token 499 database: Name of the database. Not applicable for Databricks and ignored 500 """ 501 502 type: t.Literal["databricks"] = "databricks" 503 host: str 504 http_path: str 505 token: t.Optional[str] = None # only required if auth_type is not set to 'oauth' 506 database: t.Optional[str] = Field(alias="catalog") # type: ignore 507 auth_type: t.Optional[str] = None 508 client_id: t.Optional[str] = None 509 client_secret: t.Optional[str] = None 510 511 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 512 return "merge" 513 514 @classproperty 515 def relation_class(cls) -> t.Type[BaseRelation]: 516 from dbt.adapters.databricks.relation import DatabricksRelation 517 518 return DatabricksRelation 519 520 @classproperty 521 def column_class(cls) -> t.Type[Column]: 522 from dbt.adapters.databricks.column import DatabricksColumn 523 524 return DatabricksColumn 525 526 @with_schema_differ_overrides 527 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 528 return DatabricksConnectionConfig( 529 server_hostname=self.host, 530 http_path=self.http_path, 531 access_token=self.token, 532 concurrent_tasks=self.threads, 533 catalog=self.database, 534 auth_type="databricks-oauth" if self.auth_type == "oauth" else self.auth_type, 535 oauth_client_id=self.client_id, 536 oauth_client_secret=self.client_secret, 537 **kwargs, 538 ) 539 540 541class BigQueryConfig(TargetConfig): 542 """ 543 Project connection and operational configuration for the BigQuery target 544 545 Args: 546 type: The type of the target (bigquery) 547 method: The BigQuery authentication method to use 548 project: The BigQuery project to connect to 549 location: The BigQuery location to connect to 550 keyfile: The path to the BigQuery keyfile 551 keyfile_json: The BigQuery keyfile as a JSON string 552 token: The BigQuery token 553 refresh_token: The BigQuery refresh token 554 client_id: The BigQuery client ID 555 client_secret: The BigQuery client secret 556 token_uri: The BigQuery token URI 557 scopes: The BigQuery scopes 558 impersonated_service_account: The service account to impersonate 559 job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created 560 job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete 561 timeout_seconds: Alias for job_execution_timeout_seconds 562 job_retries: The number of times to retry the underlying job if it fails 563 retries: Alias for job_retries 564 job_retry_deadline_seconds: Total number of seconds to wait while retrying the same query 565 priority: The priority of the underlying job 566 maximum_bytes_billed: The maximum number of bytes to be billed for the underlying job 567 """ 568 569 type: t.Literal["bigquery"] = "bigquery" 570 method: t.Optional[str] = BigQueryConnectionMethod.OAUTH 571 dataset: t.Optional[str] = None 572 project: t.Optional[str] = None 573 execution_project: t.Optional[str] = None 574 quota_project: t.Optional[str] = None 575 location: t.Optional[str] = None 576 keyfile: t.Optional[str] = None 577 keyfile_json: t.Optional[t.Dict[str, t.Any]] = None 578 token: t.Optional[str] = None 579 refresh_token: t.Optional[str] = None 580 client_id: t.Optional[str] = None 581 client_secret: t.Optional[str] = None 582 token_uri: t.Optional[str] = None 583 scopes: t.Tuple[str, ...] = ( 584 "https://www.googleapis.com/auth/bigquery", 585 "https://www.googleapis.com/auth/cloud-platform", 586 "https://www.googleapis.com/auth/drive", 587 ) 588 impersonated_service_account: t.Optional[str] = None 589 job_creation_timeout_seconds: t.Optional[int] = None 590 job_execution_timeout_seconds: t.Optional[int] = None 591 timeout_seconds: t.Optional[int] = None # To support legacy config 592 job_retries: t.Optional[int] = None 593 retries: int = 1 # To support legacy config 594 job_retry_deadline_seconds: t.Optional[int] = None 595 priority: BigQueryPriority = BigQueryPriority.INTERACTIVE 596 maximum_bytes_billed: t.Optional[int] = None 597 598 @model_validator(mode="before") 599 @classmethod 600 def validate_fields(cls, data: t.Any) -> t.Any: 601 if not isinstance(data, dict): 602 return data 603 604 # dbt treats schema and dataset interchangeably 605 schema = data.get("schema") or data.get("dataset") 606 if not schema: 607 raise ConfigError("Either schema or dataset must be set") 608 data["dataset"] = data["schema"] = schema 609 610 # dbt treats database and project interchangeably 611 database = data.get("database") or data.get("project") 612 if not database: 613 raise ConfigError("Either database or project must be set") 614 data["database"] = data["project"] = database 615 616 return data 617 618 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 619 return "merge" 620 621 @classproperty 622 def relation_class(cls) -> t.Type[BaseRelation]: 623 from dbt.adapters.bigquery.relation import BigQueryRelation 624 625 return BigQueryRelation 626 627 @classproperty 628 def column_class(cls) -> t.Type[Column]: 629 from dbt.adapters.bigquery import BigQueryColumn 630 631 return BigQueryColumn 632 633 @with_schema_differ_overrides 634 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 635 job_retries = self.job_retries if self.job_retries is not None else self.retries 636 job_execution_timeout_seconds = ( 637 self.job_execution_timeout_seconds 638 if self.job_execution_timeout_seconds is not None 639 else self.timeout_seconds 640 ) 641 return BigQueryConnectionConfig( 642 method=self.method, 643 project=self.database, 644 execution_project=self.execution_project, 645 quota_project=self.quota_project, 646 location=self.location, 647 concurrent_tasks=self.threads, 648 keyfile=self.keyfile, 649 keyfile_json=self.keyfile_json, 650 token=self.token, 651 refresh_token=self.refresh_token, 652 client_id=self.client_id, 653 client_secret=self.client_secret, 654 token_uri=self.token_uri, 655 scopes=self.scopes, 656 impersonated_service_account=self.impersonated_service_account, 657 job_creation_timeout_seconds=self.job_creation_timeout_seconds, 658 job_execution_timeout_seconds=job_execution_timeout_seconds, 659 job_retries=job_retries, 660 job_retry_deadline_seconds=self.job_retry_deadline_seconds, 661 priority=self.priority, 662 maximum_bytes_billed=self.maximum_bytes_billed, 663 **kwargs, 664 ) 665 666 667class MSSQLConfig(TargetConfig): 668 """ 669 Project connection and operational configuration for the SQL Server (MSSQL) target 670 671 Args: 672 host: The MSSQL server host to connect to 673 server: Alias for host 674 port: The MSSQL server port to connect to 675 user: User name for authentication 676 username: Alias for user 677 UID: Alias for user 678 password: User password for authentication 679 PWD: Alias for password 680 login_timeout: The number of seconds to wait for a login to complete 681 query_timeout: The number of seconds to wait for a query to complete 682 authentication: The authentication method to use (only "sql" is supported) 683 schema_authorization: The principal who should own created schemas, not supported by SQLMesh 684 driver: ODBC driver to use, not used by SQLMesh 685 encrypt: A boolean flag to enable server connection encryption, not used by SQLMesh 686 trust_cert: A boolean flag to trust the server certificate, not used by SQLMesh 687 retries: Number of times to retry if the SQL Server connector encounters an error, not used by SQLMesh 688 windows_login: A boolean flag to use Windows Authentication, not used by SQLMesh 689 tenant_id: The tenant ID of the Azure Active Directory instance, not used by SQLMesh 690 client_id: The client ID of the Azure Active Directory service principal, not used by SQLMesh 691 client_secret: The client secret of the Azure Active Directory service principal, not used by SQLMesh 692 """ 693 694 type: t.Literal["sqlserver"] = "sqlserver" 695 host: t.Optional[str] = None 696 server: t.Optional[str] = None 697 port: int = 1433 698 database: str = Field(default="master") 699 schema_: str = Field(default="dbo", alias="schema") 700 user: t.Optional[str] = None 701 username: t.Optional[str] = None 702 UID: t.Optional[str] = None 703 password: t.Optional[str] = None 704 PWD: t.Optional[str] = None 705 threads: int = 4 706 login_timeout: t.Optional[int] = None 707 query_timeout: t.Optional[int] = None 708 authentication: t.Optional[str] = "sql" 709 schema_authorization: t.Optional[str] = None # Not supported by SQLMesh 710 711 # Unused ODBC parameters (SQLMesh uses pymssql instead of ODBC) 712 driver: t.Optional[str] = None 713 encrypt: t.Optional[bool] = None 714 trust_cert: t.Optional[bool] = None 715 retries: t.Optional[int] = None 716 717 # Unused authentication parameters (not supported by pymssql) 718 windows_login: t.Optional[bool] = None # pymssql doesn't require this flag for Windows Auth 719 tenant_id: t.Optional[str] = None # Azure Active Directory auth 720 client_id: t.Optional[str] = None # Azure Active Directory auth 721 client_secret: t.Optional[str] = None # Azure Active Directory auth 722 723 @model_validator(mode="before") 724 @classmethod 725 def validate_alias_fields(cls, data: t.Any) -> t.Any: 726 if not isinstance(data, dict): 727 return data 728 729 data["host"] = data.get("host") or data.get("server") 730 if not data["host"]: 731 raise ConfigError("Either host or server must be set") 732 733 data["user"] = data.get("user") or data.get("username") or data.get("UID") 734 if not data["user"]: 735 raise ConfigError("One of user, username, or UID must be set") 736 737 data["password"] = data.get("password") or data.get("PWD") 738 if not data["password"]: 739 raise ConfigError("Either password or PWD must be set") 740 741 return data 742 743 @field_validator("authentication") 744 @classmethod 745 def _validate_authentication(cls, v: str) -> str: 746 if v != "sql": 747 raise ConfigError("Only SQL and Windows Authentication are supported for SQL Server") 748 return v 749 750 @field_validator("port") 751 @classmethod 752 def _validate_port(cls, v: t.Union[int, str]) -> int: 753 return int(v) 754 755 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 756 # https://github.com/microsoft/dbt-fabric/blob/main/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql 757 return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append" 758 759 @classproperty 760 def column_class(cls) -> t.Type[Column]: 761 try: 762 # 1.8.0+ 763 from dbt.adapters.sqlserver.sqlserver_column import SQLServerColumn 764 except ImportError: 765 # <1.8.0 766 from dbt.adapters.sqlserver.sql_server_column import SQLServerColumn # type: ignore 767 768 return SQLServerColumn 769 770 @property 771 def dialect(self) -> str: 772 return "tsql" 773 774 @with_schema_differ_overrides 775 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 776 return MSSQLConnectionConfig( 777 host=self.host, 778 user=self.user, 779 password=self.password, 780 port=self.port, 781 database=self.database, 782 timeout=self.query_timeout, 783 login_timeout=self.login_timeout, 784 concurrent_tasks=self.threads, 785 **kwargs, 786 ) 787 788 789class TrinoConfig(TargetConfig): 790 """ 791 Project connection and operational configuration for the Trino target. 792 793 Args: 794 method: The Trino authentication method to use 795 host: The server host to connect to 796 port: The MSSQL server port to connect to 797 database: Name of the Trino database/catalog 798 schema: Name of the Trino schema 799 user: User name for authentication 800 password: User password for authentication 801 roles: Trino catalog roles 802 session_properties: Trino session properties 803 retries: Number of times to retry if the Trino connector encounters an error 804 timezone: The timezone to use for the Trino session 805 http_headers: HTTP Headers to send alongside requests to Trino 806 http_scheme: The HTTP scheme to use for requests to Trino (default: http, or https if kerberos, ldap or jwt auth) 807 threads: The number of threads to run on 808 impersonation_user: LDAP authentication: override the provided username 809 keytab: Kerberos authentication: Path to keytab 810 krb5_config: Kerberos authentication: Path to config 811 principal: Kerberos authentication: Principal 812 service_name: Kerberos authentication: Service name 813 hostname_override: Kerberos authentication: hostname for a host whose DNS name doesn't match 814 mutual_authentication: Kerberos authentication: Boolean flag for mutual authentication. 815 force_preemptive: Kerberos authentication: Boolean flag to preemptively initiate the GSS exchange. 816 sanitize_mutual_error_response: Kerberos authentication: Boolean flag to strip content and headers from error responses. 817 delegate: Kerberos authentication: Boolean flag for credential delegation (`GSS_C_DELEG_FLAG`) 818 jwt_token: JWT authentication: JWT string 819 client_certificate: Certification authentication: Path to client certificate 820 client_private_key: Certification authentication: Path to client private key 821 cert: Certification authentication: Full path to a certificate file 822 """ 823 824 _method_to_auth_enum: t.ClassVar[t.Dict[str, TrinoAuthenticationMethod]] = { 825 "none": TrinoAuthenticationMethod.NO_AUTH, 826 "ldap": TrinoAuthenticationMethod.LDAP, 827 "kerberos": TrinoAuthenticationMethod.KERBEROS, 828 "jwt": TrinoAuthenticationMethod.JWT, 829 "certificate": TrinoAuthenticationMethod.CERTIFICATE, 830 "oauth": TrinoAuthenticationMethod.OAUTH, 831 "oauth_console": TrinoAuthenticationMethod.OAUTH, 832 } 833 834 type: t.Literal["trino"] = "trino" 835 host: str 836 database: str 837 schema_: str = Field(alias="schema") 838 port: int = 443 839 method: str 840 user: t.Optional[str] = None 841 842 threads: int = 1 843 roles: t.Optional[t.Dict[str, str]] = None 844 session_properties: t.Optional[t.Dict[str, str]] = None 845 retries: int = 3 846 timezone: t.Optional[str] = None 847 http_headers: t.Optional[t.Dict[str, str]] = None 848 http_scheme: t.Optional[str] = None 849 prepared_statements_enabled: bool = True # not used by SQLMesh 850 851 # ldap authentication 852 password: t.Optional[str] = None 853 impersonation_user: t.Optional[str] = None 854 855 # kerberos authentication 856 keytab: t.Optional[str] = None 857 krb5_config: t.Optional[str] = None 858 principal: t.Optional[str] = None 859 service_name: str = "trino" 860 hostname_override: t.Optional[str] = None 861 mutual_authentication: bool = False 862 force_preemptive: bool = False 863 sanitize_mutual_error_response: bool = True 864 delegate: bool = False 865 866 # jwt authentication 867 jwt_token: t.Optional[str] = None 868 869 # certificate authentication 870 client_certificate: t.Optional[str] = None 871 client_private_key: t.Optional[str] = None 872 cert: t.Optional[str] = None 873 874 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 875 return "append" 876 877 @classproperty 878 def relation_class(cls) -> t.Type[BaseRelation]: 879 from dbt.adapters.trino.relation import TrinoRelation 880 881 return TrinoRelation 882 883 @classproperty 884 def column_class(cls) -> t.Type[Column]: 885 from dbt.adapters.trino.column import TrinoColumn 886 887 return TrinoColumn 888 889 @with_schema_differ_overrides 890 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 891 return TrinoConnectionConfig( 892 method=self._method_to_auth_enum[self.method], 893 host=self.host, 894 user=self.user, 895 catalog=self.database, 896 port=self.port, 897 http_scheme=self.http_scheme, 898 roles=self.roles, 899 http_headers=self.http_headers, 900 session_properties=self.session_properties, 901 retries=self.retries, 902 timezone=self.timezone, 903 password=self.password, 904 impersonation_user=self.impersonation_user, 905 keytab=self.keytab, 906 krb5_config=self.krb5_config, 907 principal=self.principal, 908 service_name=self.service_name, 909 hostname_override=self.hostname_override, 910 mutual_authentication=self.mutual_authentication, 911 force_preemptive=self.force_preemptive, 912 sanitize_mutual_error_response=self.sanitize_mutual_error_response, 913 delegate=self.delegate, 914 jwt_token=self.jwt_token, 915 client_certificate=self.client_certificate, 916 client_private_key=self.client_private_key, 917 cert=self.cert, 918 concurrent_tasks=self.threads, 919 **kwargs, 920 ) 921 922 923class ClickhouseConfig(TargetConfig): 924 """ 925 Project connection and operational configuration for the Clickhouse target 926 927 Args: 928 host: [localhost] 929 user: [default] # User for all database operations 930 password: [<empty string>] # Password for the user 931 secure: [False] # Use TLS (native protocol) or HTTPS (http protocol) 932 port: [8123] # If not set, defaults to 8123, 8443 depending on the secure and driver settings 933 connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse 934 send_receive_timeout: [300] # Timeout in seconds to receive data from the ClickHouse server 935 verify: [True] # Validate TLS certificate if using TLS/SSL 936 cluster: [<empty string>] # If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster. 937 custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty. 938 schema: [default] # ClickHouse database for dbt models, not used by SQLMesh 939 driver: [http] # http or native. If not set this will be autodetermined based on port setting, not used by SQLMesh 940 retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error), not used by SQLMesh 941 compression: [<empty string>] # Use gzip compression if truthy (http), or compression type for a native connection, not used by SQLMesh 942 cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud), not used by SQLMesh 943 use_lw_deletes: [False] # Use the strategy `delete+insert` as the default incremental strategy, not used by SQLMesh 944 check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. Not used by SQLMesh. 945 local_suffix: [_local] # Table suffix of local tables on shards for distributed materializations, not used by SQLMesh 946 local_db_prefix: [<empty string>] # Database prefix of local tables on shards for distributed materializations, not used by SQLMesh 947 allow_automatic_deduplication: [False] # Enable ClickHouse automatic deduplication for Replicated tables, not used by SQLMesh 948 tcp_keepalive: [False] # Native client only, specify TCP keepalive configuration. Specify custom keepalive settings as [idle_time_sec, interval_sec, probes], not used by SQLMesh 949 sync_request_timeout: [5] # Timeout for server ping, not used by SQLMesh 950 compress_block_size: [1048576] # Compression block size if compression is enabled, not used by SQLMesh 951 """ 952 953 host: str = "localhost" 954 user: str = Field(default="default", alias="username") 955 password: str = "" 956 port: t.Optional[int] = None 957 cluster: t.Optional[str] = None 958 schema_: str = Field(default="default", alias="schema") 959 connect_timeout: int = 10 960 send_receive_timeout: int = 300 961 verify: bool = True 962 compression: str = "" 963 custom_settings: t.Optional[t.Dict[str, t.Any]] = None 964 965 # Not used by SQLMesh 966 driver: t.Optional[str] = None 967 secure: bool = False 968 retries: int = 1 969 database_engine: t.Optional[str] = None 970 cluster_mode: bool = False 971 sync_request_timeout: int = 5 972 compress_block_size: int = 1048576 973 check_exchange: bool = True 974 use_lw_deletes: bool = False 975 allow_automatic_deduplication: bool = False 976 tcp_keepalive: t.Union[bool, t.Tuple[int, ...], t.List[int]] = False 977 database: str = "" 978 local_suffix: str = "local" 979 local_db_prefix: str = "" 980 981 type: t.Literal["clickhouse"] = "clickhouse" 982 983 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 984 # dbt-clickhouse name for temp table swap. That is sqlmesh's default 985 # strategy so doesn't require special handling during conversion. 986 return "legacy" 987 988 @classproperty 989 def relation_class(cls) -> t.Type[BaseRelation]: 990 from dbt.adapters.clickhouse.relation import ClickHouseRelation 991 992 return ClickHouseRelation 993 994 @classproperty 995 def column_class(cls) -> t.Type[Column]: 996 from dbt.adapters.clickhouse.column import ClickHouseColumn 997 998 return ClickHouseColumn 999 1000 @with_schema_differ_overrides 1001 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 1002 return ClickhouseConnectionConfig( 1003 host=self.host, 1004 username=self.user, 1005 password=self.password, 1006 port=self.port, 1007 cluster=self.cluster, 1008 connect_timeout=self.connect_timeout, 1009 send_receive_timeout=self.send_receive_timeout, 1010 verify=self.verify, 1011 compression_method=self.compression, 1012 connection_settings=self.custom_settings, 1013 **kwargs, 1014 ) 1015 1016 1017class AthenaConfig(TargetConfig): 1018 """ 1019 Project connection and operational configuration for the Athena target. 1020 1021 Args: 1022 s3_staging_dir: S3 location to store Athena query results and metadata 1023 s3_data_dir: Prefix for storing tables, if different from the connection's s3_staging_dir 1024 s3_data_naming: How to generate table paths in s3_data_dir 1025 s3_tmp_table_dir: Prefix for storing temporary tables, if different from the connection's s3_data_dir 1026 region_name: AWS region of your Athena instance 1027 schema: Specify the schema (Athena database) to build models into (lowercase only) 1028 database: Specify the database (Data catalog) to build models into (lowercase only) 1029 poll_interval: Interval in seconds to use for polling the status of query results in Athena 1030 debug_query_state: Flag if debug message with Athena query state is needed 1031 aws_access_key_id: Access key ID of the user performing requests 1032 aws_secret_access_key: Secret access key of the user performing requests 1033 aws_profile_name: Profile to use from your AWS shared credentials file 1034 work_group: Identifier of Athena workgroup 1035 skip_workgroup_check: Indicates if the WorkGroup check (additional AWS call) can be skipped 1036 num_retries: Number of times to retry a failing query 1037 num_boto3_retries: Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) 1038 num_iceberg_retries: Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR 1039 spark_work_group: Identifier of Athena Spark workgroup for running Python models 1040 seed_s3_upload_args: Dictionary containing boto3 ExtraArgs when uploading to S3 1041 lf_tags_database: Default LF tags for new database if it's created by dbt 1042 """ 1043 1044 type: t.Literal["athena"] = "athena" 1045 threads: int = 4 1046 1047 s3_staging_dir: t.Optional[str] = None 1048 s3_data_dir: t.Optional[str] = None 1049 s3_data_naming: t.Optional[str] = None 1050 s3_tmp_table_dir: t.Optional[str] = None 1051 poll_interval: t.Optional[int] = None 1052 debug_query_state: bool = False 1053 work_group: t.Optional[str] = None 1054 skip_workgroup_check: t.Optional[bool] = None 1055 spark_work_group: t.Optional[str] = None 1056 1057 aws_access_key_id: t.Optional[str] = None 1058 aws_secret_access_key: t.Optional[str] = None 1059 aws_profile_name: t.Optional[str] = None 1060 region_name: t.Optional[str] = None 1061 1062 num_retries: t.Optional[int] = None 1063 num_boto3_retries: t.Optional[int] = None 1064 num_iceberg_retries: t.Optional[int] = None 1065 1066 seed_s3_upload_args: t.Dict[str, str] = {} 1067 lf_tags_database: t.Dict[str, str] = {} 1068 1069 @classproperty 1070 def relation_class(cls) -> t.Type[BaseRelation]: 1071 from dbt.adapters.athena.relation import AthenaRelation 1072 1073 return AthenaRelation 1074 1075 @classproperty 1076 def column_class(cls) -> t.Type[Column]: 1077 from dbt.adapters.athena.column import AthenaColumn 1078 1079 return AthenaColumn 1080 1081 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 1082 return "insert_overwrite" 1083 1084 @with_schema_differ_overrides 1085 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 1086 return AthenaConnectionConfig( 1087 type="athena", 1088 aws_access_key_id=self.aws_access_key_id, 1089 aws_secret_access_key=self.aws_secret_access_key, 1090 region_name=self.region_name, 1091 work_group=self.work_group, 1092 s3_staging_dir=self.s3_staging_dir, 1093 s3_warehouse_location=self.s3_data_dir, 1094 schema_name=self.schema_, 1095 catalog_name=self.database, 1096 concurrent_tasks=self.threads, 1097 **kwargs, 1098 ) 1099 1100 1101TARGET_TYPE_TO_CONFIG_CLASS: t.Dict[str, t.Type[TargetConfig]] = { 1102 "databricks": DatabricksConfig, 1103 "duckdb": DuckDbConfig, 1104 "postgres": PostgresConfig, 1105 "redshift": RedshiftConfig, 1106 "snowflake": SnowflakeConfig, 1107 "bigquery": BigQueryConfig, 1108 "sqlserver": MSSQLConfig, 1109 "tsql": MSSQLConfig, 1110 "trino": TrinoConfig, 1111 "athena": AthenaConfig, 1112 "clickhouse": ClickhouseConfig, 1113}
77def with_schema_differ_overrides( 78 func: t.Callable[..., ConnectionConfig], 79) -> t.Callable[..., ConnectionConfig]: 80 """Decorator that merges default config with kwargs.""" 81 82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs) 85 86 return wrapper
Decorator that merges default config with kwargs.
89class TargetConfig(abc.ABC, DbtConfig): 90 """ 91 Configuration for DBT profile target 92 93 Args: 94 type: The type of the data warehouse 95 name: The name of this target 96 database: Name of the database 97 schema_: Name of the schema 98 threads: The number of threads to run on 99 """ 100 101 # dbt 102 type: str = "none" 103 name: str 104 database: str 105 schema_: str = Field(alias="schema") 106 threads: int = 1 107 profile_name: t.Optional[str] = None 108 109 @classmethod 110 def load(cls, data: t.Dict[str, t.Any]) -> TargetConfig: 111 """ 112 Loads the configuration from the yaml provided for a profile target 113 114 Args: 115 data: The yaml for the project's target output 116 117 Returns: 118 The configuration of the provided profile target 119 """ 120 db_type = data["type"] 121 if db_type == "databricks": 122 return DatabricksConfig(**data) 123 if db_type == "duckdb": 124 return DuckDbConfig(**data) 125 if db_type == "postgres": 126 return PostgresConfig(**data) 127 if db_type == "redshift": 128 return RedshiftConfig(**data) 129 if db_type == "snowflake": 130 return SnowflakeConfig(**data) 131 if db_type == "bigquery": 132 return BigQueryConfig(**data) 133 if db_type == "sqlserver": 134 return MSSQLConfig(**data) 135 if db_type == "trino": 136 return TrinoConfig(**data) 137 if db_type == "clickhouse": 138 return ClickhouseConfig(**data) 139 if db_type == "athena": 140 return AthenaConfig(**data) 141 142 raise ConfigError(f"{db_type} not supported.") 143 144 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 145 """The default incremental strategy for the db""" 146 raise NotImplementedError 147 148 @with_schema_differ_overrides 149 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 150 """Converts target config to SQLMesh connection config""" 151 raise NotImplementedError 152 153 def attribute_dict(self) -> AttributeDict: 154 fields = self.dict(include=SERIALIZABLE_FIELDS).copy() 155 fields["target_name"] = self.name 156 return AttributeDict(fields) 157 158 @classproperty 159 def quote_policy(cls) -> Policy: 160 return Policy() 161 162 @property 163 def extra(self) -> t.Set[str]: 164 return self.extra_fields(set(self.dict())) 165 166 @classproperty 167 def relation_class(cls) -> t.Type[BaseRelation]: 168 return BaseRelation 169 170 @classproperty 171 def column_class(cls) -> t.Type[Column]: 172 return Column 173 174 @property 175 def dialect(self) -> str: 176 return self.type
Configuration for DBT profile target
Arguments:
- type: The type of the data warehouse
- name: The name of this target
- database: Name of the database
- schema_: Name of the schema
- threads: The number of threads to run on
109 @classmethod 110 def load(cls, data: t.Dict[str, t.Any]) -> TargetConfig: 111 """ 112 Loads the configuration from the yaml provided for a profile target 113 114 Args: 115 data: The yaml for the project's target output 116 117 Returns: 118 The configuration of the provided profile target 119 """ 120 db_type = data["type"] 121 if db_type == "databricks": 122 return DatabricksConfig(**data) 123 if db_type == "duckdb": 124 return DuckDbConfig(**data) 125 if db_type == "postgres": 126 return PostgresConfig(**data) 127 if db_type == "redshift": 128 return RedshiftConfig(**data) 129 if db_type == "snowflake": 130 return SnowflakeConfig(**data) 131 if db_type == "bigquery": 132 return BigQueryConfig(**data) 133 if db_type == "sqlserver": 134 return MSSQLConfig(**data) 135 if db_type == "trino": 136 return TrinoConfig(**data) 137 if db_type == "clickhouse": 138 return ClickhouseConfig(**data) 139 if db_type == "athena": 140 return AthenaConfig(**data) 141 142 raise ConfigError(f"{db_type} not supported.")
Loads the configuration from the yaml provided for a profile target
Arguments:
- data: The yaml for the project's target output
Returns:
The configuration of the provided profile target
144 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 145 """The default incremental strategy for the db""" 146 raise NotImplementedError
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Policy(database: bool = True, schema: bool = True, identifier: bool = True)
BaseRelation(path: dbt.adapters.contracts.relation.Path, type: Optional[dbt.adapters.contracts.relation.RelationType] = None, quote_character: str = '"', include_policy: dbt.adapters.contracts.relation.Policy =
Column(column: str, dtype: str, char_size: Optional[int] = None, numeric_precision: Optional[Any] = None, numeric_scale: Optional[Any] = None)
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
182class DuckDbConfig(TargetConfig): 183 """ 184 Connection config for DuckDb target 185 186 Args: 187 path: Location of the database file. If not specified, an in memory database is used. 188 extensions: A list of autoloadable extensions to load. 189 settings: A dictionary of settings to pass into the duckdb connector. 190 secrets: A list of secrets to pass to the secret manager in the duckdb connector. 191 filesystems: A list of `fsspec` filesystems to register in the duckdb connection. 192 """ 193 194 type: t.Literal["duckdb"] = "duckdb" 195 database: str = "main" 196 schema_: str = Field(default="main", alias="schema") 197 path: str = DUCKDB_IN_MEMORY 198 extensions: t.Optional[t.List[str]] = None 199 settings: t.Optional[t.Dict[str, t.Any]] = None 200 secrets: t.Optional[t.List[t.Dict[str, t.Any]]] = None 201 filesystems: t.Optional[t.List[t.Dict[str, t.Any]]] = None 202 203 @model_validator(mode="before") 204 def validate_authentication(cls, data: t.Any) -> t.Any: 205 if not isinstance(data, dict): 206 return data 207 208 if "database" not in data and DBT_VERSION >= (1, 5, 0): 209 path = data.get("path") 210 data["database"] = ( 211 "memory" 212 if path is None or path == DUCKDB_IN_MEMORY 213 else Path(t.cast(str, path)).stem 214 ) 215 216 if "threads" in data and t.cast(int, data["threads"]) > 1: 217 get_console().log_warning("DuckDB does not support concurrency - setting threads to 1.") 218 219 return data 220 221 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 222 return "delete+insert" 223 224 @classproperty 225 def relation_class(cls) -> t.Type[BaseRelation]: 226 from dbt.adapters.duckdb.relation import DuckDBRelation 227 228 return DuckDBRelation 229 230 @with_schema_differ_overrides 231 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 232 if self.extensions is not None: 233 kwargs["extensions"] = self.extensions 234 if self.settings is not None: 235 kwargs["connector_config"] = self.settings 236 if self.secrets is not None: 237 kwargs["secrets"] = self.secrets 238 if self.filesystems is not None: 239 kwargs["filesystems"] = self.filesystems 240 return DuckDBConnectionConfig( 241 database=self.path, 242 concurrent_tasks=1, 243 **kwargs, 244 )
Connection config for DuckDb target
Arguments:
- path: Location of the database file. If not specified, an in memory database is used.
- extensions: A list of autoloadable extensions to load.
- settings: A dictionary of settings to pass into the duckdb connector.
- secrets: A list of secrets to pass to the secret manager in the duckdb connector.
- filesystems: A list of
fsspecfilesystems to register in the duckdb connection.
203 @model_validator(mode="before") 204 def validate_authentication(cls, data: t.Any) -> t.Any: 205 if not isinstance(data, dict): 206 return data 207 208 if "database" not in data and DBT_VERSION >= (1, 5, 0): 209 path = data.get("path") 210 data["database"] = ( 211 "memory" 212 if path is None or path == DUCKDB_IN_MEMORY 213 else Path(t.cast(str, path)).stem 214 ) 215 216 if "threads" in data and t.cast(int, data["threads"]) > 1: 217 get_console().log_warning("DuckDB does not support concurrency - setting threads to 1.") 218 219 return data
221 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 222 return "delete+insert"
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
247class SnowflakeConfig(TargetConfig): 248 """ 249 Project connection and operational configuration for the Snowflake target 250 251 Args: 252 account: Snowflake account 253 warehouse: Name of the warehouse 254 user: Name of the user 255 password: User's password 256 role: Role of the user 257 client_session_keep_alive: A boolean flag to extend the duration of the Snowflake session beyond 4 hours 258 query_tag: tag for the query in Snowflake 259 connect_retries: Number of times to retry if the Snowflake connector encounters an error 260 connect_timeout: Number of seconds to wait between failed attempts 261 retry_on_database_errors: A boolean flag to retry if a Snowflake connector Database error is encountered 262 retry_all: A boolean flag to retry on all Snowflake connector errors 263 authenticator: SSO authentication: Snowflake authentication method 264 private_key: Key pair authentication: Private key 265 private_key_path: Key pair authentication: Path to the private key, used instead of private_key 266 private_key_passphrase: Key pair authentication: passphrase used to decrypt private key (if encrypted) 267 token: OAuth authentication: The Snowflake OAuth 2.0 access token 268 """ 269 270 type: t.Literal["snowflake"] = "snowflake" 271 account: str 272 user: str 273 274 # User and password authentication 275 password: t.Optional[str] = None 276 277 # SSO authentication 278 authenticator: t.Optional[str] = None 279 280 # Key Pair Auth 281 private_key: t.Optional[str] = None 282 private_key_path: t.Optional[str] = None 283 private_key_passphrase: t.Optional[str] = None 284 285 # TODO add other forms of authentication 286 287 # oauth access token 288 token: t.Optional[str] = None 289 290 # Optional 291 warehouse: t.Optional[str] = None 292 role: t.Optional[str] = None 293 client_session_keep_alive: bool = False 294 query_tag: t.Optional[str] = None 295 connect_retries: int = 0 296 connect_timeout: int = 10 297 retry_on_database_errors: bool = False 298 retry_all: bool = False 299 300 @model_validator(mode="before") 301 @classmethod 302 def validate_authentication(cls, data: t.Any) -> t.Any: 303 if not isinstance(data, dict) or ( 304 data.get("password") 305 or data.get("authenticator") 306 or data.get("private_key") 307 or data.get("private_key_path") 308 ): 309 return data 310 311 raise ConfigError("No supported Snowflake authentication method found in target profile.") 312 313 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 314 return "merge" 315 316 @classproperty 317 def relation_class(cls) -> t.Type[BaseRelation]: 318 from dbt.adapters.snowflake import SnowflakeRelation 319 320 return SnowflakeRelation 321 322 @classproperty 323 def column_class(cls) -> t.Type[Column]: 324 from dbt.adapters.snowflake import SnowflakeColumn 325 326 return SnowflakeColumn 327 328 @with_schema_differ_overrides 329 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 330 return SnowflakeConnectionConfig( 331 user=self.user, 332 password=self.password, 333 authenticator=self.authenticator, 334 account=self.account, 335 warehouse=self.warehouse, 336 database=self.database, 337 role=self.role, 338 concurrent_tasks=self.threads, 339 token=self.token, 340 private_key=self.private_key, 341 private_key_path=self.private_key_path, 342 private_key_passphrase=self.private_key_passphrase, 343 **kwargs, 344 ) 345 346 @classproperty 347 def quote_policy(cls) -> Policy: 348 return Policy(database=False, schema=False, identifier=False)
Project connection and operational configuration for the Snowflake target
Arguments:
- account: Snowflake account
- warehouse: Name of the warehouse
- user: Name of the user
- password: User's password
- role: Role of the user
- client_session_keep_alive: A boolean flag to extend the duration of the Snowflake session beyond 4 hours
- query_tag: tag for the query in Snowflake
- connect_retries: Number of times to retry if the Snowflake connector encounters an error
- connect_timeout: Number of seconds to wait between failed attempts
- retry_on_database_errors: A boolean flag to retry if a Snowflake connector Database error is encountered
- retry_all: A boolean flag to retry on all Snowflake connector errors
- authenticator: SSO authentication: Snowflake authentication method
- private_key: Key pair authentication: Private key
- private_key_path: Key pair authentication: Path to the private key, used instead of private_key
- private_key_passphrase: Key pair authentication: passphrase used to decrypt private key (if encrypted)
- token: OAuth authentication: The Snowflake OAuth 2.0 access token
300 @model_validator(mode="before") 301 @classmethod 302 def validate_authentication(cls, data: t.Any) -> t.Any: 303 if not isinstance(data, dict) or ( 304 data.get("password") 305 or data.get("authenticator") 306 or data.get("private_key") 307 or data.get("private_key_path") 308 ): 309 return data 310 311 raise ConfigError("No supported Snowflake authentication method found in target profile.")
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
346 @classproperty 347 def quote_policy(cls) -> Policy: 348 return Policy(database=False, schema=False, identifier=False)
Policy(database: bool = True, schema: bool = True, identifier: bool = True)
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
351class PostgresConfig(TargetConfig): 352 """ 353 Project connection and operational configuration for the Postgres target 354 355 Args: 356 host: The Postgres host to connect to 357 user: Name of the user 358 password: User's password 359 port: The port to connect to 360 dbname: Name of the database 361 keepalives_idle: Seconds between TCP keepalive packets 362 connect_timeout: Number of seconds to wait between failed attempts 363 retries: Number of times to retry if the Postgres connector encounters an error 364 search_path: Overrides the default search path 365 role: Role of the user 366 sslmode: SSL Mode used to connect to the database 367 """ 368 369 type: t.Literal["postgres"] = "postgres" 370 host: str 371 user: str 372 password: str = Field(validation_alias=AliasChoices("pass", "password")) 373 port: int 374 dbname: str 375 keepalives_idle: t.Optional[int] = None 376 connect_timeout: int = 10 377 retries: int = 1 # Currently Unsupported by SQLMesh 378 search_path: t.Optional[str] = None # Currently Unsupported by SQLMesh 379 role: t.Optional[str] = None 380 sslmode: t.Optional[str] = None 381 382 @model_validator(mode="before") 383 @classmethod 384 def validate_database(cls, data: t.Any) -> t.Any: 385 if not isinstance(data, dict): 386 return data 387 388 data["database"] = data.get("database") or data.get("dbname") 389 if not data["database"]: 390 raise ConfigError("Either database or dbname must be set") 391 392 return data 393 394 @field_validator("port") 395 @classmethod 396 def _validate_port(cls, v: t.Union[int, str]) -> int: 397 return int(v) 398 399 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 400 return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append" 401 402 @with_schema_differ_overrides 403 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 404 return PostgresConnectionConfig( 405 host=self.host, 406 user=self.user, 407 password=self.password, 408 port=self.port, 409 database=self.dbname, 410 keepalives_idle=self.keepalives_idle, 411 concurrent_tasks=self.threads, 412 connect_timeout=self.connect_timeout, 413 role=self.role, 414 sslmode=self.sslmode, 415 **kwargs, 416 )
Project connection and operational configuration for the Postgres target
Arguments:
- host: The Postgres host to connect to
- user: Name of the user
- password: User's password
- port: The port to connect to
- dbname: Name of the database
- keepalives_idle: Seconds between TCP keepalive packets
- connect_timeout: Number of seconds to wait between failed attempts
- retries: Number of times to retry if the Postgres connector encounters an error
- search_path: Overrides the default search path
- role: Role of the user
- sslmode: SSL Mode used to connect to the database
382 @model_validator(mode="before") 383 @classmethod 384 def validate_database(cls, data: t.Any) -> t.Any: 385 if not isinstance(data, dict): 386 return data 387 388 data["database"] = data.get("database") or data.get("dbname") 389 if not data["database"]: 390 raise ConfigError("Either database or dbname must be set") 391 392 return data
399 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 400 return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
419class RedshiftConfig(TargetConfig): 420 """ 421 Project connection and operational configuration for the Redshift target 422 423 Args: 424 host: The Redshift host to connect to 425 user: Name of the user 426 password: User's password 427 port: The port to connect to 428 dbname: Name of the database 429 connect_timeout: Number of seconds to wait between failed attempts 430 ra3_node: Enables cross-database sources 431 search_path: Overrides the default search path 432 sslmode: SSL Mode used to connect to the database 433 """ 434 435 # TODO add other forms of authentication 436 type: t.Literal["redshift"] = "redshift" 437 host: str 438 user: str 439 password: str = Field(validation_alias=AliasChoices("pass", "password")) 440 port: int 441 dbname: str 442 connect_timeout: t.Optional[int] = None 443 ra3_node: bool = True 444 search_path: t.Optional[str] = None 445 sslmode: t.Optional[str] = None 446 447 @model_validator(mode="before") 448 @classmethod 449 def validate_database(cls, data: t.Any) -> t.Any: 450 if not isinstance(data, dict): 451 return data 452 453 data["database"] = data.get("database") or data.get("dbname") 454 if not data["database"]: 455 raise ConfigError("Either database or dbname must be set") 456 457 return data 458 459 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 460 return "append" 461 462 @classproperty 463 def relation_class(cls) -> t.Type[BaseRelation]: 464 from dbt.adapters.redshift import RedshiftRelation 465 466 return RedshiftRelation 467 468 @classproperty 469 def column_class(cls) -> t.Type[Column]: 470 if DBT_VERSION < (1, 6, 0): 471 from dbt.adapters.redshift import RedshiftColumn # type: ignore 472 473 return RedshiftColumn 474 return super(RedshiftConfig, cls).column_class 475 476 @with_schema_differ_overrides 477 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 478 return RedshiftConnectionConfig( 479 user=self.user, 480 password=self.password, 481 database=self.database, 482 host=self.host, 483 port=self.port, 484 sslmode=self.sslmode, 485 timeout=self.connect_timeout, 486 concurrent_tasks=self.threads, 487 **kwargs, 488 )
Project connection and operational configuration for the Redshift target
Arguments:
- host: The Redshift host to connect to
- user: Name of the user
- password: User's password
- port: The port to connect to
- dbname: Name of the database
- connect_timeout: Number of seconds to wait between failed attempts
- ra3_node: Enables cross-database sources
- search_path: Overrides the default search path
- sslmode: SSL Mode used to connect to the database
447 @model_validator(mode="before") 448 @classmethod 449 def validate_database(cls, data: t.Any) -> t.Any: 450 if not isinstance(data, dict): 451 return data 452 453 data["database"] = data.get("database") or data.get("dbname") 454 if not data["database"]: 455 raise ConfigError("Either database or dbname must be set") 456 457 return data
The default incremental strategy for the db
468 @classproperty 469 def column_class(cls) -> t.Type[Column]: 470 if DBT_VERSION < (1, 6, 0): 471 from dbt.adapters.redshift import RedshiftColumn # type: ignore 472 473 return RedshiftColumn 474 return super(RedshiftConfig, cls).column_class
Column(column: str, dtype: str, char_size: Optional[int] = None, numeric_precision: Optional[Any] = None, numeric_scale: Optional[Any] = None)
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
491class DatabricksConfig(TargetConfig): 492 """ 493 Project connection and operational configuration for the Databricks target 494 495 Args: 496 catalog: Catalog name to use for Unity Catalog 497 host: The Databricks host to connect to 498 http_path: The Databricks compute resources URL 499 token: Personal access token 500 database: Name of the database. Not applicable for Databricks and ignored 501 """ 502 503 type: t.Literal["databricks"] = "databricks" 504 host: str 505 http_path: str 506 token: t.Optional[str] = None # only required if auth_type is not set to 'oauth' 507 database: t.Optional[str] = Field(alias="catalog") # type: ignore 508 auth_type: t.Optional[str] = None 509 client_id: t.Optional[str] = None 510 client_secret: t.Optional[str] = None 511 512 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 513 return "merge" 514 515 @classproperty 516 def relation_class(cls) -> t.Type[BaseRelation]: 517 from dbt.adapters.databricks.relation import DatabricksRelation 518 519 return DatabricksRelation 520 521 @classproperty 522 def column_class(cls) -> t.Type[Column]: 523 from dbt.adapters.databricks.column import DatabricksColumn 524 525 return DatabricksColumn 526 527 @with_schema_differ_overrides 528 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 529 return DatabricksConnectionConfig( 530 server_hostname=self.host, 531 http_path=self.http_path, 532 access_token=self.token, 533 concurrent_tasks=self.threads, 534 catalog=self.database, 535 auth_type="databricks-oauth" if self.auth_type == "oauth" else self.auth_type, 536 oauth_client_id=self.client_id, 537 oauth_client_secret=self.client_secret, 538 **kwargs, 539 )
Project connection and operational configuration for the Databricks target
Arguments:
- catalog: Catalog name to use for Unity Catalog
- host: The Databricks host to connect to
- http_path: The Databricks compute resources URL
- token: Personal access token
- database: Name of the database. Not applicable for Databricks and ignored
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
542class BigQueryConfig(TargetConfig): 543 """ 544 Project connection and operational configuration for the BigQuery target 545 546 Args: 547 type: The type of the target (bigquery) 548 method: The BigQuery authentication method to use 549 project: The BigQuery project to connect to 550 location: The BigQuery location to connect to 551 keyfile: The path to the BigQuery keyfile 552 keyfile_json: The BigQuery keyfile as a JSON string 553 token: The BigQuery token 554 refresh_token: The BigQuery refresh token 555 client_id: The BigQuery client ID 556 client_secret: The BigQuery client secret 557 token_uri: The BigQuery token URI 558 scopes: The BigQuery scopes 559 impersonated_service_account: The service account to impersonate 560 job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created 561 job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete 562 timeout_seconds: Alias for job_execution_timeout_seconds 563 job_retries: The number of times to retry the underlying job if it fails 564 retries: Alias for job_retries 565 job_retry_deadline_seconds: Total number of seconds to wait while retrying the same query 566 priority: The priority of the underlying job 567 maximum_bytes_billed: The maximum number of bytes to be billed for the underlying job 568 """ 569 570 type: t.Literal["bigquery"] = "bigquery" 571 method: t.Optional[str] = BigQueryConnectionMethod.OAUTH 572 dataset: t.Optional[str] = None 573 project: t.Optional[str] = None 574 execution_project: t.Optional[str] = None 575 quota_project: t.Optional[str] = None 576 location: t.Optional[str] = None 577 keyfile: t.Optional[str] = None 578 keyfile_json: t.Optional[t.Dict[str, t.Any]] = None 579 token: t.Optional[str] = None 580 refresh_token: t.Optional[str] = None 581 client_id: t.Optional[str] = None 582 client_secret: t.Optional[str] = None 583 token_uri: t.Optional[str] = None 584 scopes: t.Tuple[str, ...] = ( 585 "https://www.googleapis.com/auth/bigquery", 586 "https://www.googleapis.com/auth/cloud-platform", 587 "https://www.googleapis.com/auth/drive", 588 ) 589 impersonated_service_account: t.Optional[str] = None 590 job_creation_timeout_seconds: t.Optional[int] = None 591 job_execution_timeout_seconds: t.Optional[int] = None 592 timeout_seconds: t.Optional[int] = None # To support legacy config 593 job_retries: t.Optional[int] = None 594 retries: int = 1 # To support legacy config 595 job_retry_deadline_seconds: t.Optional[int] = None 596 priority: BigQueryPriority = BigQueryPriority.INTERACTIVE 597 maximum_bytes_billed: t.Optional[int] = None 598 599 @model_validator(mode="before") 600 @classmethod 601 def validate_fields(cls, data: t.Any) -> t.Any: 602 if not isinstance(data, dict): 603 return data 604 605 # dbt treats schema and dataset interchangeably 606 schema = data.get("schema") or data.get("dataset") 607 if not schema: 608 raise ConfigError("Either schema or dataset must be set") 609 data["dataset"] = data["schema"] = schema 610 611 # dbt treats database and project interchangeably 612 database = data.get("database") or data.get("project") 613 if not database: 614 raise ConfigError("Either database or project must be set") 615 data["database"] = data["project"] = database 616 617 return data 618 619 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 620 return "merge" 621 622 @classproperty 623 def relation_class(cls) -> t.Type[BaseRelation]: 624 from dbt.adapters.bigquery.relation import BigQueryRelation 625 626 return BigQueryRelation 627 628 @classproperty 629 def column_class(cls) -> t.Type[Column]: 630 from dbt.adapters.bigquery import BigQueryColumn 631 632 return BigQueryColumn 633 634 @with_schema_differ_overrides 635 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 636 job_retries = self.job_retries if self.job_retries is not None else self.retries 637 job_execution_timeout_seconds = ( 638 self.job_execution_timeout_seconds 639 if self.job_execution_timeout_seconds is not None 640 else self.timeout_seconds 641 ) 642 return BigQueryConnectionConfig( 643 method=self.method, 644 project=self.database, 645 execution_project=self.execution_project, 646 quota_project=self.quota_project, 647 location=self.location, 648 concurrent_tasks=self.threads, 649 keyfile=self.keyfile, 650 keyfile_json=self.keyfile_json, 651 token=self.token, 652 refresh_token=self.refresh_token, 653 client_id=self.client_id, 654 client_secret=self.client_secret, 655 token_uri=self.token_uri, 656 scopes=self.scopes, 657 impersonated_service_account=self.impersonated_service_account, 658 job_creation_timeout_seconds=self.job_creation_timeout_seconds, 659 job_execution_timeout_seconds=job_execution_timeout_seconds, 660 job_retries=job_retries, 661 job_retry_deadline_seconds=self.job_retry_deadline_seconds, 662 priority=self.priority, 663 maximum_bytes_billed=self.maximum_bytes_billed, 664 **kwargs, 665 )
Project connection and operational configuration for the BigQuery target
Arguments:
- type: The type of the target (bigquery)
- method: The BigQuery authentication method to use
- project: The BigQuery project to connect to
- location: The BigQuery location to connect to
- keyfile: The path to the BigQuery keyfile
- keyfile_json: The BigQuery keyfile as a JSON string
- token: The BigQuery token
- refresh_token: The BigQuery refresh token
- client_id: The BigQuery client ID
- client_secret: The BigQuery client secret
- token_uri: The BigQuery token URI
- scopes: The BigQuery scopes
- impersonated_service_account: The service account to impersonate
- job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created
- job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete
- timeout_seconds: Alias for job_execution_timeout_seconds
- job_retries: The number of times to retry the underlying job if it fails
- retries: Alias for job_retries
- job_retry_deadline_seconds: Total number of seconds to wait while retrying the same query
- priority: The priority of the underlying job
- maximum_bytes_billed: The maximum number of bytes to be billed for the underlying job
599 @model_validator(mode="before") 600 @classmethod 601 def validate_fields(cls, data: t.Any) -> t.Any: 602 if not isinstance(data, dict): 603 return data 604 605 # dbt treats schema and dataset interchangeably 606 schema = data.get("schema") or data.get("dataset") 607 if not schema: 608 raise ConfigError("Either schema or dataset must be set") 609 data["dataset"] = data["schema"] = schema 610 611 # dbt treats database and project interchangeably 612 database = data.get("database") or data.get("project") 613 if not database: 614 raise ConfigError("Either database or project must be set") 615 data["database"] = data["project"] = database 616 617 return data
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
668class MSSQLConfig(TargetConfig): 669 """ 670 Project connection and operational configuration for the SQL Server (MSSQL) target 671 672 Args: 673 host: The MSSQL server host to connect to 674 server: Alias for host 675 port: The MSSQL server port to connect to 676 user: User name for authentication 677 username: Alias for user 678 UID: Alias for user 679 password: User password for authentication 680 PWD: Alias for password 681 login_timeout: The number of seconds to wait for a login to complete 682 query_timeout: The number of seconds to wait for a query to complete 683 authentication: The authentication method to use (only "sql" is supported) 684 schema_authorization: The principal who should own created schemas, not supported by SQLMesh 685 driver: ODBC driver to use, not used by SQLMesh 686 encrypt: A boolean flag to enable server connection encryption, not used by SQLMesh 687 trust_cert: A boolean flag to trust the server certificate, not used by SQLMesh 688 retries: Number of times to retry if the SQL Server connector encounters an error, not used by SQLMesh 689 windows_login: A boolean flag to use Windows Authentication, not used by SQLMesh 690 tenant_id: The tenant ID of the Azure Active Directory instance, not used by SQLMesh 691 client_id: The client ID of the Azure Active Directory service principal, not used by SQLMesh 692 client_secret: The client secret of the Azure Active Directory service principal, not used by SQLMesh 693 """ 694 695 type: t.Literal["sqlserver"] = "sqlserver" 696 host: t.Optional[str] = None 697 server: t.Optional[str] = None 698 port: int = 1433 699 database: str = Field(default="master") 700 schema_: str = Field(default="dbo", alias="schema") 701 user: t.Optional[str] = None 702 username: t.Optional[str] = None 703 UID: t.Optional[str] = None 704 password: t.Optional[str] = None 705 PWD: t.Optional[str] = None 706 threads: int = 4 707 login_timeout: t.Optional[int] = None 708 query_timeout: t.Optional[int] = None 709 authentication: t.Optional[str] = "sql" 710 schema_authorization: t.Optional[str] = None # Not supported by SQLMesh 711 712 # Unused ODBC parameters (SQLMesh uses pymssql instead of ODBC) 713 driver: t.Optional[str] = None 714 encrypt: t.Optional[bool] = None 715 trust_cert: t.Optional[bool] = None 716 retries: t.Optional[int] = None 717 718 # Unused authentication parameters (not supported by pymssql) 719 windows_login: t.Optional[bool] = None # pymssql doesn't require this flag for Windows Auth 720 tenant_id: t.Optional[str] = None # Azure Active Directory auth 721 client_id: t.Optional[str] = None # Azure Active Directory auth 722 client_secret: t.Optional[str] = None # Azure Active Directory auth 723 724 @model_validator(mode="before") 725 @classmethod 726 def validate_alias_fields(cls, data: t.Any) -> t.Any: 727 if not isinstance(data, dict): 728 return data 729 730 data["host"] = data.get("host") or data.get("server") 731 if not data["host"]: 732 raise ConfigError("Either host or server must be set") 733 734 data["user"] = data.get("user") or data.get("username") or data.get("UID") 735 if not data["user"]: 736 raise ConfigError("One of user, username, or UID must be set") 737 738 data["password"] = data.get("password") or data.get("PWD") 739 if not data["password"]: 740 raise ConfigError("Either password or PWD must be set") 741 742 return data 743 744 @field_validator("authentication") 745 @classmethod 746 def _validate_authentication(cls, v: str) -> str: 747 if v != "sql": 748 raise ConfigError("Only SQL and Windows Authentication are supported for SQL Server") 749 return v 750 751 @field_validator("port") 752 @classmethod 753 def _validate_port(cls, v: t.Union[int, str]) -> int: 754 return int(v) 755 756 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 757 # https://github.com/microsoft/dbt-fabric/blob/main/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql 758 return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append" 759 760 @classproperty 761 def column_class(cls) -> t.Type[Column]: 762 try: 763 # 1.8.0+ 764 from dbt.adapters.sqlserver.sqlserver_column import SQLServerColumn 765 except ImportError: 766 # <1.8.0 767 from dbt.adapters.sqlserver.sql_server_column import SQLServerColumn # type: ignore 768 769 return SQLServerColumn 770 771 @property 772 def dialect(self) -> str: 773 return "tsql" 774 775 @with_schema_differ_overrides 776 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 777 return MSSQLConnectionConfig( 778 host=self.host, 779 user=self.user, 780 password=self.password, 781 port=self.port, 782 database=self.database, 783 timeout=self.query_timeout, 784 login_timeout=self.login_timeout, 785 concurrent_tasks=self.threads, 786 **kwargs, 787 )
Project connection and operational configuration for the SQL Server (MSSQL) target
Arguments:
- host: The MSSQL server host to connect to
- server: Alias for host
- port: The MSSQL server port to connect to
- user: User name for authentication
- username: Alias for user
- UID: Alias for user
- password: User password for authentication
- PWD: Alias for password
- login_timeout: The number of seconds to wait for a login to complete
- query_timeout: The number of seconds to wait for a query to complete
- authentication: The authentication method to use (only "sql" is supported)
- schema_authorization: The principal who should own created schemas, not supported by SQLMesh
- driver: ODBC driver to use, not used by SQLMesh
- encrypt: A boolean flag to enable server connection encryption, not used by SQLMesh
- trust_cert: A boolean flag to trust the server certificate, not used by SQLMesh
- retries: Number of times to retry if the SQL Server connector encounters an error, not used by SQLMesh
- windows_login: A boolean flag to use Windows Authentication, not used by SQLMesh
- tenant_id: The tenant ID of the Azure Active Directory instance, not used by SQLMesh
- client_id: The client ID of the Azure Active Directory service principal, not used by SQLMesh
- client_secret: The client secret of the Azure Active Directory service principal, not used by SQLMesh
724 @model_validator(mode="before") 725 @classmethod 726 def validate_alias_fields(cls, data: t.Any) -> t.Any: 727 if not isinstance(data, dict): 728 return data 729 730 data["host"] = data.get("host") or data.get("server") 731 if not data["host"]: 732 raise ConfigError("Either host or server must be set") 733 734 data["user"] = data.get("user") or data.get("username") or data.get("UID") 735 if not data["user"]: 736 raise ConfigError("One of user, username, or UID must be set") 737 738 data["password"] = data.get("password") or data.get("PWD") 739 if not data["password"]: 740 raise ConfigError("Either password or PWD must be set") 741 742 return data
756 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 757 # https://github.com/microsoft/dbt-fabric/blob/main/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql 758 return "delete+insert" if kind is IncrementalByUniqueKeyKind else "append"
The default incremental strategy for the db
760 @classproperty 761 def column_class(cls) -> t.Type[Column]: 762 try: 763 # 1.8.0+ 764 from dbt.adapters.sqlserver.sqlserver_column import SQLServerColumn 765 except ImportError: 766 # <1.8.0 767 from dbt.adapters.sqlserver.sql_server_column import SQLServerColumn # type: ignore 768 769 return SQLServerColumn
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
790class TrinoConfig(TargetConfig): 791 """ 792 Project connection and operational configuration for the Trino target. 793 794 Args: 795 method: The Trino authentication method to use 796 host: The server host to connect to 797 port: The MSSQL server port to connect to 798 database: Name of the Trino database/catalog 799 schema: Name of the Trino schema 800 user: User name for authentication 801 password: User password for authentication 802 roles: Trino catalog roles 803 session_properties: Trino session properties 804 retries: Number of times to retry if the Trino connector encounters an error 805 timezone: The timezone to use for the Trino session 806 http_headers: HTTP Headers to send alongside requests to Trino 807 http_scheme: The HTTP scheme to use for requests to Trino (default: http, or https if kerberos, ldap or jwt auth) 808 threads: The number of threads to run on 809 impersonation_user: LDAP authentication: override the provided username 810 keytab: Kerberos authentication: Path to keytab 811 krb5_config: Kerberos authentication: Path to config 812 principal: Kerberos authentication: Principal 813 service_name: Kerberos authentication: Service name 814 hostname_override: Kerberos authentication: hostname for a host whose DNS name doesn't match 815 mutual_authentication: Kerberos authentication: Boolean flag for mutual authentication. 816 force_preemptive: Kerberos authentication: Boolean flag to preemptively initiate the GSS exchange. 817 sanitize_mutual_error_response: Kerberos authentication: Boolean flag to strip content and headers from error responses. 818 delegate: Kerberos authentication: Boolean flag for credential delegation (`GSS_C_DELEG_FLAG`) 819 jwt_token: JWT authentication: JWT string 820 client_certificate: Certification authentication: Path to client certificate 821 client_private_key: Certification authentication: Path to client private key 822 cert: Certification authentication: Full path to a certificate file 823 """ 824 825 _method_to_auth_enum: t.ClassVar[t.Dict[str, TrinoAuthenticationMethod]] = { 826 "none": TrinoAuthenticationMethod.NO_AUTH, 827 "ldap": TrinoAuthenticationMethod.LDAP, 828 "kerberos": TrinoAuthenticationMethod.KERBEROS, 829 "jwt": TrinoAuthenticationMethod.JWT, 830 "certificate": TrinoAuthenticationMethod.CERTIFICATE, 831 "oauth": TrinoAuthenticationMethod.OAUTH, 832 "oauth_console": TrinoAuthenticationMethod.OAUTH, 833 } 834 835 type: t.Literal["trino"] = "trino" 836 host: str 837 database: str 838 schema_: str = Field(alias="schema") 839 port: int = 443 840 method: str 841 user: t.Optional[str] = None 842 843 threads: int = 1 844 roles: t.Optional[t.Dict[str, str]] = None 845 session_properties: t.Optional[t.Dict[str, str]] = None 846 retries: int = 3 847 timezone: t.Optional[str] = None 848 http_headers: t.Optional[t.Dict[str, str]] = None 849 http_scheme: t.Optional[str] = None 850 prepared_statements_enabled: bool = True # not used by SQLMesh 851 852 # ldap authentication 853 password: t.Optional[str] = None 854 impersonation_user: t.Optional[str] = None 855 856 # kerberos authentication 857 keytab: t.Optional[str] = None 858 krb5_config: t.Optional[str] = None 859 principal: t.Optional[str] = None 860 service_name: str = "trino" 861 hostname_override: t.Optional[str] = None 862 mutual_authentication: bool = False 863 force_preemptive: bool = False 864 sanitize_mutual_error_response: bool = True 865 delegate: bool = False 866 867 # jwt authentication 868 jwt_token: t.Optional[str] = None 869 870 # certificate authentication 871 client_certificate: t.Optional[str] = None 872 client_private_key: t.Optional[str] = None 873 cert: t.Optional[str] = None 874 875 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 876 return "append" 877 878 @classproperty 879 def relation_class(cls) -> t.Type[BaseRelation]: 880 from dbt.adapters.trino.relation import TrinoRelation 881 882 return TrinoRelation 883 884 @classproperty 885 def column_class(cls) -> t.Type[Column]: 886 from dbt.adapters.trino.column import TrinoColumn 887 888 return TrinoColumn 889 890 @with_schema_differ_overrides 891 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 892 return TrinoConnectionConfig( 893 method=self._method_to_auth_enum[self.method], 894 host=self.host, 895 user=self.user, 896 catalog=self.database, 897 port=self.port, 898 http_scheme=self.http_scheme, 899 roles=self.roles, 900 http_headers=self.http_headers, 901 session_properties=self.session_properties, 902 retries=self.retries, 903 timezone=self.timezone, 904 password=self.password, 905 impersonation_user=self.impersonation_user, 906 keytab=self.keytab, 907 krb5_config=self.krb5_config, 908 principal=self.principal, 909 service_name=self.service_name, 910 hostname_override=self.hostname_override, 911 mutual_authentication=self.mutual_authentication, 912 force_preemptive=self.force_preemptive, 913 sanitize_mutual_error_response=self.sanitize_mutual_error_response, 914 delegate=self.delegate, 915 jwt_token=self.jwt_token, 916 client_certificate=self.client_certificate, 917 client_private_key=self.client_private_key, 918 cert=self.cert, 919 concurrent_tasks=self.threads, 920 **kwargs, 921 )
Project connection and operational configuration for the Trino target.
Arguments:
- method: The Trino authentication method to use
- host: The server host to connect to
- port: The MSSQL server port to connect to
- database: Name of the Trino database/catalog
- schema: Name of the Trino schema
- user: User name for authentication
- password: User password for authentication
- roles: Trino catalog roles
- session_properties: Trino session properties
- retries: Number of times to retry if the Trino connector encounters an error
- timezone: The timezone to use for the Trino session
- http_headers: HTTP Headers to send alongside requests to Trino
- http_scheme: The HTTP scheme to use for requests to Trino (default: http, or https if kerberos, ldap or jwt auth)
- threads: The number of threads to run on
- impersonation_user: LDAP authentication: override the provided username
- keytab: Kerberos authentication: Path to keytab
- krb5_config: Kerberos authentication: Path to config
- principal: Kerberos authentication: Principal
- service_name: Kerberos authentication: Service name
- hostname_override: Kerberos authentication: hostname for a host whose DNS name doesn't match
- mutual_authentication: Kerberos authentication: Boolean flag for mutual authentication.
- force_preemptive: Kerberos authentication: Boolean flag to preemptively initiate the GSS exchange.
- sanitize_mutual_error_response: Kerberos authentication: Boolean flag to strip content and headers from error responses.
- delegate: Kerberos authentication: Boolean flag for credential delegation (
GSS_C_DELEG_FLAG) - jwt_token: JWT authentication: JWT string
- client_certificate: Certification authentication: Path to client certificate
- client_private_key: Certification authentication: Path to client private key
- cert: Certification authentication: Full path to a certificate file
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
924class ClickhouseConfig(TargetConfig): 925 """ 926 Project connection and operational configuration for the Clickhouse target 927 928 Args: 929 host: [localhost] 930 user: [default] # User for all database operations 931 password: [<empty string>] # Password for the user 932 secure: [False] # Use TLS (native protocol) or HTTPS (http protocol) 933 port: [8123] # If not set, defaults to 8123, 8443 depending on the secure and driver settings 934 connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse 935 send_receive_timeout: [300] # Timeout in seconds to receive data from the ClickHouse server 936 verify: [True] # Validate TLS certificate if using TLS/SSL 937 cluster: [<empty string>] # If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster. 938 custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty. 939 schema: [default] # ClickHouse database for dbt models, not used by SQLMesh 940 driver: [http] # http or native. If not set this will be autodetermined based on port setting, not used by SQLMesh 941 retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error), not used by SQLMesh 942 compression: [<empty string>] # Use gzip compression if truthy (http), or compression type for a native connection, not used by SQLMesh 943 cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud), not used by SQLMesh 944 use_lw_deletes: [False] # Use the strategy `delete+insert` as the default incremental strategy, not used by SQLMesh 945 check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. Not used by SQLMesh. 946 local_suffix: [_local] # Table suffix of local tables on shards for distributed materializations, not used by SQLMesh 947 local_db_prefix: [<empty string>] # Database prefix of local tables on shards for distributed materializations, not used by SQLMesh 948 allow_automatic_deduplication: [False] # Enable ClickHouse automatic deduplication for Replicated tables, not used by SQLMesh 949 tcp_keepalive: [False] # Native client only, specify TCP keepalive configuration. Specify custom keepalive settings as [idle_time_sec, interval_sec, probes], not used by SQLMesh 950 sync_request_timeout: [5] # Timeout for server ping, not used by SQLMesh 951 compress_block_size: [1048576] # Compression block size if compression is enabled, not used by SQLMesh 952 """ 953 954 host: str = "localhost" 955 user: str = Field(default="default", alias="username") 956 password: str = "" 957 port: t.Optional[int] = None 958 cluster: t.Optional[str] = None 959 schema_: str = Field(default="default", alias="schema") 960 connect_timeout: int = 10 961 send_receive_timeout: int = 300 962 verify: bool = True 963 compression: str = "" 964 custom_settings: t.Optional[t.Dict[str, t.Any]] = None 965 966 # Not used by SQLMesh 967 driver: t.Optional[str] = None 968 secure: bool = False 969 retries: int = 1 970 database_engine: t.Optional[str] = None 971 cluster_mode: bool = False 972 sync_request_timeout: int = 5 973 compress_block_size: int = 1048576 974 check_exchange: bool = True 975 use_lw_deletes: bool = False 976 allow_automatic_deduplication: bool = False 977 tcp_keepalive: t.Union[bool, t.Tuple[int, ...], t.List[int]] = False 978 database: str = "" 979 local_suffix: str = "local" 980 local_db_prefix: str = "" 981 982 type: t.Literal["clickhouse"] = "clickhouse" 983 984 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 985 # dbt-clickhouse name for temp table swap. That is sqlmesh's default 986 # strategy so doesn't require special handling during conversion. 987 return "legacy" 988 989 @classproperty 990 def relation_class(cls) -> t.Type[BaseRelation]: 991 from dbt.adapters.clickhouse.relation import ClickHouseRelation 992 993 return ClickHouseRelation 994 995 @classproperty 996 def column_class(cls) -> t.Type[Column]: 997 from dbt.adapters.clickhouse.column import ClickHouseColumn 998 999 return ClickHouseColumn 1000 1001 @with_schema_differ_overrides 1002 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 1003 return ClickhouseConnectionConfig( 1004 host=self.host, 1005 username=self.user, 1006 password=self.password, 1007 port=self.port, 1008 cluster=self.cluster, 1009 connect_timeout=self.connect_timeout, 1010 send_receive_timeout=self.send_receive_timeout, 1011 verify=self.verify, 1012 compression_method=self.compression, 1013 connection_settings=self.custom_settings, 1014 **kwargs, 1015 )
Project connection and operational configuration for the Clickhouse target
Arguments:
- host: [localhost]
- user: [default] # User for all database operations
- password: [
] # Password for the user - secure: [False] # Use TLS (native protocol) or HTTPS (http protocol)
- port: [8123] # If not set, defaults to 8123, 8443 depending on the secure and driver settings
- connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse
- send_receive_timeout: [300] # Timeout in seconds to receive data from the ClickHouse server
- verify: [True] # Validate TLS certificate if using TLS/SSL
- cluster: [
] # If set, certain DDL/table operations will be executed with the ON CLUSTERclause using this cluster. - custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
- schema: [default] # ClickHouse database for dbt models, not used by SQLMesh
- driver: [http] # http or native. If not set this will be autodetermined based on port setting, not used by SQLMesh
- retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error), not used by SQLMesh
- compression: [
] # Use gzip compression if truthy (http), or compression type for a native connection, not used by SQLMesh - cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud), not used by SQLMesh
- use_lw_deletes: [False] # Use the strategy
delete+insertas the default incremental strategy, not used by SQLMesh - check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. Not used by SQLMesh.
- local_suffix: [_local] # Table suffix of local tables on shards for distributed materializations, not used by SQLMesh
- local_db_prefix: [
] # Database prefix of local tables on shards for distributed materializations, not used by SQLMesh - allow_automatic_deduplication: [False] # Enable ClickHouse automatic deduplication for Replicated tables, not used by SQLMesh
- tcp_keepalive: [False] # Native client only, specify TCP keepalive configuration. Specify custom keepalive settings as [idle_time_sec, interval_sec, probes], not used by SQLMesh
- sync_request_timeout: [5] # Timeout for server ping, not used by SQLMesh
- compress_block_size: [1048576] # Compression block size if compression is enabled, not used by SQLMesh
984 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 985 # dbt-clickhouse name for temp table swap. That is sqlmesh's default 986 # strategy so doesn't require special handling during conversion. 987 return "legacy"
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
1018class AthenaConfig(TargetConfig): 1019 """ 1020 Project connection and operational configuration for the Athena target. 1021 1022 Args: 1023 s3_staging_dir: S3 location to store Athena query results and metadata 1024 s3_data_dir: Prefix for storing tables, if different from the connection's s3_staging_dir 1025 s3_data_naming: How to generate table paths in s3_data_dir 1026 s3_tmp_table_dir: Prefix for storing temporary tables, if different from the connection's s3_data_dir 1027 region_name: AWS region of your Athena instance 1028 schema: Specify the schema (Athena database) to build models into (lowercase only) 1029 database: Specify the database (Data catalog) to build models into (lowercase only) 1030 poll_interval: Interval in seconds to use for polling the status of query results in Athena 1031 debug_query_state: Flag if debug message with Athena query state is needed 1032 aws_access_key_id: Access key ID of the user performing requests 1033 aws_secret_access_key: Secret access key of the user performing requests 1034 aws_profile_name: Profile to use from your AWS shared credentials file 1035 work_group: Identifier of Athena workgroup 1036 skip_workgroup_check: Indicates if the WorkGroup check (additional AWS call) can be skipped 1037 num_retries: Number of times to retry a failing query 1038 num_boto3_retries: Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) 1039 num_iceberg_retries: Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR 1040 spark_work_group: Identifier of Athena Spark workgroup for running Python models 1041 seed_s3_upload_args: Dictionary containing boto3 ExtraArgs when uploading to S3 1042 lf_tags_database: Default LF tags for new database if it's created by dbt 1043 """ 1044 1045 type: t.Literal["athena"] = "athena" 1046 threads: int = 4 1047 1048 s3_staging_dir: t.Optional[str] = None 1049 s3_data_dir: t.Optional[str] = None 1050 s3_data_naming: t.Optional[str] = None 1051 s3_tmp_table_dir: t.Optional[str] = None 1052 poll_interval: t.Optional[int] = None 1053 debug_query_state: bool = False 1054 work_group: t.Optional[str] = None 1055 skip_workgroup_check: t.Optional[bool] = None 1056 spark_work_group: t.Optional[str] = None 1057 1058 aws_access_key_id: t.Optional[str] = None 1059 aws_secret_access_key: t.Optional[str] = None 1060 aws_profile_name: t.Optional[str] = None 1061 region_name: t.Optional[str] = None 1062 1063 num_retries: t.Optional[int] = None 1064 num_boto3_retries: t.Optional[int] = None 1065 num_iceberg_retries: t.Optional[int] = None 1066 1067 seed_s3_upload_args: t.Dict[str, str] = {} 1068 lf_tags_database: t.Dict[str, str] = {} 1069 1070 @classproperty 1071 def relation_class(cls) -> t.Type[BaseRelation]: 1072 from dbt.adapters.athena.relation import AthenaRelation 1073 1074 return AthenaRelation 1075 1076 @classproperty 1077 def column_class(cls) -> t.Type[Column]: 1078 from dbt.adapters.athena.column import AthenaColumn 1079 1080 return AthenaColumn 1081 1082 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 1083 return "insert_overwrite" 1084 1085 @with_schema_differ_overrides 1086 def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: 1087 return AthenaConnectionConfig( 1088 type="athena", 1089 aws_access_key_id=self.aws_access_key_id, 1090 aws_secret_access_key=self.aws_secret_access_key, 1091 region_name=self.region_name, 1092 work_group=self.work_group, 1093 s3_staging_dir=self.s3_staging_dir, 1094 s3_warehouse_location=self.s3_data_dir, 1095 schema_name=self.schema_, 1096 catalog_name=self.database, 1097 concurrent_tasks=self.threads, 1098 **kwargs, 1099 )
Project connection and operational configuration for the Athena target.
Arguments:
- s3_staging_dir: S3 location to store Athena query results and metadata
- s3_data_dir: Prefix for storing tables, if different from the connection's s3_staging_dir
- s3_data_naming: How to generate table paths in s3_data_dir
- s3_tmp_table_dir: Prefix for storing temporary tables, if different from the connection's s3_data_dir
- region_name: AWS region of your Athena instance
- schema: Specify the schema (Athena database) to build models into (lowercase only)
- database: Specify the database (Data catalog) to build models into (lowercase only)
- poll_interval: Interval in seconds to use for polling the status of query results in Athena
- debug_query_state: Flag if debug message with Athena query state is needed
- aws_access_key_id: Access key ID of the user performing requests
- aws_secret_access_key: Secret access key of the user performing requests
- aws_profile_name: Profile to use from your AWS shared credentials file
- work_group: Identifier of Athena workgroup
- skip_workgroup_check: Indicates if the WorkGroup check (additional AWS call) can be skipped
- num_retries: Number of times to retry a failing query
- num_boto3_retries: Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables)
- num_iceberg_retries: Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR
- spark_work_group: Identifier of Athena Spark workgroup for running Python models
- seed_s3_upload_args: Dictionary containing boto3 ExtraArgs when uploading to S3
- lf_tags_database: Default LF tags for new database if it's created by dbt
1082 def default_incremental_strategy(self, kind: IncrementalKind) -> str: 1083 return "insert_overwrite"
The default incremental strategy for the db
82 def wrapper(self: TargetConfig, **kwargs: t.Any) -> ConnectionConfig: 83 merged_kwargs = {**SCHEMA_DIFFER_OVERRIDES, **kwargs} 84 return func(self, **merged_kwargs)
Converts target config to SQLMesh connection config
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs