  1from __future__ import annotations
  3import pickle
  4import re
  5import typing as t
  6import zlib
  8from pydantic import Field
  9from sqlglot import exp
 10from sqlglot.helper import first
 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 13from sqlmesh.cicd.config import CICDBotConfig
 14from sqlmesh.core import constants as c
 15from sqlmesh.core.config import EnvironmentSuffixTarget
 16from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
 17from sqlmesh.core.config.common import variables_validator
 18from sqlmesh.core.config.connection import (
 19    ConnectionConfig,
 20    DuckDBConnectionConfig,
 21    SerializableConnectionConfig,
 22    connection_config_validator,
 24from sqlmesh.core.config.feature_flag import FeatureFlag
 25from sqlmesh.core.config.format import FormatConfig
 26from sqlmesh.core.config.gateway import GatewayConfig
 27from sqlmesh.core.config.migration import MigrationConfig
 28from sqlmesh.core.config.model import ModelDefaultsConfig
 29from sqlmesh.core.config.plan import PlanConfig
 30from sqlmesh.core.config.run import RunConfig
 31from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig, SchedulerConfig
 32from sqlmesh.core.config.ui import UIConfig
 33from sqlmesh.core.loader import Loader, SqlMeshLoader
 34from sqlmesh.core.notification_target import NotificationTarget
 35from sqlmesh.core.user import User
 36from sqlmesh.utils.errors import ConfigError
 37from sqlmesh.utils.pydantic import (
 38    field_validator,
 39    model_validator,
 40    model_validator_v1_args,
 44class Config(BaseConfig):
 45    """An object used by a Context to configure your SQLMesh project.
 47    Args:
 48        gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
 49        default_connection: The default connection to use if one is not specified in a gateway.
 50        default_test_connection: The default connection to use for tests if one is not specified in a gateway.
 51        default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
 52        default_gateway: The default gateway.
 53        notification_targets: The notification targets to use.
 54        project: The project name of this config. Used for multi-repo setups.
 55        snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
 56        environment_ttl: The period of time that a development environment should exist before being deleted.
 57        ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
 58        time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
 59            This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
 60        users: A list of users that can be used for approvals/notifications.
 61        username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list.
 62        pinned_environments: A list of development environment names that should not be deleted by the janitor task.
 63        loader: Loader class used for loading project files.
 64        loader_kwargs: Key-value arguments to pass to the loader instance.
 65        env_vars: A dictionary of environmental variable names and values.
 66        model_defaults: Default values for model definitions.
 67        physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed.
 68        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
 69        default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
 70        log_limit: The default number of logs to keep.
 71        format: The formatting options for SQL code.
 72        ui: The UI configuration for SQLMesh.
 73        feature_flags: Feature flags to enable/disable certain features.
 74        plan: The plan configuration.
 75        migration: The migration configuration.
 76        variables: A dictionary of variables that can be used in models / macros.
 77        disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
 78    """
 80    gateways: t.Dict[str, GatewayConfig] = {"": GatewayConfig()}
 81    default_connection: SerializableConnectionConfig = DuckDBConnectionConfig()
 82    default_test_connection_: t.Optional[SerializableConnectionConfig] = Field(
 83        default=None, alias="default_test_connection"
 84    )
 85    default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
 86    default_gateway: str = ""
 87    notification_targets: t.List[NotificationTarget] = []
 88    project: str = ""
 89    snapshot_ttl: str = c.DEFAULT_SNAPSHOT_TTL
 90    environment_ttl: t.Optional[str] = c.DEFAULT_ENVIRONMENT_TTL
 91    ignore_patterns: t.List[str] = c.IGNORE_PATTERNS
 92    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
 93    users: t.List[User] = []
 94    model_defaults: ModelDefaultsConfig = ModelDefaultsConfig()
 95    pinned_environments: t.Set[str] = set()
 96    loader: t.Type[Loader] = SqlMeshLoader
 97    loader_kwargs: t.Dict[str, t.Any] = {}
 98    env_vars: t.Dict[str, str] = {}
 99    username: str = ""
100    physical_schema_override: t.Dict[str, str] = {}
101    environment_suffix_target: EnvironmentSuffixTarget = Field(
102        default=EnvironmentSuffixTarget.default
103    )
104    environment_catalog_mapping: t.Dict[re.Pattern, str] = {}
105    default_target_environment: str = c.PROD
106    log_limit: int = c.DEFAULT_LOG_LIMIT
107    cicd_bot: t.Optional[CICDBotConfig] = None
108    run: RunConfig = RunConfig()
109    format: FormatConfig = FormatConfig()
110    ui: UIConfig = UIConfig()
111    feature_flags: FeatureFlag = FeatureFlag()
112    plan: PlanConfig = PlanConfig()
113    migration: MigrationConfig = MigrationConfig()
114    variables: t.Dict[str, t.Any] = {}
115    disable_anonymized_analytics: bool = False
117    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
118        "gateways": UpdateStrategy.KEY_UPDATE,
119        "notification_targets": UpdateStrategy.EXTEND,
120        "ignore_patterns": UpdateStrategy.EXTEND,
121        "users": UpdateStrategy.EXTEND,
122        "model_defaults": UpdateStrategy.NESTED_UPDATE,
123        "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE,
124        "pinned_environments": UpdateStrategy.EXTEND,
125        "physical_schema_override": UpdateStrategy.KEY_UPDATE,
126        "run": UpdateStrategy.NESTED_UPDATE,
127        "format": UpdateStrategy.NESTED_UPDATE,
128        "ui": UpdateStrategy.NESTED_UPDATE,
129        "loader_kwargs": UpdateStrategy.KEY_UPDATE,
130        "plan": UpdateStrategy.NESTED_UPDATE,
131    }
133    _connection_config_validator = connection_config_validator
134    _variables_validator = variables_validator
136    @field_validator("gateways", mode="before", always=True)
137    @classmethod
138    def _gateways_ensure_dict(cls, value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
139        try:
140            if not isinstance(value, GatewayConfig):
141                GatewayConfig.parse_obj(value)
142            return {"": value}
143        except Exception:
144            return value
146    @field_validator("environment_catalog_mapping", mode="before")
147    @classmethod
148    def _validate_regex_keys(
149        cls, value: t.Dict[str | re.Pattern, t.Any]
150    ) -> t.Dict[re.Pattern, t.Any]:
151        compiled_regexes = {}
152        for k, v in value.items():
153            try:
154                compiled_regexes[re.compile(k)] = v
155            except re.error:
156                raise ConfigError(f"`{k}` is not a valid regular expression.")
157        return compiled_regexes
159    @model_validator(mode="before")
160    @model_validator_v1_args
161    def _normalize_and_validate_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
162        if "gateways" not in values and "gateway" in values:
163            values["gateways"] = values.pop("gateway")
165        for plan_deprecated in ("auto_categorize_changes", "include_unmodified"):
166            if plan_deprecated in values:
167                raise ConfigError(
168                    f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead."
169                )
171        return values
173    @model_validator(mode="after")
174    @model_validator_v1_args
175    def _normalize_fields_after(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
176        dialect = values["model_defaults"].dialect
177        values["environment_catalog_mapping"] = {
178            k: normalize_identifiers(v, dialect=dialect).name
179            for k, v in values.get("environment_catalog_mapping", {}).items()
180        }
181        return values
183    def get_default_test_connection(
184        self,
185        default_catalog: t.Optional[str] = None,
186        default_catalog_dialect: t.Optional[str] = None,
187    ) -> ConnectionConfig:
188        return self.default_test_connection_ or DuckDBConnectionConfig(
189            catalogs=(
190                None
191                if default_catalog is None
192                else {
193                    # transpile catalog name from main connection dialect to DuckDB
194                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
195                        dialect="duckdb"
196                    ): ":memory:"
197                }
198            )
199        )
201    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
202        if isinstance(self.gateways, dict):
203            if name is None:
204                if self.default_gateway:
205                    if self.default_gateway not in self.gateways:
206                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
207                    return self.gateways[self.default_gateway]
209                if "" in self.gateways:
210                    return self.gateways[""]
212                return first(self.gateways.values())
214            if name not in self.gateways:
215                raise ConfigError(f"Missing gateway with name '{name}'.")
217            return self.gateways[name]
218        else:
219            if name is not None:
220                raise ConfigError(
221                    "Gateway name is not supported when only one gateway is configured."
222                )
223            return self.gateways
225    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
226        return self.get_gateway(gateway_name).connection or self.default_connection
228    def get_state_connection(
229        self, gateway_name: t.Optional[str] = None
230    ) -> t.Optional[ConnectionConfig]:
231        return self.get_gateway(gateway_name).state_connection
233    def get_test_connection(
234        self,
235        gateway_name: t.Optional[str] = None,
236        default_catalog: t.Optional[str] = None,
237        default_catalog_dialect: t.Optional[str] = None,
238    ) -> ConnectionConfig:
239        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
240            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
241        )
243    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
244        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
246    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
247        return self.get_gateway(gateway_name).state_schema
249    @property
250    def default_gateway_name(self) -> str:
251        if self.default_gateway:
252            return self.default_gateway
253        if "" in self.gateways:
254            return ""
255        return first(self.gateways)
257    @property
258    def dialect(self) -> t.Optional[str]:
259        return self.model_defaults.dialect
261    @property
262    def fingerprint(self) -> str:
263        return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
