sqlmesh.core.config.scheduler
1from __future__ import annotations 2 3import abc 4import logging 5import sys 6import typing as t 7 8from pydantic import Field 9from requests import Session 10 11from sqlmesh.core.config.base import BaseConfig 12from sqlmesh.core.config.common import concurrent_tasks_validator 13from sqlmesh.core.console import Console 14from sqlmesh.core.plan import ( 15 AirflowPlanEvaluator, 16 BuiltInPlanEvaluator, 17 MWAAPlanEvaluator, 18 PlanEvaluator, 19) 20from sqlmesh.core.state_sync import EngineAdapterStateSync, StateSync 21from sqlmesh.schedulers.airflow.client import AirflowClient 22from sqlmesh.schedulers.airflow.mwaa_client import MWAAClient 23from sqlmesh.utils.errors import ConfigError 24from sqlmesh.utils.pydantic import model_validator, model_validator_v1_args 25 26if t.TYPE_CHECKING: 27 from google.auth.transport.requests import AuthorizedSession 28 29 from sqlmesh.core.context import GenericContext 30 31if sys.version_info >= (3, 9): 32 from typing import Annotated, Literal 33else: 34 from typing_extensions import Annotated, Literal 35 36 37logger = logging.getLogger(__name__) 38 39 40class _SchedulerConfig(abc.ABC): 41 """Abstract base class for Scheduler configurations.""" 42 43 @abc.abstractmethod 44 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 45 """Creates a Plan Evaluator instance. 46 47 Args: 48 context: The SQLMesh Context. 49 """ 50 51 @abc.abstractmethod 52 def create_state_sync(self, context: GenericContext) -> StateSync: 53 """Creates a State Sync instance. 54 55 Args: 56 context: The SQLMesh Context. 57 58 Returns: 59 The StateSync instance. 60 """ 61 62 @abc.abstractmethod 63 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 64 """Returns the default catalog for the Scheduler. 65 66 Args: 67 context: The SQLMesh Context. 68 """ 69 70 71class _EngineAdapterStateSyncSchedulerConfig(_SchedulerConfig): 72 def create_state_sync(self, context: GenericContext) -> StateSync: 73 state_connection = ( 74 context.config.get_state_connection(context.gateway) or context._connection_config 75 ) 76 engine_adapter = state_connection.create_engine_adapter() 77 if not engine_adapter.SUPPORTS_ROW_LEVEL_OP: 78 raise ConfigError( 79 f"The {engine_adapter.DIALECT.upper()} engine cannot be used to store SQLMesh state - please specify a different `state_connection` engine." 80 + " See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#gateways for more information." 81 ) 82 schema = context.config.get_state_schema(context.gateway) 83 return EngineAdapterStateSync( 84 engine_adapter, schema=schema, context_path=context.path, console=context.console 85 ) 86 87 88class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig): 89 """The Built-In Scheduler configuration.""" 90 91 type_: Literal["builtin"] = Field(alias="type", default="builtin") 92 93 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 94 return BuiltInPlanEvaluator( 95 state_sync=context.state_sync, 96 snapshot_evaluator=context.snapshot_evaluator, 97 default_catalog=self.get_default_catalog(context), 98 backfill_concurrent_tasks=context.concurrent_tasks, 99 console=context.console, 100 notification_target_manager=context.notification_target_manager, 101 ) 102 103 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 104 return context.engine_adapter.default_catalog 105 106 107class _BaseAirflowSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig): 108 dag_run_poll_interval_secs: int 109 dag_creation_poll_interval_secs: int 110 dag_creation_max_retry_attempts: int 111 112 backfill_concurrent_tasks: int 113 ddl_concurrent_tasks: int 114 115 use_state_connection: bool 116 117 default_catalog_override: t.Optional[str] 118 119 @abc.abstractmethod 120 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 121 """Constructs the Airflow Client instance.""" 122 123 def create_state_sync(self, context: GenericContext) -> StateSync: 124 if self.use_state_connection: 125 return super().create_state_sync(context) 126 127 from sqlmesh.schedulers.airflow.state_sync import HttpStateSync 128 129 return HttpStateSync( 130 client=self.get_client(context.console), 131 dag_run_poll_interval_secs=self.dag_run_poll_interval_secs, 132 console=context.console, 133 ) 134 135 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 136 return AirflowPlanEvaluator( 137 airflow_client=self.get_client(context.console), 138 dag_run_poll_interval_secs=self.dag_run_poll_interval_secs, 139 dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs, 140 dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts, 141 console=context.console, 142 notification_targets=context.notification_targets, 143 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 144 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 145 users=context.users, 146 state_sync=context.state_sync if self.use_state_connection else None, 147 ) 148 149 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 150 # The default catalog must still be set on the Airflow side. 151 default_catalog = self.get_client(context.console).default_catalog 152 return self.default_catalog_override or default_catalog 153 154 155class AirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig): 156 """The Airflow Scheduler configuration. 157 158 Args: 159 airflow_url: The URL of the Airflow Webserver. 160 username: The Airflow username. 161 password: The Airflow password. 162 dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds). 163 dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds). 164 dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for 165 whether a DAG has been created. 166 backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application. 167 ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc). 168 max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver. 169 use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state. 170 default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence 171 over the default catalog value set on the Airflow side. 172 """ 173 174 airflow_url: str = "http://localhost:8080/" 175 username: str = "airflow" 176 password: str = "airflow" 177 dag_run_poll_interval_secs: int = 10 178 dag_creation_poll_interval_secs: int = 30 179 dag_creation_max_retry_attempts: int = 10 180 181 backfill_concurrent_tasks: int = 4 182 ddl_concurrent_tasks: int = 4 183 184 max_snapshot_ids_per_request: t.Optional[int] = None 185 use_state_connection: bool = False 186 187 default_catalog_override: t.Optional[str] = None 188 189 type_: Literal["airflow"] = Field(alias="type", default="airflow") 190 191 _concurrent_tasks_validator = concurrent_tasks_validator 192 193 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 194 session = Session() 195 session.headers.update({"Content-Type": "application/json"}) 196 session.auth = (self.username, self.password) 197 198 return AirflowClient( 199 session=session, 200 airflow_url=self.airflow_url, 201 console=console, 202 snapshot_ids_batch_size=self.max_snapshot_ids_per_request, 203 ) 204 205 206class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig, extra="allow"): 207 """The Google Cloud Composer configuration. 208 209 Args: 210 airflow_url: The URL of the Airflow Webserver. 211 dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds). 212 dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds). 213 dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for 214 whether a DAG has been created. 215 backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application. 216 ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc). 217 max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver. 218 use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state. 219 default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence 220 over the default catalog value set on the Airflow side. 221 """ 222 223 airflow_url: str 224 dag_run_poll_interval_secs: int = 10 225 dag_creation_poll_interval_secs: int = 30 226 dag_creation_max_retry_attempts: int = 10 227 228 backfill_concurrent_tasks: int = 4 229 ddl_concurrent_tasks: int = 4 230 231 max_snapshot_ids_per_request: t.Optional[int] = 20 232 use_state_connection: bool = False 233 234 default_catalog_override: t.Optional[str] = None 235 236 type_: Literal["cloud_composer"] = Field(alias="type", default="cloud_composer") 237 238 _concurrent_tasks_validator = concurrent_tasks_validator 239 240 def __init__(self, **data: t.Any) -> None: 241 super().__init__(**data) 242 self._session: t.Optional[AuthorizedSession] = data.get("session") 243 244 @property 245 def session(self) -> AuthorizedSession: 246 import google.auth 247 from google.auth.transport.requests import AuthorizedSession 248 249 if self._session is None: 250 self._session = AuthorizedSession( 251 google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])[0] 252 ) 253 self._session.headers.update({"Content-Type": "application/json"}) 254 return self._session 255 256 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 257 return AirflowClient( 258 airflow_url=self.airflow_url, 259 session=self.session, 260 console=console, 261 snapshot_ids_batch_size=self.max_snapshot_ids_per_request, 262 ) 263 264 @model_validator(mode="before") 265 @model_validator_v1_args 266 def check_supported_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 267 allowed_field_names = {field.alias or name for name, field in cls.all_field_infos().items()} 268 allowed_field_names.add("session") 269 270 for field_name in values: 271 if field_name not in allowed_field_names: 272 raise ValueError(f"Unsupported Field: {field_name}") 273 return values 274 275 276class MWAASchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig): 277 """The AWS MWAA Scheduler configuration. 278 279 Args: 280 environment: The name of the MWAA environment. 281 dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds). 282 dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds). 283 dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for 284 whether a DAG has been created. 285 backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application. 286 ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc). 287 default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence 288 over the default catalog value set on the Airflow side. 289 """ 290 291 environment: str 292 dag_run_poll_interval_secs: int = 10 293 dag_creation_poll_interval_secs: int = 30 294 dag_creation_max_retry_attempts: int = 10 295 296 backfill_concurrent_tasks: int = 4 297 ddl_concurrent_tasks: int = 4 298 299 default_catalog_override: t.Optional[str] = None 300 301 type_: Literal["mwaa"] = Field(alias="type", default="mwaa") 302 303 _concurrent_tasks_validator = concurrent_tasks_validator 304 305 def get_client(self, console: t.Optional[Console] = None) -> MWAAClient: 306 return MWAAClient(self.environment, console=console) 307 308 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 309 return MWAAPlanEvaluator( 310 client=self.get_client(context.console), 311 state_sync=context.state_sync, 312 console=context.console, 313 dag_run_poll_interval_secs=self.dag_run_poll_interval_secs, 314 dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs, 315 dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts, 316 notification_targets=context.notification_targets, 317 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 318 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 319 users=context.users, 320 ) 321 322 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 323 # The default catalog must still be set on the Airflow side. 324 default_catalog = self.get_client(context.console).default_catalog 325 return self.default_catalog_override or default_catalog 326 327 328SchedulerConfig = Annotated[ 329 t.Union[ 330 BuiltInSchedulerConfig, 331 AirflowSchedulerConfig, 332 CloudComposerSchedulerConfig, 333 MWAASchedulerConfig, 334 ], 335 Field(discriminator="type_"), 336]
class
BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, sqlmesh.core.config.base.BaseConfig):
89class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig): 90 """The Built-In Scheduler configuration.""" 91 92 type_: Literal["builtin"] = Field(alias="type", default="builtin") 93 94 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 95 return BuiltInPlanEvaluator( 96 state_sync=context.state_sync, 97 snapshot_evaluator=context.snapshot_evaluator, 98 default_catalog=self.get_default_catalog(context), 99 backfill_concurrent_tasks=context.concurrent_tasks, 100 console=context.console, 101 notification_target_manager=context.notification_target_manager, 102 ) 103 104 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 105 return context.engine_adapter.default_catalog
The Built-In Scheduler configuration.
def
create_plan_evaluator( self, context: <MagicMock id='139879825230192'>) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
94 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 95 return BuiltInPlanEvaluator( 96 state_sync=context.state_sync, 97 snapshot_evaluator=context.snapshot_evaluator, 98 default_catalog=self.get_default_catalog(context), 99 backfill_concurrent_tasks=context.concurrent_tasks, 100 console=context.console, 101 notification_target_manager=context.notification_target_manager, 102 )
Creates a Plan Evaluator instance.
Arguments:
- context: The SQLMesh Context.
def
get_default_catalog(self, context: <MagicMock id='139879824857216'>) -> Union[str, NoneType]:
104 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 105 return context.engine_adapter.default_catalog
Returns the default catalog for the Scheduler.
Arguments:
- context: The SQLMesh Context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
156class AirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig): 157 """The Airflow Scheduler configuration. 158 159 Args: 160 airflow_url: The URL of the Airflow Webserver. 161 username: The Airflow username. 162 password: The Airflow password. 163 dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds). 164 dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds). 165 dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for 166 whether a DAG has been created. 167 backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application. 168 ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc). 169 max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver. 170 use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state. 171 default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence 172 over the default catalog value set on the Airflow side. 173 """ 174 175 airflow_url: str = "http://localhost:8080/" 176 username: str = "airflow" 177 password: str = "airflow" 178 dag_run_poll_interval_secs: int = 10 179 dag_creation_poll_interval_secs: int = 30 180 dag_creation_max_retry_attempts: int = 10 181 182 backfill_concurrent_tasks: int = 4 183 ddl_concurrent_tasks: int = 4 184 185 max_snapshot_ids_per_request: t.Optional[int] = None 186 use_state_connection: bool = False 187 188 default_catalog_override: t.Optional[str] = None 189 190 type_: Literal["airflow"] = Field(alias="type", default="airflow") 191 192 _concurrent_tasks_validator = concurrent_tasks_validator 193 194 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 195 session = Session() 196 session.headers.update({"Content-Type": "application/json"}) 197 session.auth = (self.username, self.password) 198 199 return AirflowClient( 200 session=session, 201 airflow_url=self.airflow_url, 202 console=console, 203 snapshot_ids_batch_size=self.max_snapshot_ids_per_request, 204 )
The Airflow Scheduler configuration.
Arguments:
- airflow_url: The URL of the Airflow Webserver.
- username: The Airflow username.
- password: The Airflow password.
- dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
- dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
- dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for whether a DAG has been created.
- backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
- ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
- max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
- use_state_connection: Whether to use the
state_connection
configuration to access the SQLMesh state. - default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence over the default catalog value set on the Airflow side.
def
get_client( self, console: Union[sqlmesh.core.console.Console, NoneType] = None) -> sqlmesh.schedulers.airflow.client.AirflowClient:
194 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 195 session = Session() 196 session.headers.update({"Content-Type": "application/json"}) 197 session.auth = (self.username, self.password) 198 199 return AirflowClient( 200 session=session, 201 airflow_url=self.airflow_url, 202 console=console, 203 snapshot_ids_batch_size=self.max_snapshot_ids_per_request, 204 )
Constructs the Airflow Client instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
class
CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, sqlmesh.core.config.base.BaseConfig):
207class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig, extra="allow"): 208 """The Google Cloud Composer configuration. 209 210 Args: 211 airflow_url: The URL of the Airflow Webserver. 212 dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds). 213 dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds). 214 dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for 215 whether a DAG has been created. 216 backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application. 217 ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc). 218 max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver. 219 use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state. 220 default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence 221 over the default catalog value set on the Airflow side. 222 """ 223 224 airflow_url: str 225 dag_run_poll_interval_secs: int = 10 226 dag_creation_poll_interval_secs: int = 30 227 dag_creation_max_retry_attempts: int = 10 228 229 backfill_concurrent_tasks: int = 4 230 ddl_concurrent_tasks: int = 4 231 232 max_snapshot_ids_per_request: t.Optional[int] = 20 233 use_state_connection: bool = False 234 235 default_catalog_override: t.Optional[str] = None 236 237 type_: Literal["cloud_composer"] = Field(alias="type", default="cloud_composer") 238 239 _concurrent_tasks_validator = concurrent_tasks_validator 240 241 def __init__(self, **data: t.Any) -> None: 242 super().__init__(**data) 243 self._session: t.Optional[AuthorizedSession] = data.get("session") 244 245 @property 246 def session(self) -> AuthorizedSession: 247 import google.auth 248 from google.auth.transport.requests import AuthorizedSession 249 250 if self._session is None: 251 self._session = AuthorizedSession( 252 google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])[0] 253 ) 254 self._session.headers.update({"Content-Type": "application/json"}) 255 return self._session 256 257 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 258 return AirflowClient( 259 airflow_url=self.airflow_url, 260 session=self.session, 261 console=console, 262 snapshot_ids_batch_size=self.max_snapshot_ids_per_request, 263 ) 264 265 @model_validator(mode="before") 266 @model_validator_v1_args 267 def check_supported_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 268 allowed_field_names = {field.alias or name for name, field in cls.all_field_infos().items()} 269 allowed_field_names.add("session") 270 271 for field_name in values: 272 if field_name not in allowed_field_names: 273 raise ValueError(f"Unsupported Field: {field_name}") 274 return values
The Google Cloud Composer configuration.
Arguments:
- airflow_url: The URL of the Airflow Webserver.
- dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
- dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
- dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for whether a DAG has been created.
- backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
- ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
- max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver.
- use_state_connection: Whether to use the
state_connection
configuration to access the SQLMesh state. - default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence over the default catalog value set on the Airflow side.
CloudComposerSchedulerConfig(**data: Any)
241 def __init__(self, **data: t.Any) -> None: 242 super().__init__(**data) 243 self._session: t.Optional[AuthorizedSession] = data.get("session")
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError
][pydantic_core.ValidationError] if the input data cannot be
validated to form a valid model.
self
is explicitly positional-only to allow self
as a field name.
def
get_client( self, console: Union[sqlmesh.core.console.Console, NoneType] = None) -> sqlmesh.schedulers.airflow.client.AirflowClient:
257 def get_client(self, console: t.Optional[Console] = None) -> AirflowClient: 258 return AirflowClient( 259 airflow_url=self.airflow_url, 260 session=self.session, 261 console=console, 262 snapshot_ids_batch_size=self.max_snapshot_ids_per_request, 263 )
Constructs the Airflow Client instance.
@model_validator(mode='before')
@model_validator_v1_args
def
check_supported_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
265 @model_validator(mode="before") 266 @model_validator_v1_args 267 def check_supported_fields(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 268 allowed_field_names = {field.alias or name for name, field in cls.all_field_infos().items()} 269 allowed_field_names.add("session") 270 271 for field_name in values: 272 if field_name not in allowed_field_names: 273 raise ValueError(f"Unsupported Field: {field_name}") 274 return values
Inherited Members
- sqlmesh.utils.pydantic.PydanticModel
- dict
- json
- copy
- parse_obj
- parse_raw
- missing_required_fields
- extra_fields
- all_fields
- all_field_infos
- required_fields
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
class
MWAASchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, sqlmesh.core.config.base.BaseConfig):
277class MWAASchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig): 278 """The AWS MWAA Scheduler configuration. 279 280 Args: 281 environment: The name of the MWAA environment. 282 dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds). 283 dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds). 284 dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for 285 whether a DAG has been created. 286 backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application. 287 ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc). 288 default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence 289 over the default catalog value set on the Airflow side. 290 """ 291 292 environment: str 293 dag_run_poll_interval_secs: int = 10 294 dag_creation_poll_interval_secs: int = 30 295 dag_creation_max_retry_attempts: int = 10 296 297 backfill_concurrent_tasks: int = 4 298 ddl_concurrent_tasks: int = 4 299 300 default_catalog_override: t.Optional[str] = None 301 302 type_: Literal["mwaa"] = Field(alias="type", default="mwaa") 303 304 _concurrent_tasks_validator = concurrent_tasks_validator 305 306 def get_client(self, console: t.Optional[Console] = None) -> MWAAClient: 307 return MWAAClient(self.environment, console=console) 308 309 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 310 return MWAAPlanEvaluator( 311 client=self.get_client(context.console), 312 state_sync=context.state_sync, 313 console=context.console, 314 dag_run_poll_interval_secs=self.dag_run_poll_interval_secs, 315 dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs, 316 dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts, 317 notification_targets=context.notification_targets, 318 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 319 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 320 users=context.users, 321 ) 322 323 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 324 # The default catalog must still be set on the Airflow side. 325 default_catalog = self.get_client(context.console).default_catalog 326 return self.default_catalog_override or default_catalog
The AWS MWAA Scheduler configuration.
Arguments:
- environment: The name of the MWAA environment.
- dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
- dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
- dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for whether a DAG has been created.
- backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
- ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
- default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence over the default catalog value set on the Airflow side.
def
get_client( self, console: Union[sqlmesh.core.console.Console, NoneType] = None) -> sqlmesh.schedulers.airflow.mwaa_client.MWAAClient:
def
create_plan_evaluator( self, context: <MagicMock id='139879822426752'>) -> sqlmesh.core.plan.evaluator.PlanEvaluator:
309 def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: 310 return MWAAPlanEvaluator( 311 client=self.get_client(context.console), 312 state_sync=context.state_sync, 313 console=context.console, 314 dag_run_poll_interval_secs=self.dag_run_poll_interval_secs, 315 dag_creation_poll_interval_secs=self.dag_creation_poll_interval_secs, 316 dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts, 317 notification_targets=context.notification_targets, 318 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 319 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 320 users=context.users, 321 )
Creates a Plan Evaluator instance.
Arguments:
- context: The SQLMesh Context.
def
get_default_catalog(self, context: <MagicMock id='139879822448624'>) -> Union[str, NoneType]:
323 def get_default_catalog(self, context: GenericContext) -> t.Optional[str]: 324 # The default catalog must still be set on the Airflow side. 325 default_catalog = self.get_client(context.console).default_catalog 326 return self.default_catalog_override or default_catalog
Returns the default catalog for the Scheduler.
Arguments:
- context: The SQLMesh Context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs