sqlmesh.core.config.scheduler
1from __future__ import annotations 2 3import abc 4import typing as t 5 6from pydantic import Field, ValidationError 7 8from sqlglot.helper import subclasses 9from sqlmesh.core.config.base import BaseConfig 10from sqlmesh.core.console import get_console 11from sqlmesh.core.plan import ( 12 BuiltInPlanEvaluator, 13 PlanEvaluator, 14) 15from sqlmesh.core.config import DuckDBConnectionConfig 16from sqlmesh.core.state_sync import EngineAdapterStateSync, StateSync 17from sqlmesh.utils.errors import ConfigError 18from sqlmesh.utils.hashing import md5 19from sqlmesh.utils.pydantic import field_validator, validation_error_message 20 21if t.TYPE_CHECKING: 22 from sqlmesh.core.context import GenericContext 23 24from sqlmesh.utils.config import sensitive_fields, excluded_fields 25 26 27class SchedulerConfig(abc.ABC): 28 """Abstract base class for Scheduler configurations.""" 29 30 @abc.abstractmethod 31 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 32 """Creates a Plan Evaluator instance. 33 34 Args: 35 context: The SQLMesh Context. 36 """ 37 38 @abc.abstractmethod 39 def create_state_sync(self, context: GenericContext) -> StateSync: 40 """Creates a State Sync instance. 41 42 Args: 43 context: The SQLMesh Context. 44 45 Returns: 46 The StateSync instance. 47 """ 48 49 @abc.abstractmethod 50 def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: 51 """Returns the default catalog for each gateway. 52 53 Args: 54 context: The SQLMesh Context. 55 """ 56 57 @abc.abstractmethod 58 def state_sync_fingerprint(self, context: GenericContext) -> str: 59 """Returns the fingerprint of the State Sync configuration. 60 61 Args: 62 context: The SQLMesh Context. 63 """ 64 65 66class _EngineAdapterStateSyncSchedulerConfig(SchedulerConfig): 67 def create_state_sync(self, context: GenericContext) -> StateSync: 68 state_connection = ( 69 context.config.get_state_connection(context.gateway) or context.connection_config 70 ) 71 72 warehouse_connection = context.config.get_connection(context.gateway) 73 74 if ( 75 isinstance(state_connection, DuckDBConnectionConfig) 76 and state_connection.concurrent_tasks <= 1 77 ): 78 # If we are using DuckDB, ensure that multithreaded mode gets enabled if necessary 79 if warehouse_connection.concurrent_tasks > 1: 80 get_console().log_warning( 81 "The duckdb state connection is configured for single threaded mode but the warehouse connection is configured for " 82 + f"multi threaded mode with {warehouse_connection.concurrent_tasks} concurrent tasks." 83 + " This can cause SQLMesh to hang. Overriding the duckdb state connection config to use multi threaded mode." 84 ) 85 # this triggers multithreaded mode and has to happen before the engine adapter is created below 86 state_connection.concurrent_tasks = warehouse_connection.concurrent_tasks 87 88 engine_adapter = state_connection.create_engine_adapter() 89 if state_connection.is_forbidden_for_state_sync: 90 raise ConfigError( 91 f"The {engine_adapter.DIALECT.upper()} engine cannot be used to store SQLMesh state - please specify a different `state_connection` engine." 92 + " See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#gateways for more information." 93 ) 94 95 # If the user is using DuckDB for both the state and the warehouse connection, they are most likely running an example project 96 # or POC. To reduce friction, we wont log a warning about DuckDB being used for state until they change to a proper warehouse 97 if not isinstance(state_connection, DuckDBConnectionConfig) or not isinstance( 98 warehouse_connection, DuckDBConnectionConfig 99 ): 100 if not state_connection.is_recommended_for_state_sync: 101 get_console().log_warning( 102 f"The {state_connection.type_} engine is not recommended for storing SQLMesh state in production deployments. Please see" 103 + " https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#state-connection for a list of recommended engines and more information." 104 ) 105 106 schema = context.config.get_state_schema(context.gateway) 107 return EngineAdapterStateSync( 108 engine_adapter, schema=schema, cache_dir=context.cache_dir, console=context.console 109 ) 110 111 def state_sync_fingerprint(self, context: GenericContext) -> str: 112 state_connection = ( 113 context.config.get_state_connection(context.gateway) or context.connection_config 114 ) 115 return md5( 116 [ 117 state_connection.json( 118 sort_keys=True, 119 exclude=sensitive_fields.union(excluded_fields), 120 ) 121 ] 122 ) 123 124 125class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig): 126 """The Built-In Scheduler configuration.""" 127 128 type_: t.Literal["builtin"] = Field(alias="type", default="builtin") 129 130 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 131 return BuiltInPlanEvaluator( 132 state_sync=context.state_sync, 133 snapshot_evaluator=context.snapshot_evaluator, 134 create_scheduler=context.create_scheduler, 135 default_catalog=context.default_catalog, 136 console=context.console, 137 ) 138 139 def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: 140 default_catalogs_per_gateway: t.Dict[str, str] = {} 141 for gateway, adapter in context.engine_adapters.items(): 142 if catalog := adapter.default_catalog: 143 default_catalogs_per_gateway[gateway] = catalog 144 return default_catalogs_per_gateway 145 146 147SCHEDULER_CONFIG_TO_TYPE = { 148 tpe.all_field_infos()["type_"].default: tpe 149 for tpe in subclasses(__name__, BaseConfig, exclude={BaseConfig}) 150} 151 152 153def _scheduler_config_validator( 154 cls: t.Type, v: SchedulerConfig | t.Dict[str, t.Any] | None 155) -> SchedulerConfig | None: 156 if v is None or isinstance(v, SchedulerConfig): 157 return v 158 159 if "type" not in v: 160 raise ConfigError("Missing scheduler type.") 161 162 scheduler_type = v["type"] 163 if scheduler_type not in SCHEDULER_CONFIG_TO_TYPE: 164 raise ConfigError(f"Unknown scheduler type '{scheduler_type}'.") 165 166 try: 167 return SCHEDULER_CONFIG_TO_TYPE[scheduler_type](**v) 168 except ValidationError as e: 169 raise ConfigError( 170 validation_error_message(e, f"Invalid '{scheduler_type}' scheduler config:") 171 + "\n\nVerify your config.yaml and environment variables." 172 ) 173 174 175scheduler_config_validator = field_validator( 176 "scheduler", 177 "default_scheduler", 178 mode="before", 179 check_fields=False, 180)(_scheduler_config_validator)
class
SchedulerConfig(abc.ABC):
28class SchedulerConfig(abc.ABC): 29 """Abstract base class for Scheduler configurations.""" 30 31 @abc.abstractmethod 32 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 33 """Creates a Plan Evaluator instance. 34 35 Args: 36 context: The SQLMesh Context. 37 """ 38 39 @abc.abstractmethod 40 def create_state_sync(self, context: GenericContext) -> StateSync: 41 """Creates a State Sync instance. 42 43 Args: 44 context: The SQLMesh Context. 45 46 Returns: 47 The StateSync instance. 48 """ 49 50 @abc.abstractmethod 51 def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: 52 """Returns the default catalog for each gateway. 53 54 Args: 55 context: The SQLMesh Context. 56 """ 57 58 @abc.abstractmethod 59 def state_sync_fingerprint(self, context: GenericContext) -> str: 60 """Returns the fingerprint of the State Sync configuration. 61 62 Args: 63 context: The SQLMesh Context. 64 """
Abstract base class for Scheduler configurations.
@abc.abstractmethod
def
create_plan_evaluator( self, context: sqlmesh.core.context.GenericContext) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
31 @abc.abstractmethod 32 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 33 """Creates a Plan Evaluator instance. 34 35 Args: 36 context: The SQLMesh Context. 37 """
Creates a Plan Evaluator instance.
Arguments:
- context: The SQLMesh Context.
@abc.abstractmethod
def
create_state_sync( self, context: sqlmesh.core.context.GenericContext) -> sqlmesh.core.state_sync.base.StateSync:
39 @abc.abstractmethod 40 def create_state_sync(self, context: GenericContext) -> StateSync: 41 """Creates a State Sync instance. 42 43 Args: 44 context: The SQLMesh Context. 45 46 Returns: 47 The StateSync instance. 48 """
Creates a State Sync instance.
Arguments:
- context: The SQLMesh Context.
Returns:
The StateSync instance.
@abc.abstractmethod
def
get_default_catalog_per_gateway(self, context: sqlmesh.core.context.GenericContext) -> Dict[str, str]:
50 @abc.abstractmethod 51 def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: 52 """Returns the default catalog for each gateway. 53 54 Args: 55 context: The SQLMesh Context. 56 """
Returns the default catalog for each gateway.
Arguments:
- context: The SQLMesh Context.
@abc.abstractmethod
def
state_sync_fingerprint(self, context: sqlmesh.core.context.GenericContext) -> str:
58 @abc.abstractmethod 59 def state_sync_fingerprint(self, context: GenericContext) -> str: 60 """Returns the fingerprint of the State Sync configuration. 61 62 Args: 63 context: The SQLMesh Context. 64 """
Returns the fingerprint of the State Sync configuration.
Arguments:
- context: The SQLMesh Context.
class
BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, sqlmesh.core.config.base.BaseConfig):
126class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig): 127 """The Built-In Scheduler configuration.""" 128 129 type_: t.Literal["builtin"] = Field(alias="type", default="builtin") 130 131 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 132 return BuiltInPlanEvaluator( 133 state_sync=context.state_sync, 134 snapshot_evaluator=context.snapshot_evaluator, 135 create_scheduler=context.create_scheduler, 136 default_catalog=context.default_catalog, 137 console=context.console, 138 ) 139 140 def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: 141 default_catalogs_per_gateway: t.Dict[str, str] = {} 142 for gateway, adapter in context.engine_adapters.items(): 143 if catalog := adapter.default_catalog: 144 default_catalogs_per_gateway[gateway] = catalog 145 return default_catalogs_per_gateway
The Built-In Scheduler configuration.
def
create_plan_evaluator( self, context: sqlmesh.core.context.GenericContext) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
131 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 132 return BuiltInPlanEvaluator( 133 state_sync=context.state_sync, 134 snapshot_evaluator=context.snapshot_evaluator, 135 create_scheduler=context.create_scheduler, 136 default_catalog=context.default_catalog, 137 console=context.console, 138 )
Creates a Plan Evaluator instance.
Arguments:
- context: The SQLMesh Context.
def
get_default_catalog_per_gateway(self, context: sqlmesh.core.context.GenericContext) -> Dict[str, str]:
140 def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: 141 default_catalogs_per_gateway: t.Dict[str, str] = {} 142 for gateway, adapter in context.engine_adapters.items(): 143 if catalog := adapter.default_catalog: 144 default_catalogs_per_gateway[gateway] = catalog 145 return default_catalogs_per_gateway
Returns the default catalog for each gateway.
Arguments:
- context: The SQLMesh Context.
model_config =
{'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
SCHEDULER_CONFIG_TO_TYPE =
{'builtin': <class 'BuiltInSchedulerConfig'>, 'duckdb': <class 'sqlmesh.core.config.connection.DuckDBConnectionConfig'>}
def
scheduler_config_validator( cls: Type, v: Union[SchedulerConfig, Dict[str, Any], NoneType]) -> SchedulerConfig | None:
154def _scheduler_config_validator( 155 cls: t.Type, v: SchedulerConfig | t.Dict[str, t.Any] | None 156) -> SchedulerConfig | None: 157 if v is None or isinstance(v, SchedulerConfig): 158 return v 159 160 if "type" not in v: 161 raise ConfigError("Missing scheduler type.") 162 163 scheduler_type = v["type"] 164 if scheduler_type not in SCHEDULER_CONFIG_TO_TYPE: 165 raise ConfigError(f"Unknown scheduler type '{scheduler_type}'.") 166 167 try: 168 return SCHEDULER_CONFIG_TO_TYPE[scheduler_type](**v) 169 except ValidationError as e: 170 raise ConfigError( 171 validation_error_message(e, f"Invalid '{scheduler_type}' scheduler config:") 172 + "\n\nVerify your config.yaml and environment variables." 173 )
Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.
This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.
Attributes:
- wrapped: The decorator that has to be wrapped.
- decorator_info: The decorator info.
- shim: A wrapper function to wrap V1 style function.