sqlmesh.core.config.root
1from __future__ import annotations 2 3import pickle 4import re 5import typing as t 6import zlib 7 8from pydantic import Field 9from pydantic.functional_validators import BeforeValidator 10from sqlglot import exp 11from sqlglot.helper import first 12from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 13 14from sqlmesh.cicd.config import CICDBotConfig 15from sqlmesh.core import constants as c 16from sqlmesh.core.console import get_console 17from sqlmesh.core.config.common import ( 18 EnvironmentSuffixTarget, 19 TableNamingConvention, 20 VirtualEnvironmentMode, 21) 22from sqlmesh.core.config.base import BaseConfig, UpdateStrategy 23from sqlmesh.core.config.common import variables_validator, compile_regex_mapping 24from sqlmesh.core.config.connection import ( 25 ConnectionConfig, 26 DuckDBConnectionConfig, 27 SerializableConnectionConfig, 28 connection_config_validator, 29) 30from sqlmesh.core.config.format import FormatConfig 31from sqlmesh.core.config.gateway import GatewayConfig 32from sqlmesh.core.config.janitor import JanitorConfig 33from sqlmesh.core.config.migration import MigrationConfig 34from sqlmesh.core.config.model import ModelDefaultsConfig 35from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig 36from sqlmesh.core.config.linter import LinterConfig as LinterConfig 37from sqlmesh.core.config.plan import PlanConfig 38from sqlmesh.core.config.run import RunConfig 39from sqlmesh.core.config.dbt import DbtConfig 40from sqlmesh.core.config.scheduler import ( 41 BuiltInSchedulerConfig, 42 SchedulerConfig, 43 scheduler_config_validator, 44) 45from sqlmesh.core.config.ui import UIConfig 46from sqlmesh.core.loader import Loader, SqlMeshLoader 47from sqlmesh.core.notification_target import NotificationTarget 48from sqlmesh.core.user import User 49from sqlmesh.utils.date import to_timestamp, now 50from sqlmesh.utils.errors import ConfigError 51from sqlmesh.utils.pydantic import model_validator 52 53 54def validate_no_past_ttl(v: str) -> str: 55 current_time = now() 56 if to_timestamp(v, relative_base=current_time) < to_timestamp(current_time): 57 raise ValueError( 58 f"TTL '{v}' is in the past. Please specify a relative time in the future. Ex: `in 1 week` instead of `1 week`." 59 ) 60 return v 61 62 63def gateways_ensure_dict(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 64 try: 65 if not isinstance(value, GatewayConfig): 66 GatewayConfig.parse_obj(value) 67 return {"": value} 68 except Exception: 69 # Normalize all gateway keys to lowercase for case-insensitive matching 70 if isinstance(value, dict): 71 return {k.lower(): v for k, v in value.items()} 72 return value 73 74 75def validate_regex_key_dict(value: t.Dict[str | re.Pattern, t.Any]) -> t.Dict[re.Pattern, t.Any]: 76 return compile_regex_mapping(value) 77 78 79if t.TYPE_CHECKING: 80 from sqlmesh.core._typing import Self 81 82 NoPastTTLString = str 83 GatewayDict = t.Dict[str, GatewayConfig] 84 RegexKeyDict = t.Dict[re.Pattern, str] 85else: 86 NoPastTTLString = t.Annotated[str, BeforeValidator(validate_no_past_ttl)] 87 GatewayDict = t.Annotated[t.Dict[str, GatewayConfig], BeforeValidator(gateways_ensure_dict)] 88 RegexKeyDict = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(validate_regex_key_dict)] 89 90 91class Config(BaseConfig): 92 """An object used by a Context to configure your SQLMesh project. 93 94 Args: 95 gateways: Supported gateways and their configurations. Key represents a unique name of a gateway. 96 default_connection: The default connection to use if one is not specified in a gateway. 97 default_test_connection: The default connection to use for tests if one is not specified in a gateway. 98 default_scheduler: The default scheduler configuration to use if one is not specified in a gateway. 99 default_gateway: The default gateway. 100 notification_targets: The notification targets to use. 101 project: The project name of this config. Used for multi-repo setups. 102 snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted. 103 environment_ttl: The period of time that a development environment should exist before being deleted. 104 ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder. 105 time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. 106 This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes. 107 users: A list of users that can be used for approvals/notifications. 108 username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list. 109 pinned_environments: A list of development environment names that should not be deleted by the janitor task. 110 loader: Loader class used for loading project files. 111 loader_kwargs: Key-value arguments to pass to the loader instance. 112 env_vars: A dictionary of environmental variable names and values. 113 model_defaults: Default values for model definitions. 114 physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed. 115 environment_suffix_target: Indicates whether to append the environment name to the schema or table name. 116 physical_table_naming_convention: Indicates how tables should be named at the physical layer 117 virtual_environment_mode: Indicates how environments should be handled. 118 gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway. 119 infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements. 120 environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment. 121 default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands. 122 log_limit: The default number of logs to keep. 123 format: The formatting options for SQL code. 124 ui: The UI configuration for SQLMesh. 125 plan: The plan configuration. 126 migration: The migration configuration. 127 variables: A dictionary of variables that can be used in models / macros. 128 disable_anonymized_analytics: Whether to disable the anonymized analytics collection. 129 before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands. 130 after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands. 131 cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder. 132 """ 133 134 gateways: GatewayDict = {"": GatewayConfig()} 135 default_connection: t.Optional[SerializableConnectionConfig] = None 136 default_test_connection_: t.Optional[SerializableConnectionConfig] = Field( 137 default=None, alias="default_test_connection" 138 ) 139 default_scheduler: SchedulerConfig = BuiltInSchedulerConfig() 140 default_gateway: str = "" 141 notification_targets: t.List[NotificationTarget] = [] 142 project: str = "" 143 snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL 144 environment_ttl: t.Optional[NoPastTTLString] = c.DEFAULT_ENVIRONMENT_TTL 145 ignore_patterns: t.List[str] = c.IGNORE_PATTERNS 146 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT 147 users: t.List[User] = [] 148 model_defaults: ModelDefaultsConfig = ModelDefaultsConfig() 149 pinned_environments: t.Set[str] = set() 150 loader: t.Type[Loader] = SqlMeshLoader 151 loader_kwargs: t.Dict[str, t.Any] = {} 152 env_vars: t.Dict[str, str] = {} 153 username: str = "" 154 physical_schema_mapping: RegexKeyDict = {} 155 environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default 156 physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default 157 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default 158 gateway_managed_virtual_layer: bool = False 159 infer_python_dependencies: bool = True 160 environment_catalog_mapping: RegexKeyDict = {} 161 default_target_environment: str = c.PROD 162 log_limit: int = c.DEFAULT_LOG_LIMIT 163 cicd_bot: t.Optional[CICDBotConfig] = None 164 run: RunConfig = RunConfig() 165 format: FormatConfig = FormatConfig() 166 ui: UIConfig = UIConfig() 167 plan: PlanConfig = PlanConfig() 168 migration: MigrationConfig = MigrationConfig() 169 model_naming: NameInferenceConfig = NameInferenceConfig() 170 variables: t.Dict[str, t.Any] = {} 171 disable_anonymized_analytics: bool = False 172 before_all: t.Optional[t.List[str]] = None 173 after_all: t.Optional[t.List[str]] = None 174 linter: LinterConfig = LinterConfig() 175 janitor: JanitorConfig = JanitorConfig() 176 cache_dir: t.Optional[str] = None 177 dbt: t.Optional[DbtConfig] = None 178 179 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 180 "gateways": UpdateStrategy.NESTED_UPDATE, 181 "notification_targets": UpdateStrategy.EXTEND, 182 "ignore_patterns": UpdateStrategy.EXTEND, 183 "users": UpdateStrategy.EXTEND, 184 "model_defaults": UpdateStrategy.NESTED_UPDATE, 185 "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE, 186 "pinned_environments": UpdateStrategy.EXTEND, 187 "physical_schema_override": UpdateStrategy.KEY_UPDATE, 188 "run": UpdateStrategy.NESTED_UPDATE, 189 "format": UpdateStrategy.NESTED_UPDATE, 190 "ui": UpdateStrategy.NESTED_UPDATE, 191 "loader_kwargs": UpdateStrategy.KEY_UPDATE, 192 "plan": UpdateStrategy.NESTED_UPDATE, 193 "before_all": UpdateStrategy.EXTEND, 194 "after_all": UpdateStrategy.EXTEND, 195 "linter": UpdateStrategy.NESTED_UPDATE, 196 "dbt": UpdateStrategy.NESTED_UPDATE, 197 } 198 199 _connection_config_validator = connection_config_validator 200 _scheduler_config_validator = scheduler_config_validator # type: ignore 201 _variables_validator = variables_validator 202 203 @model_validator(mode="before") 204 def _normalize_and_validate_fields(cls, data: t.Any) -> t.Any: 205 if not isinstance(data, dict): 206 return data 207 208 if "gateways" not in data and "gateway" in data: 209 data["gateways"] = data.pop("gateway") 210 211 for plan_deprecated in ("auto_categorize_changes", "include_unmodified"): 212 if plan_deprecated in data: 213 raise ConfigError( 214 f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead." 215 ) 216 217 if "physical_schema_override" in data: 218 get_console().log_warning( 219 "`physical_schema_override` is deprecated. Please use `physical_schema_mapping` instead." 220 ) 221 222 if "physical_schema_mapping" in data: 223 raise ConfigError( 224 "Only one of `physical_schema_override` and `physical_schema_mapping` can be specified." 225 ) 226 227 physical_schema_override: t.Dict[str, str] = data.pop("physical_schema_override") 228 # translate physical_schema_override to physical_schema_mapping 229 data["physical_schema_mapping"] = { 230 f"^{k}$": v for k, v in physical_schema_override.items() 231 } 232 233 return data 234 235 @model_validator(mode="after") 236 def _normalize_fields_after(self) -> Self: 237 dialect = self.model_defaults.dialect 238 239 def _normalize_identifiers(key: str) -> None: 240 setattr( 241 self, 242 key, 243 { 244 k: normalize_identifiers(v, dialect=dialect).name 245 for k, v in getattr(self, key, {}).items() 246 }, 247 ) 248 249 if ( 250 self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG 251 and self.environment_catalog_mapping 252 ): 253 raise ConfigError( 254 f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n" 255 "Please specify one or the other" 256 ) 257 258 if self.plan.use_finalized_state and not self.virtual_environment_mode.is_full: 259 raise ConfigError( 260 "Using the finalized state is only supported when `virtual_environment_mode` is set to `full`." 261 ) 262 263 if self.environment_catalog_mapping: 264 _normalize_identifiers("environment_catalog_mapping") 265 if self.physical_schema_mapping: 266 _normalize_identifiers("physical_schema_mapping") 267 268 return self 269 270 @model_validator(mode="after") 271 def _inherit_project_config_in_cicd_bot(self) -> Self: 272 if self.cicd_bot: 273 # inherit the project-level settings into the CICD bot if they have not been explicitly overridden 274 if self.cicd_bot.auto_categorize_changes_ is None: 275 self.cicd_bot.auto_categorize_changes_ = self.plan.auto_categorize_changes 276 277 if self.cicd_bot.pr_include_unmodified_ is None: 278 self.cicd_bot.pr_include_unmodified_ = self.plan.include_unmodified 279 280 return self 281 282 def get_default_test_connection( 283 self, 284 default_catalog: t.Optional[str] = None, 285 default_catalog_dialect: t.Optional[str] = None, 286 ) -> ConnectionConfig: 287 return self.default_test_connection_ or DuckDBConnectionConfig( 288 catalogs=( 289 None 290 if default_catalog is None 291 else { 292 # transpile catalog name from main connection dialect to DuckDB 293 exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql( 294 dialect="duckdb" 295 ): ":memory:" 296 } 297 ) 298 ) 299 300 def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig: 301 if isinstance(self.gateways, dict): 302 if name is None: 303 if self.default_gateway: 304 # Normalize default_gateway name to lowercase for lookup 305 default_key = self.default_gateway.lower() 306 if default_key not in self.gateways: 307 raise ConfigError(f"Missing gateway with name '{self.default_gateway}'") 308 return self.gateways[default_key] 309 310 if "" in self.gateways: 311 return self.gateways[""] 312 313 return first(self.gateways.values()) 314 315 # Normalize lookup name to lowercase since gateway keys are already lowercase 316 lookup_key = name.lower() 317 if lookup_key not in self.gateways: 318 raise ConfigError(f"Missing gateway with name '{name}'.") 319 320 return self.gateways[lookup_key] 321 if name is not None: 322 raise ConfigError("Gateway name is not supported when only one gateway is configured.") 323 return self.gateways 324 325 def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig: 326 connection = self.get_gateway(gateway_name).connection or self.default_connection 327 if connection is None: 328 msg = f" for gateway '{gateway_name}'" if gateway_name else "" 329 raise ConfigError(f"No connection configured{msg}.") 330 return connection 331 332 def get_state_connection( 333 self, gateway_name: t.Optional[str] = None 334 ) -> t.Optional[ConnectionConfig]: 335 return self.get_gateway(gateway_name).state_connection 336 337 def get_test_connection( 338 self, 339 gateway_name: t.Optional[str] = None, 340 default_catalog: t.Optional[str] = None, 341 default_catalog_dialect: t.Optional[str] = None, 342 ) -> ConnectionConfig: 343 return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection( 344 default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect 345 ) 346 347 def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig: 348 return self.get_gateway(gateway_name).scheduler or self.default_scheduler 349 350 def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]: 351 return self.get_gateway(gateway_name).state_schema 352 353 @property 354 def default_gateway_name(self) -> str: 355 if self.default_gateway: 356 return self.default_gateway 357 if "" in self.gateways: 358 return "" 359 return first(self.gateways) 360 361 @property 362 def dialect(self) -> t.Optional[str]: 363 return self.model_defaults.dialect 364 365 @property 366 def fingerprint(self) -> str: 367 return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
def
validate_no_past_ttl(v: str) -> str:
def
gateways_ensure_dict(value: Dict[str, Any]) -> Dict[str, Any]:
64def gateways_ensure_dict(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 65 try: 66 if not isinstance(value, GatewayConfig): 67 GatewayConfig.parse_obj(value) 68 return {"": value} 69 except Exception: 70 # Normalize all gateway keys to lowercase for case-insensitive matching 71 if isinstance(value, dict): 72 return {k.lower(): v for k, v in value.items()} 73 return value
def
validate_regex_key_dict(value: Dict[str | re.Pattern, Any]) -> Dict[re.Pattern, Any]:
92class Config(BaseConfig): 93 """An object used by a Context to configure your SQLMesh project. 94 95 Args: 96 gateways: Supported gateways and their configurations. Key represents a unique name of a gateway. 97 default_connection: The default connection to use if one is not specified in a gateway. 98 default_test_connection: The default connection to use for tests if one is not specified in a gateway. 99 default_scheduler: The default scheduler configuration to use if one is not specified in a gateway. 100 default_gateway: The default gateway. 101 notification_targets: The notification targets to use. 102 project: The project name of this config. Used for multi-repo setups. 103 snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted. 104 environment_ttl: The period of time that a development environment should exist before being deleted. 105 ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder. 106 time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. 107 This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes. 108 users: A list of users that can be used for approvals/notifications. 109 username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list. 110 pinned_environments: A list of development environment names that should not be deleted by the janitor task. 111 loader: Loader class used for loading project files. 112 loader_kwargs: Key-value arguments to pass to the loader instance. 113 env_vars: A dictionary of environmental variable names and values. 114 model_defaults: Default values for model definitions. 115 physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed. 116 environment_suffix_target: Indicates whether to append the environment name to the schema or table name. 117 physical_table_naming_convention: Indicates how tables should be named at the physical layer 118 virtual_environment_mode: Indicates how environments should be handled. 119 gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway. 120 infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements. 121 environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment. 122 default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands. 123 log_limit: The default number of logs to keep. 124 format: The formatting options for SQL code. 125 ui: The UI configuration for SQLMesh. 126 plan: The plan configuration. 127 migration: The migration configuration. 128 variables: A dictionary of variables that can be used in models / macros. 129 disable_anonymized_analytics: Whether to disable the anonymized analytics collection. 130 before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands. 131 after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands. 132 cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder. 133 """ 134 135 gateways: GatewayDict = {"": GatewayConfig()} 136 default_connection: t.Optional[SerializableConnectionConfig] = None 137 default_test_connection_: t.Optional[SerializableConnectionConfig] = Field( 138 default=None, alias="default_test_connection" 139 ) 140 default_scheduler: SchedulerConfig = BuiltInSchedulerConfig() 141 default_gateway: str = "" 142 notification_targets: t.List[NotificationTarget] = [] 143 project: str = "" 144 snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL 145 environment_ttl: t.Optional[NoPastTTLString] = c.DEFAULT_ENVIRONMENT_TTL 146 ignore_patterns: t.List[str] = c.IGNORE_PATTERNS 147 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT 148 users: t.List[User] = [] 149 model_defaults: ModelDefaultsConfig = ModelDefaultsConfig() 150 pinned_environments: t.Set[str] = set() 151 loader: t.Type[Loader] = SqlMeshLoader 152 loader_kwargs: t.Dict[str, t.Any] = {} 153 env_vars: t.Dict[str, str] = {} 154 username: str = "" 155 physical_schema_mapping: RegexKeyDict = {} 156 environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default 157 physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default 158 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default 159 gateway_managed_virtual_layer: bool = False 160 infer_python_dependencies: bool = True 161 environment_catalog_mapping: RegexKeyDict = {} 162 default_target_environment: str = c.PROD 163 log_limit: int = c.DEFAULT_LOG_LIMIT 164 cicd_bot: t.Optional[CICDBotConfig] = None 165 run: RunConfig = RunConfig() 166 format: FormatConfig = FormatConfig() 167 ui: UIConfig = UIConfig() 168 plan: PlanConfig = PlanConfig() 169 migration: MigrationConfig = MigrationConfig() 170 model_naming: NameInferenceConfig = NameInferenceConfig() 171 variables: t.Dict[str, t.Any] = {} 172 disable_anonymized_analytics: bool = False 173 before_all: t.Optional[t.List[str]] = None 174 after_all: t.Optional[t.List[str]] = None 175 linter: LinterConfig = LinterConfig() 176 janitor: JanitorConfig = JanitorConfig() 177 cache_dir: t.Optional[str] = None 178 dbt: t.Optional[DbtConfig] = None 179 180 _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { 181 "gateways": UpdateStrategy.NESTED_UPDATE, 182 "notification_targets": UpdateStrategy.EXTEND, 183 "ignore_patterns": UpdateStrategy.EXTEND, 184 "users": UpdateStrategy.EXTEND, 185 "model_defaults": UpdateStrategy.NESTED_UPDATE, 186 "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE, 187 "pinned_environments": UpdateStrategy.EXTEND, 188 "physical_schema_override": UpdateStrategy.KEY_UPDATE, 189 "run": UpdateStrategy.NESTED_UPDATE, 190 "format": UpdateStrategy.NESTED_UPDATE, 191 "ui": UpdateStrategy.NESTED_UPDATE, 192 "loader_kwargs": UpdateStrategy.KEY_UPDATE, 193 "plan": UpdateStrategy.NESTED_UPDATE, 194 "before_all": UpdateStrategy.EXTEND, 195 "after_all": UpdateStrategy.EXTEND, 196 "linter": UpdateStrategy.NESTED_UPDATE, 197 "dbt": UpdateStrategy.NESTED_UPDATE, 198 } 199 200 _connection_config_validator = connection_config_validator 201 _scheduler_config_validator = scheduler_config_validator # type: ignore 202 _variables_validator = variables_validator 203 204 @model_validator(mode="before") 205 def _normalize_and_validate_fields(cls, data: t.Any) -> t.Any: 206 if not isinstance(data, dict): 207 return data 208 209 if "gateways" not in data and "gateway" in data: 210 data["gateways"] = data.pop("gateway") 211 212 for plan_deprecated in ("auto_categorize_changes", "include_unmodified"): 213 if plan_deprecated in data: 214 raise ConfigError( 215 f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead." 216 ) 217 218 if "physical_schema_override" in data: 219 get_console().log_warning( 220 "`physical_schema_override` is deprecated. Please use `physical_schema_mapping` instead." 221 ) 222 223 if "physical_schema_mapping" in data: 224 raise ConfigError( 225 "Only one of `physical_schema_override` and `physical_schema_mapping` can be specified." 226 ) 227 228 physical_schema_override: t.Dict[str, str] = data.pop("physical_schema_override") 229 # translate physical_schema_override to physical_schema_mapping 230 data["physical_schema_mapping"] = { 231 f"^{k}$": v for k, v in physical_schema_override.items() 232 } 233 234 return data 235 236 @model_validator(mode="after") 237 def _normalize_fields_after(self) -> Self: 238 dialect = self.model_defaults.dialect 239 240 def _normalize_identifiers(key: str) -> None: 241 setattr( 242 self, 243 key, 244 { 245 k: normalize_identifiers(v, dialect=dialect).name 246 for k, v in getattr(self, key, {}).items() 247 }, 248 ) 249 250 if ( 251 self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG 252 and self.environment_catalog_mapping 253 ): 254 raise ConfigError( 255 f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n" 256 "Please specify one or the other" 257 ) 258 259 if self.plan.use_finalized_state and not self.virtual_environment_mode.is_full: 260 raise ConfigError( 261 "Using the finalized state is only supported when `virtual_environment_mode` is set to `full`." 262 ) 263 264 if self.environment_catalog_mapping: 265 _normalize_identifiers("environment_catalog_mapping") 266 if self.physical_schema_mapping: 267 _normalize_identifiers("physical_schema_mapping") 268 269 return self 270 271 @model_validator(mode="after") 272 def _inherit_project_config_in_cicd_bot(self) -> Self: 273 if self.cicd_bot: 274 # inherit the project-level settings into the CICD bot if they have not been explicitly overridden 275 if self.cicd_bot.auto_categorize_changes_ is None: 276 self.cicd_bot.auto_categorize_changes_ = self.plan.auto_categorize_changes 277 278 if self.cicd_bot.pr_include_unmodified_ is None: 279 self.cicd_bot.pr_include_unmodified_ = self.plan.include_unmodified 280 281 return self 282 283 def get_default_test_connection( 284 self, 285 default_catalog: t.Optional[str] = None, 286 default_catalog_dialect: t.Optional[str] = None, 287 ) -> ConnectionConfig: 288 return self.default_test_connection_ or DuckDBConnectionConfig( 289 catalogs=( 290 None 291 if default_catalog is None 292 else { 293 # transpile catalog name from main connection dialect to DuckDB 294 exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql( 295 dialect="duckdb" 296 ): ":memory:" 297 } 298 ) 299 ) 300 301 def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig: 302 if isinstance(self.gateways, dict): 303 if name is None: 304 if self.default_gateway: 305 # Normalize default_gateway name to lowercase for lookup 306 default_key = self.default_gateway.lower() 307 if default_key not in self.gateways: 308 raise ConfigError(f"Missing gateway with name '{self.default_gateway}'") 309 return self.gateways[default_key] 310 311 if "" in self.gateways: 312 return self.gateways[""] 313 314 return first(self.gateways.values()) 315 316 # Normalize lookup name to lowercase since gateway keys are already lowercase 317 lookup_key = name.lower() 318 if lookup_key not in self.gateways: 319 raise ConfigError(f"Missing gateway with name '{name}'.") 320 321 return self.gateways[lookup_key] 322 if name is not None: 323 raise ConfigError("Gateway name is not supported when only one gateway is configured.") 324 return self.gateways 325 326 def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig: 327 connection = self.get_gateway(gateway_name).connection or self.default_connection 328 if connection is None: 329 msg = f" for gateway '{gateway_name}'" if gateway_name else "" 330 raise ConfigError(f"No connection configured{msg}.") 331 return connection 332 333 def get_state_connection( 334 self, gateway_name: t.Optional[str] = None 335 ) -> t.Optional[ConnectionConfig]: 336 return self.get_gateway(gateway_name).state_connection 337 338 def get_test_connection( 339 self, 340 gateway_name: t.Optional[str] = None, 341 default_catalog: t.Optional[str] = None, 342 default_catalog_dialect: t.Optional[str] = None, 343 ) -> ConnectionConfig: 344 return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection( 345 default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect 346 ) 347 348 def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig: 349 return self.get_gateway(gateway_name).scheduler or self.default_scheduler 350 351 def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]: 352 return self.get_gateway(gateway_name).state_schema 353 354 @property 355 def default_gateway_name(self) -> str: 356 if self.default_gateway: 357 return self.default_gateway 358 if "" in self.gateways: 359 return "" 360 return first(self.gateways) 361 362 @property 363 def dialect(self) -> t.Optional[str]: 364 return self.model_defaults.dialect 365 366 @property 367 def fingerprint(self) -> str: 368 return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
An object used by a Context to configure your SQLMesh project.
Arguments:
- gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
- default_connection: The default connection to use if one is not specified in a gateway.
- default_test_connection: The default connection to use for tests if one is not specified in a gateway.
- default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
- default_gateway: The default gateway.
- notification_targets: The notification targets to use.
- project: The project name of this config. Used for multi-repo setups.
- snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
- environment_ttl: The period of time that a development environment should exist before being deleted.
- ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
- time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
- users: A list of users that can be used for approvals/notifications.
- username: Name of a single user who should receive approvals/notification, instead of all users in the
userslist. - pinned_environments: A list of development environment names that should not be deleted by the janitor task.
- loader: Loader class used for loading project files.
- loader_kwargs: Key-value arguments to pass to the loader instance.
- env_vars: A dictionary of environmental variable names and values.
- model_defaults: Default values for model definitions.
- physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
- environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
- physical_table_naming_convention: Indicates how tables should be named at the physical layer
- virtual_environment_mode: Indicates how environments should be handled.
- gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
- infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
- environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
- default_target_environment: The name of the environment that will be the default target for the
sqlmesh planandsqlmesh runcommands. - log_limit: The default number of logs to keep.
- format: The formatting options for SQL code.
- ui: The UI configuration for SQLMesh.
- plan: The plan configuration.
- migration: The migration configuration.
- variables: A dictionary of variables that can be used in models / macros.
- disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
- before_all: SQL statements or macros to be executed at the start of the
sqlmesh planandsqlmesh runcommands. - after_all: SQL statements or macros to be executed at the end of the
sqlmesh planandsqlmesh runcommands. - cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder.
gateways: Annotated[Dict[str, sqlmesh.core.config.gateway.GatewayConfig], BeforeValidator(func=<function gateways_ensure_dict at 0x78b6ef1989d0>, json_schema_input_type=PydanticUndefined)]
default_connection: Optional[Annotated[sqlmesh.core.config.connection.ConnectionConfig, SerializeAsAny()]]
default_test_connection_: Optional[Annotated[sqlmesh.core.config.connection.ConnectionConfig, SerializeAsAny()]]
default_scheduler: sqlmesh.core.config.scheduler.SchedulerConfig
notification_targets: List[Annotated[Union[sqlmesh.core.notification_target.BasicSMTPNotificationTarget, sqlmesh.core.notification_target.GenericNotificationTarget, sqlmesh.core.notification_target.ConsoleNotificationTarget, sqlmesh.core.notification_target.SlackApiNotificationTarget, sqlmesh.core.notification_target.SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]]
snapshot_ttl: typing.Annotated[str, BeforeValidator(func=<function validate_no_past_ttl at 0x78b6ef11cc10>, json_schema_input_type=PydanticUndefined)]
environment_ttl: Optional[Annotated[str, BeforeValidator(func=<function validate_no_past_ttl at 0x78b6ef11cc10>, json_schema_input_type=PydanticUndefined)]]
users: List[sqlmesh.core.user.User]
model_defaults: sqlmesh.core.config.model.ModelDefaultsConfig
loader: Type[sqlmesh.core.loader.Loader]
physical_schema_mapping: Annotated[Dict[re.Pattern, str], BeforeValidator(func=<function validate_regex_key_dict at 0x78b6ef19a3b0>, json_schema_input_type=PydanticUndefined)]
environment_suffix_target: sqlmesh.core.config.common.EnvironmentSuffixTarget
physical_table_naming_convention: sqlmesh.core.config.common.TableNamingConvention
virtual_environment_mode: sqlmesh.core.config.common.VirtualEnvironmentMode
environment_catalog_mapping: Annotated[Dict[re.Pattern, str], BeforeValidator(func=<function validate_regex_key_dict at 0x78b6ef19a3b0>, json_schema_input_type=PydanticUndefined)]
cicd_bot: Optional[sqlmesh.integrations.github.cicd.config.GithubCICDBotConfig]
model_naming: sqlmesh.core.config.naming.NameInferenceConfig
dbt: Optional[sqlmesh.core.config.dbt.DbtConfig]
def
get_default_test_connection( self, default_catalog: Optional[str] = None, default_catalog_dialect: Optional[str] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
283 def get_default_test_connection( 284 self, 285 default_catalog: t.Optional[str] = None, 286 default_catalog_dialect: t.Optional[str] = None, 287 ) -> ConnectionConfig: 288 return self.default_test_connection_ or DuckDBConnectionConfig( 289 catalogs=( 290 None 291 if default_catalog is None 292 else { 293 # transpile catalog name from main connection dialect to DuckDB 294 exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql( 295 dialect="duckdb" 296 ): ":memory:" 297 } 298 ) 299 )
301 def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig: 302 if isinstance(self.gateways, dict): 303 if name is None: 304 if self.default_gateway: 305 # Normalize default_gateway name to lowercase for lookup 306 default_key = self.default_gateway.lower() 307 if default_key not in self.gateways: 308 raise ConfigError(f"Missing gateway with name '{self.default_gateway}'") 309 return self.gateways[default_key] 310 311 if "" in self.gateways: 312 return self.gateways[""] 313 314 return first(self.gateways.values()) 315 316 # Normalize lookup name to lowercase since gateway keys are already lowercase 317 lookup_key = name.lower() 318 if lookup_key not in self.gateways: 319 raise ConfigError(f"Missing gateway with name '{name}'.") 320 321 return self.gateways[lookup_key] 322 if name is not None: 323 raise ConfigError("Gateway name is not supported when only one gateway is configured.") 324 return self.gateways
def
get_connection( self, gateway_name: Optional[str] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
326 def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig: 327 connection = self.get_gateway(gateway_name).connection or self.default_connection 328 if connection is None: 329 msg = f" for gateway '{gateway_name}'" if gateway_name else "" 330 raise ConfigError(f"No connection configured{msg}.") 331 return connection
def
get_state_connection( self, gateway_name: Optional[str] = None) -> Optional[sqlmesh.core.config.connection.ConnectionConfig]:
def
get_test_connection( self, gateway_name: Optional[str] = None, default_catalog: Optional[str] = None, default_catalog_dialect: Optional[str] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
338 def get_test_connection( 339 self, 340 gateway_name: t.Optional[str] = None, 341 default_catalog: t.Optional[str] = None, 342 default_catalog_dialect: t.Optional[str] = None, 343 ) -> ConnectionConfig: 344 return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection( 345 default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect 346 )
def
get_scheduler( self, gateway_name: Optional[str] = None) -> sqlmesh.core.config.scheduler.SchedulerConfig:
model_config =
{'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs