Edit on GitHub

sqlmesh.core.config.scheduler

  1from __future__ import annotations
  2
  3import abc
  4import logging
  5import sys
  6import typing as t
  7
  8from pydantic import Field
  9from requests import Session
 10
 11from sqlmesh.core.config.base import BaseConfig
 12from sqlmesh.core.config.common import concurrent_tasks_validator
 13from sqlmesh.core.console import Console
 14from sqlmesh.core.plan import (
 15    AirflowPlanEvaluator,
 16    BuiltInPlanEvaluator,
 17    MWAAPlanEvaluator,
 18    PlanEvaluator,
 19)
 20from sqlmesh.core.state_sync import EngineAdapterStateSync, StateSync
 21from sqlmesh.schedulers.airflow.client import AirflowClient
 22from sqlmesh.schedulers.airflow.mwaa_client import MWAAClient
 23from sqlmesh.utils.errors import ConfigError
 24from sqlmesh.utils.pydantic import model_validator, model_validator_v1_args
 25
 26if t.TYPE_CHECKING:
 27    from google.auth.transport.requests import AuthorizedSession
 28
 29    from sqlmesh.core.context import GenericContext
 30
 31if sys.version_info >= (3, 9):
 32    from typing import Annotated, Literal
 33else:
 34    from typing_extensions import Annotated, Literal
 35
 36
 37logger = logging.getLogger(__name__)
 38
 39
 40class _SchedulerConfig(abc.ABC):
 41    """Abstract base class for Scheduler configurations."""
 42
 43    @abc.abstractmethod
 44    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
 45        """Creates a Plan Evaluator instance.
 46
 47        Args:
 48            context: The SQLMesh Context.
 49        """
 50
 51    @abc.abstractmethod
 52    def create_state_sync(self, context: GenericContext) -> StateSync:
 53        """Creates a State Sync instance.
 54
 55        Args:
 56            context: The SQLMesh Context.
 57
 58        Returns:
 59            The StateSync instance.
 60        """
 61
 62    @abc.abstractmethod
 63    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
 64        """Returns the default catalog for the Scheduler.
 65
 66        Args:
 67            context: The SQLMesh Context.
 68        """
 69
 70
 71class _EngineAdapterStateSyncSchedulerConfig(_SchedulerConfig):
 72    def create_state_sync(self, context: GenericContext) -> StateSync:
 73        state_connection = (
 74            context.config.get_state_connection(context.gateway) or context._connection_config
 75        )
 76        engine_adapter = state_connection.create_engine_adapter()
 77        if not engine_adapter.SUPPORTS_ROW_LEVEL_OP:
 78            raise ConfigError(
 79                f"The {engine_adapter.DIALECT.upper()} engine cannot be used to store SQLMesh state - please specify a different `state_connection` engine."
 80                + " See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#gateways for more information."
 81            )
 82        schema = context.config.get_state_schema(context.gateway)
 83        return EngineAdapterStateSync(
 84            engine_adapter, schema=schema, context_path=context.path, console=context.console
 85        )
 86
 87
 88class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
 89    """The Built-In Scheduler configuration."""
 90
 91    type_: Literal["builtin"] = Field(alias="type", default="builtin")
 92
 93    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
 94        return BuiltInPlanEvaluator(
 95            state_sync=context.state_sync,
 96            snapshot_evaluator=context.snapshot_evaluator,
 97            default_catalog=self.get_default_catalog(context),
 98            backfill_concurrent_tasks=context.concurrent_tasks,
 99            console=context.console,
100            notification_target_manager=context.notification_target_manager,
101        )
102
103    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
104        return context.engine_adapter.default_catalog
105
106
107class _BaseAirflowSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig):
108    dag_run_poll_interval_secs: int
109    dag_creation_poll_interval_secs: int
110    dag_creation_max_retry_attempts: int
111
112    backfill_concurrent_tasks: int
113    ddl_concurrent_tasks: int
114
115    use_state_connection: bool
116
117    default_catalog_override: t.Optional[str]
118
119    @abc.abstractmethod
120    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
121        """Constructs the Airflow Client instance."""
122
123    def create_state_sync(self, context: GenericContext) -> StateSync:
124        if self.use_state_connection:
125            return super().create_state_sync(context)
126
127        from sqlmesh.schedulers.airflow.state_sync import HttpStateSync
128
129        return HttpStateSync(
130            client=self.get_client(context.console),
131            dag_run_poll_interval_secs=self.dag_run_poll_interval_secs,
132            console=context.console,
133        )
134
135    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
136        return AirflowPlanEvaluator(
137            airflow_client=self.get_client(context.console),
138            dag_run_poll_interval_secs=self.dag_run_poll_interval_secs,
139            dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs,
140            dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts,
141            console=context.console,
142            notification_targets=context.notification_targets,
143            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
144            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
145            users=context.users,
146            state_sync=context.state_sync if self.use_state_connection else None,
147        )
148
149    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
150        # The default catalog must still be set on the Airflow side.
151        default_catalog = self.get_client(context.console).default_catalog
152        return self.default_catalog_override or default_catalog
153
154
155class AirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig):
156    """The Airflow Scheduler configuration.
157
158    Args:
159        airflow_url: The URL of the Airflow Webserver.
160        username: The Airflow username.
161        password: The Airflow password.
162        dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
163        dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
164        dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
165            whether a DAG has been created.
166        backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
167        ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
168        max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
169        use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state.
170        default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
171            over the default catalog value set on the Airflow side.
172    """
173
174    airflow_url: str = "http://localhost:8080/"
175    username: str = "airflow"
176    password: str = "airflow"
177    dag_run_poll_interval_secs: int = 10
178    dag_creation_poll_interval_secs: int = 30
179    dag_creation_max_retry_attempts: int = 10
180
181    backfill_concurrent_tasks: int = 4
182    ddl_concurrent_tasks: int = 4
183
184    max_snapshot_ids_per_request: t.Optional[int] = None
185    use_state_connection: bool = False
186
187    default_catalog_override: t.Optional[str] = None
188
189    type_: Literal["airflow"] = Field(alias="type", default="airflow")
190
191    _concurrent_tasks_validator = concurrent_tasks_validator
192
193    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
194        session = Session()
195        session.headers.update({"Content-Type": "application/json"})
196        session.auth = (self.username, self.password)
197
198        return AirflowClient(
199            session=session,
200            airflow_url=self.airflow_url,
201            console=console,
202            snapshot_ids_batch_size=self.max_snapshot_ids_per_request,
203        )
204
205
206class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig, extra="allow"):
207    """The Google Cloud Composer configuration.
208
209    Args:
210        airflow_url: The URL of the Airflow Webserver.
211        dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
212        dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
213        dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
214            whether a DAG has been created.
215        backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
216        ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
217        max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
218        use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state.
219        default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
220            over the default catalog value set on the Airflow side.
221    """
222
223    airflow_url: str
224    dag_run_poll_interval_secs: int = 10
225    dag_creation_poll_interval_secs: int = 30
226    dag_creation_max_retry_attempts: int = 10
227
228    backfill_concurrent_tasks: int = 4
229    ddl_concurrent_tasks: int = 4
230
231    max_snapshot_ids_per_request: t.Optional[int] = 20
232    use_state_connection: bool = False
233
234    default_catalog_override: t.Optional[str] = None
235
236    type_: Literal["cloud_composer"] = Field(alias="type", default="cloud_composer")
237
238    _concurrent_tasks_validator = concurrent_tasks_validator
239
240    def __init__(self, **data: t.Any) -> None:
241        super().__init__(**data)
242        self._session: t.Optional[AuthorizedSession] = data.get("session")
243
244    @property
245    def session(self) -> AuthorizedSession:
246        import google.auth
247        from google.auth.transport.requests import AuthorizedSession
248
249        if self._session is None:
250            self._session = AuthorizedSession(
251                google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])[0]
252            )
253            self._session.headers.update({"Content-Type": "application/json"})
254        return self._session
255
256    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
257        return AirflowClient(
258            airflow_url=self.airflow_url,
259            session=self.session,
260            console=console,
261            snapshot_ids_batch_size=self.max_snapshot_ids_per_request,
262        )
263
264    @model_validator(mode="before")
265    @model_validator_v1_args
266    def check_supported_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
267        allowed_field_names = {field.alias or name for name, field in cls.all_field_infos().items()}
268        allowed_field_names.add("session")
269
270        for field_name in values:
271            if field_name not in allowed_field_names:
272                raise ValueError(f"Unsupported Field: {field_name}")
273        return values
274
275
276class MWAASchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
277    """The AWS MWAA Scheduler configuration.
278
279    Args:
280        environment: The name of the MWAA environment.
281        dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
282        dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
283        dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
284            whether a DAG has been created.
285        backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
286        ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
287        default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
288            over the default catalog value set on the Airflow side.
289    """
290
291    environment: str
292    dag_run_poll_interval_secs: int = 10
293    dag_creation_poll_interval_secs: int = 30
294    dag_creation_max_retry_attempts: int = 10
295
296    backfill_concurrent_tasks: int = 4
297    ddl_concurrent_tasks: int = 4
298
299    default_catalog_override: t.Optional[str] = None
300
301    type_: Literal["mwaa"] = Field(alias="type", default="mwaa")
302
303    _concurrent_tasks_validator = concurrent_tasks_validator
304
305    def get_client(self, console: t.Optional[Console] = None) -> MWAAClient:
306        return MWAAClient(self.environment, console=console)
307
308    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
309        return MWAAPlanEvaluator(
310            client=self.get_client(context.console),
311            state_sync=context.state_sync,
312            console=context.console,
313            dag_run_poll_interval_secs=self.dag_run_poll_interval_secs,
314            dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs,
315            dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts,
316            notification_targets=context.notification_targets,
317            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
318            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
319            users=context.users,
320        )
321
322    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
323        # The default catalog must still be set on the Airflow side.
324        default_catalog = self.get_client(context.console).default_catalog
325        return self.default_catalog_override or default_catalog
326
327
328SchedulerConfig = Annotated[
329    t.Union[
330        BuiltInSchedulerConfig,
331        AirflowSchedulerConfig,
332        CloudComposerSchedulerConfig,
333        MWAASchedulerConfig,
334    ],
335    Field(discriminator="type_"),
336]
 89class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
 90    """The Built-In Scheduler configuration."""
 91
 92    type_: Literal["builtin"] = Field(alias="type", default="builtin")
 93
 94    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
 95        return BuiltInPlanEvaluator(
 96            state_sync=context.state_sync,
 97            snapshot_evaluator=context.snapshot_evaluator,
 98            default_catalog=self.get_default_catalog(context),
 99            backfill_concurrent_tasks=context.concurrent_tasks,
100            console=context.console,
101            notification_target_manager=context.notification_target_manager,
102        )
103
104    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
105        return context.engine_adapter.default_catalog

The Built-In Scheduler configuration.

def create_plan_evaluator( self, context: <MagicMock id='139879825230192'>) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
 94    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
 95        return BuiltInPlanEvaluator(
 96            state_sync=context.state_sync,
 97            snapshot_evaluator=context.snapshot_evaluator,
 98            default_catalog=self.get_default_catalog(context),
 99            backfill_concurrent_tasks=context.concurrent_tasks,
100            console=context.console,
101            notification_target_manager=context.notification_target_manager,
102        )

Creates a Plan Evaluator instance.

Arguments:
  • context: The SQLMesh Context.
def get_default_catalog(self, context: <MagicMock id='139879824857216'>) -> Union[str, NoneType]:
104    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
105        return context.engine_adapter.default_catalog

Returns the default catalog for the Scheduler.

Arguments:
  • context: The SQLMesh Context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_EngineAdapterStateSyncSchedulerConfig
create_state_sync
sqlmesh.core.config.base.BaseConfig
update_with
model_post_init
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class AirflowSchedulerConfig(_BaseAirflowSchedulerConfig, sqlmesh.core.config.base.BaseConfig):
156class AirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig):
157    """The Airflow Scheduler configuration.
158
159    Args:
160        airflow_url: The URL of the Airflow Webserver.
161        username: The Airflow username.
162        password: The Airflow password.
163        dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
164        dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
165        dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
166            whether a DAG has been created.
167        backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
168        ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
169        max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
170        use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state.
171        default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
172            over the default catalog value set on the Airflow side.
173    """
174
175    airflow_url: str = "http://localhost:8080/"
176    username: str = "airflow"
177    password: str = "airflow"
178    dag_run_poll_interval_secs: int = 10
179    dag_creation_poll_interval_secs: int = 30
180    dag_creation_max_retry_attempts: int = 10
181
182    backfill_concurrent_tasks: int = 4
183    ddl_concurrent_tasks: int = 4
184
185    max_snapshot_ids_per_request: t.Optional[int] = None
186    use_state_connection: bool = False
187
188    default_catalog_override: t.Optional[str] = None
189
190    type_: Literal["airflow"] = Field(alias="type", default="airflow")
191
192    _concurrent_tasks_validator = concurrent_tasks_validator
193
194    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
195        session = Session()
196        session.headers.update({"Content-Type": "application/json"})
197        session.auth = (self.username, self.password)
198
199        return AirflowClient(
200            session=session,
201            airflow_url=self.airflow_url,
202            console=console,
203            snapshot_ids_batch_size=self.max_snapshot_ids_per_request,
204        )

