sqlmesh.core.config.root
1from __future__ import annotations 2 3import pickle 4import re 5import typing as t 6import zlib 7 8from pydantic import Field 9from sqlglot import exp 10from sqlglot.helper import first 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 12 13from sqlmesh.cicd.config import CICDBotConfig 14from sqlmesh.core import constants as c 15from sqlmesh.core.config import EnvironmentSuffixTarget 16from sqlmesh.core.config.base import BaseConfig, UpdateStrategy 17from sqlmesh.core.config.common import variables_validator 18from sqlmesh.core.config.connection import ( 19 ConnectionConfig, 20 DuckDBConnectionConfig, 21 SerializableConnectionConfig, 22 connection_config_validator, 23) 24from sqlmesh.core.config.feature_flag import FeatureFlag 25from sqlmesh.core.config.format import FormatConfig 26from sqlmesh.core.config.gateway import GatewayConfig 27from sqlmesh.core.config.migration import MigrationConfig 28from sqlmesh.core.config.model import ModelDefaultsConfig 29from sqlmesh.core.config.plan import PlanConfig 30from sqlmesh.core.config.run import RunConfig 31from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig, SchedulerConfig 32from sqlmesh.core.config.ui import UIConfig 33from sqlmesh.core.loader import Loader, SqlMeshLoader 34from sqlmesh.core.notification_target import NotificationTarget 35from sqlmesh.core.user import User 36from sqlmesh.utils.errors import ConfigError 37from sqlmesh.utils.pydantic import ( 38 field_validator, 39 model_validator, 40 model_validator_v1_args, 41) 42 43 44class Config(BaseConfig): 45 """An object used by a Context to configure your SQLMesh project. 46 47 Args: 48 gateways: Supported gateways and their configurations. Key represents a unique name of a gateway. 49 default_connection: The default connection to use if one is not specified in a gateway. 50 default_test_connection: The default connection to use for tests if one is not specified in a gateway. 51 default_scheduler: The default scheduler configuration to use if one is not specified in a gateway. 52 default_gateway: The default gateway. 53 notification_targets: The notification targets to use. 54 project: The project name of this config. Used for multi-repo setups. 55 snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted. 56 environment_ttl: The period of time that a development environment should exist before being deleted. 57 ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder. 58 time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. 59 This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes. 60 users: A list of users that can be used for approvals/notifications. 61 username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list. 62 pinned_environments: A list of development environment names that should not be deleted by the janitor task. 63 loader: Loader class used for loading project files. 64 loader_kwargs: Key-value arguments to pass to the loader instance. 65 env_vars: A dictionary of environmental variable names and values. 66 model_defaults: Default values for model definitions. 67 physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed. 68 environment_suffix_target: Indicates whether to append the environment name to the schema or table name. 69 default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands. 70 log_limit: The default number of logs to keep. 71 format: The formatting options for SQL code. 72 ui: The UI configuration for SQLMesh. 73 feature_flags: Feature flags to enable/disable certain features. 74 plan: The plan configuration. 75 migration: The migration configuration. 76 variables: A dictionary of variables that can be used in models / macros. 77 disable_anonymized_analytics: Whether to disable the anonymized analytics collection. 78 """ 79 80 gateways: t.Dict[str, GatewayConfig] = {"": GatewayConfig()} 81 default_connection: SerializableConnectionConfig = DuckDBConnectionConfig() 82 default_test_connection_: t.Optional[SerializableConnectionConfig] = Field( 83 default=None, alias="default_test_connection" 84 ) 85 default_scheduler: SchedulerConfig = BuiltInSchedulerConfig() 86 default_gateway: str = "" 87 notification_targets: t.List[NotificationTarget] = [] 88 project: str = "" 89 snapshot_ttl: str = c.DEFAULT_SNAPSHOT_TTL 90 environment_ttl: t.Optional[str] = c.DEFAULT_ENVIRONMENT_TTL 91 ignore_patterns: t.List[str] = c.IGNORE_PATTERNS 92 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT 93 users: t.List[User] = [] 94 model_defaults: ModelDefaultsConfig = ModelDefaultsConfig() 95 pinned_environments: t.Set[str] = set() 96 loader: t.Type[Loader] = SqlMeshLoader 97 loader_kwargs: t.Dict[str, t.Any] = {} 98 env_vars: t.Dict[str, str] = {} 99 username: str = "" 100 physical_schema_override: t.Dict[str, str] = {} 101 environment_suffix_target: EnvironmentSuffixTarget = Field( 102 default=EnvironmentSuffixTarget.default 103 ) 104 environment_catalog_mapping: t.Dict[re.Pattern, str] = {} 105 default_target_environment: str = c.PROD 106 log_limit: int = c.DEFAULT_LOG_LIMIT 107 cicd_bot: t.Optional[CICDBotConfig] = None 108 run: RunConfig = RunConfig() 109 format: FormatConfig = FormatConfig() 110 ui: UIConfig = UIConfig() 111 feature_flags: FeatureFlag = FeatureFlag() 112 plan: PlanConfig = PlanConfig() 113 migration: MigrationConfig = MigrationConfig() 114 variables: t.Dict[str, t.Any] = {} 115 disable_anonymized_analytics: bool = False 116 117 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 118 "gateways": UpdateStrategy.KEY_UPDATE, 119 "notification_targets": UpdateStrategy.EXTEND, 120 "ignore_patterns": UpdateStrategy.EXTEND, 121 "users": UpdateStrategy.EXTEND, 122 "model_defaults": UpdateStrategy.NESTED_UPDATE, 123 "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE, 124 "pinned_environments": UpdateStrategy.EXTEND, 125 "physical_schema_override": UpdateStrategy.KEY_UPDATE, 126 "run": UpdateStrategy.NESTED_UPDATE, 127 "format": UpdateStrategy.NESTED_UPDATE, 128 "ui": UpdateStrategy.NESTED_UPDATE, 129 "loader_kwargs": UpdateStrategy.KEY_UPDATE, 130 "plan": UpdateStrategy.NESTED_UPDATE, 131 } 132 133 _connection_config_validator = connection_config_validator 134 _variables_validator = variables_validator 135 136 @field_validator("gateways", mode="before", always=True) 137 @classmethod 138 def _gateways_ensure_dict(cls, value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 139 try: 140 if not isinstance(value, GatewayConfig): 141 GatewayConfig.parse_obj(value) 142 return {"": value} 143 except Exception: 144 return value 145 146 @field_validator("environment_catalog_mapping", mode="before") 147 @classmethod 148 def _validate_regex_keys( 149 cls, value: t.Dict[str | re.Pattern, t.Any] 150 ) -> t.Dict[re.Pattern, t.Any]: 151 compiled_regexes = {} 152 for k, v in value.items(): 153 try: 154 compiled_regexes[re.compile(k)] = v 155 except re.error: 156 raise ConfigError(f"`{k}` is not a valid regular expression.") 157 return compiled_regexes 158 159 @model_validator(mode="before") 160 @model_validator_v1_args 161 def _normalize_and_validate_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 162 if "gateways" not in values and "gateway" in values: 163 values["gateways"] = values.pop("gateway") 164 165 for plan_deprecated in ("auto_categorize_changes", "include_unmodified"): 166 if plan_deprecated in values: 167 raise ConfigError( 168 f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead." 169 ) 170 171 return values 172 173 @model_validator(mode="after") 174 @model_validator_v1_args 175 def _normalize_fields_after(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 176 dialect = values["model_defaults"].dialect 177 values["environment_catalog_mapping"] = { 178 k: normalize_identifiers(v, dialect=dialect).name 179 for k, v in values.get("environment_catalog_mapping", {}).items() 180 } 181 return values 182 183 def get_default_test_connection( 184 self, 185 default_catalog: t.Optional[str] = None, 186 default_catalog_dialect: t.Optional[str] = None, 187 ) -> ConnectionConfig: 188 return self.default_test_connection_ or DuckDBConnectionConfig( 189 catalogs=( 190 None 191 if default_catalog is None 192 else { 193 # transpile catalog name from main connection dialect to DuckDB 194 exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql( 195 dialect="duckdb" 196 ): ":memory:" 197 } 198 ) 199 ) 200 201 def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig: 202 if isinstance(self.gateways, dict): 203 if name is None: 204 if self.default_gateway: 205 if self.default_gateway not in self.gateways: 206 raise ConfigError(f"Missing gateway with name '{self.default_gateway}'") 207 return self.gateways[self.default_gateway] 208 209 if "" in self.gateways: 210 return self.gateways[""] 211 212 return first(self.gateways.values()) 213 214 if name not in self.gateways: 215 raise ConfigError(f"Missing gateway with name '{name}'.") 216 217 return self.gateways[name] 218 else: 219 if name is not None: 220 raise ConfigError( 221 "Gateway name is not supported when only one gateway is configured." 222 ) 223 return self.gateways 224 225 def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig: 226 return self.get_gateway(gateway_name).connection or self.default_connection 227 228 def get_state_connection( 229 self, gateway_name: t.Optional[str] = None 230 ) -> t.Optional[ConnectionConfig]: 231 return self.get_gateway(gateway_name).state_connection 232 233 def get_test_connection( 234 self, 235 gateway_name: t.Optional[str] = None, 236 default_catalog: t.Optional[str] = None, 237 default_catalog_dialect: t.Optional[str] = None, 238 ) -> ConnectionConfig: 239 return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection( 240 default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect 241 ) 242 243 def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig: 244 return self.get_gateway(gateway_name).scheduler or self.default_scheduler 245 246 def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]: 247 return self.get_gateway(gateway_name).state_schema 248 249 @property 250 def default_gateway_name(self) -> str: 251 if self.default_gateway: 252 return self.default_gateway 253 if "" in self.gateways: 254 return "" 255 return first(self.gateways) 256 257 @property 258 def dialect(self) -> t.Optional[str]: 259 return self.model_defaults.dialect 260 261 @property 262 def fingerprint(self) -> str: 263 return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
45class Config(BaseConfig): 46 """An object used by a Context to configure your SQLMesh project. 47 48 Args: 49 gateways: Supported gateways and their configurations. Key represents a unique name of a gateway. 50 default_connection: The default connection to use if one is not specified in a gateway. 51 default_test_connection: The default connection to use for tests if one is not specified in a gateway. 52 default_scheduler: The default scheduler configuration to use if one is not specified in a gateway. 53 default_gateway: The default gateway. 54 notification_targets: The notification targets to use. 55 project: The project name of this config. Used for multi-repo setups. 56 snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted. 57 environment_ttl: The period of time that a development environment should exist before being deleted. 58 ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder. 59 time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. 60 This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes. 61 users: A list of users that can be used for approvals/notifications. 62 username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list. 63 pinned_environments: A list of development environment names that should not be deleted by the janitor task. 64 loader: Loader class used for loading project files. 65 loader_kwargs: Key-value arguments to pass to the loader instance. 66 env_vars: A dictionary of environmental variable names and values. 67 model_defaults: Default values for model definitions. 68 physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed. 69 environment_suffix_target: Indicates whether to append the environment name to the schema or table name. 70 default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands. 71 log_limit: The default number of logs to keep. 72 format: The formatting options for SQL code. 73 ui: The UI configuration for SQLMesh. 74 feature_flags: Feature flags to enable/disable certain features. 75 plan: The plan configuration. 76 migration: The migration configuration. 77 variables: A dictionary of variables that can be used in models / macros. 78 disable_anonymized_analytics: Whether to disable the anonymized analytics collection. 79 """ 80 81 gateways: t.Dict[str, GatewayConfig] = {"": GatewayConfig()} 82 default_connection: SerializableConnectionConfig = DuckDBConnectionConfig() 83 default_test_connection_: t.Optional[SerializableConnectionConfig] = Field( 84 default=None, alias="default_test_connection" 85 ) 86 default_scheduler: SchedulerConfig = BuiltInSchedulerConfig() 87 default_gateway: str = "" 88 notification_targets: t.List[NotificationTarget] = [] 89 project: str = "" 90 snapshot_ttl: str = c.DEFAULT_SNAPSHOT_TTL 91 environment_ttl: t.Optional[str] = c.DEFAULT_ENVIRONMENT_TTL 92 ignore_patterns: t.List[str] = c.IGNORE_PATTERNS 93 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT 94 users: t.List[User] = [] 95 model_defaults: ModelDefaultsConfig = ModelDefaultsConfig() 96 pinned_environments: t.Set[str] = set() 97 loader: t.Type[Loader] = SqlMeshLoader 98 loader_kwargs: t.Dict[str, t.Any] = {} 99 env_vars: t.Dict[str, str] = {} 100 username: str = "" 101 physical_schema_override: t.Dict[str, str] = {} 102 environment_suffix_target: EnvironmentSuffixTarget = Field( 103 default=EnvironmentSuffixTarget.default 104 ) 105 environment_catalog_mapping: t.Dict[re.Pattern, str] = {} 106 default_target_environment: str = c.PROD 107 log_limit: int = c.DEFAULT_LOG_LIMIT 108 cicd_bot: t.Optional[CICDBotConfig] = None 109 run: RunConfig = RunConfig() 110 format: FormatConfig = FormatConfig() 111 ui: UIConfig = UIConfig() 112 feature_flags: FeatureFlag = FeatureFlag() 113 plan: PlanConfig = PlanConfig() 114 migration: MigrationConfig = MigrationConfig() 115 variables: t.Dict[str, t.Any] = {} 116 disable_anonymized_analytics: bool = False 117 118 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 119 "gateways": UpdateStrategy.KEY_UPDATE, 120 "notification_targets": UpdateStrategy.EXTEND, 121 "ignore_patterns": UpdateStrategy.EXTEND, 122 "users": UpdateStrategy.EXTEND, 123 "model_defaults": UpdateStrategy.NESTED_UPDATE, 124 "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE, 125 "pinned_environments": UpdateStrategy.EXTEND, 126 "physical_schema_override": UpdateStrategy.KEY_UPDATE, 127 "run": UpdateStrategy.NESTED_UPDATE, 128 "format": UpdateStrategy.NESTED_UPDATE, 129 "ui": UpdateStrategy.NESTED_UPDATE, 130 "loader_kwargs": UpdateStrategy.KEY_UPDATE, 131 "plan": UpdateStrategy.NESTED_UPDATE, 132 } 133 134 _connection_config_validator = connection_config_validator 135 _variables_validator = variables_validator 136 137 @field_validator("gateways", mode="before", always=True) 138 @classmethod 139 def _gateways_ensure_dict(cls, value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 140 try: 141 if not isinstance(value, GatewayConfig): 142 GatewayConfig.parse_obj(value) 143 return {"": value} 144 except Exception: 145 return value 146 147 @field_validator("environment_catalog_mapping", mode="before") 148 @classmethod 149 def _validate_regex_keys( 150 cls, value: t.Dict[str | re.Pattern, t.Any] 151 ) -> t.Dict[re.Pattern, t.Any]: 152 compiled_regexes = {} 153 for k, v in value.items(): 154 try: 155 compiled_regexes[re.compile(k)] = v 156 except re.error: 157 raise ConfigError(f"`{k}` is not a valid regular expression.") 158 return compiled_regexes 159 160 @model_validator(mode="before") 161 @model_validator_v1_args 162 def _normalize_and_validate_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 163 if "gateways" not in values and "gateway" in values: 164 values["gateways"] = values.pop("gateway") 165 166 for plan_deprecated in ("auto_categorize_changes", "include_unmodified"): 167 if plan_deprecated in values: 168 raise ConfigError( 169 f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead." 170 ) 171 172 return values 173 174 @model_validator(mode="after") 175 @model_validator_v1_args 176 def _normalize_fields_after(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 177 dialect = values["model_defaults"].dialect 178 values["environment_catalog_mapping"] = { 179 k: normalize_identifiers(v, dialect=dialect).name 180 for k, v in values.get("environment_catalog_mapping", {}).items() 181 } 182 return values 183 184 def get_default_test_connection( 185 self, 186 default_catalog: t.Optional[str] = None, 187 default_catalog_dialect: t.Optional[str] = None, 188 ) -> ConnectionConfig: 189 return self.default_test_connection_ or DuckDBConnectionConfig( 190 catalogs=( 191 None 192 if default_catalog is None 193 else { 194 # transpile catalog name from main connection dialect to DuckDB 195 exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql( 196 dialect="duckdb" 197 ): ":memory:" 198 } 199 ) 200 ) 201 202 def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig: 203 if isinstance(self.gateways, dict): 204 if name is None: 205 if self.default_gateway: 206 if self.default_gateway not in self.gateways: 207 raise ConfigError(f"Missing gateway with name '{self.default_gateway}'") 208 return self.gateways[self.default_gateway] 209 210 if "" in self.gateways: 211 return self.gateways[""] 212 213 return first(self.gateways.values()) 214 215 if name not in self.gateways: 216 raise ConfigError(f"Missing gateway with name '{name}'.") 217 218 return self.gateways[name] 219 else: 220 if name is not None: 221 raise ConfigError( 222 "Gateway name is not supported when only one gateway is configured." 223 ) 224 return self.gateways 225 226 def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig: 227 return self.get_gateway(gateway_name).connection or self.default_connection 228 229 def get_state_connection( 230 self, gateway_name: t.Optional[str] = None 231 ) -> t.Optional[ConnectionConfig]: 232 return self.get_gateway(gateway_name).state_connection 233 234 def get_test_connection( 235 self, 236 gateway_name: t.Optional[str] = None, 237 default_catalog: t.Optional[str] = None, 238 default_catalog_dialect: t.Optional[str] = None, 239 ) -> ConnectionConfig: 240 return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection( 241 default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect 242 ) 243 244 def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig: 245 return self.get_gateway(gateway_name).scheduler or self.default_scheduler 246 247 def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]: 248 return self.get_gateway(gateway_name).state_schema 249 250 @property 251 def default_gateway_name(self) -> str: 252 if self.default_gateway: 253 return self.default_gateway 254 if "" in self.gateways: 255 return "" 256 return first(self.gateways) 257 258 @property 259 def dialect(self) -> t.Optional[str]: 260 return self.model_defaults.dialect 261 262 @property 263 def fingerprint(self) -> str: 264 return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
An object used by a Context to configure your SQLMesh project.
Arguments:
- gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
- default_connection: The default connection to use if one is not specified in a gateway.
- default_test_connection: The default connection to use for tests if one is not specified in a gateway.
- default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
- default_gateway: The default gateway.
- notification_targets: The notification targets to use.
- project: The project name of this config. Used for multi-repo setups.
- snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
- environment_ttl: The period of time that a development environment should exist before being deleted.
- ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
- time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
- users: A list of users that can be used for approvals/notifications.
- username: Name of a single user who should receive approvals/notification, instead of all users in the
users
list. - pinned_environments: A list of development environment names that should not be deleted by the janitor task.
- loader: Loader class used for loading project files.
- loader_kwargs: Key-value arguments to pass to the loader instance.
- env_vars: A dictionary of environmental variable names and values.
- model_defaults: Default values for model definitions.
- physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed.
- environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
- default_target_environment: The name of the environment that will be the default target for the
sqlmesh plan
andsqlmesh run
commands. - log_limit: The default number of logs to keep.
- format: The formatting options for SQL code.
- ui: The UI configuration for SQLMesh.
- feature_flags: Feature flags to enable/disable certain features.
- plan: The plan configuration.
- migration: The migration configuration.
- variables: A dictionary of variables that can be used in models / macros.
- disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
def
get_default_test_connection( self, default_catalog: Union[str, NoneType] = None, default_catalog_dialect: Union[str, NoneType] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
184 def get_default_test_connection( 185 self, 186 default_catalog: t.Optional[str] = None, 187 default_catalog_dialect: t.Optional[str] = None, 188 ) -> ConnectionConfig: 189 return self.default_test_connection_ or DuckDBConnectionConfig( 190 catalogs=( 191 None 192 if default_catalog is None 193 else { 194 # transpile catalog name from main connection dialect to DuckDB 195 exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql( 196 dialect="duckdb" 197 ): ":memory:" 198 } 199 ) 200 )
def
get_gateway( self, name: Union[str, NoneType] = None) -> sqlmesh.core.config.gateway.GatewayConfig:
202 def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig: 203 if isinstance(self.gateways, dict): 204 if name is None: 205 if self.default_gateway: 206 if self.default_gateway not in self.gateways: 207 raise ConfigError(f"Missing gateway with name '{self.default_gateway}'") 208 return self.gateways[self.default_gateway] 209 210 if "" in self.gateways: 211 return self.gateways[""] 212 213 return first(self.gateways.values()) 214 215 if name not in self.gateways: 216 raise ConfigError(f"Missing gateway with name '{name}'.") 217 218 return self.gateways[name] 219 else: 220 if name is not None: 221 raise ConfigError( 222 "Gateway name is not supported when only one gateway is configured." 223 ) 224 return self.gateways
def
get_connection( self, gateway_name: Union[str, NoneType] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
def
get_state_connection( self, gateway_name: Union[str, NoneType] = None) -> Union[sqlmesh.core.config.connection.ConnectionConfig, NoneType]:
def
get_test_connection( self, gateway_name: Union[str, NoneType] = None, default_catalog: Union[str, NoneType] = None, default_catalog_dialect: Union[str, NoneType] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
234 def get_test_connection( 235 self, 236 gateway_name: t.Optional[str] = None, 237 default_catalog: t.Optional[str] = None, 238 default_catalog_dialect: t.Optional[str] = None, 239 ) -> ConnectionConfig: 240 return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection( 241 default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect 242 )
def
get_scheduler( self, gateway_name: Union[str, NoneType] = None) -> typing_extensions.Annotated[Union[sqlmesh.core.config.scheduler.BuiltInSchedulerConfig, sqlmesh.core.config.scheduler.AirflowSchedulerConfig, sqlmesh.core.config.scheduler.CloudComposerSchedulerConfig, sqlmesh.core.config.scheduler.MWAASchedulerConfig], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]:
def
model_post_init(self: pydantic.main.BaseModel, _ModelMetaclass__context: Any) -> None:
102 def wrapped_model_post_init(self: BaseModel, __context: Any) -> None: 103 """We need to both initialize private attributes and call the user-defined model_post_init 104 method. 105 """ 106 init_private_attributes(self, __context) 107 original_model_post_init(self, __context)
Override this method to perform additional initialization after __init__
and model_construct
.
This is useful if you want to do some validation that requires the entire model to be initialized.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs