Edit on GitHub

sqlmesh.core.config.scheduler

  1from __future__ import annotations
  2
  3import abc
  4import typing as t
  5
  6from pydantic import Field, ValidationError
  7
  8from sqlglot.helper import subclasses
  9from sqlmesh.core.config.base import BaseConfig
 10from sqlmesh.core.console import get_console
 11from sqlmesh.core.plan import (
 12    BuiltInPlanEvaluator,
 13    PlanEvaluator,
 14)
 15from sqlmesh.core.config import DuckDBConnectionConfig
 16from sqlmesh.core.state_sync import EngineAdapterStateSync, StateSync
 17from sqlmesh.utils.errors import ConfigError
 18from sqlmesh.utils.hashing import md5
 19from sqlmesh.utils.pydantic import field_validator, validation_error_message
 20
 21if t.TYPE_CHECKING:
 22    from sqlmesh.core.context import GenericContext
 23
 24from sqlmesh.utils.config import sensitive_fields, excluded_fields
 25
 26
 27class SchedulerConfig(abc.ABC):
 28    """Abstract base class for Scheduler configurations."""
 29
 30    @abc.abstractmethod
 31    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
 32        """Creates a Plan Evaluator instance.
 33
 34        Args:
 35            context: The SQLMesh Context.
 36        """
 37
 38    @abc.abstractmethod
 39    def create_state_sync(self, context: GenericContext) -> StateSync:
 40        """Creates a State Sync instance.
 41
 42        Args:
 43            context: The SQLMesh Context.
 44
 45        Returns:
 46            The StateSync instance.
 47        """
 48
 49    @abc.abstractmethod
 50    def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
 51        """Returns the default catalog for each gateway.
 52
 53        Args:
 54            context: The SQLMesh Context.
 55        """
 56
 57    @abc.abstractmethod
 58    def state_sync_fingerprint(self, context: GenericContext) -> str:
 59        """Returns the fingerprint of the State Sync configuration.
 60
 61        Args:
 62            context: The SQLMesh Context.
 63        """
 64
 65
 66class _EngineAdapterStateSyncSchedulerConfig(SchedulerConfig):
 67    def create_state_sync(self, context: GenericContext) -> StateSync:
 68        state_connection = (
 69            context.config.get_state_connection(context.gateway) or context.connection_config
 70        )
 71
 72        warehouse_connection = context.config.get_connection(context.gateway)
 73
 74        if (
 75            isinstance(state_connection, DuckDBConnectionConfig)
 76            and state_connection.concurrent_tasks <= 1
 77        ):
 78            # If we are using DuckDB, ensure that multithreaded mode gets enabled if necessary
 79            if warehouse_connection.concurrent_tasks > 1:
 80                get_console().log_warning(
 81                    "The duckdb state connection is configured for single threaded mode but the warehouse connection is configured for "
 82                    + f"multi threaded mode with {warehouse_connection.concurrent_tasks} concurrent tasks."
 83                    + " This can cause SQLMesh to hang. Overriding the duckdb state connection config to use multi threaded mode."
 84                )
 85                # this triggers multithreaded mode and has to happen before the engine adapter is created below
 86                state_connection.concurrent_tasks = warehouse_connection.concurrent_tasks
 87
 88        engine_adapter = state_connection.create_engine_adapter()
 89        if state_connection.is_forbidden_for_state_sync:
 90            raise ConfigError(
 91                f"The {engine_adapter.DIALECT.upper()} engine cannot be used to store SQLMesh state - please specify a different `state_connection` engine."
 92                + " See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#gateways for more information."
 93            )
 94
 95        # If the user is using DuckDB for both the state and the warehouse connection, they are most likely running an example project
 96        # or POC. To reduce friction, we wont log a warning about DuckDB being used for state until they change to a proper warehouse
 97        if not isinstance(state_connection, DuckDBConnectionConfig) or not isinstance(
 98            warehouse_connection, DuckDBConnectionConfig
 99        ):
100            if not state_connection.is_recommended_for_state_sync:
101                get_console().log_warning(
102                    f"The {state_connection.type_} engine is not recommended for storing SQLMesh state in production deployments. Please see"
103                    + " https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#state-connection for a list of recommended engines and more information."
104                )
105
106        schema = context.config.get_state_schema(context.gateway)
107        return EngineAdapterStateSync(
108            engine_adapter, schema=schema, cache_dir=context.cache_dir, console=context.console
109        )
110
111    def state_sync_fingerprint(self, context: GenericContext) -> str:
112        state_connection = (
113            context.config.get_state_connection(context.gateway) or context.connection_config
114        )
115        return md5(
116            [
117                state_connection.json(
118                    sort_keys=True,
119                    exclude=sensitive_fields.union(excluded_fields),
120                )
121            ]
122        )
123
124
125class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
126    """The Built-In Scheduler configuration."""
127
128    type_: t.Literal["builtin"] = Field(alias="type", default="builtin")
129
130    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
131        return BuiltInPlanEvaluator(
132            state_sync=context.state_sync,
133            snapshot_evaluator=context.snapshot_evaluator,
134            create_scheduler=context.create_scheduler,
135            default_catalog=context.default_catalog,
136            console=context.console,
137        )
138
139    def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
140        default_catalogs_per_gateway: t.Dict[str, str] = {}
141        for gateway, adapter in context.engine_adapters.items():
142            if catalog := adapter.default_catalog:
143                default_catalogs_per_gateway[gateway] = catalog
144        return default_catalogs_per_gateway
145
146
147SCHEDULER_CONFIG_TO_TYPE = {
148    tpe.all_field_infos()["type_"].default: tpe
149    for tpe in subclasses(__name__, BaseConfig, exclude={BaseConfig})
150}
151
152
153def _scheduler_config_validator(
154    cls: t.Type, v: SchedulerConfig | t.Dict[str, t.Any] | None
155) -> SchedulerConfig | None:
156    if v is None or isinstance(v, SchedulerConfig):
157        return v
158
159    if "type" not in v:
160        raise ConfigError("Missing scheduler type.")
161
162    scheduler_type = v["type"]
163    if scheduler_type not in SCHEDULER_CONFIG_TO_TYPE:
164        raise ConfigError(f"Unknown scheduler type '{scheduler_type}'.")
165
166    try:
167        return SCHEDULER_CONFIG_TO_TYPE[scheduler_type](**v)
168    except ValidationError as e:
169        raise ConfigError(
170            validation_error_message(e, f"Invalid '{scheduler_type}' scheduler config:")
171            + "\n\nVerify your config.yaml and environment variables."
172        )
173
174
175scheduler_config_validator = field_validator(
176    "scheduler",
177    "default_scheduler",
178    mode="before",
179    check_fields=False,
180)(_scheduler_config_validator)
class SchedulerConfig(abc.ABC):
28class SchedulerConfig(abc.ABC):
29    """Abstract base class for Scheduler configurations."""
30
31    @abc.abstractmethod
32    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
33        """Creates a Plan Evaluator instance.
34
35        Args:
36            context: The SQLMesh Context.
37        """
38
39    @abc.abstractmethod
40    def create_state_sync(self, context: GenericContext) -> StateSync:
41        """Creates a State Sync instance.
42
43        Args:
44            context: The SQLMesh Context.
45
46        Returns:
47            The StateSync instance.
48        """
49
50    @abc.abstractmethod
51    def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
52        """Returns the default catalog for each gateway.
53
54        Args:
55            context: The SQLMesh Context.
56        """
57
58    @abc.abstractmethod
59    def state_sync_fingerprint(self, context: GenericContext) -> str:
60        """Returns the fingerprint of the State Sync configuration.
61
62        Args:
63            context: The SQLMesh Context.
64        """

Abstract base class for Scheduler configurations.

@abc.abstractmethod
def create_plan_evaluator( self, context: sqlmesh.core.context.GenericContext) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
31    @abc.abstractmethod
32    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
33        """Creates a Plan Evaluator instance.
34
35        Args:
36            context: The SQLMesh Context.
37        """

Creates a Plan Evaluator instance.

Arguments:
  • context: The SQLMesh Context.
@abc.abstractmethod
def create_state_sync( self, context: sqlmesh.core.context.GenericContext) -> sqlmesh.core.state_sync.base.StateSync:
39    @abc.abstractmethod
40    def create_state_sync(self, context: GenericContext) -> StateSync:
41        """Creates a State Sync instance.
42
43        Args:
44            context: The SQLMesh Context.
45
46        Returns:
47            The StateSync instance.
48        """

Creates a State Sync instance.

Arguments:
  • context: The SQLMesh Context.
Returns:

The StateSync instance.

@abc.abstractmethod
def get_default_catalog_per_gateway(self, context: sqlmesh.core.context.GenericContext) -> Dict[str, str]:
50    @abc.abstractmethod
51    def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
52        """Returns the default catalog for each gateway.
53
54        Args:
55            context: The SQLMesh Context.
56        """

Returns the default catalog for each gateway.

Arguments:
  • context: The SQLMesh Context.
@abc.abstractmethod
def state_sync_fingerprint(self, context: sqlmesh.core.context.GenericContext) -> str:
58    @abc.abstractmethod
59    def state_sync_fingerprint(self, context: GenericContext) -> str:
60        """Returns the fingerprint of the State Sync configuration.
61
62        Args:
63            context: The SQLMesh Context.
64        """

Returns the fingerprint of the State Sync configuration.

Arguments:
  • context: The SQLMesh Context.
126class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
127    """The Built-In Scheduler configuration."""
128
129    type_: t.Literal["builtin"] = Field(alias="type", default="builtin")
130
131    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
132        return BuiltInPlanEvaluator(
133            state_sync=context.state_sync,
134            snapshot_evaluator=context.snapshot_evaluator,
135            create_scheduler=context.create_scheduler,
136            default_catalog=context.default_catalog,
137            console=context.console,
138        )
139
140    def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
141        default_catalogs_per_gateway: t.Dict[str, str] = {}
142        for gateway, adapter in context.engine_adapters.items():
143            if catalog := adapter.default_catalog:
144                default_catalogs_per_gateway[gateway] = catalog
145        return default_catalogs_per_gateway

The Built-In Scheduler configuration.

type_: Literal['builtin']
def create_plan_evaluator( self, context: sqlmesh.core.context.GenericContext) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
131    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
132        return BuiltInPlanEvaluator(
133            state_sync=context.state_sync,
134            snapshot_evaluator=context.snapshot_evaluator,
135            create_scheduler=context.create_scheduler,
136            default_catalog=context.default_catalog,
137            console=context.console,
138        )

Creates a Plan Evaluator instance.

Arguments:
  • context: The SQLMesh Context.
def get_default_catalog_per_gateway(self, context: sqlmesh.core.context.GenericContext) -> Dict[str, str]:
140    def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
141        default_catalogs_per_gateway: t.Dict[str, str] = {}
142        for gateway, adapter in context.engine_adapters.items():
143            if catalog := adapter.default_catalog:
144                default_catalogs_per_gateway[gateway] = catalog
145        return default_catalogs_per_gateway

Returns the default catalog for each gateway.

Arguments:
  • context: The SQLMesh Context.
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_EngineAdapterStateSyncSchedulerConfig
create_state_sync
state_sync_fingerprint
sqlmesh.core.config.base.BaseConfig
update_with
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
SCHEDULER_CONFIG_TO_TYPE = {'builtin': <class 'BuiltInSchedulerConfig'>, 'duckdb': <class 'sqlmesh.core.config.connection.DuckDBConnectionConfig'>}
def scheduler_config_validator( cls: Type, v: Union[SchedulerConfig, Dict[str, Any], NoneType]) -> SchedulerConfig | None:
154def _scheduler_config_validator(
155    cls: t.Type, v: SchedulerConfig | t.Dict[str, t.Any] | None
156) -> SchedulerConfig | None:
157    if v is None or isinstance(v, SchedulerConfig):
158        return v
159
160    if "type" not in v:
161        raise ConfigError("Missing scheduler type.")
162
163    scheduler_type = v["type"]
164    if scheduler_type not in SCHEDULER_CONFIG_TO_TYPE:
165        raise ConfigError(f"Unknown scheduler type '{scheduler_type}'.")
166
167    try:
168        return SCHEDULER_CONFIG_TO_TYPE[scheduler_type](**v)
169    except ValidationError as e:
170        raise ConfigError(
171            validation_error_message(e, f"Invalid '{scheduler_type}' scheduler config:")
172            + "\n\nVerify your config.yaml and environment variables."
173        )

Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.

This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.

Attributes:
  • wrapped: The decorator that has to be wrapped.
  • decorator_info: The decorator info.
  • shim: A wrapper function to wrap V1 style function.