Edit on GitHub

sqlmesh.core.config.root

  1from __future__ import annotations
  2
  3import pickle
  4import re
  5import typing as t
  6import zlib
  7
  8from pydantic import Field
  9from pydantic.functional_validators import BeforeValidator
 10from sqlglot import exp
 11from sqlglot.helper import first
 12from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 13
 14from sqlmesh.cicd.config import CICDBotConfig
 15from sqlmesh.core import constants as c
 16from sqlmesh.core.console import get_console
 17from sqlmesh.core.config.common import (
 18    EnvironmentSuffixTarget,
 19    TableNamingConvention,
 20    VirtualEnvironmentMode,
 21)
 22from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
 23from sqlmesh.core.config.common import variables_validator, compile_regex_mapping
 24from sqlmesh.core.config.connection import (
 25    ConnectionConfig,
 26    DuckDBConnectionConfig,
 27    SerializableConnectionConfig,
 28    connection_config_validator,
 29)
 30from sqlmesh.core.config.format import FormatConfig
 31from sqlmesh.core.config.gateway import GatewayConfig
 32from sqlmesh.core.config.janitor import JanitorConfig
 33from sqlmesh.core.config.migration import MigrationConfig
 34from sqlmesh.core.config.model import ModelDefaultsConfig
 35from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
 36from sqlmesh.core.config.linter import LinterConfig as LinterConfig
 37from sqlmesh.core.config.plan import PlanConfig
 38from sqlmesh.core.config.run import RunConfig
 39from sqlmesh.core.config.dbt import DbtConfig
 40from sqlmesh.core.config.scheduler import (
 41    BuiltInSchedulerConfig,
 42    SchedulerConfig,
 43    scheduler_config_validator,
 44)
 45from sqlmesh.core.config.ui import UIConfig
 46from sqlmesh.core.loader import Loader, SqlMeshLoader
 47from sqlmesh.core.notification_target import NotificationTarget
 48from sqlmesh.core.user import User
 49from sqlmesh.utils.date import to_timestamp, now
 50from sqlmesh.utils.errors import ConfigError
 51from sqlmesh.utils.pydantic import model_validator
 52
 53
 54def validate_no_past_ttl(v: str) -> str:
 55    current_time = now()
 56    if to_timestamp(v, relative_base=current_time) < to_timestamp(current_time):
 57        raise ValueError(
 58            f"TTL '{v}' is in the past. Please specify a relative time in the future. Ex: `in 1 week` instead of `1 week`."
 59        )
 60    return v
 61
 62
 63def gateways_ensure_dict(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
 64    try:
 65        if not isinstance(value, GatewayConfig):
 66            GatewayConfig.parse_obj(value)
 67        return {"": value}
 68    except Exception:
 69        # Normalize all gateway keys to lowercase for case-insensitive matching
 70        if isinstance(value, dict):
 71            return {k.lower(): v for k, v in value.items()}
 72        return value
 73
 74
 75def validate_regex_key_dict(value: t.Dict[str | re.Pattern, t.Any]) -> t.Dict[re.Pattern, t.Any]:
 76    return compile_regex_mapping(value)
 77
 78
 79if t.TYPE_CHECKING:
 80    from sqlmesh.core._typing import Self
 81
 82    NoPastTTLString = str
 83    GatewayDict = t.Dict[str, GatewayConfig]
 84    RegexKeyDict = t.Dict[re.Pattern, str]
 85else:
 86    NoPastTTLString = t.Annotated[str, BeforeValidator(validate_no_past_ttl)]
 87    GatewayDict = t.Annotated[t.Dict[str, GatewayConfig], BeforeValidator(gateways_ensure_dict)]
 88    RegexKeyDict = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(validate_regex_key_dict)]
 89
 90
 91class Config(BaseConfig):
 92    """An object used by a Context to configure your SQLMesh project.
 93
 94    Args:
 95        gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
 96        default_connection: The default connection to use if one is not specified in a gateway.
 97        default_test_connection: The default connection to use for tests if one is not specified in a gateway.
 98        default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
 99        default_gateway: The default gateway.
100        notification_targets: The notification targets to use.
101        project: The project name of this config. Used for multi-repo setups.
102        snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
103        environment_ttl: The period of time that a development environment should exist before being deleted.
104        ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
105        time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
106            This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
107        users: A list of users that can be used for approvals/notifications.
108        username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list.
109        pinned_environments: A list of development environment names that should not be deleted by the janitor task.
110        loader: Loader class used for loading project files.
111        loader_kwargs: Key-value arguments to pass to the loader instance.
112        env_vars: A dictionary of environmental variable names and values.
113        model_defaults: Default values for model definitions.
114        physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
115        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
116        physical_table_naming_convention: Indicates how tables should be named at the physical layer
117        virtual_environment_mode: Indicates how environments should be handled.
118        gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
119        infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
120        environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
121        default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
122        log_limit: The default number of logs to keep.
123        format: The formatting options for SQL code.
124        ui: The UI configuration for SQLMesh.
125        plan: The plan configuration.
126        migration: The migration configuration.
127        variables: A dictionary of variables that can be used in models / macros.
128        disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
129        before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands.
130        after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands.
131        cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder.
132    """
133
134    gateways: GatewayDict = {"": GatewayConfig()}
135    default_connection: t.Optional[SerializableConnectionConfig] = None
136    default_test_connection_: t.Optional[SerializableConnectionConfig] = Field(
137        default=None, alias="default_test_connection"
138    )
139    default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
140    default_gateway: str = ""
141    notification_targets: t.List[NotificationTarget] = []
142    project: str = ""
143    snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL
144    environment_ttl: t.Optional[NoPastTTLString] = c.DEFAULT_ENVIRONMENT_TTL
145    ignore_patterns: t.List[str] = c.IGNORE_PATTERNS
146    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
147    users: t.List[User] = []
148    model_defaults: ModelDefaultsConfig = ModelDefaultsConfig()
149    pinned_environments: t.Set[str] = set()
150    loader: t.Type[Loader] = SqlMeshLoader
151    loader_kwargs: t.Dict[str, t.Any] = {}
152    env_vars: t.Dict[str, str] = {}
153    username: str = ""
154    physical_schema_mapping: RegexKeyDict = {}
155    environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default
156    physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default
157    virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
158    gateway_managed_virtual_layer: bool = False
159    infer_python_dependencies: bool = True
160    environment_catalog_mapping: RegexKeyDict = {}
161    default_target_environment: str = c.PROD
162    log_limit: int = c.DEFAULT_LOG_LIMIT
163    cicd_bot: t.Optional[CICDBotConfig] = None
164    run: RunConfig = RunConfig()
165    format: FormatConfig = FormatConfig()
166    ui: UIConfig = UIConfig()
167    plan: PlanConfig = PlanConfig()
168    migration: MigrationConfig = MigrationConfig()
169    model_naming: NameInferenceConfig = NameInferenceConfig()
170    variables: t.Dict[str, t.Any] = {}
171    disable_anonymized_analytics: bool = False
172    before_all: t.Optional[t.List[str]] = None
173    after_all: t.Optional[t.List[str]] = None
174    linter: LinterConfig = LinterConfig()
175    janitor: JanitorConfig = JanitorConfig()
176    cache_dir: t.Optional[str] = None
177    dbt: t.Optional[DbtConfig] = None
178
179    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
180        "gateways": UpdateStrategy.NESTED_UPDATE,
181        "notification_targets": UpdateStrategy.EXTEND,
182        "ignore_patterns": UpdateStrategy.EXTEND,
183        "users": UpdateStrategy.EXTEND,
184        "model_defaults": UpdateStrategy.NESTED_UPDATE,
185        "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE,
186        "pinned_environments": UpdateStrategy.EXTEND,
187        "physical_schema_override": UpdateStrategy.KEY_UPDATE,
188        "run": UpdateStrategy.NESTED_UPDATE,
189        "format": UpdateStrategy.NESTED_UPDATE,
190        "ui": UpdateStrategy.NESTED_UPDATE,
191        "loader_kwargs": UpdateStrategy.KEY_UPDATE,
192        "plan": UpdateStrategy.NESTED_UPDATE,
193        "before_all": UpdateStrategy.EXTEND,
194        "after_all": UpdateStrategy.EXTEND,
195        "linter": UpdateStrategy.NESTED_UPDATE,
196        "dbt": UpdateStrategy.NESTED_UPDATE,
197    }
198
199    _connection_config_validator = connection_config_validator
200    _scheduler_config_validator = scheduler_config_validator  # type: ignore
201    _variables_validator = variables_validator
202
203    @model_validator(mode="before")
204    def _normalize_and_validate_fields(cls, data: t.Any) -> t.Any:
205        if not isinstance(data, dict):
206            return data
207
208        if "gateways" not in data and "gateway" in data:
209            data["gateways"] = data.pop("gateway")
210
211        for plan_deprecated in ("auto_categorize_changes", "include_unmodified"):
212            if plan_deprecated in data:
213                raise ConfigError(
214                    f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead."
215                )
216
217        if "physical_schema_override" in data:
218            get_console().log_warning(
219                "`physical_schema_override` is deprecated. Please use `physical_schema_mapping` instead."
220            )
221
222            if "physical_schema_mapping" in data:
223                raise ConfigError(
224                    "Only one of `physical_schema_override` and `physical_schema_mapping` can be specified."
225                )
226
227            physical_schema_override: t.Dict[str, str] = data.pop("physical_schema_override")
228            # translate physical_schema_override to physical_schema_mapping
229            data["physical_schema_mapping"] = {
230                f"^{k}$": v for k, v in physical_schema_override.items()
231            }
232
233        return data
234
235    @model_validator(mode="after")
236    def _normalize_fields_after(self) -> Self:
237        dialect = self.model_defaults.dialect
238
239        def _normalize_identifiers(key: str) -> None:
240            setattr(
241                self,
242                key,
243                {
244                    k: normalize_identifiers(v, dialect=dialect).name
245                    for k, v in getattr(self, key, {}).items()
246                },
247            )
248
249        if (
250            self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG
251            and self.environment_catalog_mapping
252        ):
253            raise ConfigError(
254                f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n"
255                "Please specify one or the other"
256            )
257
258        if self.plan.use_finalized_state and not self.virtual_environment_mode.is_full:
259            raise ConfigError(
260                "Using the finalized state is only supported when `virtual_environment_mode` is set to `full`."
261            )
262
263        if self.environment_catalog_mapping:
264            _normalize_identifiers("environment_catalog_mapping")
265        if self.physical_schema_mapping:
266            _normalize_identifiers("physical_schema_mapping")
267
268        return self
269
270    @model_validator(mode="after")
271    def _inherit_project_config_in_cicd_bot(self) -> Self:
272        if self.cicd_bot:
273            # inherit the project-level settings into the CICD bot if they have not been explicitly overridden
274            if self.cicd_bot.auto_categorize_changes_ is None:
275                self.cicd_bot.auto_categorize_changes_ = self.plan.auto_categorize_changes
276
277            if self.cicd_bot.pr_include_unmodified_ is None:
278                self.cicd_bot.pr_include_unmodified_ = self.plan.include_unmodified
279
280        return self
281
282    def get_default_test_connection(
283        self,
284        default_catalog: t.Optional[str] = None,
285        default_catalog_dialect: t.Optional[str] = None,
286    ) -> ConnectionConfig:
287        return self.default_test_connection_ or DuckDBConnectionConfig(
288            catalogs=(
289                None
290                if default_catalog is None
291                else {
292                    # transpile catalog name from main connection dialect to DuckDB
293                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
294                        dialect="duckdb"
295                    ): ":memory:"
296                }
297            )
298        )
299
300    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
301        if isinstance(self.gateways, dict):
302            if name is None:
303                if self.default_gateway:
304                    # Normalize default_gateway name to lowercase for lookup
305                    default_key = self.default_gateway.lower()
306                    if default_key not in self.gateways:
307                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
308                    return self.gateways[default_key]
309
310                if "" in self.gateways:
311                    return self.gateways[""]
312
313                return first(self.gateways.values())
314
315            # Normalize lookup name to lowercase since gateway keys are already lowercase
316            lookup_key = name.lower()
317            if lookup_key not in self.gateways:
318                raise ConfigError(f"Missing gateway with name '{name}'.")
319
320            return self.gateways[lookup_key]
321        if name is not None:
322            raise ConfigError("Gateway name is not supported when only one gateway is configured.")
323        return self.gateways
324
325    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
326        connection = self.get_gateway(gateway_name).connection or self.default_connection
327        if connection is None:
328            msg = f" for gateway '{gateway_name}'" if gateway_name else ""
329            raise ConfigError(f"No connection configured{msg}.")
330        return connection
331
332    def get_state_connection(
333        self, gateway_name: t.Optional[str] = None
334    ) -> t.Optional[ConnectionConfig]:
335        return self.get_gateway(gateway_name).state_connection
336
337    def get_test_connection(
338        self,
339        gateway_name: t.Optional[str] = None,
340        default_catalog: t.Optional[str] = None,
341        default_catalog_dialect: t.Optional[str] = None,
342    ) -> ConnectionConfig:
343        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
344            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
345        )
346
347    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
348        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
349
350    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
351        return self.get_gateway(gateway_name).state_schema
352
353    @property
354    def default_gateway_name(self) -> str:
355        if self.default_gateway:
356            return self.default_gateway
357        if "" in self.gateways:
358            return ""
359        return first(self.gateways)
360
361    @property
362    def dialect(self) -> t.Optional[str]:
363        return self.model_defaults.dialect
364
365    @property
366    def fingerprint(self) -> str:
367        return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
def validate_no_past_ttl(v: str) -> str:
55def validate_no_past_ttl(v: str) -> str:
56    current_time = now()
57    if to_timestamp(v, relative_base=current_time) < to_timestamp(current_time):
58        raise ValueError(
59            f"TTL '{v}' is in the past. Please specify a relative time in the future. Ex: `in 1 week` instead of `1 week`."
60        )
61    return v
def gateways_ensure_dict(value: Dict[str, Any]) -> Dict[str, Any]:
64def gateways_ensure_dict(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
65    try:
66        if not isinstance(value, GatewayConfig):
67            GatewayConfig.parse_obj(value)
68        return {"": value}
69    except Exception:
70        # Normalize all gateway keys to lowercase for case-insensitive matching
71        if isinstance(value, dict):
72            return {k.lower(): v for k, v in value.items()}
73        return value
def validate_regex_key_dict(value: Dict[str | re.Pattern, Any]) -> Dict[re.Pattern, Any]:
76def validate_regex_key_dict(value: t.Dict[str | re.Pattern, t.Any]) -> t.Dict[re.Pattern, t.Any]:
77    return compile_regex_mapping(value)
class Config(sqlmesh.core.config.base.BaseConfig):
 92class Config(BaseConfig):
 93    """An object used by a Context to configure your SQLMesh project.
 94
 95    Args:
 96        gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
 97        default_connection: The default connection to use if one is not specified in a gateway.
 98        default_test_connection: The default connection to use for tests if one is not specified in a gateway.
 99        default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
100        default_gateway: The default gateway.
101        notification_targets: The notification targets to use.
102        project: The project name of this config. Used for multi-repo setups.
103        snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
104        environment_ttl: The period of time that a development environment should exist before being deleted.
105        ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
106        time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
107            This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
108        users: A list of users that can be used for approvals/notifications.
109        username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list.
110        pinned_environments: A list of development environment names that should not be deleted by the janitor task.
111        loader: Loader class used for loading project files.
112        loader_kwargs: Key-value arguments to pass to the loader instance.
113        env_vars: A dictionary of environmental variable names and values.
114        model_defaults: Default values for model definitions.
115        physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
116        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
117        physical_table_naming_convention: Indicates how tables should be named at the physical layer
118        virtual_environment_mode: Indicates how environments should be handled.
119        gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
120        infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
121        environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
122        default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
123        log_limit: The default number of logs to keep.
124        format: The formatting options for SQL code.
125        ui: The UI configuration for SQLMesh.
126        plan: The plan configuration.
127        migration: The migration configuration.
128        variables: A dictionary of variables that can be used in models / macros.
129        disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
130        before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands.
131        after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands.
132        cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder.
133    """
134
135    gateways: GatewayDict = {"": GatewayConfig()}
136    default_connection: t.Optional[SerializableConnectionConfig] = None
137    default_test_connection_: t.Optional[SerializableConnectionConfig] = Field(
138        default=None, alias="default_test_connection"
139    )
140    default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
141    default_gateway: str = ""
142    notification_targets: t.List[NotificationTarget] = []
143    project: str = ""
144    snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL
145    environment_ttl: t.Optional[NoPastTTLString] = c.DEFAULT_ENVIRONMENT_TTL
146    ignore_patterns: t.List[str] = c.IGNORE_PATTERNS
147    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
148    users: t.List[User] = []
149    model_defaults: ModelDefaultsConfig = ModelDefaultsConfig()
150    pinned_environments: t.Set[str] = set()
151    loader: t.Type[Loader] = SqlMeshLoader
152    loader_kwargs: t.Dict[str, t.Any] = {}
153    env_vars: t.Dict[str, str] = {}
154    username: str = ""
155    physical_schema_mapping: RegexKeyDict = {}
156    environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default
157    physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default
158    virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
159    gateway_managed_virtual_layer: bool = False
160    infer_python_dependencies: bool = True
161    environment_catalog_mapping: RegexKeyDict = {}
162    default_target_environment: str = c.PROD
163    log_limit: int = c.DEFAULT_LOG_LIMIT
164    cicd_bot: t.Optional[CICDBotConfig] = None
165    run: RunConfig = RunConfig()
166    format: FormatConfig = FormatConfig()
167    ui: UIConfig = UIConfig()
168    plan: PlanConfig = PlanConfig()
169    migration: MigrationConfig = MigrationConfig()
170    model_naming: NameInferenceConfig = NameInferenceConfig()
171    variables: t.Dict[str, t.Any] = {}
172    disable_anonymized_analytics: bool = False
173    before_all: t.Optional[t.List[str]] = None
174    after_all: t.Optional[t.List[str]] = None
175    linter: LinterConfig = LinterConfig()
176    janitor: JanitorConfig = JanitorConfig()
177    cache_dir: t.Optional[str] = None
178    dbt: t.Optional[DbtConfig] = None
179
180    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
181        "gateways": UpdateStrategy.NESTED_UPDATE,
182        "notification_targets": UpdateStrategy.EXTEND,
183        "ignore_patterns": UpdateStrategy.EXTEND,
184        "users": UpdateStrategy.EXTEND,
185        "model_defaults": UpdateStrategy.NESTED_UPDATE,
186        "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE,
187        "pinned_environments": UpdateStrategy.EXTEND,
188        "physical_schema_override": UpdateStrategy.KEY_UPDATE,
189        "run": UpdateStrategy.NESTED_UPDATE,
190        "format": UpdateStrategy.NESTED_UPDATE,
191        "ui": UpdateStrategy.NESTED_UPDATE,
192        "loader_kwargs": UpdateStrategy.KEY_UPDATE,
193        "plan": UpdateStrategy.NESTED_UPDATE,
194        "before_all": UpdateStrategy.EXTEND,
195        "after_all": UpdateStrategy.EXTEND,
196        "linter": UpdateStrategy.NESTED_UPDATE,
197        "dbt": UpdateStrategy.NESTED_UPDATE,
198    }
199
200    _connection_config_validator = connection_config_validator
201    _scheduler_config_validator = scheduler_config_validator  # type: ignore
202    _variables_validator = variables_validator
203
204    @model_validator(mode="before")
205    def _normalize_and_validate_fields(cls, data: t.Any) -> t.Any:
206        if not isinstance(data, dict):
207            return data
208
209        if "gateways" not in data and "gateway" in data:
210            data["gateways"] = data.pop("gateway")
211
212        for plan_deprecated in ("auto_categorize_changes", "include_unmodified"):
213            if plan_deprecated in data:
214                raise ConfigError(
215                    f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead."
216                )
217
218        if "physical_schema_override" in data:
219            get_console().log_warning(
220                "`physical_schema_override` is deprecated. Please use `physical_schema_mapping` instead."
221            )
222
223            if "physical_schema_mapping" in data:
224                raise ConfigError(
225                    "Only one of `physical_schema_override` and `physical_schema_mapping` can be specified."
226                )
227
228            physical_schema_override: t.Dict[str, str] = data.pop("physical_schema_override")
229            # translate physical_schema_override to physical_schema_mapping
230            data["physical_schema_mapping"] = {
231                f"^{k}$": v for k, v in physical_schema_override.items()
232            }
233
234        return data
235
236    @model_validator(mode="after")
237    def _normalize_fields_after(self) -> Self:
238        dialect = self.model_defaults.dialect
239
240        def _normalize_identifiers(key: str) -> None:
241            setattr(
242                self,
243                key,
244                {
245                    k: normalize_identifiers(v, dialect=dialect).name
246                    for k, v in getattr(self, key, {}).items()
247                },
248            )
249
250        if (
251            self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG
252            and self.environment_catalog_mapping
253        ):
254            raise ConfigError(
255                f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n"
256                "Please specify one or the other"
257            )
258
259        if self.plan.use_finalized_state and not self.virtual_environment_mode.is_full:
260            raise ConfigError(
261                "Using the finalized state is only supported when `virtual_environment_mode` is set to `full`."
262            )
263
264        if self.environment_catalog_mapping:
265            _normalize_identifiers("environment_catalog_mapping")
266        if self.physical_schema_mapping:
267            _normalize_identifiers("physical_schema_mapping")
268
269        return self
270
271    @model_validator(mode="after")
272    def _inherit_project_config_in_cicd_bot(self) -> Self:
273        if self.cicd_bot:
274            # inherit the project-level settings into the CICD bot if they have not been explicitly overridden
275            if self.cicd_bot.auto_categorize_changes_ is None:
276                self.cicd_bot.auto_categorize_changes_ = self.plan.auto_categorize_changes
277
278            if self.cicd_bot.pr_include_unmodified_ is None:
279                self.cicd_bot.pr_include_unmodified_ = self.plan.include_unmodified
280
281        return self
282
283    def get_default_test_connection(
284        self,
285        default_catalog: t.Optional[str] = None,
286        default_catalog_dialect: t.Optional[str] = None,
287    ) -> ConnectionConfig:
288        return self.default_test_connection_ or DuckDBConnectionConfig(
289            catalogs=(
290                None
291                if default_catalog is None
292                else {
293                    # transpile catalog name from main connection dialect to DuckDB
294                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
295                        dialect="duckdb"
296                    ): ":memory:"
297                }
298            )
299        )
300
301    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
302        if isinstance(self.gateways, dict):
303            if name is None:
304                if self.default_gateway:
305                    # Normalize default_gateway name to lowercase for lookup
306                    default_key = self.default_gateway.lower()
307                    if default_key not in self.gateways:
308                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
309                    return self.gateways[default_key]
310
311                if "" in self.gateways:
312                    return self.gateways[""]
313
314                return first(self.gateways.values())
315
316            # Normalize lookup name to lowercase since gateway keys are already lowercase
317            lookup_key = name.lower()
318            if lookup_key not in self.gateways:
319                raise ConfigError(f"Missing gateway with name '{name}'.")
320
321            return self.gateways[lookup_key]
322        if name is not None:
323            raise ConfigError("Gateway name is not supported when only one gateway is configured.")
324        return self.gateways
325
326    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
327        connection = self.get_gateway(gateway_name).connection or self.default_connection
328        if connection is None:
329            msg = f" for gateway '{gateway_name}'" if gateway_name else ""
330            raise ConfigError(f"No connection configured{msg}.")
331        return connection
332
333    def get_state_connection(
334        self, gateway_name: t.Optional[str] = None
335    ) -> t.Optional[ConnectionConfig]:
336        return self.get_gateway(gateway_name).state_connection
337
338    def get_test_connection(
339        self,
340        gateway_name: t.Optional[str] = None,
341        default_catalog: t.Optional[str] = None,
342        default_catalog_dialect: t.Optional[str] = None,
343    ) -> ConnectionConfig:
344        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
345            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
346        )
347
348    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
349        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
350
351    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
352        return self.get_gateway(gateway_name).state_schema
353
354    @property
355    def default_gateway_name(self) -> str:
356        if self.default_gateway:
357            return self.default_gateway
358        if "" in self.gateways:
359            return ""
360        return first(self.gateways)
361
362    @property
363    def dialect(self) -> t.Optional[str]:
364        return self.model_defaults.dialect
365
366    @property
367    def fingerprint(self) -> str:
368        return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))

An object used by a Context to configure your SQLMesh project.

Arguments:
  • gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
  • default_connection: The default connection to use if one is not specified in a gateway.
  • default_test_connection: The default connection to use for tests if one is not specified in a gateway.
  • default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
  • default_gateway: The default gateway.
  • notification_targets: The notification targets to use.
  • project: The project name of this config. Used for multi-repo setups.
  • snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
  • environment_ttl: The period of time that a development environment should exist before being deleted.
  • ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
  • time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
  • users: A list of users that can be used for approvals/notifications.
  • username: Name of a single user who should receive approvals/notification, instead of all users in the users list.
  • pinned_environments: A list of development environment names that should not be deleted by the janitor task.
  • loader: Loader class used for loading project files.
  • loader_kwargs: Key-value arguments to pass to the loader instance.
  • env_vars: A dictionary of environmental variable names and values.
  • model_defaults: Default values for model definitions.
  • physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
  • environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
  • physical_table_naming_convention: Indicates how tables should be named at the physical layer
  • virtual_environment_mode: Indicates how environments should be handled.
  • gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
  • infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
  • environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
  • default_target_environment: The name of the environment that will be the default target for the sqlmesh plan and sqlmesh run commands.
  • log_limit: The default number of logs to keep.
  • format: The formatting options for SQL code.
  • ui: The UI configuration for SQLMesh.
  • plan: The plan configuration.
  • migration: The migration configuration.
  • variables: A dictionary of variables that can be used in models / macros.
  • disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
  • before_all: SQL statements or macros to be executed at the start of the sqlmesh plan and sqlmesh run commands.
  • after_all: SQL statements or macros to be executed at the end of the sqlmesh plan and sqlmesh run commands.
  • cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder.
gateways: Annotated[Dict[str, sqlmesh.core.config.gateway.GatewayConfig], BeforeValidator(func=<function gateways_ensure_dict at 0x78b6ef1989d0>, json_schema_input_type=PydanticUndefined)]
default_connection: Optional[Annotated[sqlmesh.core.config.connection.ConnectionConfig, SerializeAsAny()]]
default_test_connection_: Optional[Annotated[sqlmesh.core.config.connection.ConnectionConfig, SerializeAsAny()]]
default_gateway: str
project: str
snapshot_ttl: typing.Annotated[str, BeforeValidator(func=<function validate_no_past_ttl at 0x78b6ef11cc10>, json_schema_input_type=PydanticUndefined)]
environment_ttl: Optional[Annotated[str, BeforeValidator(func=<function validate_no_past_ttl at 0x78b6ef11cc10>, json_schema_input_type=PydanticUndefined)]]
ignore_patterns: List[str]
time_column_format: str
users: List[sqlmesh.core.user.User]
pinned_environments: Set[str]
loader_kwargs: Dict[str, Any]
env_vars: Dict[str, str]
username: str
physical_schema_mapping: Annotated[Dict[re.Pattern, str], BeforeValidator(func=<function validate_regex_key_dict at 0x78b6ef19a3b0>, json_schema_input_type=PydanticUndefined)]
physical_table_naming_convention: sqlmesh.core.config.common.TableNamingConvention
gateway_managed_virtual_layer: bool
infer_python_dependencies: bool
environment_catalog_mapping: Annotated[Dict[re.Pattern, str], BeforeValidator(func=<function validate_regex_key_dict at 0x78b6ef19a3b0>, json_schema_input_type=PydanticUndefined)]
default_target_environment: str
log_limit: int
variables: Dict[str, Any]
disable_anonymized_analytics: bool
before_all: Optional[List[str]]
after_all: Optional[List[str]]
cache_dir: Optional[str]
def get_default_test_connection( self, default_catalog: Optional[str] = None, default_catalog_dialect: Optional[str] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
283    def get_default_test_connection(
284        self,
285        default_catalog: t.Optional[str] = None,
286        default_catalog_dialect: t.Optional[str] = None,
287    ) -> ConnectionConfig:
288        return self.default_test_connection_ or DuckDBConnectionConfig(
289            catalogs=(
290                None
291                if default_catalog is None
292                else {
293                    # transpile catalog name from main connection dialect to DuckDB
294                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
295                        dialect="duckdb"
296                    ): ":memory:"
297                }
298            )
299        )
def get_gateway( self, name: Optional[str] = None) -> sqlmesh.core.config.gateway.GatewayConfig:
301    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
302        if isinstance(self.gateways, dict):
303            if name is None:
304                if self.default_gateway:
305                    # Normalize default_gateway name to lowercase for lookup
306                    default_key = self.default_gateway.lower()
307                    if default_key not in self.gateways:
308                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
309                    return self.gateways[default_key]
310
311                if "" in self.gateways:
312                    return self.gateways[""]
313
314                return first(self.gateways.values())
315
316            # Normalize lookup name to lowercase since gateway keys are already lowercase
317            lookup_key = name.lower()
318            if lookup_key not in self.gateways:
319                raise ConfigError(f"Missing gateway with name '{name}'.")
320
321            return self.gateways[lookup_key]
322        if name is not None:
323            raise ConfigError("Gateway name is not supported when only one gateway is configured.")
324        return self.gateways
def get_connection( self, gateway_name: Optional[str] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
326    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
327        connection = self.get_gateway(gateway_name).connection or self.default_connection
328        if connection is None:
329            msg = f" for gateway '{gateway_name}'" if gateway_name else ""
330            raise ConfigError(f"No connection configured{msg}.")
331        return connection
def get_state_connection( self, gateway_name: Optional[str] = None) -> Optional[sqlmesh.core.config.connection.ConnectionConfig]:
333    def get_state_connection(
334        self, gateway_name: t.Optional[str] = None
335    ) -> t.Optional[ConnectionConfig]:
336        return self.get_gateway(gateway_name).state_connection
def get_test_connection( self, gateway_name: Optional[str] = None, default_catalog: Optional[str] = None, default_catalog_dialect: Optional[str] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
338    def get_test_connection(
339        self,
340        gateway_name: t.Optional[str] = None,
341        default_catalog: t.Optional[str] = None,
342        default_catalog_dialect: t.Optional[str] = None,
343    ) -> ConnectionConfig:
344        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
345            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
346        )
def get_scheduler( self, gateway_name: Optional[str] = None) -> sqlmesh.core.config.scheduler.SchedulerConfig:
348    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
349        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
def get_state_schema(self, gateway_name: Optional[str] = None) -> Optional[str]:
351    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
352        return self.get_gateway(gateway_name).state_schema
default_gateway_name: str
354    @property
355    def default_gateway_name(self) -> str:
356        if self.default_gateway:
357            return self.default_gateway
358        if "" in self.gateways:
359            return ""
360        return first(self.gateways)
dialect: Optional[str]
362    @property
363    def dialect(self) -> t.Optional[str]:
364        return self.model_defaults.dialect
fingerprint: str
366    @property
367    def fingerprint(self) -> str:
368        return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields