PlanEvaluator
A plan evaluator is responsible for evaluating a plan when it is being applied.
Evaluation steps
At a high level, when a plan is evaluated, SQLMesh will:
- Push new snapshots to the state sync.
- Create snapshot tables.
- Backfill data.
- Promote the snapshots.
Refer to sqlmesh.core.plan
.
1""" 2# PlanEvaluator 3 4A plan evaluator is responsible for evaluating a plan when it is being applied. 5 6# Evaluation steps 7 8At a high level, when a plan is evaluated, SQLMesh will: 9- Push new snapshots to the state sync. 10- Create snapshot tables. 11- Backfill data. 12- Promote the snapshots. 13 14Refer to `sqlmesh.core.plan`. 15""" 16 17import abc 18import logging 19import typing as t 20 21from sqlmesh.core import analytics 22from sqlmesh.core import constants as c 23from sqlmesh.core.console import Console, get_console 24from sqlmesh.core.notification_target import ( 25 NotificationTarget, 26 NotificationTargetManager, 27) 28from sqlmesh.core.plan.definition import Plan 29from sqlmesh.core.scheduler import Scheduler 30from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot, SnapshotEvaluator 31from sqlmesh.core.state_sync import StateSync 32from sqlmesh.core.state_sync.base import PromotionResult 33from sqlmesh.core.user import User 34from sqlmesh.schedulers.airflow import common as airflow_common 35from sqlmesh.schedulers.airflow.client import AirflowClient, BaseAirflowClient 36from sqlmesh.schedulers.airflow.mwaa_client import MWAAClient 37from sqlmesh.utils.errors import SQLMeshError 38 39logger = logging.getLogger(__name__) 40 41 42class PlanEvaluator(abc.ABC): 43 @abc.abstractmethod 44 def evaluate( 45 self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None 46 ) -> None: 47 """Evaluates a plan by pushing snapshots and backfilling data. 48 49 Given a plan, it pushes snapshots into the state and then kicks off 50 the backfill process for all affected snapshots. Once backfill is done, 51 snapshots that are part of the plan are promoted in the environment targeted 52 by this plan. 53 54 Args: 55 plan: The plan to evaluate. 56 """ 57 58 59class BuiltInPlanEvaluator(PlanEvaluator): 60 def __init__( 61 self, 62 state_sync: StateSync, 63 snapshot_evaluator: SnapshotEvaluator, 64 default_catalog: t.Optional[str], 65 backfill_concurrent_tasks: int = 1, 66 console: t.Optional[Console] = None, 67 notification_target_manager: t.Optional[NotificationTargetManager] = None, 68 ): 69 self.state_sync = state_sync 70 self.snapshot_evaluator = snapshot_evaluator 71 self.default_catalog = default_catalog 72 self.backfill_concurrent_tasks = backfill_concurrent_tasks 73 self.console = console or get_console() 74 self.notification_target_manager = notification_target_manager 75 76 def evaluate( 77 self, 78 plan: Plan, 79 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 80 ) -> None: 81 self.console.start_plan_evaluation(plan) 82 analytics.collector.on_plan_apply_start( 83 plan=plan, 84 engine_type=self.snapshot_evaluator.adapter.dialect, 85 state_sync_type=self.state_sync.state_type(), 86 scheduler_type=c.BUILTIN, 87 ) 88 89 try: 90 snapshots = plan.snapshots 91 all_names = { 92 s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name) 93 } 94 deployability_index_for_evaluation = DeployabilityIndex.create(snapshots) 95 deployability_index_for_creation = deployability_index_for_evaluation 96 if plan.is_dev: 97 before_promote_snapshots = all_names 98 after_promote_snapshots = set() 99 else: 100 before_promote_snapshots = { 101 s.name 102 for s in snapshots.values() 103 if deployability_index_for_evaluation.is_representative(s) 104 and plan.is_selected_for_backfill(s.name) 105 } 106 after_promote_snapshots = all_names - before_promote_snapshots 107 deployability_index_for_evaluation = DeployabilityIndex.all_deployable() 108 109 self._push(plan, deployability_index_for_creation) 110 update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync) 111 self._restate(plan) 112 self._backfill( 113 plan, 114 before_promote_snapshots, 115 deployability_index_for_evaluation, 116 circuit_breaker=circuit_breaker, 117 ) 118 promotion_result = self._promote(plan, before_promote_snapshots) 119 self._backfill( 120 plan, 121 after_promote_snapshots, 122 deployability_index_for_evaluation, 123 circuit_breaker=circuit_breaker, 124 ) 125 self._update_views(plan, promotion_result, deployability_index_for_evaluation) 126 127 if not plan.requires_backfill: 128 self.console.log_success("Virtual Update executed successfully") 129 except Exception as e: 130 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e) 131 raise 132 finally: 133 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id) 134 self.console.stop_plan_evaluation() 135 136 def _backfill( 137 self, 138 plan: Plan, 139 selected_snapshots: t.Set[str], 140 deployability_index: DeployabilityIndex, 141 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 142 ) -> None: 143 """Backfill missing intervals for snapshots that are part of the given plan. 144 145 Args: 146 plan: The plan to source snapshots from. 147 selected_snapshots: The snapshots to backfill. 148 """ 149 if not plan.requires_backfill or not selected_snapshots: 150 return 151 152 snapshots = plan.snapshots 153 scheduler = Scheduler( 154 snapshots.values(), 155 self.snapshot_evaluator, 156 self.state_sync, 157 default_catalog=self.default_catalog, 158 max_workers=self.backfill_concurrent_tasks, 159 console=self.console, 160 notification_target_manager=self.notification_target_manager, 161 ) 162 is_run_successful = scheduler.run( 163 plan.environment_naming_info, 164 plan.start, 165 plan.end, 166 execution_time=plan.execution_time, 167 restatements=plan.restatements, 168 selected_snapshots=selected_snapshots, 169 deployability_index=deployability_index, 170 circuit_breaker=circuit_breaker, 171 end_bounded=plan.end_bounded, 172 ) 173 if not is_run_successful: 174 raise SQLMeshError("Plan application failed.") 175 176 def _push(self, plan: Plan, deployability_index: t.Optional[DeployabilityIndex] = None) -> None: 177 """Push the snapshots to the state sync. 178 179 As a part of plan pushing, snapshot tables are created. 180 181 Args: 182 plan: The plan to source snapshots from. 183 deployability_index: Indicates which snapshots are deployable in the context of this creation. 184 """ 185 snapshots_to_create = [ 186 s 187 for s in plan.snapshots.values() 188 if s.is_model and not s.is_symbolic and plan.is_selected_for_backfill(s.name) 189 ] 190 snapshots_to_create_count = len(snapshots_to_create) 191 192 if snapshots_to_create_count > 0: 193 self.console.start_creation_progress( 194 snapshots_to_create_count, plan.environment_naming_info, self.default_catalog 195 ) 196 197 completed = False 198 try: 199 self.snapshot_evaluator.create( 200 snapshots_to_create, 201 plan.snapshots, 202 deployability_index=deployability_index, 203 on_complete=self.console.update_creation_progress, 204 ) 205 completed = True 206 finally: 207 self.console.stop_creation_progress(success=completed) 208 209 self.state_sync.push_snapshots(plan.new_snapshots) 210 211 analytics.collector.on_snapshots_created( 212 new_snapshots=plan.new_snapshots, plan_id=plan.plan_id 213 ) 214 215 def _promote( 216 self, plan: Plan, no_gaps_snapshot_names: t.Optional[t.Set[str]] = None 217 ) -> PromotionResult: 218 """Promote a plan. 219 220 Args: 221 plan: The plan to promote. 222 no_gaps_snapshot_names: The names of snapshots to check for gaps if the no gaps check is enabled in the plan. 223 If not provided, all snapshots are checked. 224 """ 225 promotion_result = self.state_sync.promote( 226 plan.environment, 227 no_gaps_snapshot_names=no_gaps_snapshot_names if plan.no_gaps else set(), 228 ) 229 230 if not plan.is_dev: 231 self.snapshot_evaluator.migrate( 232 [s for s in plan.snapshots.values() if s.is_paused], 233 plan.snapshots, 234 ) 235 if not plan.ensure_finalized_snapshots: 236 # Only unpause at this point if we don't have to use the finalized snapshots 237 # for subsequent plan applications. Otherwise, unpause right before finalizing 238 # the environment. 239 self.state_sync.unpause_snapshots(promotion_result.added, plan.end) 240 241 return promotion_result 242 243 def _update_views( 244 self, 245 plan: Plan, 246 promotion_result: PromotionResult, 247 deployability_index: t.Optional[DeployabilityIndex] = None, 248 ) -> None: 249 """Update environment views. 250 251 Args: 252 plan: The plan to promote. 253 promotion_result: The result of the promotion. 254 deployability_index: Indicates which snapshots are deployable in the context of this promotion. 255 """ 256 if not plan.is_dev and plan.ensure_finalized_snapshots: 257 # Unpause right before finalizing the environment in case when 258 # we need to use the finalized snapshots for subsequent plan applications. 259 # Otherwise, unpause right after updatig the environment record. 260 self.state_sync.unpause_snapshots(promotion_result.added, plan.end) 261 262 environment = plan.environment 263 264 self.console.start_promotion_progress( 265 len(promotion_result.added) + len(promotion_result.removed), 266 environment.naming_info, 267 self.default_catalog, 268 ) 269 270 completed = False 271 try: 272 self.snapshot_evaluator.promote( 273 [plan.context_diff.snapshots[s.snapshot_id] for s in promotion_result.added], 274 environment.naming_info, 275 deployability_index=deployability_index, 276 on_complete=lambda s: self.console.update_promotion_progress(s, True), 277 ) 278 if promotion_result.removed_environment_naming_info: 279 self.snapshot_evaluator.demote( 280 promotion_result.removed, 281 promotion_result.removed_environment_naming_info, 282 on_complete=lambda s: self.console.update_promotion_progress(s, False), 283 ) 284 self.state_sync.finalize(environment) 285 completed = True 286 finally: 287 self.console.stop_promotion_progress(success=completed) 288 289 def _restate(self, plan: Plan) -> None: 290 if not plan.restatements: 291 return 292 293 self.state_sync.remove_interval( 294 [ 295 (plan.context_diff.snapshots[s_id], interval) 296 for s_id, interval in plan.restatements.items() 297 ], 298 remove_shared_versions=not plan.is_dev, 299 ) 300 301 302class BaseAirflowPlanEvaluator(PlanEvaluator): 303 def __init__( 304 self, 305 console: t.Optional[Console], 306 blocking: bool, 307 dag_run_poll_interval_secs: int, 308 dag_creation_poll_interval_secs: int, 309 dag_creation_max_retry_attempts: int, 310 ): 311 self.blocking = blocking 312 self.dag_run_poll_interval_secs = dag_run_poll_interval_secs 313 self.dag_creation_poll_interval_secs = dag_creation_poll_interval_secs 314 self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts 315 self.console = console or get_console() 316 317 def evaluate( 318 self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None 319 ) -> None: 320 plan_request_id = plan.plan_id 321 self._apply_plan(plan, plan_request_id) 322 323 analytics.collector.on_plan_apply_start( 324 plan=plan, 325 engine_type=None, 326 state_sync_type=None, 327 scheduler_type=c.AIRFLOW, 328 ) 329 330 if self.blocking: 331 plan_application_dag_id = airflow_common.plan_application_dag_id( 332 plan.environment_naming_info.name, plan_request_id 333 ) 334 335 self.console.log_status_update( 336 f"Waiting for the plan application DAG '{plan_application_dag_id}' to be provisioned on Airflow" 337 ) 338 339 plan_application_dag_run_id = self.client.wait_for_first_dag_run( 340 plan_application_dag_id, 341 self.dag_creation_poll_interval_secs, 342 self.dag_creation_max_retry_attempts, 343 ) 344 345 self.client.print_tracking_url( 346 plan_application_dag_id, 347 plan_application_dag_run_id, 348 "plan application", 349 ) 350 plan_application_succeeded = self.client.wait_for_dag_run_completion( 351 plan_application_dag_id, 352 plan_application_dag_run_id, 353 self.dag_run_poll_interval_secs, 354 ) 355 if not plan_application_succeeded: 356 raise SQLMeshError("Plan application failed.") 357 358 self.console.log_success("The plan has been applied successfully") 359 360 @property 361 def client(self) -> BaseAirflowClient: 362 raise NotImplementedError 363 364 def _apply_plan(self, plan: Plan, plan_request_id: str) -> None: 365 raise NotImplementedError 366 367 368class StateBasedAirflowPlanEvaluator(BaseAirflowPlanEvaluator): 369 backfill_concurrent_tasks: int 370 ddl_concurrent_tasks: int 371 notification_targets: t.Optional[t.List[NotificationTarget]] 372 users: t.Optional[t.List[User]] 373 374 def _apply_plan(self, plan: Plan, plan_request_id: str) -> None: 375 from sqlmesh.schedulers.airflow.plan import PlanDagState, create_plan_dag_spec 376 377 plan_application_request = airflow_common.PlanApplicationRequest( 378 new_snapshots=plan.new_snapshots, 379 environment=plan.environment, 380 no_gaps=plan.no_gaps, 381 skip_backfill=plan.skip_backfill, 382 request_id=plan_request_id, 383 restatements=plan.restatements or {}, 384 notification_targets=self.notification_targets or [], 385 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 386 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 387 users=self.users or [], 388 is_dev=plan.is_dev, 389 forward_only=plan.forward_only, 390 models_to_backfill=plan.models_to_backfill, 391 end_bounded=plan.end_bounded, 392 ensure_finalized_snapshots=plan.ensure_finalized_snapshots, 393 directly_modified_snapshots=list(plan.directly_modified), 394 indirectly_modified_snapshots={ 395 change_source.name: list(snapshots) 396 for change_source, snapshots in plan.indirectly_modified.items() 397 }, 398 removed_snapshots=list(plan.context_diff.removed_snapshots), 399 execution_time=plan.execution_time, 400 ) 401 plan_dag_spec = create_plan_dag_spec(plan_application_request, self.state_sync) 402 PlanDagState.from_state_sync(self.state_sync).add_dag_spec(plan_dag_spec) 403 404 @property 405 def state_sync(self) -> StateSync: 406 raise NotImplementedError 407 408 409class AirflowPlanEvaluator(StateBasedAirflowPlanEvaluator): 410 def __init__( 411 self, 412 airflow_client: AirflowClient, 413 console: t.Optional[Console] = None, 414 blocking: bool = True, 415 dag_run_poll_interval_secs: int = 10, 416 dag_creation_poll_interval_secs: int = 30, 417 dag_creation_max_retry_attempts: int = 10, 418 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 419 backfill_concurrent_tasks: int = 1, 420 ddl_concurrent_tasks: int = 1, 421 users: t.Optional[t.List[User]] = None, 422 state_sync: t.Optional[StateSync] = None, 423 ): 424 super().__init__( 425 console, 426 blocking, 427 dag_run_poll_interval_secs, 428 dag_creation_poll_interval_secs, 429 dag_creation_max_retry_attempts, 430 ) 431 self._airflow_client = airflow_client 432 self.notification_targets = notification_targets or [] 433 self.backfill_concurrent_tasks = backfill_concurrent_tasks 434 self.ddl_concurrent_tasks = ddl_concurrent_tasks 435 self.users = users or [] 436 437 self._state_sync = state_sync 438 439 @property 440 def client(self) -> BaseAirflowClient: 441 return self._airflow_client 442 443 @property 444 def state_sync(self) -> StateSync: 445 if self._state_sync is None: 446 raise SQLMeshError("State Sync is not configured") 447 return self._state_sync 448 449 def _apply_plan(self, plan: Plan, plan_request_id: str) -> None: 450 if self._state_sync is not None: 451 super()._apply_plan(plan, plan_request_id) 452 return 453 454 self._airflow_client.apply_plan( 455 plan.new_snapshots, 456 plan.environment, 457 plan_request_id, 458 no_gaps=plan.no_gaps, 459 skip_backfill=plan.skip_backfill, 460 restatements=plan.restatements, 461 notification_targets=self.notification_targets, 462 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 463 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 464 users=self.users, 465 is_dev=plan.is_dev, 466 forward_only=plan.forward_only, 467 models_to_backfill=plan.models_to_backfill, 468 end_bounded=plan.end_bounded, 469 ensure_finalized_snapshots=plan.ensure_finalized_snapshots, 470 directly_modified_snapshots=list(plan.directly_modified), 471 indirectly_modified_snapshots={ 472 change_source.name: list(snapshots) 473 for change_source, snapshots in plan.indirectly_modified.items() 474 }, 475 removed_snapshots=list(plan.context_diff.removed_snapshots), 476 execution_time=plan.execution_time, 477 ) 478 479 480class MWAAPlanEvaluator(StateBasedAirflowPlanEvaluator): 481 def __init__( 482 self, 483 client: MWAAClient, 484 state_sync: StateSync, 485 console: t.Optional[Console] = None, 486 blocking: bool = True, 487 dag_run_poll_interval_secs: int = 10, 488 dag_creation_poll_interval_secs: int = 30, 489 dag_creation_max_retry_attempts: int = 10, 490 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 491 backfill_concurrent_tasks: int = 1, 492 ddl_concurrent_tasks: int = 1, 493 users: t.Optional[t.List[User]] = None, 494 ): 495 super().__init__( 496 console, 497 blocking, 498 dag_run_poll_interval_secs, 499 dag_creation_poll_interval_secs, 500 dag_creation_max_retry_attempts, 501 ) 502 self._mwaa_client = client 503 self._state_sync = state_sync 504 self.notification_targets = notification_targets or [] 505 self.backfill_concurrent_tasks = backfill_concurrent_tasks 506 self.ddl_concurrent_tasks = ddl_concurrent_tasks 507 self.users = users or [] 508 509 @property 510 def client(self) -> BaseAirflowClient: 511 return self._mwaa_client 512 513 @property 514 def state_sync(self) -> StateSync: 515 return self._state_sync 516 517 518def update_intervals_for_new_snapshots( 519 snapshots: t.Collection[Snapshot], state_sync: StateSync 520) -> None: 521 for snapshot in state_sync.refresh_snapshot_intervals(snapshots): 522 if snapshot.is_forward_only: 523 snapshot.dev_intervals = snapshot.intervals.copy() 524 for start, end in snapshot.dev_intervals: 525 state_sync.add_interval(snapshot, start, end, is_dev=True)
43class PlanEvaluator(abc.ABC): 44 @abc.abstractmethod 45 def evaluate( 46 self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None 47 ) -> None: 48 """Evaluates a plan by pushing snapshots and backfilling data. 49 50 Given a plan, it pushes snapshots into the state and then kicks off 51 the backfill process for all affected snapshots. Once backfill is done, 52 snapshots that are part of the plan are promoted in the environment targeted 53 by this plan. 54 55 Args: 56 plan: The plan to evaluate. 57 """
Helper class that provides a standard way to create an ABC using inheritance.
44 @abc.abstractmethod 45 def evaluate( 46 self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None 47 ) -> None: 48 """Evaluates a plan by pushing snapshots and backfilling data. 49 50 Given a plan, it pushes snapshots into the state and then kicks off 51 the backfill process for all affected snapshots. Once backfill is done, 52 snapshots that are part of the plan are promoted in the environment targeted 53 by this plan. 54 55 Args: 56 plan: The plan to evaluate. 57 """
Evaluates a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.
Arguments:
- plan: The plan to evaluate.
60class BuiltInPlanEvaluator(PlanEvaluator): 61 def __init__( 62 self, 63 state_sync: StateSync, 64 snapshot_evaluator: SnapshotEvaluator, 65 default_catalog: t.Optional[str], 66 backfill_concurrent_tasks: int = 1, 67 console: t.Optional[Console] = None, 68 notification_target_manager: t.Optional[NotificationTargetManager] = None, 69 ): 70 self.state_sync = state_sync 71 self.snapshot_evaluator = snapshot_evaluator 72 self.default_catalog = default_catalog 73 self.backfill_concurrent_tasks = backfill_concurrent_tasks 74 self.console = console or get_console() 75 self.notification_target_manager = notification_target_manager 76 77 def evaluate( 78 self, 79 plan: Plan, 80 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 81 ) -> None: 82 self.console.start_plan_evaluation(plan) 83 analytics.collector.on_plan_apply_start( 84 plan=plan, 85 engine_type=self.snapshot_evaluator.adapter.dialect, 86 state_sync_type=self.state_sync.state_type(), 87 scheduler_type=c.BUILTIN, 88 ) 89 90 try: 91 snapshots = plan.snapshots 92 all_names = { 93 s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name) 94 } 95 deployability_index_for_evaluation = DeployabilityIndex.create(snapshots) 96 deployability_index_for_creation = deployability_index_for_evaluation 97 if plan.is_dev: 98 before_promote_snapshots = all_names 99 after_promote_snapshots = set() 100 else: 101 before_promote_snapshots = { 102 s.name 103 for s in snapshots.values() 104 if deployability_index_for_evaluation.is_representative(s) 105 and plan.is_selected_for_backfill(s.name) 106 } 107 after_promote_snapshots = all_names - before_promote_snapshots 108 deployability_index_for_evaluation = DeployabilityIndex.all_deployable() 109 110 self._push(plan, deployability_index_for_creation) 111 update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync) 112 self._restate(plan) 113 self._backfill( 114 plan, 115 before_promote_snapshots, 116 deployability_index_for_evaluation, 117 circuit_breaker=circuit_breaker, 118 ) 119 promotion_result = self._promote(plan, before_promote_snapshots) 120 self._backfill( 121 plan, 122 after_promote_snapshots, 123 deployability_index_for_evaluation, 124 circuit_breaker=circuit_breaker, 125 ) 126 self._update_views(plan, promotion_result, deployability_index_for_evaluation) 127 128 if not plan.requires_backfill: 129 self.console.log_success("Virtual Update executed successfully") 130 except Exception as e: 131 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e) 132 raise 133 finally: 134 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id) 135 self.console.stop_plan_evaluation() 136 137 def _backfill( 138 self, 139 plan: Plan, 140 selected_snapshots: t.Set[str], 141 deployability_index: DeployabilityIndex, 142 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 143 ) -> None: 144 """Backfill missing intervals for snapshots that are part of the given plan. 145 146 Args: 147 plan: The plan to source snapshots from. 148 selected_snapshots: The snapshots to backfill. 149 """ 150 if not plan.requires_backfill or not selected_snapshots: 151 return 152 153 snapshots = plan.snapshots 154 scheduler = Scheduler( 155 snapshots.values(), 156 self.snapshot_evaluator, 157 self.state_sync, 158 default_catalog=self.default_catalog, 159 max_workers=self.backfill_concurrent_tasks, 160 console=self.console, 161 notification_target_manager=self.notification_target_manager, 162 ) 163 is_run_successful = scheduler.run( 164 plan.environment_naming_info, 165 plan.start, 166 plan.end, 167 execution_time=plan.execution_time, 168 restatements=plan.restatements, 169 selected_snapshots=selected_snapshots, 170 deployability_index=deployability_index, 171 circuit_breaker=circuit_breaker, 172 end_bounded=plan.end_bounded, 173 ) 174 if not is_run_successful: 175 raise SQLMeshError("Plan application failed.") 176 177 def _push(self, plan: Plan, deployability_index: t.Optional[DeployabilityIndex] = None) -> None: 178 """Push the snapshots to the state sync. 179 180 As a part of plan pushing, snapshot tables are created. 181 182 Args: 183 plan: The plan to source snapshots from. 184 deployability_index: Indicates which snapshots are deployable in the context of this creation. 185 """ 186 snapshots_to_create = [ 187 s 188 for s in plan.snapshots.values() 189 if s.is_model and not s.is_symbolic and plan.is_selected_for_backfill(s.name) 190 ] 191 snapshots_to_create_count = len(snapshots_to_create) 192 193 if snapshots_to_create_count > 0: 194 self.console.start_creation_progress( 195 snapshots_to_create_count, plan.environment_naming_info, self.default_catalog 196 ) 197 198 completed = False 199 try: 200 self.snapshot_evaluator.create( 201 snapshots_to_create, 202 plan.snapshots, 203 deployability_index=deployability_index, 204 on_complete=self.console.update_creation_progress, 205 ) 206 completed = True 207 finally: 208 self.console.stop_creation_progress(success=completed) 209 210 self.state_sync.push_snapshots(plan.new_snapshots) 211 212 analytics.collector.on_snapshots_created( 213 new_snapshots=plan.new_snapshots, plan_id=plan.plan_id 214 ) 215 216 def _promote( 217 self, plan: Plan, no_gaps_snapshot_names: t.Optional[t.Set[str]] = None 218 ) -> PromotionResult: 219 """Promote a plan. 220 221 Args: 222 plan: The plan to promote. 223 no_gaps_snapshot_names: The names of snapshots to check for gaps if the no gaps check is enabled in the plan. 224 If not provided, all snapshots are checked. 225 """ 226 promotion_result = self.state_sync.promote( 227 plan.environment, 228 no_gaps_snapshot_names=no_gaps_snapshot_names if plan.no_gaps else set(), 229 ) 230 231 if not plan.is_dev: 232 self.snapshot_evaluator.migrate( 233 [s for s in plan.snapshots.values() if s.is_paused], 234 plan.snapshots, 235 ) 236 if not plan.ensure_finalized_snapshots: 237 # Only unpause at this point if we don't have to use the finalized snapshots 238 # for subsequent plan applications. Otherwise, unpause right before finalizing 239 # the environment. 240 self.state_sync.unpause_snapshots(promotion_result.added, plan.end) 241 242 return promotion_result 243 244 def _update_views( 245 self, 246 plan: Plan, 247 promotion_result: PromotionResult, 248 deployability_index: t.Optional[DeployabilityIndex] = None, 249 ) -> None: 250 """Update environment views. 251 252 Args: 253 plan: The plan to promote. 254 promotion_result: The result of the promotion. 255 deployability_index: Indicates which snapshots are deployable in the context of this promotion. 256 """ 257 if not plan.is_dev and plan.ensure_finalized_snapshots: 258 # Unpause right before finalizing the environment in case when 259 # we need to use the finalized snapshots for subsequent plan applications. 260 # Otherwise, unpause right after updatig the environment record. 261 self.state_sync.unpause_snapshots(promotion_result.added, plan.end) 262 263 environment = plan.environment 264 265 self.console.start_promotion_progress( 266 len(promotion_result.added) + len(promotion_result.removed), 267 environment.naming_info, 268 self.default_catalog, 269 ) 270 271 completed = False 272 try: 273 self.snapshot_evaluator.promote( 274 [plan.context_diff.snapshots[s.snapshot_id] for s in promotion_result.added], 275 environment.naming_info, 276 deployability_index=deployability_index, 277 on_complete=lambda s: self.console.update_promotion_progress(s, True), 278 ) 279 if promotion_result.removed_environment_naming_info: 280 self.snapshot_evaluator.demote( 281 promotion_result.removed, 282 promotion_result.removed_environment_naming_info, 283 on_complete=lambda s: self.console.update_promotion_progress(s, False), 284 ) 285 self.state_sync.finalize(environment) 286 completed = True 287 finally: 288 self.console.stop_promotion_progress(success=completed) 289 290 def _restate(self, plan: Plan) -> None: 291 if not plan.restatements: 292 return 293 294 self.state_sync.remove_interval( 295 [ 296 (plan.context_diff.snapshots[s_id], interval) 297 for s_id, interval in plan.restatements.items() 298 ], 299 remove_shared_versions=not plan.is_dev, 300 )
Helper class that provides a standard way to create an ABC using inheritance.
61 def __init__( 62 self, 63 state_sync: StateSync, 64 snapshot_evaluator: SnapshotEvaluator, 65 default_catalog: t.Optional[str], 66 backfill_concurrent_tasks: int = 1, 67 console: t.Optional[Console] = None, 68 notification_target_manager: t.Optional[NotificationTargetManager] = None, 69 ): 70 self.state_sync = state_sync 71 self.snapshot_evaluator = snapshot_evaluator 72 self.default_catalog = default_catalog 73 self.backfill_concurrent_tasks = backfill_concurrent_tasks 74 self.console = console or get_console() 75 self.notification_target_manager = notification_target_manager
77 def evaluate( 78 self, 79 plan: Plan, 80 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 81 ) -> None: 82 self.console.start_plan_evaluation(plan) 83 analytics.collector.on_plan_apply_start( 84 plan=plan, 85 engine_type=self.snapshot_evaluator.adapter.dialect, 86 state_sync_type=self.state_sync.state_type(), 87 scheduler_type=c.BUILTIN, 88 ) 89 90 try: 91 snapshots = plan.snapshots 92 all_names = { 93 s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name) 94 } 95 deployability_index_for_evaluation = DeployabilityIndex.create(snapshots) 96 deployability_index_for_creation = deployability_index_for_evaluation 97 if plan.is_dev: 98 before_promote_snapshots = all_names 99 after_promote_snapshots = set() 100 else: 101 before_promote_snapshots = { 102 s.name 103 for s in snapshots.values() 104 if deployability_index_for_evaluation.is_representative(s) 105 and plan.is_selected_for_backfill(s.name) 106 } 107 after_promote_snapshots = all_names - before_promote_snapshots 108 deployability_index_for_evaluation = DeployabilityIndex.all_deployable() 109 110 self._push(plan, deployability_index_for_creation) 111 update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync) 112 self._restate(plan) 113 self._backfill( 114 plan, 115 before_promote_snapshots, 116 deployability_index_for_evaluation, 117 circuit_breaker=circuit_breaker, 118 ) 119 promotion_result = self._promote(plan, before_promote_snapshots) 120 self._backfill( 121 plan, 122 after_promote_snapshots, 123 deployability_index_for_evaluation, 124 circuit_breaker=circuit_breaker, 125 ) 126 self._update_views(plan, promotion_result, deployability_index_for_evaluation) 127 128 if not plan.requires_backfill: 129 self.console.log_success("Virtual Update executed successfully") 130 except Exception as e: 131 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e) 132 raise 133 finally: 134 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id) 135 self.console.stop_plan_evaluation()
Evaluates a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.
Arguments:
- plan: The plan to evaluate.
303class BaseAirflowPlanEvaluator(PlanEvaluator): 304 def __init__( 305 self, 306 console: t.Optional[Console], 307 blocking: bool, 308 dag_run_poll_interval_secs: int, 309 dag_creation_poll_interval_secs: int, 310 dag_creation_max_retry_attempts: int, 311 ): 312 self.blocking = blocking 313 self.dag_run_poll_interval_secs = dag_run_poll_interval_secs 314 self.dag_creation_poll_interval_secs = dag_creation_poll_interval_secs 315 self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts 316 self.console = console or get_console() 317 318 def evaluate( 319 self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None 320 ) -> None: 321 plan_request_id = plan.plan_id 322 self._apply_plan(plan, plan_request_id) 323 324 analytics.collector.on_plan_apply_start( 325 plan=plan, 326 engine_type=None, 327 state_sync_type=None, 328 scheduler_type=c.AIRFLOW, 329 ) 330 331 if self.blocking: 332 plan_application_dag_id = airflow_common.plan_application_dag_id( 333 plan.environment_naming_info.name, plan_request_id 334 ) 335 336 self.console.log_status_update( 337 f"Waiting for the plan application DAG '{plan_application_dag_id}' to be provisioned on Airflow" 338 ) 339 340 plan_application_dag_run_id = self.client.wait_for_first_dag_run( 341 plan_application_dag_id, 342 self.dag_creation_poll_interval_secs, 343 self.dag_creation_max_retry_attempts, 344 ) 345 346 self.client.print_tracking_url( 347 plan_application_dag_id, 348 plan_application_dag_run_id, 349 "plan application", 350 ) 351 plan_application_succeeded = self.client.wait_for_dag_run_completion( 352 plan_application_dag_id, 353 plan_application_dag_run_id, 354 self.dag_run_poll_interval_secs, 355 ) 356 if not plan_application_succeeded: 357 raise SQLMeshError("Plan application failed.") 358 359 self.console.log_success("The plan has been applied successfully") 360 361 @property 362 def client(self) -> BaseAirflowClient: 363 raise NotImplementedError 364 365 def _apply_plan(self, plan: Plan, plan_request_id: str) -> None: 366 raise NotImplementedError
Helper class that provides a standard way to create an ABC using inheritance.
304 def __init__( 305 self, 306 console: t.Optional[Console], 307 blocking: bool, 308 dag_run_poll_interval_secs: int, 309 dag_creation_poll_interval_secs: int, 310 dag_creation_max_retry_attempts: int, 311 ): 312 self.blocking = blocking 313 self.dag_run_poll_interval_secs = dag_run_poll_interval_secs 314 self.dag_creation_poll_interval_secs = dag_creation_poll_interval_secs 315 self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts 316 self.console = console or get_console()
318 def evaluate( 319 self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None 320 ) -> None: 321 plan_request_id = plan.plan_id 322 self._apply_plan(plan, plan_request_id) 323 324 analytics.collector.on_plan_apply_start( 325 plan=plan, 326 engine_type=None, 327 state_sync_type=None, 328 scheduler_type=c.AIRFLOW, 329 ) 330 331 if self.blocking: 332 plan_application_dag_id = airflow_common.plan_application_dag_id( 333 plan.environment_naming_info.name, plan_request_id 334 ) 335 336 self.console.log_status_update( 337 f"Waiting for the plan application DAG '{plan_application_dag_id}' to be provisioned on Airflow" 338 ) 339 340 plan_application_dag_run_id = self.client.wait_for_first_dag_run( 341 plan_application_dag_id, 342 self.dag_creation_poll_interval_secs, 343 self.dag_creation_max_retry_attempts, 344 ) 345 346 self.client.print_tracking_url( 347 plan_application_dag_id, 348 plan_application_dag_run_id, 349 "plan application", 350 ) 351 plan_application_succeeded = self.client.wait_for_dag_run_completion( 352 plan_application_dag_id, 353 plan_application_dag_run_id, 354 self.dag_run_poll_interval_secs, 355 ) 356 if not plan_application_succeeded: 357 raise SQLMeshError("Plan application failed.") 358 359 self.console.log_success("The plan has been applied successfully")
Evaluates a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.
Arguments:
- plan: The plan to evaluate.
369class StateBasedAirflowPlanEvaluator(BaseAirflowPlanEvaluator): 370 backfill_concurrent_tasks: int 371 ddl_concurrent_tasks: int 372 notification_targets: t.Optional[t.List[NotificationTarget]] 373 users: t.Optional[t.List[User]] 374 375 def _apply_plan(self, plan: Plan, plan_request_id: str) -> None: 376 from sqlmesh.schedulers.airflow.plan import PlanDagState, create_plan_dag_spec 377 378 plan_application_request = airflow_common.PlanApplicationRequest( 379 new_snapshots=plan.new_snapshots, 380 environment=plan.environment, 381 no_gaps=plan.no_gaps, 382 skip_backfill=plan.skip_backfill, 383 request_id=plan_request_id, 384 restatements=plan.restatements or {}, 385 notification_targets=self.notification_targets or [], 386 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 387 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 388 users=self.users or [], 389 is_dev=plan.is_dev, 390 forward_only=plan.forward_only, 391 models_to_backfill=plan.models_to_backfill, 392 end_bounded=plan.end_bounded, 393 ensure_finalized_snapshots=plan.ensure_finalized_snapshots, 394 directly_modified_snapshots=list(plan.directly_modified), 395 indirectly_modified_snapshots={ 396 change_source.name: list(snapshots) 397 for change_source, snapshots in plan.indirectly_modified.items() 398 }, 399 removed_snapshots=list(plan.context_diff.removed_snapshots), 400 execution_time=plan.execution_time, 401 ) 402 plan_dag_spec = create_plan_dag_spec(plan_application_request, self.state_sync) 403 PlanDagState.from_state_sync(self.state_sync).add_dag_spec(plan_dag_spec) 404 405 @property 406 def state_sync(self) -> StateSync: 407 raise NotImplementedError
Helper class that provides a standard way to create an ABC using inheritance.
Inherited Members
410class AirflowPlanEvaluator(StateBasedAirflowPlanEvaluator): 411 def __init__( 412 self, 413 airflow_client: AirflowClient, 414 console: t.Optional[Console] = None, 415 blocking: bool = True, 416 dag_run_poll_interval_secs: int = 10, 417 dag_creation_poll_interval_secs: int = 30, 418 dag_creation_max_retry_attempts: int = 10, 419 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 420 backfill_concurrent_tasks: int = 1, 421 ddl_concurrent_tasks: int = 1, 422 users: t.Optional[t.List[User]] = None, 423 state_sync: t.Optional[StateSync] = None, 424 ): 425 super().__init__( 426 console, 427 blocking, 428 dag_run_poll_interval_secs, 429 dag_creation_poll_interval_secs, 430 dag_creation_max_retry_attempts, 431 ) 432 self._airflow_client = airflow_client 433 self.notification_targets = notification_targets or [] 434 self.backfill_concurrent_tasks = backfill_concurrent_tasks 435 self.ddl_concurrent_tasks = ddl_concurrent_tasks 436 self.users = users or [] 437 438 self._state_sync = state_sync 439 440 @property 441 def client(self) -> BaseAirflowClient: 442 return self._airflow_client 443 444 @property 445 def state_sync(self) -> StateSync: 446 if self._state_sync is None: 447 raise SQLMeshError("State Sync is not configured") 448 return self._state_sync 449 450 def _apply_plan(self, plan: Plan, plan_request_id: str) -> None: 451 if self._state_sync is not None: 452 super()._apply_plan(plan, plan_request_id) 453 return 454 455 self._airflow_client.apply_plan( 456 plan.new_snapshots, 457 plan.environment, 458 plan_request_id, 459 no_gaps=plan.no_gaps, 460 skip_backfill=plan.skip_backfill, 461 restatements=plan.restatements, 462 notification_targets=self.notification_targets, 463 backfill_concurrent_tasks=self.backfill_concurrent_tasks, 464 ddl_concurrent_tasks=self.ddl_concurrent_tasks, 465 users=self.users, 466 is_dev=plan.is_dev, 467 forward_only=plan.forward_only, 468 models_to_backfill=plan.models_to_backfill, 469 end_bounded=plan.end_bounded, 470 ensure_finalized_snapshots=plan.ensure_finalized_snapshots, 471 directly_modified_snapshots=list(plan.directly_modified), 472 indirectly_modified_snapshots={ 473 change_source.name: list(snapshots) 474 for change_source, snapshots in plan.indirectly_modified.items() 475 }, 476 removed_snapshots=list(plan.context_diff.removed_snapshots), 477 execution_time=plan.execution_time, 478 )
Helper class that provides a standard way to create an ABC using inheritance.
411 def __init__( 412 self, 413 airflow_client: AirflowClient, 414 console: t.Optional[Console] = None, 415 blocking: bool = True, 416 dag_run_poll_interval_secs: int = 10, 417 dag_creation_poll_interval_secs: int = 30, 418 dag_creation_max_retry_attempts: int = 10, 419 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 420 backfill_concurrent_tasks: int = 1, 421 ddl_concurrent_tasks: int = 1, 422 users: t.Optional[t.List[User]] = None, 423 state_sync: t.Optional[StateSync] = None, 424 ): 425 super().__init__( 426 console, 427 blocking, 428 dag_run_poll_interval_secs, 429 dag_creation_poll_interval_secs, 430 dag_creation_max_retry_attempts, 431 ) 432 self._airflow_client = airflow_client 433 self.notification_targets = notification_targets or [] 434 self.backfill_concurrent_tasks = backfill_concurrent_tasks 435 self.ddl_concurrent_tasks = ddl_concurrent_tasks 436 self.users = users or [] 437 438 self._state_sync = state_sync
Inherited Members
481class MWAAPlanEvaluator(StateBasedAirflowPlanEvaluator): 482 def __init__( 483 self, 484 client: MWAAClient, 485 state_sync: StateSync, 486 console: t.Optional[Console] = None, 487 blocking: bool = True, 488 dag_run_poll_interval_secs: int = 10, 489 dag_creation_poll_interval_secs: int = 30, 490 dag_creation_max_retry_attempts: int = 10, 491 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 492 backfill_concurrent_tasks: int = 1, 493 ddl_concurrent_tasks: int = 1, 494 users: t.Optional[t.List[User]] = None, 495 ): 496 super().__init__( 497 console, 498 blocking, 499 dag_run_poll_interval_secs, 500 dag_creation_poll_interval_secs, 501 dag_creation_max_retry_attempts, 502 ) 503 self._mwaa_client = client 504 self._state_sync = state_sync 505 self.notification_targets = notification_targets or [] 506 self.backfill_concurrent_tasks = backfill_concurrent_tasks 507 self.ddl_concurrent_tasks = ddl_concurrent_tasks 508 self.users = users or [] 509 510 @property 511 def client(self) -> BaseAirflowClient: 512 return self._mwaa_client 513 514 @property 515 def state_sync(self) -> StateSync: 516 return self._state_sync
Helper class that provides a standard way to create an ABC using inheritance.
482 def __init__( 483 self, 484 client: MWAAClient, 485 state_sync: StateSync, 486 console: t.Optional[Console] = None, 487 blocking: bool = True, 488 dag_run_poll_interval_secs: int = 10, 489 dag_creation_poll_interval_secs: int = 30, 490 dag_creation_max_retry_attempts: int = 10, 491 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 492 backfill_concurrent_tasks: int = 1, 493 ddl_concurrent_tasks: int = 1, 494 users: t.Optional[t.List[User]] = None, 495 ): 496 super().__init__( 497 console, 498 blocking, 499 dag_run_poll_interval_secs, 500 dag_creation_poll_interval_secs, 501 dag_creation_max_retry_attempts, 502 ) 503 self._mwaa_client = client 504 self._state_sync = state_sync 505 self.notification_targets = notification_targets or [] 506 self.backfill_concurrent_tasks = backfill_concurrent_tasks 507 self.ddl_concurrent_tasks = ddl_concurrent_tasks 508 self.users = users or []
Inherited Members
519def update_intervals_for_new_snapshots( 520 snapshots: t.Collection[Snapshot], state_sync: StateSync 521) -> None: 522 for snapshot in state_sync.refresh_snapshot_intervals(snapshots): 523 if snapshot.is_forward_only: 524 snapshot.dev_intervals = snapshot.intervals.copy() 525 for start, end in snapshot.dev_intervals: 526 state_sync.add_interval(snapshot, start, end, is_dev=True)