The Airflow Scheduler configuration.

Arguments:
  • airflow_url: The URL of the Airflow Webserver.
  • username: The Airflow username.
  • password: The Airflow password.
  • dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
  • dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
  • dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for whether a DAG has been created.
  • backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
  • ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
  • max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
  • use_state_connection: Whether to use the state_connection configuration to access the SQLMesh state.
  • default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence over the default catalog value set on the Airflow side.
def get_client( self, console: Union[sqlmesh.core.console.Console, NoneType] = None) -> sqlmesh.schedulers.airflow.client.AirflowClient:
194    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
195        session = Session()
196        session.headers.update({"Content-Type": "application/json"})
197        session.auth = (self.username, self.password)
198
199        return AirflowClient(
200            session=session,
201            airflow_url=self.airflow_url,
202            console=console,
203            snapshot_ids_batch_size=self.max_snapshot_ids_per_request,
204        )

Constructs the Airflow Client instance.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_BaseAirflowSchedulerConfig
create_state_sync
create_plan_evaluator
get_default_catalog
sqlmesh.core.config.base.BaseConfig
update_with
model_post_init
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, sqlmesh.core.config.base.BaseConfig):
207class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig, extra="allow"):
208    """The Google Cloud Composer configuration.
209
210    Args:
211        airflow_url: The URL of the Airflow Webserver.
212        dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
213        dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
214        dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
215            whether a DAG has been created.
216        backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
217        ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
218        max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
219        use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state.
220        default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
221            over the default catalog value set on the Airflow side.
222    """
223
224    airflow_url: str
225    dag_run_poll_interval_secs: int = 10
226    dag_creation_poll_interval_secs: int = 30
227    dag_creation_max_retry_attempts: int = 10
228
229    backfill_concurrent_tasks: int = 4
230    ddl_concurrent_tasks: int = 4
231
232    max_snapshot_ids_per_request: t.Optional[int] = 20
233    use_state_connection: bool = False
234
235    default_catalog_override: t.Optional[str] = None
236
237    type_: Literal["cloud_composer"] = Field(alias="type", default="cloud_composer")
238
239    _concurrent_tasks_validator = concurrent_tasks_validator
240
241    def __init__(self, **data: t.Any) -> None:
242        super().__init__(**data)
243        self._session: t.Optional[AuthorizedSession] = data.get("session")
244
245    @property
246    def session(self) -> AuthorizedSession:
247        import google.auth
248        from google.auth.transport.requests import AuthorizedSession
249
250        if self._session is None:
251            self._session = AuthorizedSession(
252                google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])[0]
253            )
254            self._session.headers.update({"Content-Type": "application/json"})
255        return self._session
256
257    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
258        return AirflowClient(
259            airflow_url=self.airflow_url,
260            session=self.session,
261            console=console,
262            snapshot_ids_batch_size=self.max_snapshot_ids_per_request,
263        )
264
265    @model_validator(mode="before")
266    @model_validator_v1_args
267    def check_supported_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
268        allowed_field_names = {field.alias or name for name, field in cls.all_field_infos().items()}
269        allowed_field_names.add("session")
270
271        for field_name in values:
272            if field_name not in allowed_field_names:
273                raise ValueError(f"Unsupported Field: {field_name}")
274        return values

The Google Cloud Composer configuration.

Arguments:
  • airflow_url: The URL of the Airflow Webserver.
  • dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
  • dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
  • dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for whether a DAG has been created.
  • backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
  • ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
  • max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
  • use_state_connection: Whether to use the state_connection configuration to access the SQLMesh state.
  • default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence over the default catalog value set on the Airflow side.
CloudComposerSchedulerConfig(**data: Any)
241    def __init__(self, **data: t.Any) -> None:
242        super().__init__(**data)
243        self._session: t.Optional[AuthorizedSession] = data.get("session")

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

def get_client( self, console: Union[sqlmesh.core.console.Console, NoneType] = None) -> sqlmesh.schedulers.airflow.client.AirflowClient:
257    def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
258        return AirflowClient(
259            airflow_url=self.airflow_url,
260            session=self.session,
261            console=console,
262            snapshot_ids_batch_size=self.max_snapshot_ids_per_request,
263        )

Constructs the Airflow Client instance.

@model_validator(mode='before')
@model_validator_v1_args
def check_supported_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
265    @model_validator(mode="before")
266    @model_validator_v1_args
267    def check_supported_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
268        allowed_field_names = {field.alias or name for name, field in cls.all_field_infos().items()}
269        allowed_field_names.add("session")
270
271        for field_name in values:
272            if field_name not in allowed_field_names:
273                raise ValueError(f"Unsupported Field: {field_name}")
274        return values
Inherited Members
_BaseAirflowSchedulerConfig
create_state_sync
create_plan_evaluator
get_default_catalog
sqlmesh.core.config.base.BaseConfig
update_with
model_post_init
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
277class MWAASchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
278    """The AWS MWAA Scheduler configuration.
279
280    Args:
281        environment: The name of the MWAA environment.
282        dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
283        dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
284        dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
285            whether a DAG has been created.
286        backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
287        ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
288        default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
289            over the default catalog value set on the Airflow side.
290    """
291
292    environment: str
293    dag_run_poll_interval_secs: int = 10
294    dag_creation_poll_interval_secs: int = 30
295    dag_creation_max_retry_attempts: int = 10
296
297    backfill_concurrent_tasks: int = 4
298    ddl_concurrent_tasks: int = 4
299
300    default_catalog_override: t.Optional[str] = None
301
302    type_: Literal["mwaa"] = Field(alias="type", default="mwaa")
303
304    _concurrent_tasks_validator = concurrent_tasks_validator
305
306    def get_client(self, console: t.Optional[Console] = None) -> MWAAClient:
307        return MWAAClient(self.environment, console=console)
308
309    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
310        return MWAAPlanEvaluator(
311            client=self.get_client(context.console),
312            state_sync=context.state_sync,
313            console=context.console,
314            dag_run_poll_interval_secs=self.dag_run_poll_interval_secs,
315            dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs,
316            dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts,
317            notification_targets=context.notification_targets,
318            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
319            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
320            users=context.users,
321        )
322
323    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
324        # The default catalog must still be set on the Airflow side.
325        default_catalog = self.get_client(context.console).default_catalog
326        return self.default_catalog_override or default_catalog

The AWS MWAA Scheduler configuration.

Arguments:
  • environment: The name of the MWAA environment.
  • dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
  • dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
  • dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for whether a DAG has been created.
  • backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
  • ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
  • default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence over the default catalog value set on the Airflow side.
def get_client( self, console: Union[sqlmesh.core.console.Console, NoneType] = None) -> sqlmesh.schedulers.airflow.mwaa_client.MWAAClient:
306    def get_client(self, console: t.Optional[Console] = None) -> MWAAClient:
307        return MWAAClient(self.environment, console=console)
def create_plan_evaluator( self, context: <MagicMock id='139879822426752'>) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
309    def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
310        return MWAAPlanEvaluator(
311            client=self.get_client(context.console),
312            state_sync=context.state_sync,
313            console=context.console,
314            dag_run_poll_interval_secs=self.dag_run_poll_interval_secs,
315            dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs,
316            dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts,
317            notification_targets=context.notification_targets,
318            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
319            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
320            users=context.users,
321        )

Creates a Plan Evaluator instance.

Arguments:
  • context: The SQLMesh Context.
def get_default_catalog(self, context: <MagicMock id='139879822448624'>) -> Union[str, NoneType]:
323    def get_default_catalog(self, context: GenericContext) -> t.Optional[str]:
324        # The default catalog must still be set on the Airflow side.
325        default_catalog = self.get_client(context.console).default_catalog
326        return self.default_catalog_override or default_catalog

Returns the default catalog for the Scheduler.

Arguments:
  • context: The SQLMesh Context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_EngineAdapterStateSyncSchedulerConfig
create_state_sync
sqlmesh.core.config.base.BaseConfig
update_with
model_post_init
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields