PlanEvaluator
A plan evaluator is responsible for evaluating a plan when it is being applied.
Evaluation steps
At a high level, when a plan is evaluated, SQLMesh will:
- Push new snapshots to the state sync.
- Create snapshot tables.
- Backfill data.
- Promote the snapshots.
Refer to sqlmesh.core.plan.
1""" 2# PlanEvaluator 3 4A plan evaluator is responsible for evaluating a plan when it is being applied. 5 6# Evaluation steps 7 8At a high level, when a plan is evaluated, SQLMesh will: 9- Push new snapshots to the state sync. 10- Create snapshot tables. 11- Backfill data. 12- Promote the snapshots. 13 14Refer to `sqlmesh.core.plan`. 15""" 16 17import abc 18import logging 19import typing as t 20from sqlmesh.core import analytics 21from sqlmesh.core import constants as c 22from sqlmesh.core.console import Console, get_console 23from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements 24from sqlmesh.core.macros import RuntimeStage 25from sqlmesh.core.snapshot.definition import to_view_mapping, SnapshotTableInfo 26from sqlmesh.core.plan import stages 27from sqlmesh.core.plan.definition import EvaluatablePlan 28from sqlmesh.core.scheduler import Scheduler 29from sqlmesh.core.snapshot import ( 30 DeployabilityIndex, 31 Snapshot, 32 SnapshotEvaluator, 33 SnapshotIntervals, 34 SnapshotId, 35 SnapshotInfoLike, 36 SnapshotCreationFailedError, 37) 38from sqlmesh.utils import to_snake_case 39from sqlmesh.core.state_sync import StateSync 40from sqlmesh.core.plan.common import identify_restatement_intervals_across_snapshot_versions 41from sqlmesh.utils import CorrelationId 42from sqlmesh.utils.concurrency import NodeExecutionFailedError 43from sqlmesh.utils.errors import PlanError, ConflictingPlanError, SQLMeshError 44from sqlmesh.utils.date import now, to_timestamp 45 46logger = logging.getLogger(__name__) 47 48 49class PlanEvaluator(abc.ABC): 50 @abc.abstractmethod 51 def evaluate( 52 self, 53 plan: EvaluatablePlan, 54 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 55 ) -> None: 56 """Evaluates a plan by pushing snapshots and backfilling data. 57 58 Given a plan, it pushes snapshots into the state and then kicks off 59 the backfill process for all affected snapshots. Once backfill is done, 60 snapshots that are part of the plan are promoted in the environment targeted 61 by this plan. 62 63 Args: 64 plan: The plan to evaluate. 65 circuit_breaker: The circuit breaker to use. 66 """ 67 68 69class BuiltInPlanEvaluator(PlanEvaluator): 70 def __init__( 71 self, 72 state_sync: StateSync, 73 snapshot_evaluator: SnapshotEvaluator, 74 create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler], 75 default_catalog: t.Optional[str], 76 console: t.Optional[Console] = None, 77 ): 78 self.state_sync = state_sync 79 self.snapshot_evaluator = snapshot_evaluator 80 self.create_scheduler = create_scheduler 81 self.default_catalog = default_catalog 82 self.console = console or get_console() 83 self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None 84 85 def evaluate( 86 self, 87 plan: EvaluatablePlan, 88 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 89 ) -> None: 90 self._circuit_breaker = circuit_breaker 91 self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id( 92 CorrelationId.from_plan_id(plan.plan_id) 93 ) 94 95 self.console.start_plan_evaluation(plan) 96 analytics.collector.on_plan_apply_start( 97 plan=plan, 98 engine_type=self.snapshot_evaluator.adapter.dialect, 99 state_sync_type=self.state_sync.state_type(), 100 scheduler_type=c.BUILTIN, 101 ) 102 103 try: 104 plan_stages = stages.build_plan_stages(plan, self.state_sync, self.default_catalog) 105 self._evaluate_stages(plan_stages, plan) 106 except Exception as e: 107 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e) 108 raise 109 else: 110 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id) 111 finally: 112 self.snapshot_evaluator.recycle() 113 self.console.stop_plan_evaluation() 114 115 def _evaluate_stages( 116 self, plan_stages: t.List[stages.PlanStage], plan: EvaluatablePlan 117 ) -> None: 118 for stage in plan_stages: 119 stage_name = stage.__class__.__name__ 120 handler_name = f"visit_{to_snake_case(stage_name)}" 121 if not hasattr(self, handler_name): 122 raise SQLMeshError(f"Unexpected plan stage: {stage_name}") 123 logger.info("Evaluating plan stage %s", stage_name) 124 handler = getattr(self, handler_name) 125 handler(stage, plan) 126 127 def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: EvaluatablePlan) -> None: 128 execute_environment_statements( 129 adapter=self.snapshot_evaluator.adapter, 130 environment_statements=stage.statements, 131 runtime_stage=RuntimeStage.BEFORE_ALL, 132 environment_naming_info=plan.environment.naming_info, 133 default_catalog=self.default_catalog, 134 snapshots=stage.all_snapshots, 135 start=plan.start, 136 end=plan.end, 137 execution_time=plan.execution_time, 138 selected_models=plan.selected_models, 139 ) 140 141 def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None: 142 execute_environment_statements( 143 adapter=self.snapshot_evaluator.adapter, 144 environment_statements=stage.statements, 145 runtime_stage=RuntimeStage.AFTER_ALL, 146 environment_naming_info=plan.environment.naming_info, 147 default_catalog=self.default_catalog, 148 snapshots=stage.all_snapshots, 149 start=plan.start, 150 end=plan.end, 151 execution_time=plan.execution_time, 152 selected_models=plan.selected_models, 153 ) 154 155 def visit_create_snapshot_records_stage( 156 self, stage: stages.CreateSnapshotRecordsStage, plan: EvaluatablePlan 157 ) -> None: 158 self.state_sync.push_snapshots(stage.snapshots) 159 analytics.collector.on_snapshots_created( 160 new_snapshots=stage.snapshots, plan_id=plan.plan_id 161 ) 162 # Update the intervals for the new forward-only snapshots 163 self._update_intervals_for_new_snapshots(stage.snapshots) 164 165 def visit_physical_layer_update_stage( 166 self, stage: stages.PhysicalLayerUpdateStage, plan: EvaluatablePlan 167 ) -> None: 168 skip_message = "" if plan.restatements else "\nSKIP: No physical layer updates to perform" 169 170 snapshots_to_create = stage.snapshots 171 if not snapshots_to_create: 172 self.console.log_success(skip_message) 173 return 174 175 completion_status = None 176 progress_stopped = False 177 try: 178 completion_status = self.snapshot_evaluator.create( 179 snapshots_to_create, 180 stage.all_snapshots, 181 allow_destructive_snapshots=plan.allow_destructive_models, 182 allow_additive_snapshots=plan.allow_additive_models, 183 deployability_index=stage.deployability_index, 184 on_start=lambda x: self.console.start_creation_progress( 185 x, plan.environment, self.default_catalog 186 ), 187 on_complete=self.console.update_creation_progress, 188 ) 189 if completion_status.is_nothing_to_do: 190 self.console.log_success(skip_message) 191 return 192 except SnapshotCreationFailedError as ex: 193 self.console.stop_creation_progress(success=False) 194 progress_stopped = True 195 196 for error in ex.errors: 197 logger.info(str(error), exc_info=error) 198 199 self.console.log_skipped_models({s.name for s in ex.skipped}) 200 self.console.log_failed_models(ex.errors) 201 202 raise PlanError("Plan application failed.") 203 finally: 204 if not progress_stopped: 205 self.console.stop_creation_progress( 206 success=completion_status is not None and completion_status.is_success 207 ) 208 209 def visit_physical_layer_schema_creation_stage( 210 self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan 211 ) -> None: 212 try: 213 self.snapshot_evaluator.create_physical_schemas( 214 stage.snapshots, stage.deployability_index 215 ) 216 except Exception as ex: 217 raise PlanError("Plan application failed.") from ex 218 219 def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None: 220 if plan.empty_backfill: 221 intervals_to_add = [] 222 for snapshot in stage.all_snapshots.values(): 223 if not snapshot.evaluatable or not plan.is_selected_for_backfill(snapshot.name): 224 # Skip snapshots that are not evaluatable or not selected for backfill. 225 continue 226 intervals = [ 227 snapshot.inclusive_exclusive(plan.start, plan.end, strict=False, expand=False) 228 ] 229 is_deployable = stage.deployability_index.is_deployable(snapshot) 230 intervals_to_add.append( 231 SnapshotIntervals( 232 name=snapshot.name, 233 identifier=snapshot.identifier, 234 version=snapshot.version, 235 dev_version=snapshot.dev_version, 236 intervals=intervals if is_deployable else [], 237 dev_intervals=intervals if not is_deployable else [], 238 ) 239 ) 240 self.state_sync.add_snapshots_intervals(intervals_to_add) 241 self.console.log_success("SKIP: No model batches to execute") 242 return 243 244 if not stage.snapshot_to_intervals: 245 self.console.log_success("SKIP: No model batches to execute") 246 return 247 248 scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator) 249 errors, _ = scheduler.run_merged_intervals( 250 merged_intervals=stage.snapshot_to_intervals, 251 deployability_index=stage.deployability_index, 252 environment_naming_info=plan.environment.naming_info, 253 execution_time=plan.execution_time, 254 circuit_breaker=self._circuit_breaker, 255 start=plan.start, 256 end=plan.end, 257 allow_destructive_snapshots=plan.allow_destructive_models, 258 allow_additive_snapshots=plan.allow_additive_models, 259 selected_snapshot_ids=stage.selected_snapshot_ids, 260 selected_models=plan.selected_models, 261 is_restatement=bool(plan.restatements), 262 ) 263 if errors: 264 raise PlanError("Plan application failed.") 265 266 def visit_audit_only_run_stage( 267 self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan 268 ) -> None: 269 audit_snapshots = stage.snapshots 270 if not audit_snapshots: 271 return 272 273 # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them 274 scheduler = self.create_scheduler(audit_snapshots, self.snapshot_evaluator) 275 completion_status = scheduler.audit( 276 plan.environment, 277 plan.start, 278 plan.end, 279 execution_time=plan.execution_time, 280 end_bounded=plan.end_bounded, 281 start_override_per_model=plan.start_override_per_model, 282 end_override_per_model=plan.end_override_per_model, 283 ) 284 285 if completion_status.is_failure: 286 raise PlanError("Plan application failed.") 287 288 def visit_restatement_stage( 289 self, stage: stages.RestatementStage, plan: EvaluatablePlan 290 ) -> None: 291 # Restating intervals on prod plans means that once the data for the intervals being restated has been backfilled 292 # (which happens in the backfill stage) then we need to clear those intervals *from state* across all other environments. 293 # 294 # This ensures that work done in dev environments can still be promoted to prod by forcing dev environments to 295 # re-run intervals that changed in prod (because after this stage runs they are cleared from state and thus show as missing) 296 # 297 # It also means that any new dev environments created while this restatement plan was running also get the 298 # correct intervals cleared because we look up matching snapshots as at right now and not as at the time the plan 299 # was created, which could have been several hours ago if there was a lot of data to restate. 300 # 301 # Without this rule, its possible that promoting a dev table to prod will introduce old data to prod 302 303 intervals_to_clear = identify_restatement_intervals_across_snapshot_versions( 304 state_reader=self.state_sync, 305 prod_restatements=plan.restatements, 306 disable_restatement_models=plan.disabled_restatement_models, 307 loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()}, 308 current_ts=to_timestamp(plan.execution_time or now()), 309 ) 310 311 if not intervals_to_clear: 312 # Nothing to do 313 return 314 315 # While the restatements were being processed, did any of the snapshots being restated get new versions deployed? 316 # If they did, they will not reflect the data that just got restated, so we need to notify the user 317 deployed_during_restatement: t.Dict[ 318 str, t.Tuple[SnapshotTableInfo, SnapshotTableInfo] 319 ] = {} # tuple of (restated_snapshot, current_prod_snapshot) 320 321 if deployed_env := self.state_sync.get_environment(plan.environment.name): 322 promoted_snapshots_by_name = {s.name: s for s in deployed_env.snapshots} 323 324 for name in plan.restatements: 325 snapshot = stage.all_snapshots[name] 326 version = snapshot.table_info.version 327 if ( 328 prod_snapshot := promoted_snapshots_by_name.get(name) 329 ) and prod_snapshot.version != version: 330 deployed_during_restatement[name] = ( 331 snapshot.table_info, 332 prod_snapshot.table_info, 333 ) 334 335 # we need to *not* clear the intervals on the snapshots where new versions were deployed while the restatement was running in order to prevent 336 # subsequent plans from having unexpected intervals to backfill. 337 # we instead list the affected models and abort the plan with an error so the user can decide what to do 338 # (either re-attempt the restatement plan or leave things as they are) 339 filtered_intervals_to_clear = [ 340 (s.snapshot, s.interval) 341 for s in intervals_to_clear.values() 342 if s.snapshot.name not in deployed_during_restatement 343 ] 344 345 if filtered_intervals_to_clear: 346 # We still clear intervals in other envs for models that were successfully restated without having new versions promoted during restatement 347 self.state_sync.remove_intervals( 348 snapshot_intervals=filtered_intervals_to_clear, 349 remove_shared_versions=plan.is_prod, 350 ) 351 352 if deployed_env and deployed_during_restatement: 353 self.console.log_models_updated_during_restatement( 354 list(deployed_during_restatement.values()), 355 plan.environment.naming_info, 356 self.default_catalog, 357 ) 358 raise ConflictingPlanError( 359 f"Another plan ({deployed_env.summary.plan_id}) deployed new versions of {len(deployed_during_restatement)} models in the target environment '{plan.environment.name}' while they were being restated by this plan.\n" 360 "Please re-apply your plan if these new versions should be restated." 361 ) 362 363 def visit_environment_record_update_stage( 364 self, stage: stages.EnvironmentRecordUpdateStage, plan: EvaluatablePlan 365 ) -> None: 366 self.state_sync.promote( 367 plan.environment, 368 no_gaps_snapshot_names=stage.no_gaps_snapshot_names if plan.no_gaps else set(), 369 environment_statements=plan.environment_statements, 370 ) 371 372 def visit_migrate_schemas_stage( 373 self, stage: stages.MigrateSchemasStage, plan: EvaluatablePlan 374 ) -> None: 375 try: 376 self.snapshot_evaluator.migrate( 377 stage.snapshots, 378 stage.all_snapshots, 379 allow_destructive_snapshots=plan.allow_destructive_models, 380 allow_additive_snapshots=plan.allow_additive_models, 381 deployability_index=stage.deployability_index, 382 ) 383 except NodeExecutionFailedError as ex: 384 raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex)) 385 386 def visit_unpause_stage(self, stage: stages.UnpauseStage, plan: EvaluatablePlan) -> None: 387 self.state_sync.unpause_snapshots(stage.promoted_snapshots, plan.end) 388 389 def visit_virtual_layer_update_stage( 390 self, stage: stages.VirtualLayerUpdateStage, plan: EvaluatablePlan 391 ) -> None: 392 environment = plan.environment 393 394 self.console.start_promotion_progress( 395 list(stage.promoted_snapshots) + list(stage.demoted_snapshots), 396 environment.naming_info, 397 self.default_catalog, 398 ) 399 400 completed = False 401 try: 402 self._promote_snapshots( 403 plan, 404 [stage.all_snapshots[s.snapshot_id] for s in stage.promoted_snapshots], 405 environment.naming_info, 406 deployability_index=stage.deployability_index, 407 on_complete=lambda s: self.console.update_promotion_progress(s, True), 408 snapshots=stage.all_snapshots, 409 ) 410 if stage.demoted_environment_naming_info: 411 self._demote_snapshots( 412 [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots], 413 stage.demoted_environment_naming_info, 414 deployability_index=stage.deployability_index, 415 on_complete=lambda s: self.console.update_promotion_progress(s, False), 416 snapshots=stage.all_snapshots, 417 ) 418 419 completed = True 420 finally: 421 self.console.stop_promotion_progress(success=completed) 422 423 def visit_finalize_environment_stage( 424 self, stage: stages.FinalizeEnvironmentStage, plan: EvaluatablePlan 425 ) -> None: 426 self.state_sync.finalize(plan.environment) 427 428 def _promote_snapshots( 429 self, 430 plan: EvaluatablePlan, 431 target_snapshots: t.Iterable[Snapshot], 432 environment_naming_info: EnvironmentNamingInfo, 433 snapshots: t.Dict[SnapshotId, Snapshot], 434 deployability_index: t.Optional[DeployabilityIndex] = None, 435 on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, 436 ) -> None: 437 self.snapshot_evaluator.promote( 438 target_snapshots, 439 start=plan.start, 440 end=plan.end, 441 execution_time=plan.execution_time or now(), 442 snapshots=snapshots, 443 table_mapping=to_view_mapping( 444 snapshots.values(), 445 environment_naming_info, 446 default_catalog=self.default_catalog, 447 dialect=self.snapshot_evaluator.adapter.dialect, 448 ), 449 environment_naming_info=environment_naming_info, 450 deployability_index=deployability_index, 451 on_complete=on_complete, 452 ) 453 454 def _demote_snapshots( 455 self, 456 target_snapshots: t.Iterable[Snapshot], 457 environment_naming_info: EnvironmentNamingInfo, 458 snapshots: t.Dict[SnapshotId, Snapshot], 459 deployability_index: t.Optional[DeployabilityIndex] = None, 460 on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, 461 ) -> None: 462 self.snapshot_evaluator.demote( 463 target_snapshots, 464 environment_naming_info, 465 table_mapping=to_view_mapping( 466 snapshots.values(), 467 environment_naming_info, 468 default_catalog=self.default_catalog, 469 dialect=self.snapshot_evaluator.adapter.dialect, 470 ), 471 deployability_index=deployability_index, 472 on_complete=on_complete, 473 ) 474 475 def _update_intervals_for_new_snapshots(self, snapshots: t.Collection[Snapshot]) -> None: 476 snapshots_intervals: t.List[SnapshotIntervals] = [] 477 for snapshot in snapshots: 478 if snapshot.is_forward_only: 479 snapshots_intervals.append( 480 SnapshotIntervals( 481 name=snapshot.name, 482 identifier=snapshot.identifier, 483 version=snapshot.version, 484 dev_version=snapshot.dev_version, 485 dev_intervals=snapshot.dev_intervals, 486 ) 487 ) 488 489 if snapshots_intervals: 490 self.state_sync.add_snapshots_intervals(snapshots_intervals)
50class PlanEvaluator(abc.ABC): 51 @abc.abstractmethod 52 def evaluate( 53 self, 54 plan: EvaluatablePlan, 55 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 56 ) -> None: 57 """Evaluates a plan by pushing snapshots and backfilling data. 58 59 Given a plan, it pushes snapshots into the state and then kicks off 60 the backfill process for all affected snapshots. Once backfill is done, 61 snapshots that are part of the plan are promoted in the environment targeted 62 by this plan. 63 64 Args: 65 plan: The plan to evaluate. 66 circuit_breaker: The circuit breaker to use. 67 """
Helper class that provides a standard way to create an ABC using inheritance.
51 @abc.abstractmethod 52 def evaluate( 53 self, 54 plan: EvaluatablePlan, 55 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 56 ) -> None: 57 """Evaluates a plan by pushing snapshots and backfilling data. 58 59 Given a plan, it pushes snapshots into the state and then kicks off 60 the backfill process for all affected snapshots. Once backfill is done, 61 snapshots that are part of the plan are promoted in the environment targeted 62 by this plan. 63 64 Args: 65 plan: The plan to evaluate. 66 circuit_breaker: The circuit breaker to use. 67 """
Evaluates a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.
Arguments:
- plan: The plan to evaluate.
- circuit_breaker: The circuit breaker to use.
70class BuiltInPlanEvaluator(PlanEvaluator): 71 def __init__( 72 self, 73 state_sync: StateSync, 74 snapshot_evaluator: SnapshotEvaluator, 75 create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler], 76 default_catalog: t.Optional[str], 77 console: t.Optional[Console] = None, 78 ): 79 self.state_sync = state_sync 80 self.snapshot_evaluator = snapshot_evaluator 81 self.create_scheduler = create_scheduler 82 self.default_catalog = default_catalog 83 self.console = console or get_console() 84 self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None 85 86 def evaluate( 87 self, 88 plan: EvaluatablePlan, 89 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 90 ) -> None: 91 self._circuit_breaker = circuit_breaker 92 self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id( 93 CorrelationId.from_plan_id(plan.plan_id) 94 ) 95 96 self.console.start_plan_evaluation(plan) 97 analytics.collector.on_plan_apply_start( 98 plan=plan, 99 engine_type=self.snapshot_evaluator.adapter.dialect, 100 state_sync_type=self.state_sync.state_type(), 101 scheduler_type=c.BUILTIN, 102 ) 103 104 try: 105 plan_stages = stages.build_plan_stages(plan, self.state_sync, self.default_catalog) 106 self._evaluate_stages(plan_stages, plan) 107 except Exception as e: 108 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e) 109 raise 110 else: 111 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id) 112 finally: 113 self.snapshot_evaluator.recycle() 114 self.console.stop_plan_evaluation() 115 116 def _evaluate_stages( 117 self, plan_stages: t.List[stages.PlanStage], plan: EvaluatablePlan 118 ) -> None: 119 for stage in plan_stages: 120 stage_name = stage.__class__.__name__ 121 handler_name = f"visit_{to_snake_case(stage_name)}" 122 if not hasattr(self, handler_name): 123 raise SQLMeshError(f"Unexpected plan stage: {stage_name}") 124 logger.info("Evaluating plan stage %s", stage_name) 125 handler = getattr(self, handler_name) 126 handler(stage, plan) 127 128 def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: EvaluatablePlan) -> None: 129 execute_environment_statements( 130 adapter=self.snapshot_evaluator.adapter, 131 environment_statements=stage.statements, 132 runtime_stage=RuntimeStage.BEFORE_ALL, 133 environment_naming_info=plan.environment.naming_info, 134 default_catalog=self.default_catalog, 135 snapshots=stage.all_snapshots, 136 start=plan.start, 137 end=plan.end, 138 execution_time=plan.execution_time, 139 selected_models=plan.selected_models, 140 ) 141 142 def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None: 143 execute_environment_statements( 144 adapter=self.snapshot_evaluator.adapter, 145 environment_statements=stage.statements, 146 runtime_stage=RuntimeStage.AFTER_ALL, 147 environment_naming_info=plan.environment.naming_info, 148 default_catalog=self.default_catalog, 149 snapshots=stage.all_snapshots, 150 start=plan.start, 151 end=plan.end, 152 execution_time=plan.execution_time, 153 selected_models=plan.selected_models, 154 ) 155 156 def visit_create_snapshot_records_stage( 157 self, stage: stages.CreateSnapshotRecordsStage, plan: EvaluatablePlan 158 ) -> None: 159 self.state_sync.push_snapshots(stage.snapshots) 160 analytics.collector.on_snapshots_created( 161 new_snapshots=stage.snapshots, plan_id=plan.plan_id 162 ) 163 # Update the intervals for the new forward-only snapshots 164 self._update_intervals_for_new_snapshots(stage.snapshots) 165 166 def visit_physical_layer_update_stage( 167 self, stage: stages.PhysicalLayerUpdateStage, plan: EvaluatablePlan 168 ) -> None: 169 skip_message = "" if plan.restatements else "\nSKIP: No physical layer updates to perform" 170 171 snapshots_to_create = stage.snapshots 172 if not snapshots_to_create: 173 self.console.log_success(skip_message) 174 return 175 176 completion_status = None 177 progress_stopped = False 178 try: 179 completion_status = self.snapshot_evaluator.create( 180 snapshots_to_create, 181 stage.all_snapshots, 182 allow_destructive_snapshots=plan.allow_destructive_models, 183 allow_additive_snapshots=plan.allow_additive_models, 184 deployability_index=stage.deployability_index, 185 on_start=lambda x: self.console.start_creation_progress( 186 x, plan.environment, self.default_catalog 187 ), 188 on_complete=self.console.update_creation_progress, 189 ) 190 if completion_status.is_nothing_to_do: 191 self.console.log_success(skip_message) 192 return 193 except SnapshotCreationFailedError as ex: 194 self.console.stop_creation_progress(success=False) 195 progress_stopped = True 196 197 for error in ex.errors: 198 logger.info(str(error), exc_info=error) 199 200 self.console.log_skipped_models({s.name for s in ex.skipped}) 201 self.console.log_failed_models(ex.errors) 202 203 raise PlanError("Plan application failed.") 204 finally: 205 if not progress_stopped: 206 self.console.stop_creation_progress( 207 success=completion_status is not None and completion_status.is_success 208 ) 209 210 def visit_physical_layer_schema_creation_stage( 211 self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan 212 ) -> None: 213 try: 214 self.snapshot_evaluator.create_physical_schemas( 215 stage.snapshots, stage.deployability_index 216 ) 217 except Exception as ex: 218 raise PlanError("Plan application failed.") from ex 219 220 def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None: 221 if plan.empty_backfill: 222 intervals_to_add = [] 223 for snapshot in stage.all_snapshots.values(): 224 if not snapshot.evaluatable or not plan.is_selected_for_backfill(snapshot.name): 225 # Skip snapshots that are not evaluatable or not selected for backfill. 226 continue 227 intervals = [ 228 snapshot.inclusive_exclusive(plan.start, plan.end, strict=False, expand=False) 229 ] 230 is_deployable = stage.deployability_index.is_deployable(snapshot) 231 intervals_to_add.append( 232 SnapshotIntervals( 233 name=snapshot.name, 234 identifier=snapshot.identifier, 235 version=snapshot.version, 236 dev_version=snapshot.dev_version, 237 intervals=intervals if is_deployable else [], 238 dev_intervals=intervals if not is_deployable else [], 239 ) 240 ) 241 self.state_sync.add_snapshots_intervals(intervals_to_add) 242 self.console.log_success("SKIP: No model batches to execute") 243 return 244 245 if not stage.snapshot_to_intervals: 246 self.console.log_success("SKIP: No model batches to execute") 247 return 248 249 scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator) 250 errors, _ = scheduler.run_merged_intervals( 251 merged_intervals=stage.snapshot_to_intervals, 252 deployability_index=stage.deployability_index, 253 environment_naming_info=plan.environment.naming_info, 254 execution_time=plan.execution_time, 255 circuit_breaker=self._circuit_breaker, 256 start=plan.start, 257 end=plan.end, 258 allow_destructive_snapshots=plan.allow_destructive_models, 259 allow_additive_snapshots=plan.allow_additive_models, 260 selected_snapshot_ids=stage.selected_snapshot_ids, 261 selected_models=plan.selected_models, 262 is_restatement=bool(plan.restatements), 263 ) 264 if errors: 265 raise PlanError("Plan application failed.") 266 267 def visit_audit_only_run_stage( 268 self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan 269 ) -> None: 270 audit_snapshots = stage.snapshots 271 if not audit_snapshots: 272 return 273 274 # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them 275 scheduler = self.create_scheduler(audit_snapshots, self.snapshot_evaluator) 276 completion_status = scheduler.audit( 277 plan.environment, 278 plan.start, 279 plan.end, 280 execution_time=plan.execution_time, 281 end_bounded=plan.end_bounded, 282 start_override_per_model=plan.start_override_per_model, 283 end_override_per_model=plan.end_override_per_model, 284 ) 285 286 if completion_status.is_failure: 287 raise PlanError("Plan application failed.") 288 289 def visit_restatement_stage( 290 self, stage: stages.RestatementStage, plan: EvaluatablePlan 291 ) -> None: 292 # Restating intervals on prod plans means that once the data for the intervals being restated has been backfilled 293 # (which happens in the backfill stage) then we need to clear those intervals *from state* across all other environments. 294 # 295 # This ensures that work done in dev environments can still be promoted to prod by forcing dev environments to 296 # re-run intervals that changed in prod (because after this stage runs they are cleared from state and thus show as missing) 297 # 298 # It also means that any new dev environments created while this restatement plan was running also get the 299 # correct intervals cleared because we look up matching snapshots as at right now and not as at the time the plan 300 # was created, which could have been several hours ago if there was a lot of data to restate. 301 # 302 # Without this rule, its possible that promoting a dev table to prod will introduce old data to prod 303 304 intervals_to_clear = identify_restatement_intervals_across_snapshot_versions( 305 state_reader=self.state_sync, 306 prod_restatements=plan.restatements, 307 disable_restatement_models=plan.disabled_restatement_models, 308 loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()}, 309 current_ts=to_timestamp(plan.execution_time or now()), 310 ) 311 312 if not intervals_to_clear: 313 # Nothing to do 314 return 315 316 # While the restatements were being processed, did any of the snapshots being restated get new versions deployed? 317 # If they did, they will not reflect the data that just got restated, so we need to notify the user 318 deployed_during_restatement: t.Dict[ 319 str, t.Tuple[SnapshotTableInfo, SnapshotTableInfo] 320 ] = {} # tuple of (restated_snapshot, current_prod_snapshot) 321 322 if deployed_env := self.state_sync.get_environment(plan.environment.name): 323 promoted_snapshots_by_name = {s.name: s for s in deployed_env.snapshots} 324 325 for name in plan.restatements: 326 snapshot = stage.all_snapshots[name] 327 version = snapshot.table_info.version 328 if ( 329 prod_snapshot := promoted_snapshots_by_name.get(name) 330 ) and prod_snapshot.version != version: 331 deployed_during_restatement[name] = ( 332 snapshot.table_info, 333 prod_snapshot.table_info, 334 ) 335 336 # we need to *not* clear the intervals on the snapshots where new versions were deployed while the restatement was running in order to prevent 337 # subsequent plans from having unexpected intervals to backfill. 338 # we instead list the affected models and abort the plan with an error so the user can decide what to do 339 # (either re-attempt the restatement plan or leave things as they are) 340 filtered_intervals_to_clear = [ 341 (s.snapshot, s.interval) 342 for s in intervals_to_clear.values() 343 if s.snapshot.name not in deployed_during_restatement 344 ] 345 346 if filtered_intervals_to_clear: 347 # We still clear intervals in other envs for models that were successfully restated without having new versions promoted during restatement 348 self.state_sync.remove_intervals( 349 snapshot_intervals=filtered_intervals_to_clear, 350 remove_shared_versions=plan.is_prod, 351 ) 352 353 if deployed_env and deployed_during_restatement: 354 self.console.log_models_updated_during_restatement( 355 list(deployed_during_restatement.values()), 356 plan.environment.naming_info, 357 self.default_catalog, 358 ) 359 raise ConflictingPlanError( 360 f"Another plan ({deployed_env.summary.plan_id}) deployed new versions of {len(deployed_during_restatement)} models in the target environment '{plan.environment.name}' while they were being restated by this plan.\n" 361 "Please re-apply your plan if these new versions should be restated." 362 ) 363 364 def visit_environment_record_update_stage( 365 self, stage: stages.EnvironmentRecordUpdateStage, plan: EvaluatablePlan 366 ) -> None: 367 self.state_sync.promote( 368 plan.environment, 369 no_gaps_snapshot_names=stage.no_gaps_snapshot_names if plan.no_gaps else set(), 370 environment_statements=plan.environment_statements, 371 ) 372 373 def visit_migrate_schemas_stage( 374 self, stage: stages.MigrateSchemasStage, plan: EvaluatablePlan 375 ) -> None: 376 try: 377 self.snapshot_evaluator.migrate( 378 stage.snapshots, 379 stage.all_snapshots, 380 allow_destructive_snapshots=plan.allow_destructive_models, 381 allow_additive_snapshots=plan.allow_additive_models, 382 deployability_index=stage.deployability_index, 383 ) 384 except NodeExecutionFailedError as ex: 385 raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex)) 386 387 def visit_unpause_stage(self, stage: stages.UnpauseStage, plan: EvaluatablePlan) -> None: 388 self.state_sync.unpause_snapshots(stage.promoted_snapshots, plan.end) 389 390 def visit_virtual_layer_update_stage( 391 self, stage: stages.VirtualLayerUpdateStage, plan: EvaluatablePlan 392 ) -> None: 393 environment = plan.environment 394 395 self.console.start_promotion_progress( 396 list(stage.promoted_snapshots) + list(stage.demoted_snapshots), 397 environment.naming_info, 398 self.default_catalog, 399 ) 400 401 completed = False 402 try: 403 self._promote_snapshots( 404 plan, 405 [stage.all_snapshots[s.snapshot_id] for s in stage.promoted_snapshots], 406 environment.naming_info, 407 deployability_index=stage.deployability_index, 408 on_complete=lambda s: self.console.update_promotion_progress(s, True), 409 snapshots=stage.all_snapshots, 410 ) 411 if stage.demoted_environment_naming_info: 412 self._demote_snapshots( 413 [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots], 414 stage.demoted_environment_naming_info, 415 deployability_index=stage.deployability_index, 416 on_complete=lambda s: self.console.update_promotion_progress(s, False), 417 snapshots=stage.all_snapshots, 418 ) 419 420 completed = True 421 finally: 422 self.console.stop_promotion_progress(success=completed) 423 424 def visit_finalize_environment_stage( 425 self, stage: stages.FinalizeEnvironmentStage, plan: EvaluatablePlan 426 ) -> None: 427 self.state_sync.finalize(plan.environment) 428 429 def _promote_snapshots( 430 self, 431 plan: EvaluatablePlan, 432 target_snapshots: t.Iterable[Snapshot], 433 environment_naming_info: EnvironmentNamingInfo, 434 snapshots: t.Dict[SnapshotId, Snapshot], 435 deployability_index: t.Optional[DeployabilityIndex] = None, 436 on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, 437 ) -> None: 438 self.snapshot_evaluator.promote( 439 target_snapshots, 440 start=plan.start, 441 end=plan.end, 442 execution_time=plan.execution_time or now(), 443 snapshots=snapshots, 444 table_mapping=to_view_mapping( 445 snapshots.values(), 446 environment_naming_info, 447 default_catalog=self.default_catalog, 448 dialect=self.snapshot_evaluator.adapter.dialect, 449 ), 450 environment_naming_info=environment_naming_info, 451 deployability_index=deployability_index, 452 on_complete=on_complete, 453 ) 454 455 def _demote_snapshots( 456 self, 457 target_snapshots: t.Iterable[Snapshot], 458 environment_naming_info: EnvironmentNamingInfo, 459 snapshots: t.Dict[SnapshotId, Snapshot], 460 deployability_index: t.Optional[DeployabilityIndex] = None, 461 on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, 462 ) -> None: 463 self.snapshot_evaluator.demote( 464 target_snapshots, 465 environment_naming_info, 466 table_mapping=to_view_mapping( 467 snapshots.values(), 468 environment_naming_info, 469 default_catalog=self.default_catalog, 470 dialect=self.snapshot_evaluator.adapter.dialect, 471 ), 472 deployability_index=deployability_index, 473 on_complete=on_complete, 474 ) 475 476 def _update_intervals_for_new_snapshots(self, snapshots: t.Collection[Snapshot]) -> None: 477 snapshots_intervals: t.List[SnapshotIntervals] = [] 478 for snapshot in snapshots: 479 if snapshot.is_forward_only: 480 snapshots_intervals.append( 481 SnapshotIntervals( 482 name=snapshot.name, 483 identifier=snapshot.identifier, 484 version=snapshot.version, 485 dev_version=snapshot.dev_version, 486 dev_intervals=snapshot.dev_intervals, 487 ) 488 ) 489 490 if snapshots_intervals: 491 self.state_sync.add_snapshots_intervals(snapshots_intervals)
Helper class that provides a standard way to create an ABC using inheritance.
71 def __init__( 72 self, 73 state_sync: StateSync, 74 snapshot_evaluator: SnapshotEvaluator, 75 create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler], 76 default_catalog: t.Optional[str], 77 console: t.Optional[Console] = None, 78 ): 79 self.state_sync = state_sync 80 self.snapshot_evaluator = snapshot_evaluator 81 self.create_scheduler = create_scheduler 82 self.default_catalog = default_catalog 83 self.console = console or get_console() 84 self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None
86 def evaluate( 87 self, 88 plan: EvaluatablePlan, 89 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 90 ) -> None: 91 self._circuit_breaker = circuit_breaker 92 self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id( 93 CorrelationId.from_plan_id(plan.plan_id) 94 ) 95 96 self.console.start_plan_evaluation(plan) 97 analytics.collector.on_plan_apply_start( 98 plan=plan, 99 engine_type=self.snapshot_evaluator.adapter.dialect, 100 state_sync_type=self.state_sync.state_type(), 101 scheduler_type=c.BUILTIN, 102 ) 103 104 try: 105 plan_stages = stages.build_plan_stages(plan, self.state_sync, self.default_catalog) 106 self._evaluate_stages(plan_stages, plan) 107 except Exception as e: 108 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e) 109 raise 110 else: 111 analytics.collector.on_plan_apply_end(plan_id=plan.plan_id) 112 finally: 113 self.snapshot_evaluator.recycle() 114 self.console.stop_plan_evaluation()
Evaluates a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.
Arguments:
- plan: The plan to evaluate.
- circuit_breaker: The circuit breaker to use.
128 def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: EvaluatablePlan) -> None: 129 execute_environment_statements( 130 adapter=self.snapshot_evaluator.adapter, 131 environment_statements=stage.statements, 132 runtime_stage=RuntimeStage.BEFORE_ALL, 133 environment_naming_info=plan.environment.naming_info, 134 default_catalog=self.default_catalog, 135 snapshots=stage.all_snapshots, 136 start=plan.start, 137 end=plan.end, 138 execution_time=plan.execution_time, 139 selected_models=plan.selected_models, 140 )
142 def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None: 143 execute_environment_statements( 144 adapter=self.snapshot_evaluator.adapter, 145 environment_statements=stage.statements, 146 runtime_stage=RuntimeStage.AFTER_ALL, 147 environment_naming_info=plan.environment.naming_info, 148 default_catalog=self.default_catalog, 149 snapshots=stage.all_snapshots, 150 start=plan.start, 151 end=plan.end, 152 execution_time=plan.execution_time, 153 selected_models=plan.selected_models, 154 )
156 def visit_create_snapshot_records_stage( 157 self, stage: stages.CreateSnapshotRecordsStage, plan: EvaluatablePlan 158 ) -> None: 159 self.state_sync.push_snapshots(stage.snapshots) 160 analytics.collector.on_snapshots_created( 161 new_snapshots=stage.snapshots, plan_id=plan.plan_id 162 ) 163 # Update the intervals for the new forward-only snapshots 164 self._update_intervals_for_new_snapshots(stage.snapshots)
166 def visit_physical_layer_update_stage( 167 self, stage: stages.PhysicalLayerUpdateStage, plan: EvaluatablePlan 168 ) -> None: 169 skip_message = "" if plan.restatements else "\nSKIP: No physical layer updates to perform" 170 171 snapshots_to_create = stage.snapshots 172 if not snapshots_to_create: 173 self.console.log_success(skip_message) 174 return 175 176 completion_status = None 177 progress_stopped = False 178 try: 179 completion_status = self.snapshot_evaluator.create( 180 snapshots_to_create, 181 stage.all_snapshots, 182 allow_destructive_snapshots=plan.allow_destructive_models, 183 allow_additive_snapshots=plan.allow_additive_models, 184 deployability_index=stage.deployability_index, 185 on_start=lambda x: self.console.start_creation_progress( 186 x, plan.environment, self.default_catalog 187 ), 188 on_complete=self.console.update_creation_progress, 189 ) 190 if completion_status.is_nothing_to_do: 191 self.console.log_success(skip_message) 192 return 193 except SnapshotCreationFailedError as ex: 194 self.console.stop_creation_progress(success=False) 195 progress_stopped = True 196 197 for error in ex.errors: 198 logger.info(str(error), exc_info=error) 199 200 self.console.log_skipped_models({s.name for s in ex.skipped}) 201 self.console.log_failed_models(ex.errors) 202 203 raise PlanError("Plan application failed.") 204 finally: 205 if not progress_stopped: 206 self.console.stop_creation_progress( 207 success=completion_status is not None and completion_status.is_success 208 )
210 def visit_physical_layer_schema_creation_stage( 211 self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan 212 ) -> None: 213 try: 214 self.snapshot_evaluator.create_physical_schemas( 215 stage.snapshots, stage.deployability_index 216 ) 217 except Exception as ex: 218 raise PlanError("Plan application failed.") from ex
220 def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None: 221 if plan.empty_backfill: 222 intervals_to_add = [] 223 for snapshot in stage.all_snapshots.values(): 224 if not snapshot.evaluatable or not plan.is_selected_for_backfill(snapshot.name): 225 # Skip snapshots that are not evaluatable or not selected for backfill. 226 continue 227 intervals = [ 228 snapshot.inclusive_exclusive(plan.start, plan.end, strict=False, expand=False) 229 ] 230 is_deployable = stage.deployability_index.is_deployable(snapshot) 231 intervals_to_add.append( 232 SnapshotIntervals( 233 name=snapshot.name, 234 identifier=snapshot.identifier, 235 version=snapshot.version, 236 dev_version=snapshot.dev_version, 237 intervals=intervals if is_deployable else [], 238 dev_intervals=intervals if not is_deployable else [], 239 ) 240 ) 241 self.state_sync.add_snapshots_intervals(intervals_to_add) 242 self.console.log_success("SKIP: No model batches to execute") 243 return 244 245 if not stage.snapshot_to_intervals: 246 self.console.log_success("SKIP: No model batches to execute") 247 return 248 249 scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator) 250 errors, _ = scheduler.run_merged_intervals( 251 merged_intervals=stage.snapshot_to_intervals, 252 deployability_index=stage.deployability_index, 253 environment_naming_info=plan.environment.naming_info, 254 execution_time=plan.execution_time, 255 circuit_breaker=self._circuit_breaker, 256 start=plan.start, 257 end=plan.end, 258 allow_destructive_snapshots=plan.allow_destructive_models, 259 allow_additive_snapshots=plan.allow_additive_models, 260 selected_snapshot_ids=stage.selected_snapshot_ids, 261 selected_models=plan.selected_models, 262 is_restatement=bool(plan.restatements), 263 ) 264 if errors: 265 raise PlanError("Plan application failed.")
267 def visit_audit_only_run_stage( 268 self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan 269 ) -> None: 270 audit_snapshots = stage.snapshots 271 if not audit_snapshots: 272 return 273 274 # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them 275 scheduler = self.create_scheduler(audit_snapshots, self.snapshot_evaluator) 276 completion_status = scheduler.audit( 277 plan.environment, 278 plan.start, 279 plan.end, 280 execution_time=plan.execution_time, 281 end_bounded=plan.end_bounded, 282 start_override_per_model=plan.start_override_per_model, 283 end_override_per_model=plan.end_override_per_model, 284 ) 285 286 if completion_status.is_failure: 287 raise PlanError("Plan application failed.")
289 def visit_restatement_stage( 290 self, stage: stages.RestatementStage, plan: EvaluatablePlan 291 ) -> None: 292 # Restating intervals on prod plans means that once the data for the intervals being restated has been backfilled 293 # (which happens in the backfill stage) then we need to clear those intervals *from state* across all other environments. 294 # 295 # This ensures that work done in dev environments can still be promoted to prod by forcing dev environments to 296 # re-run intervals that changed in prod (because after this stage runs they are cleared from state and thus show as missing) 297 # 298 # It also means that any new dev environments created while this restatement plan was running also get the 299 # correct intervals cleared because we look up matching snapshots as at right now and not as at the time the plan 300 # was created, which could have been several hours ago if there was a lot of data to restate. 301 # 302 # Without this rule, its possible that promoting a dev table to prod will introduce old data to prod 303 304 intervals_to_clear = identify_restatement_intervals_across_snapshot_versions( 305 state_reader=self.state_sync, 306 prod_restatements=plan.restatements, 307 disable_restatement_models=plan.disabled_restatement_models, 308 loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()}, 309 current_ts=to_timestamp(plan.execution_time or now()), 310 ) 311 312 if not intervals_to_clear: 313 # Nothing to do 314 return 315 316 # While the restatements were being processed, did any of the snapshots being restated get new versions deployed? 317 # If they did, they will not reflect the data that just got restated, so we need to notify the user 318 deployed_during_restatement: t.Dict[ 319 str, t.Tuple[SnapshotTableInfo, SnapshotTableInfo] 320 ] = {} # tuple of (restated_snapshot, current_prod_snapshot) 321 322 if deployed_env := self.state_sync.get_environment(plan.environment.name): 323 promoted_snapshots_by_name = {s.name: s for s in deployed_env.snapshots} 324 325 for name in plan.restatements: 326 snapshot = stage.all_snapshots[name] 327 version = snapshot.table_info.version 328 if ( 329 prod_snapshot := promoted_snapshots_by_name.get(name) 330 ) and prod_snapshot.version != version: 331 deployed_during_restatement[name] = ( 332 snapshot.table_info, 333 prod_snapshot.table_info, 334 ) 335 336 # we need to *not* clear the intervals on the snapshots where new versions were deployed while the restatement was running in order to prevent 337 # subsequent plans from having unexpected intervals to backfill. 338 # we instead list the affected models and abort the plan with an error so the user can decide what to do 339 # (either re-attempt the restatement plan or leave things as they are) 340 filtered_intervals_to_clear = [ 341 (s.snapshot, s.interval) 342 for s in intervals_to_clear.values() 343 if s.snapshot.name not in deployed_during_restatement 344 ] 345 346 if filtered_intervals_to_clear: 347 # We still clear intervals in other envs for models that were successfully restated without having new versions promoted during restatement 348 self.state_sync.remove_intervals( 349 snapshot_intervals=filtered_intervals_to_clear, 350 remove_shared_versions=plan.is_prod, 351 ) 352 353 if deployed_env and deployed_during_restatement: 354 self.console.log_models_updated_during_restatement( 355 list(deployed_during_restatement.values()), 356 plan.environment.naming_info, 357 self.default_catalog, 358 ) 359 raise ConflictingPlanError( 360 f"Another plan ({deployed_env.summary.plan_id}) deployed new versions of {len(deployed_during_restatement)} models in the target environment '{plan.environment.name}' while they were being restated by this plan.\n" 361 "Please re-apply your plan if these new versions should be restated." 362 )
364 def visit_environment_record_update_stage( 365 self, stage: stages.EnvironmentRecordUpdateStage, plan: EvaluatablePlan 366 ) -> None: 367 self.state_sync.promote( 368 plan.environment, 369 no_gaps_snapshot_names=stage.no_gaps_snapshot_names if plan.no_gaps else set(), 370 environment_statements=plan.environment_statements, 371 )
373 def visit_migrate_schemas_stage( 374 self, stage: stages.MigrateSchemasStage, plan: EvaluatablePlan 375 ) -> None: 376 try: 377 self.snapshot_evaluator.migrate( 378 stage.snapshots, 379 stage.all_snapshots, 380 allow_destructive_snapshots=plan.allow_destructive_models, 381 allow_additive_snapshots=plan.allow_additive_models, 382 deployability_index=stage.deployability_index, 383 ) 384 except NodeExecutionFailedError as ex: 385 raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex))
390 def visit_virtual_layer_update_stage( 391 self, stage: stages.VirtualLayerUpdateStage, plan: EvaluatablePlan 392 ) -> None: 393 environment = plan.environment 394 395 self.console.start_promotion_progress( 396 list(stage.promoted_snapshots) + list(stage.demoted_snapshots), 397 environment.naming_info, 398 self.default_catalog, 399 ) 400 401 completed = False 402 try: 403 self._promote_snapshots( 404 plan, 405 [stage.all_snapshots[s.snapshot_id] for s in stage.promoted_snapshots], 406 environment.naming_info, 407 deployability_index=stage.deployability_index, 408 on_complete=lambda s: self.console.update_promotion_progress(s, True), 409 snapshots=stage.all_snapshots, 410 ) 411 if stage.demoted_environment_naming_info: 412 self._demote_snapshots( 413 [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots], 414 stage.demoted_environment_naming_info, 415 deployability_index=stage.deployability_index, 416 on_complete=lambda s: self.console.update_promotion_progress(s, False), 417 snapshots=stage.all_snapshots, 418 ) 419 420 completed = True 421 finally: 422 self.console.stop_promotion_progress(success=completed)