Edit on GitHub

sqlmesh.core.config.root

  1from __future__ import annotations
  2
  3import pickle
  4import re
  5import typing as t
  6import zlib
  7
  8from pydantic import Field
  9from sqlglot import exp
 10from sqlglot.helper import first
 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 12
 13from sqlmesh.cicd.config import CICDBotConfig
 14from sqlmesh.core import constants as c
 15from sqlmesh.core.config import EnvironmentSuffixTarget
 16from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
 17from sqlmesh.core.config.common import variables_validator
 18from sqlmesh.core.config.connection import (
 19    ConnectionConfig,
 20    DuckDBConnectionConfig,
 21    SerializableConnectionConfig,
 22    connection_config_validator,
 23)
 24from sqlmesh.core.config.feature_flag import FeatureFlag
 25from sqlmesh.core.config.format import FormatConfig
 26from sqlmesh.core.config.gateway import GatewayConfig
 27from sqlmesh.core.config.migration import MigrationConfig
 28from sqlmesh.core.config.model import ModelDefaultsConfig
 29from sqlmesh.core.config.plan import PlanConfig
 30from sqlmesh.core.config.run import RunConfig
 31from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig, SchedulerConfig
 32from sqlmesh.core.config.ui import UIConfig
 33from sqlmesh.core.loader import Loader, SqlMeshLoader
 34from sqlmesh.core.notification_target import NotificationTarget
 35from sqlmesh.core.user import User
 36from sqlmesh.utils.errors import ConfigError
 37from sqlmesh.utils.pydantic import (
 38    field_validator,
 39    model_validator,
 40    model_validator_v1_args,
 41)
 42
 43
 44class Config(BaseConfig):
 45    """An object used by a Context to configure your SQLMesh project.
 46
 47    Args:
 48        gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
 49        default_connection: The default connection to use if one is not specified in a gateway.
 50        default_test_connection: The default connection to use for tests if one is not specified in a gateway.
 51        default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
 52        default_gateway: The default gateway.
 53        notification_targets: The notification targets to use.
 54        project: The project name of this config. Used for multi-repo setups.
 55        snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
 56        environment_ttl: The period of time that a development environment should exist before being deleted.
 57        ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
 58        time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
 59            This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
 60        users: A list of users that can be used for approvals/notifications.
 61        username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list.
 62        pinned_environments: A list of development environment names that should not be deleted by the janitor task.
 63        loader: Loader class used for loading project files.
 64        loader_kwargs: Key-value arguments to pass to the loader instance.
 65        env_vars: A dictionary of environmental variable names and values.
 66        model_defaults: Default values for model definitions.
 67        physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed.
 68        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
 69        default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
 70        log_limit: The default number of logs to keep.
 71        format: The formatting options for SQL code.
 72        ui: The UI configuration for SQLMesh.
 73        feature_flags: Feature flags to enable/disable certain features.
 74        plan: The plan configuration.
 75        migration: The migration configuration.
 76        variables: A dictionary of variables that can be used in models / macros.
 77        disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
 78    """
 79
 80    gateways: t.Dict[str, GatewayConfig] = {"": GatewayConfig()}
 81    default_connection: SerializableConnectionConfig = DuckDBConnectionConfig()
 82    default_test_connection_: t.Optional[SerializableConnectionConfig] = Field(
 83        default=None, alias="default_test_connection"
 84    )
 85    default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
 86    default_gateway: str = ""
 87    notification_targets: t.List[NotificationTarget] = []
 88    project: str = ""
 89    snapshot_ttl: str = c.DEFAULT_SNAPSHOT_TTL
 90    environment_ttl: t.Optional[str] = c.DEFAULT_ENVIRONMENT_TTL
 91    ignore_patterns: t.List[str] = c.IGNORE_PATTERNS
 92    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
 93    users: t.List[User] = []
 94    model_defaults: ModelDefaultsConfig = ModelDefaultsConfig()
 95    pinned_environments: t.Set[str] = set()
 96    loader: t.Type[Loader] = SqlMeshLoader
 97    loader_kwargs: t.Dict[str, t.Any] = {}
 98    env_vars: t.Dict[str, str] = {}
 99    username: str = ""
100    physical_schema_override: t.Dict[str, str] = {}
101    environment_suffix_target: EnvironmentSuffixTarget = Field(
102        default=EnvironmentSuffixTarget.default
103    )
104    environment_catalog_mapping: t.Dict[re.Pattern, str] = {}
105    default_target_environment: str = c.PROD
106    log_limit: int = c.DEFAULT_LOG_LIMIT
107    cicd_bot: t.Optional[CICDBotConfig] = None
108    run: RunConfig = RunConfig()
109    format: FormatConfig = FormatConfig()
110    ui: UIConfig = UIConfig()
111    feature_flags: FeatureFlag = FeatureFlag()
112    plan: PlanConfig = PlanConfig()
113    migration: MigrationConfig = MigrationConfig()
114    variables: t.Dict[str, t.Any] = {}
115    disable_anonymized_analytics: bool = False
116
117    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
118        "gateways": UpdateStrategy.KEY_UPDATE,
119        "notification_targets": UpdateStrategy.EXTEND,
120        "ignore_patterns": UpdateStrategy.EXTEND,
121        "users": UpdateStrategy.EXTEND,
122        "model_defaults": UpdateStrategy.NESTED_UPDATE,
123        "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE,
124        "pinned_environments": UpdateStrategy.EXTEND,
125        "physical_schema_override": UpdateStrategy.KEY_UPDATE,
126        "run": UpdateStrategy.NESTED_UPDATE,
127        "format": UpdateStrategy.NESTED_UPDATE,
128        "ui": UpdateStrategy.NESTED_UPDATE,
129        "loader_kwargs": UpdateStrategy.KEY_UPDATE,
130        "plan": UpdateStrategy.NESTED_UPDATE,
131    }
132
133    _connection_config_validator = connection_config_validator
134    _variables_validator = variables_validator
135
136    @field_validator("gateways", mode="before", always=True)
137    @classmethod
138    def _gateways_ensure_dict(cls, value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
139        try:
140            if not isinstance(value, GatewayConfig):
141                GatewayConfig.parse_obj(value)
142            return {"": value}
143        except Exception:
144            return value
145
146    @field_validator("environment_catalog_mapping", mode="before")
147    @classmethod
148    def _validate_regex_keys(
149        cls, value: t.Dict[str | re.Pattern, t.Any]
150    ) -> t.Dict[re.Pattern, t.Any]:
151        compiled_regexes = {}
152        for k, v in value.items():
153            try:
154                compiled_regexes[re.compile(k)] = v
155            except re.error:
156                raise ConfigError(f"`{k}` is not a valid regular expression.")
157        return compiled_regexes
158
159    @model_validator(mode="before")
160    @model_validator_v1_args
161    def _normalize_and_validate_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
162        if "gateways" not in values and "gateway" in values:
163            values["gateways"] = values.pop("gateway")
164
165        for plan_deprecated in ("auto_categorize_changes", "include_unmodified"):
166            if plan_deprecated in values:
167                raise ConfigError(
168                    f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead."
169                )
170
171        return values
172
173    @model_validator(mode="after")
174    @model_validator_v1_args
175    def _normalize_fields_after(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
176        dialect = values["model_defaults"].dialect
177        values["environment_catalog_mapping"] = {
178            k: normalize_identifiers(v, dialect=dialect).name
179            for k, v in values.get("environment_catalog_mapping", {}).items()
180        }
181        return values
182
183    def get_default_test_connection(
184        self,
185        default_catalog: t.Optional[str] = None,
186        default_catalog_dialect: t.Optional[str] = None,
187    ) -> ConnectionConfig:
188        return self.default_test_connection_ or DuckDBConnectionConfig(
189            catalogs=(
190                None
191                if default_catalog is None
192                else {
193                    # transpile catalog name from main connection dialect to DuckDB
194                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
195                        dialect="duckdb"
196                    ): ":memory:"
197                }
198            )
199        )
200
201    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
202        if isinstance(self.gateways, dict):
203            if name is None:
204                if self.default_gateway:
205                    if self.default_gateway not in self.gateways:
206                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
207                    return self.gateways[self.default_gateway]
208
209                if "" in self.gateways:
210                    return self.gateways[""]
211
212                return first(self.gateways.values())
213
214            if name not in self.gateways:
215                raise ConfigError(f"Missing gateway with name '{name}'.")
216
217            return self.gateways[name]
218        else:
219            if name is not None:
220                raise ConfigError(
221                    "Gateway name is not supported when only one gateway is configured."
222                )
223            return self.gateways
224
225    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
226        return self.get_gateway(gateway_name).connection or self.default_connection
227
228    def get_state_connection(
229        self, gateway_name: t.Optional[str] = None
230    ) -> t.Optional[ConnectionConfig]:
231        return self.get_gateway(gateway_name).state_connection
232
233    def get_test_connection(
234        self,
235        gateway_name: t.Optional[str] = None,
236        default_catalog: t.Optional[str] = None,
237        default_catalog_dialect: t.Optional[str] = None,
238    ) -> ConnectionConfig:
239        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
240            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
241        )
242
243    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
244        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
245
246    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
247        return self.get_gateway(gateway_name).state_schema
248
249    @property
250    def default_gateway_name(self) -> str:
251        if self.default_gateway:
252            return self.default_gateway
253        if "" in self.gateways:
254            return ""
255        return first(self.gateways)
256
257    @property
258    def dialect(self) -> t.Optional[str]:
259        return self.model_defaults.dialect
260
261    @property
262    def fingerprint(self) -> str:
263        return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))
class Config(sqlmesh.core.config.base.BaseConfig):
 45class Config(BaseConfig):
 46    """An object used by a Context to configure your SQLMesh project.
 47
 48    Args:
 49        gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
 50        default_connection: The default connection to use if one is not specified in a gateway.
 51        default_test_connection: The default connection to use for tests if one is not specified in a gateway.
 52        default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
 53        default_gateway: The default gateway.
 54        notification_targets: The notification targets to use.
 55        project: The project name of this config. Used for multi-repo setups.
 56        snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
 57        environment_ttl: The period of time that a development environment should exist before being deleted.
 58        ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
 59        time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
 60            This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
 61        users: A list of users that can be used for approvals/notifications.
 62        username: Name of a single user who should receive approvals/notification, instead of all users in the `users` list.
 63        pinned_environments: A list of development environment names that should not be deleted by the janitor task.
 64        loader: Loader class used for loading project files.
 65        loader_kwargs: Key-value arguments to pass to the loader instance.
 66        env_vars: A dictionary of environmental variable names and values.
 67        model_defaults: Default values for model definitions.
 68        physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed.
 69        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
 70        default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
 71        log_limit: The default number of logs to keep.
 72        format: The formatting options for SQL code.
 73        ui: The UI configuration for SQLMesh.
 74        feature_flags: Feature flags to enable/disable certain features.
 75        plan: The plan configuration.
 76        migration: The migration configuration.
 77        variables: A dictionary of variables that can be used in models / macros.
 78        disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
 79    """
 80
 81    gateways: t.Dict[str, GatewayConfig] = {"": GatewayConfig()}
 82    default_connection: SerializableConnectionConfig = DuckDBConnectionConfig()
 83    default_test_connection_: t.Optional[SerializableConnectionConfig] = Field(
 84        default=None, alias="default_test_connection"
 85    )
 86    default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
 87    default_gateway: str = ""
 88    notification_targets: t.List[NotificationTarget] = []
 89    project: str = ""
 90    snapshot_ttl: str = c.DEFAULT_SNAPSHOT_TTL
 91    environment_ttl: t.Optional[str] = c.DEFAULT_ENVIRONMENT_TTL
 92    ignore_patterns: t.List[str] = c.IGNORE_PATTERNS
 93    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
 94    users: t.List[User] = []
 95    model_defaults: ModelDefaultsConfig = ModelDefaultsConfig()
 96    pinned_environments: t.Set[str] = set()
 97    loader: t.Type[Loader] = SqlMeshLoader
 98    loader_kwargs: t.Dict[str, t.Any] = {}
 99    env_vars: t.Dict[str, str] = {}
100    username: str = ""
101    physical_schema_override: t.Dict[str, str] = {}
102    environment_suffix_target: EnvironmentSuffixTarget = Field(
103        default=EnvironmentSuffixTarget.default
104    )
105    environment_catalog_mapping: t.Dict[re.Pattern, str] = {}
106    default_target_environment: str = c.PROD
107    log_limit: int = c.DEFAULT_LOG_LIMIT
108    cicd_bot: t.Optional[CICDBotConfig] = None
109    run: RunConfig = RunConfig()
110    format: FormatConfig = FormatConfig()
111    ui: UIConfig = UIConfig()
112    feature_flags: FeatureFlag = FeatureFlag()
113    plan: PlanConfig = PlanConfig()
114    migration: MigrationConfig = MigrationConfig()
115    variables: t.Dict[str, t.Any] = {}
116    disable_anonymized_analytics: bool = False
117
118    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
119        "gateways": UpdateStrategy.KEY_UPDATE,
120        "notification_targets": UpdateStrategy.EXTEND,
121        "ignore_patterns": UpdateStrategy.EXTEND,
122        "users": UpdateStrategy.EXTEND,
123        "model_defaults": UpdateStrategy.NESTED_UPDATE,
124        "auto_categorize_changes": UpdateStrategy.NESTED_UPDATE,
125        "pinned_environments": UpdateStrategy.EXTEND,
126        "physical_schema_override": UpdateStrategy.KEY_UPDATE,
127        "run": UpdateStrategy.NESTED_UPDATE,
128        "format": UpdateStrategy.NESTED_UPDATE,
129        "ui": UpdateStrategy.NESTED_UPDATE,
130        "loader_kwargs": UpdateStrategy.KEY_UPDATE,
131        "plan": UpdateStrategy.NESTED_UPDATE,
132    }
133
134    _connection_config_validator = connection_config_validator
135    _variables_validator = variables_validator
136
137    @field_validator("gateways", mode="before", always=True)
138    @classmethod
139    def _gateways_ensure_dict(cls, value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
140        try:
141            if not isinstance(value, GatewayConfig):
142                GatewayConfig.parse_obj(value)
143            return {"": value}
144        except Exception:
145            return value
146
147    @field_validator("environment_catalog_mapping", mode="before")
148    @classmethod
149    def _validate_regex_keys(
150        cls, value: t.Dict[str | re.Pattern, t.Any]
151    ) -> t.Dict[re.Pattern, t.Any]:
152        compiled_regexes = {}
153        for k, v in value.items():
154            try:
155                compiled_regexes[re.compile(k)] = v
156            except re.error:
157                raise ConfigError(f"`{k}` is not a valid regular expression.")
158        return compiled_regexes
159
160    @model_validator(mode="before")
161    @model_validator_v1_args
162    def _normalize_and_validate_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
163        if "gateways" not in values and "gateway" in values:
164            values["gateways"] = values.pop("gateway")
165
166        for plan_deprecated in ("auto_categorize_changes", "include_unmodified"):
167            if plan_deprecated in values:
168                raise ConfigError(
169                    f"The `{plan_deprecated}` config is deprecated. Please use the `plan.{plan_deprecated}` config instead."
170                )
171
172        return values
173
174    @model_validator(mode="after")
175    @model_validator_v1_args
176    def _normalize_fields_after(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
177        dialect = values["model_defaults"].dialect
178        values["environment_catalog_mapping"] = {
179            k: normalize_identifiers(v, dialect=dialect).name
180            for k, v in values.get("environment_catalog_mapping", {}).items()
181        }
182        return values
183
184    def get_default_test_connection(
185        self,
186        default_catalog: t.Optional[str] = None,
187        default_catalog_dialect: t.Optional[str] = None,
188    ) -> ConnectionConfig:
189        return self.default_test_connection_ or DuckDBConnectionConfig(
190            catalogs=(
191                None
192                if default_catalog is None
193                else {
194                    # transpile catalog name from main connection dialect to DuckDB
195                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
196                        dialect="duckdb"
197                    ): ":memory:"
198                }
199            )
200        )
201
202    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
203        if isinstance(self.gateways, dict):
204            if name is None:
205                if self.default_gateway:
206                    if self.default_gateway not in self.gateways:
207                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
208                    return self.gateways[self.default_gateway]
209
210                if "" in self.gateways:
211                    return self.gateways[""]
212
213                return first(self.gateways.values())
214
215            if name not in self.gateways:
216                raise ConfigError(f"Missing gateway with name '{name}'.")
217
218            return self.gateways[name]
219        else:
220            if name is not None:
221                raise ConfigError(
222                    "Gateway name is not supported when only one gateway is configured."
223                )
224            return self.gateways
225
226    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
227        return self.get_gateway(gateway_name).connection or self.default_connection
228
229    def get_state_connection(
230        self, gateway_name: t.Optional[str] = None
231    ) -> t.Optional[ConnectionConfig]:
232        return self.get_gateway(gateway_name).state_connection
233
234    def get_test_connection(
235        self,
236        gateway_name: t.Optional[str] = None,
237        default_catalog: t.Optional[str] = None,
238        default_catalog_dialect: t.Optional[str] = None,
239    ) -> ConnectionConfig:
240        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
241            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
242        )
243
244    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
245        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
246
247    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
248        return self.get_gateway(gateway_name).state_schema
249
250    @property
251    def default_gateway_name(self) -> str:
252        if self.default_gateway:
253            return self.default_gateway
254        if "" in self.gateways:
255            return ""
256        return first(self.gateways)
257
258    @property
259    def dialect(self) -> t.Optional[str]:
260        return self.model_defaults.dialect
261
262    @property
263    def fingerprint(self) -> str:
264        return str(zlib.crc32(pickle.dumps(self.dict(exclude={"loader", "notification_targets"}))))

An object used by a Context to configure your SQLMesh project.

Arguments:
  • gateways: Supported gateways and their configurations. Key represents a unique name of a gateway.
  • default_connection: The default connection to use if one is not specified in a gateway.
  • default_test_connection: The default connection to use for tests if one is not specified in a gateway.
  • default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
  • default_gateway: The default gateway.
  • notification_targets: The notification targets to use.
  • project: The project name of this config. Used for multi-repo setups.
  • snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
  • environment_ttl: The period of time that a development environment should exist before being deleted.
  • ignore_patterns: Files that match glob patterns specified in this list are ignored when scanning the project folder.
  • time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d. This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
  • users: A list of users that can be used for approvals/notifications.
  • username: Name of a single user who should receive approvals/notification, instead of all users in the users list.
  • pinned_environments: A list of development environment names that should not be deleted by the janitor task.
  • loader: Loader class used for loading project files.
  • loader_kwargs: Key-value arguments to pass to the loader instance.
  • env_vars: A dictionary of environmental variable names and values.
  • model_defaults: Default values for model definitions.
  • physical_schema_override: A mapping from model schema names to names of schemas in which physical tables for corresponding models will be placed.
  • environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
  • default_target_environment: The name of the environment that will be the default target for the sqlmesh plan and sqlmesh run commands.
  • log_limit: The default number of logs to keep.
  • format: The formatting options for SQL code.
  • ui: The UI configuration for SQLMesh.
  • feature_flags: Feature flags to enable/disable certain features.
  • plan: The plan configuration.
  • migration: The migration configuration.
  • variables: A dictionary of variables that can be used in models / macros.
  • disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
def get_default_test_connection( self, default_catalog: Union[str, NoneType] = None, default_catalog_dialect: Union[str, NoneType] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
184    def get_default_test_connection(
185        self,
186        default_catalog: t.Optional[str] = None,
187        default_catalog_dialect: t.Optional[str] = None,
188    ) -> ConnectionConfig:
189        return self.default_test_connection_ or DuckDBConnectionConfig(
190            catalogs=(
191                None
192                if default_catalog is None
193                else {
194                    # transpile catalog name from main connection dialect to DuckDB
195                    exp.parse_identifier(default_catalog, dialect=default_catalog_dialect).sql(
196                        dialect="duckdb"
197                    ): ":memory:"
198                }
199            )
200        )
def get_gateway( self, name: Union[str, NoneType] = None) -> sqlmesh.core.config.gateway.GatewayConfig:
202    def get_gateway(self, name: t.Optional[str] = None) -> GatewayConfig:
203        if isinstance(self.gateways, dict):
204            if name is None:
205                if self.default_gateway:
206                    if self.default_gateway not in self.gateways:
207                        raise ConfigError(f"Missing gateway with name '{self.default_gateway}'")
208                    return self.gateways[self.default_gateway]
209
210                if "" in self.gateways:
211                    return self.gateways[""]
212
213                return first(self.gateways.values())
214
215            if name not in self.gateways:
216                raise ConfigError(f"Missing gateway with name '{name}'.")
217
218            return self.gateways[name]
219        else:
220            if name is not None:
221                raise ConfigError(
222                    "Gateway name is not supported when only one gateway is configured."
223                )
224            return self.gateways
def get_connection( self, gateway_name: Union[str, NoneType] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
226    def get_connection(self, gateway_name: t.Optional[str] = None) -> ConnectionConfig:
227        return self.get_gateway(gateway_name).connection or self.default_connection
def get_state_connection( self, gateway_name: Union[str, NoneType] = None) -> Union[sqlmesh.core.config.connection.ConnectionConfig, NoneType]:
229    def get_state_connection(
230        self, gateway_name: t.Optional[str] = None
231    ) -> t.Optional[ConnectionConfig]:
232        return self.get_gateway(gateway_name).state_connection
def get_test_connection( self, gateway_name: Union[str, NoneType] = None, default_catalog: Union[str, NoneType] = None, default_catalog_dialect: Union[str, NoneType] = None) -> sqlmesh.core.config.connection.ConnectionConfig:
234    def get_test_connection(
235        self,
236        gateway_name: t.Optional[str] = None,
237        default_catalog: t.Optional[str] = None,
238        default_catalog_dialect: t.Optional[str] = None,
239    ) -> ConnectionConfig:
240        return self.get_gateway(gateway_name).test_connection or self.get_default_test_connection(
241            default_catalog=default_catalog, default_catalog_dialect=default_catalog_dialect
242        )
def get_scheduler( self, gateway_name: Union[str, NoneType] = None) -> typing_extensions.Annotated[Union[sqlmesh.core.config.scheduler.BuiltInSchedulerConfig, sqlmesh.core.config.scheduler.AirflowSchedulerConfig, sqlmesh.core.config.scheduler.CloudComposerSchedulerConfig, sqlmesh.core.config.scheduler.MWAASchedulerConfig], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]:
244    def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
245        return self.get_gateway(gateway_name).scheduler or self.default_scheduler
def get_state_schema(self, gateway_name: Union[str, NoneType] = None) -> Union[str, NoneType]:
247    def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
248        return self.get_gateway(gateway_name).state_schema
def model_post_init(self: pydantic.main.BaseModel, _ModelMetaclass__context: Any) -> None:
102                    def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
103                        """We need to both initialize private attributes and call the user-defined model_post_init
104                        method.
105                        """
106                        init_private_attributes(self, __context)
107                        original_model_post_init(self, __context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields