Edit on GitHub

PlanEvaluator

A plan evaluator is responsible for evaluating a plan when it is being applied.

Evaluation steps

At a high level, when a plan is evaluated, SQLMesh will:

  • Push new snapshots to the state sync.
  • Create snapshot tables.
  • Backfill data.
  • Promote the snapshots.

Refer to sqlmesh.core.plan.

  1"""
  2# PlanEvaluator
  3
  4A plan evaluator is responsible for evaluating a plan when it is being applied.
  5
  6# Evaluation steps
  7
  8At a high level, when a plan is evaluated, SQLMesh will:
  9- Push new snapshots to the state sync.
 10- Create snapshot tables.
 11- Backfill data.
 12- Promote the snapshots.
 13
 14Refer to `sqlmesh.core.plan`.
 15"""
 16
 17import abc
 18import logging
 19import typing as t
 20from sqlmesh.core import analytics
 21from sqlmesh.core import constants as c
 22from sqlmesh.core.console import Console, get_console
 23from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
 24from sqlmesh.core.macros import RuntimeStage
 25from sqlmesh.core.snapshot.definition import to_view_mapping, SnapshotTableInfo
 26from sqlmesh.core.plan import stages
 27from sqlmesh.core.plan.definition import EvaluatablePlan
 28from sqlmesh.core.scheduler import Scheduler
 29from sqlmesh.core.snapshot import (
 30    DeployabilityIndex,
 31    Snapshot,
 32    SnapshotEvaluator,
 33    SnapshotIntervals,
 34    SnapshotId,
 35    SnapshotInfoLike,
 36    SnapshotCreationFailedError,
 37)
 38from sqlmesh.utils import to_snake_case
 39from sqlmesh.core.state_sync import StateSync
 40from sqlmesh.core.plan.common import identify_restatement_intervals_across_snapshot_versions
 41from sqlmesh.utils import CorrelationId
 42from sqlmesh.utils.concurrency import NodeExecutionFailedError
 43from sqlmesh.utils.errors import PlanError, ConflictingPlanError, SQLMeshError
 44from sqlmesh.utils.date import now, to_timestamp
 45
 46logger = logging.getLogger(__name__)
 47
 48
 49class PlanEvaluator(abc.ABC):
 50    @abc.abstractmethod
 51    def evaluate(
 52        self,
 53        plan: EvaluatablePlan,
 54        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 55    ) -> None:
 56        """Evaluates a plan by pushing snapshots and backfilling data.
 57
 58        Given a plan, it pushes snapshots into the state and then kicks off
 59        the backfill process for all affected snapshots. Once backfill is done,
 60        snapshots that are part of the plan are promoted in the environment targeted
 61        by this plan.
 62
 63        Args:
 64            plan: The plan to evaluate.
 65            circuit_breaker: The circuit breaker to use.
 66        """
 67
 68
 69class BuiltInPlanEvaluator(PlanEvaluator):
 70    def __init__(
 71        self,
 72        state_sync: StateSync,
 73        snapshot_evaluator: SnapshotEvaluator,
 74        create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler],
 75        default_catalog: t.Optional[str],
 76        console: t.Optional[Console] = None,
 77    ):
 78        self.state_sync = state_sync
 79        self.snapshot_evaluator = snapshot_evaluator
 80        self.create_scheduler = create_scheduler
 81        self.default_catalog = default_catalog
 82        self.console = console or get_console()
 83        self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None
 84
 85    def evaluate(
 86        self,
 87        plan: EvaluatablePlan,
 88        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 89    ) -> None:
 90        self._circuit_breaker = circuit_breaker
 91        self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id(
 92            CorrelationId.from_plan_id(plan.plan_id)
 93        )
 94
 95        self.console.start_plan_evaluation(plan)
 96        analytics.collector.on_plan_apply_start(
 97            plan=plan,
 98            engine_type=self.snapshot_evaluator.adapter.dialect,
 99            state_sync_type=self.state_sync.state_type(),
100            scheduler_type=c.BUILTIN,
101        )
102
103        try:
104            plan_stages = stages.build_plan_stages(plan, self.state_sync, self.default_catalog)
105            self._evaluate_stages(plan_stages, plan)
106        except Exception as e:
107            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e)
108            raise
109        else:
110            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id)
111        finally:
112            self.snapshot_evaluator.recycle()
113            self.console.stop_plan_evaluation()
114
115    def _evaluate_stages(
116        self, plan_stages: t.List[stages.PlanStage], plan: EvaluatablePlan
117    ) -> None:
118        for stage in plan_stages:
119            stage_name = stage.__class__.__name__
120            handler_name = f"visit_{to_snake_case(stage_name)}"
121            if not hasattr(self, handler_name):
122                raise SQLMeshError(f"Unexpected plan stage: {stage_name}")
123            logger.info("Evaluating plan stage %s", stage_name)
124            handler = getattr(self, handler_name)
125            handler(stage, plan)
126
127    def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: EvaluatablePlan) -> None:
128        execute_environment_statements(
129            adapter=self.snapshot_evaluator.adapter,
130            environment_statements=stage.statements,
131            runtime_stage=RuntimeStage.BEFORE_ALL,
132            environment_naming_info=plan.environment.naming_info,
133            default_catalog=self.default_catalog,
134            snapshots=stage.all_snapshots,
135            start=plan.start,
136            end=plan.end,
137            execution_time=plan.execution_time,
138            selected_models=plan.selected_models,
139        )
140
141    def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None:
142        execute_environment_statements(
143            adapter=self.snapshot_evaluator.adapter,
144            environment_statements=stage.statements,
145            runtime_stage=RuntimeStage.AFTER_ALL,
146            environment_naming_info=plan.environment.naming_info,
147            default_catalog=self.default_catalog,
148            snapshots=stage.all_snapshots,
149            start=plan.start,
150            end=plan.end,
151            execution_time=plan.execution_time,
152            selected_models=plan.selected_models,
153        )
154
155    def visit_create_snapshot_records_stage(
156        self, stage: stages.CreateSnapshotRecordsStage, plan: EvaluatablePlan
157    ) -> None:
158        self.state_sync.push_snapshots(stage.snapshots)
159        analytics.collector.on_snapshots_created(
160            new_snapshots=stage.snapshots, plan_id=plan.plan_id
161        )
162        # Update the intervals for the new forward-only snapshots
163        self._update_intervals_for_new_snapshots(stage.snapshots)
164
165    def visit_physical_layer_update_stage(
166        self, stage: stages.PhysicalLayerUpdateStage, plan: EvaluatablePlan
167    ) -> None:
168        skip_message = "" if plan.restatements else "\nSKIP: No physical layer updates to perform"
169
170        snapshots_to_create = stage.snapshots
171        if not snapshots_to_create:
172            self.console.log_success(skip_message)
173            return
174
175        completion_status = None
176        progress_stopped = False
177        try:
178            completion_status = self.snapshot_evaluator.create(
179                snapshots_to_create,
180                stage.all_snapshots,
181                allow_destructive_snapshots=plan.allow_destructive_models,
182                allow_additive_snapshots=plan.allow_additive_models,
183                deployability_index=stage.deployability_index,
184                on_start=lambda x: self.console.start_creation_progress(
185                    x, plan.environment, self.default_catalog
186                ),
187                on_complete=self.console.update_creation_progress,
188            )
189            if completion_status.is_nothing_to_do:
190                self.console.log_success(skip_message)
191                return
192        except SnapshotCreationFailedError as ex:
193            self.console.stop_creation_progress(success=False)
194            progress_stopped = True
195
196            for error in ex.errors:
197                logger.info(str(error), exc_info=error)
198
199            self.console.log_skipped_models({s.name for s in ex.skipped})
200            self.console.log_failed_models(ex.errors)
201
202            raise PlanError("Plan application failed.")
203        finally:
204            if not progress_stopped:
205                self.console.stop_creation_progress(
206                    success=completion_status is not None and completion_status.is_success
207                )
208
209    def visit_physical_layer_schema_creation_stage(
210        self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
211    ) -> None:
212        try:
213            self.snapshot_evaluator.create_physical_schemas(
214                stage.snapshots, stage.deployability_index
215            )
216        except Exception as ex:
217            raise PlanError("Plan application failed.") from ex
218
219    def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None:
220        if plan.empty_backfill:
221            intervals_to_add = []
222            for snapshot in stage.all_snapshots.values():
223                if not snapshot.evaluatable or not plan.is_selected_for_backfill(snapshot.name):
224                    # Skip snapshots that are not evaluatable or not selected for backfill.
225                    continue
226                intervals = [
227                    snapshot.inclusive_exclusive(plan.start, plan.end, strict=False, expand=False)
228                ]
229                is_deployable = stage.deployability_index.is_deployable(snapshot)
230                intervals_to_add.append(
231                    SnapshotIntervals(
232                        name=snapshot.name,
233                        identifier=snapshot.identifier,
234                        version=snapshot.version,
235                        dev_version=snapshot.dev_version,
236                        intervals=intervals if is_deployable else [],
237                        dev_intervals=intervals if not is_deployable else [],
238                    )
239                )
240            self.state_sync.add_snapshots_intervals(intervals_to_add)
241            self.console.log_success("SKIP: No model batches to execute")
242            return
243
244        if not stage.snapshot_to_intervals:
245            self.console.log_success("SKIP: No model batches to execute")
246            return
247
248        scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
249        errors, _ = scheduler.run_merged_intervals(
250            merged_intervals=stage.snapshot_to_intervals,
251            deployability_index=stage.deployability_index,
252            environment_naming_info=plan.environment.naming_info,
253            execution_time=plan.execution_time,
254            circuit_breaker=self._circuit_breaker,
255            start=plan.start,
256            end=plan.end,
257            allow_destructive_snapshots=plan.allow_destructive_models,
258            allow_additive_snapshots=plan.allow_additive_models,
259            selected_snapshot_ids=stage.selected_snapshot_ids,
260            selected_models=plan.selected_models,
261            is_restatement=bool(plan.restatements),
262        )
263        if errors:
264            raise PlanError("Plan application failed.")
265
266    def visit_audit_only_run_stage(
267        self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan
268    ) -> None:
269        audit_snapshots = stage.snapshots
270        if not audit_snapshots:
271            return
272
273        # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
274        scheduler = self.create_scheduler(audit_snapshots, self.snapshot_evaluator)
275        completion_status = scheduler.audit(
276            plan.environment,
277            plan.start,
278            plan.end,
279            execution_time=plan.execution_time,
280            end_bounded=plan.end_bounded,
281            start_override_per_model=plan.start_override_per_model,
282            end_override_per_model=plan.end_override_per_model,
283        )
284
285        if completion_status.is_failure:
286            raise PlanError("Plan application failed.")
287
288    def visit_restatement_stage(
289        self, stage: stages.RestatementStage, plan: EvaluatablePlan
290    ) -> None:
291        # Restating intervals on prod plans means that once the data for the intervals being restated has been backfilled
292        # (which happens in the backfill stage) then we need to clear those intervals *from state* across all other environments.
293        #
294        # This ensures that work done in dev environments can still be promoted to prod by forcing dev environments to
295        # re-run intervals that changed in prod (because after this stage runs they are cleared from state and thus show as missing)
296        #
297        # It also means that any new dev environments created while this restatement plan was running also get the
298        # correct intervals cleared because we look up matching snapshots as at right now and not as at the time the plan
299        # was created, which could have been several hours ago if there was a lot of data to restate.
300        #
301        # Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
302
303        intervals_to_clear = identify_restatement_intervals_across_snapshot_versions(
304            state_reader=self.state_sync,
305            prod_restatements=plan.restatements,
306            disable_restatement_models=plan.disabled_restatement_models,
307            loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()},
308            current_ts=to_timestamp(plan.execution_time or now()),
309        )
310
311        if not intervals_to_clear:
312            # Nothing to do
313            return
314
315        # While the restatements were being processed, did any of the snapshots being restated get new versions deployed?
316        # If they did, they will not reflect the data that just got restated, so we need to notify the user
317        deployed_during_restatement: t.Dict[
318            str, t.Tuple[SnapshotTableInfo, SnapshotTableInfo]
319        ] = {}  # tuple of (restated_snapshot, current_prod_snapshot)
320
321        if deployed_env := self.state_sync.get_environment(plan.environment.name):
322            promoted_snapshots_by_name = {s.name: s for s in deployed_env.snapshots}
323
324            for name in plan.restatements:
325                snapshot = stage.all_snapshots[name]
326                version = snapshot.table_info.version
327                if (
328                    prod_snapshot := promoted_snapshots_by_name.get(name)
329                ) and prod_snapshot.version != version:
330                    deployed_during_restatement[name] = (
331                        snapshot.table_info,
332                        prod_snapshot.table_info,
333                    )
334
335        # we need to *not* clear the intervals on the snapshots where new versions were deployed while the restatement was running in order to prevent
336        # subsequent plans from having unexpected intervals to backfill.
337        # we instead list the affected models and abort the plan with an error so the user can decide what to do
338        # (either re-attempt the restatement plan or leave things as they are)
339        filtered_intervals_to_clear = [
340            (s.snapshot, s.interval)
341            for s in intervals_to_clear.values()
342            if s.snapshot.name not in deployed_during_restatement
343        ]
344
345        if filtered_intervals_to_clear:
346            # We still clear intervals in other envs for models that were successfully restated without having new versions promoted during restatement
347            self.state_sync.remove_intervals(
348                snapshot_intervals=filtered_intervals_to_clear,
349                remove_shared_versions=plan.is_prod,
350            )
351
352        if deployed_env and deployed_during_restatement:
353            self.console.log_models_updated_during_restatement(
354                list(deployed_during_restatement.values()),
355                plan.environment.naming_info,
356                self.default_catalog,
357            )
358            raise ConflictingPlanError(
359                f"Another plan ({deployed_env.summary.plan_id}) deployed new versions of {len(deployed_during_restatement)} models in the target environment '{plan.environment.name}' while they were being restated by this plan.\n"
360                "Please re-apply your plan if these new versions should be restated."
361            )
362
363    def visit_environment_record_update_stage(
364        self, stage: stages.EnvironmentRecordUpdateStage, plan: EvaluatablePlan
365    ) -> None:
366        self.state_sync.promote(
367            plan.environment,
368            no_gaps_snapshot_names=stage.no_gaps_snapshot_names if plan.no_gaps else set(),
369            environment_statements=plan.environment_statements,
370        )
371
372    def visit_migrate_schemas_stage(
373        self, stage: stages.MigrateSchemasStage, plan: EvaluatablePlan
374    ) -> None:
375        try:
376            self.snapshot_evaluator.migrate(
377                stage.snapshots,
378                stage.all_snapshots,
379                allow_destructive_snapshots=plan.allow_destructive_models,
380                allow_additive_snapshots=plan.allow_additive_models,
381                deployability_index=stage.deployability_index,
382            )
383        except NodeExecutionFailedError as ex:
384            raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex))
385
386    def visit_unpause_stage(self, stage: stages.UnpauseStage, plan: EvaluatablePlan) -> None:
387        self.state_sync.unpause_snapshots(stage.promoted_snapshots, plan.end)
388
389    def visit_virtual_layer_update_stage(
390        self, stage: stages.VirtualLayerUpdateStage, plan: EvaluatablePlan
391    ) -> None:
392        environment = plan.environment
393
394        self.console.start_promotion_progress(
395            list(stage.promoted_snapshots) + list(stage.demoted_snapshots),
396            environment.naming_info,
397            self.default_catalog,
398        )
399
400        completed = False
401        try:
402            self._promote_snapshots(
403                plan,
404                [stage.all_snapshots[s.snapshot_id] for s in stage.promoted_snapshots],
405                environment.naming_info,
406                deployability_index=stage.deployability_index,
407                on_complete=lambda s: self.console.update_promotion_progress(s, True),
408                snapshots=stage.all_snapshots,
409            )
410            if stage.demoted_environment_naming_info:
411                self._demote_snapshots(
412                    [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots],
413                    stage.demoted_environment_naming_info,
414                    deployability_index=stage.deployability_index,
415                    on_complete=lambda s: self.console.update_promotion_progress(s, False),
416                    snapshots=stage.all_snapshots,
417                )
418
419            completed = True
420        finally:
421            self.console.stop_promotion_progress(success=completed)
422
423    def visit_finalize_environment_stage(
424        self, stage: stages.FinalizeEnvironmentStage, plan: EvaluatablePlan
425    ) -> None:
426        self.state_sync.finalize(plan.environment)
427
428    def _promote_snapshots(
429        self,
430        plan: EvaluatablePlan,
431        target_snapshots: t.Iterable[Snapshot],
432        environment_naming_info: EnvironmentNamingInfo,
433        snapshots: t.Dict[SnapshotId, Snapshot],
434        deployability_index: t.Optional[DeployabilityIndex] = None,
435        on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
436    ) -> None:
437        self.snapshot_evaluator.promote(
438            target_snapshots,
439            start=plan.start,
440            end=plan.end,
441            execution_time=plan.execution_time or now(),
442            snapshots=snapshots,
443            table_mapping=to_view_mapping(
444                snapshots.values(),
445                environment_naming_info,
446                default_catalog=self.default_catalog,
447                dialect=self.snapshot_evaluator.adapter.dialect,
448            ),
449            environment_naming_info=environment_naming_info,
450            deployability_index=deployability_index,
451            on_complete=on_complete,
452        )
453
454    def _demote_snapshots(
455        self,
456        target_snapshots: t.Iterable[Snapshot],
457        environment_naming_info: EnvironmentNamingInfo,
458        snapshots: t.Dict[SnapshotId, Snapshot],
459        deployability_index: t.Optional[DeployabilityIndex] = None,
460        on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
461    ) -> None:
462        self.snapshot_evaluator.demote(
463            target_snapshots,
464            environment_naming_info,
465            table_mapping=to_view_mapping(
466                snapshots.values(),
467                environment_naming_info,
468                default_catalog=self.default_catalog,
469                dialect=self.snapshot_evaluator.adapter.dialect,
470            ),
471            deployability_index=deployability_index,
472            on_complete=on_complete,
473        )
474
475    def _update_intervals_for_new_snapshots(self, snapshots: t.Collection[Snapshot]) -> None:
476        snapshots_intervals: t.List[SnapshotIntervals] = []
477        for snapshot in snapshots:
478            if snapshot.is_forward_only:
479                snapshots_intervals.append(
480                    SnapshotIntervals(
481                        name=snapshot.name,
482                        identifier=snapshot.identifier,
483                        version=snapshot.version,
484                        dev_version=snapshot.dev_version,
485                        dev_intervals=snapshot.dev_intervals,
486                    )
487                )
488
489        if snapshots_intervals:
490            self.state_sync.add_snapshots_intervals(snapshots_intervals)
logger = <Logger sqlmesh.core.plan.evaluator (WARNING)>
class PlanEvaluator(abc.ABC):
50class PlanEvaluator(abc.ABC):
51    @abc.abstractmethod
52    def evaluate(
53        self,
54        plan: EvaluatablePlan,
55        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
56    ) -> None:
57        """Evaluates a plan by pushing snapshots and backfilling data.
58
59        Given a plan, it pushes snapshots into the state and then kicks off
60        the backfill process for all affected snapshots. Once backfill is done,
61        snapshots that are part of the plan are promoted in the environment targeted
62        by this plan.
63
64        Args:
65            plan: The plan to evaluate.
66            circuit_breaker: The circuit breaker to use.
67        """

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def evaluate( self, plan: sqlmesh.core.plan.definition.EvaluatablePlan, circuit_breaker: Optional[Callable[[], bool]] = None) -> None:
51    @abc.abstractmethod
52    def evaluate(
53        self,
54        plan: EvaluatablePlan,
55        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
56    ) -> None:
57        """Evaluates a plan by pushing snapshots and backfilling data.
58
59        Given a plan, it pushes snapshots into the state and then kicks off
60        the backfill process for all affected snapshots. Once backfill is done,
61        snapshots that are part of the plan are promoted in the environment targeted
62        by this plan.
63
64        Args:
65            plan: The plan to evaluate.
66            circuit_breaker: The circuit breaker to use.
67        """

Evaluates a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.

Arguments:
  • plan: The plan to evaluate.
  • circuit_breaker: The circuit breaker to use.
class BuiltInPlanEvaluator(PlanEvaluator):
 70class BuiltInPlanEvaluator(PlanEvaluator):
 71    def __init__(
 72        self,
 73        state_sync: StateSync,
 74        snapshot_evaluator: SnapshotEvaluator,
 75        create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler],
 76        default_catalog: t.Optional[str],
 77        console: t.Optional[Console] = None,
 78    ):
 79        self.state_sync = state_sync
 80        self.snapshot_evaluator = snapshot_evaluator
 81        self.create_scheduler = create_scheduler
 82        self.default_catalog = default_catalog
 83        self.console = console or get_console()
 84        self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None
 85
 86    def evaluate(
 87        self,
 88        plan: EvaluatablePlan,
 89        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 90    ) -> None:
 91        self._circuit_breaker = circuit_breaker
 92        self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id(
 93            CorrelationId.from_plan_id(plan.plan_id)
 94        )
 95
 96        self.console.start_plan_evaluation(plan)
 97        analytics.collector.on_plan_apply_start(
 98            plan=plan,
 99            engine_type=self.snapshot_evaluator.adapter.dialect,
100            state_sync_type=self.state_sync.state_type(),
101            scheduler_type=c.BUILTIN,
102        )
103
104        try:
105            plan_stages = stages.build_plan_stages(plan, self.state_sync, self.default_catalog)
106            self._evaluate_stages(plan_stages, plan)
107        except Exception as e:
108            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e)
109            raise
110        else:
111            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id)
112        finally:
113            self.snapshot_evaluator.recycle()
114            self.console.stop_plan_evaluation()
115
116    def _evaluate_stages(
117        self, plan_stages: t.List[stages.PlanStage], plan: EvaluatablePlan
118    ) -> None:
119        for stage in plan_stages:
120            stage_name = stage.__class__.__name__
121            handler_name = f"visit_{to_snake_case(stage_name)}"
122            if not hasattr(self, handler_name):
123                raise SQLMeshError(f"Unexpected plan stage: {stage_name}")
124            logger.info("Evaluating plan stage %s", stage_name)
125            handler = getattr(self, handler_name)
126            handler(stage, plan)
127
128    def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: EvaluatablePlan) -> None:
129        execute_environment_statements(
130            adapter=self.snapshot_evaluator.adapter,
131            environment_statements=stage.statements,
132            runtime_stage=RuntimeStage.BEFORE_ALL,
133            environment_naming_info=plan.environment.naming_info,
134            default_catalog=self.default_catalog,
135            snapshots=stage.all_snapshots,
136            start=plan.start,
137            end=plan.end,
138            execution_time=plan.execution_time,
139            selected_models=plan.selected_models,
140        )
141
142    def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None:
143        execute_environment_statements(
144            adapter=self.snapshot_evaluator.adapter,
145            environment_statements=stage.statements,
146            runtime_stage=RuntimeStage.AFTER_ALL,
147            environment_naming_info=plan.environment.naming_info,
148            default_catalog=self.default_catalog,
149            snapshots=stage.all_snapshots,
150            start=plan.start,
151            end=plan.end,
152            execution_time=plan.execution_time,
153            selected_models=plan.selected_models,
154        )
155
156    def visit_create_snapshot_records_stage(
157        self, stage: stages.CreateSnapshotRecordsStage, plan: EvaluatablePlan
158    ) -> None:
159        self.state_sync.push_snapshots(stage.snapshots)
160        analytics.collector.on_snapshots_created(
161            new_snapshots=stage.snapshots, plan_id=plan.plan_id
162        )
163        # Update the intervals for the new forward-only snapshots
164        self._update_intervals_for_new_snapshots(stage.snapshots)
165
166    def visit_physical_layer_update_stage(
167        self, stage: stages.PhysicalLayerUpdateStage, plan: EvaluatablePlan
168    ) -> None:
169        skip_message = "" if plan.restatements else "\nSKIP: No physical layer updates to perform"
170
171        snapshots_to_create = stage.snapshots
172        if not snapshots_to_create:
173            self.console.log_success(skip_message)
174            return
175
176        completion_status = None
177        progress_stopped = False
178        try:
179            completion_status = self.snapshot_evaluator.create(
180                snapshots_to_create,
181                stage.all_snapshots,
182                allow_destructive_snapshots=plan.allow_destructive_models,
183                allow_additive_snapshots=plan.allow_additive_models,
184                deployability_index=stage.deployability_index,
185                on_start=lambda x: self.console.start_creation_progress(
186                    x, plan.environment, self.default_catalog
187                ),
188                on_complete=self.console.update_creation_progress,
189            )
190            if completion_status.is_nothing_to_do:
191                self.console.log_success(skip_message)
192                return
193        except SnapshotCreationFailedError as ex:
194            self.console.stop_creation_progress(success=False)
195            progress_stopped = True
196
197            for error in ex.errors:
198                logger.info(str(error), exc_info=error)
199
200            self.console.log_skipped_models({s.name for s in ex.skipped})
201            self.console.log_failed_models(ex.errors)
202
203            raise PlanError("Plan application failed.")
204        finally:
205            if not progress_stopped:
206                self.console.stop_creation_progress(
207                    success=completion_status is not None and completion_status.is_success
208                )
209
210    def visit_physical_layer_schema_creation_stage(
211        self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
212    ) -> None:
213        try:
214            self.snapshot_evaluator.create_physical_schemas(
215                stage.snapshots, stage.deployability_index
216            )
217        except Exception as ex:
218            raise PlanError("Plan application failed.") from ex
219
220    def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None:
221        if plan.empty_backfill:
222            intervals_to_add = []
223            for snapshot in stage.all_snapshots.values():
224                if not snapshot.evaluatable or not plan.is_selected_for_backfill(snapshot.name):
225                    # Skip snapshots that are not evaluatable or not selected for backfill.
226                    continue
227                intervals = [
228                    snapshot.inclusive_exclusive(plan.start, plan.end, strict=False, expand=False)
229                ]
230                is_deployable = stage.deployability_index.is_deployable(snapshot)
231                intervals_to_add.append(
232                    SnapshotIntervals(
233                        name=snapshot.name,
234                        identifier=snapshot.identifier,
235                        version=snapshot.version,
236                        dev_version=snapshot.dev_version,
237                        intervals=intervals if is_deployable else [],
238                        dev_intervals=intervals if not is_deployable else [],
239                    )
240                )
241            self.state_sync.add_snapshots_intervals(intervals_to_add)
242            self.console.log_success("SKIP: No model batches to execute")
243            return
244
245        if not stage.snapshot_to_intervals:
246            self.console.log_success("SKIP: No model batches to execute")
247            return
248
249        scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
250        errors, _ = scheduler.run_merged_intervals(
251            merged_intervals=stage.snapshot_to_intervals,
252            deployability_index=stage.deployability_index,
253            environment_naming_info=plan.environment.naming_info,
254            execution_time=plan.execution_time,
255            circuit_breaker=self._circuit_breaker,
256            start=plan.start,
257            end=plan.end,
258            allow_destructive_snapshots=plan.allow_destructive_models,
259            allow_additive_snapshots=plan.allow_additive_models,
260            selected_snapshot_ids=stage.selected_snapshot_ids,
261            selected_models=plan.selected_models,
262            is_restatement=bool(plan.restatements),
263        )
264        if errors:
265            raise PlanError("Plan application failed.")
266
267    def visit_audit_only_run_stage(
268        self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan
269    ) -> None:
270        audit_snapshots = stage.snapshots
271        if not audit_snapshots:
272            return
273
274        # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
275        scheduler = self.create_scheduler(audit_snapshots, self.snapshot_evaluator)
276        completion_status = scheduler.audit(
277            plan.environment,
278            plan.start,
279            plan.end,
280            execution_time=plan.execution_time,
281            end_bounded=plan.end_bounded,
282            start_override_per_model=plan.start_override_per_model,
283            end_override_per_model=plan.end_override_per_model,
284        )
285
286        if completion_status.is_failure:
287            raise PlanError("Plan application failed.")
288
289    def visit_restatement_stage(
290        self, stage: stages.RestatementStage, plan: EvaluatablePlan
291    ) -> None:
292        # Restating intervals on prod plans means that once the data for the intervals being restated has been backfilled
293        # (which happens in the backfill stage) then we need to clear those intervals *from state* across all other environments.
294        #
295        # This ensures that work done in dev environments can still be promoted to prod by forcing dev environments to
296        # re-run intervals that changed in prod (because after this stage runs they are cleared from state and thus show as missing)
297        #
298        # It also means that any new dev environments created while this restatement plan was running also get the
299        # correct intervals cleared because we look up matching snapshots as at right now and not as at the time the plan
300        # was created, which could have been several hours ago if there was a lot of data to restate.
301        #
302        # Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
303
304        intervals_to_clear = identify_restatement_intervals_across_snapshot_versions(
305            state_reader=self.state_sync,
306            prod_restatements=plan.restatements,
307            disable_restatement_models=plan.disabled_restatement_models,
308            loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()},
309            current_ts=to_timestamp(plan.execution_time or now()),
310        )
311
312        if not intervals_to_clear:
313            # Nothing to do
314            return
315
316        # While the restatements were being processed, did any of the snapshots being restated get new versions deployed?
317        # If they did, they will not reflect the data that just got restated, so we need to notify the user
318        deployed_during_restatement: t.Dict[
319            str, t.Tuple[SnapshotTableInfo, SnapshotTableInfo]
320        ] = {}  # tuple of (restated_snapshot, current_prod_snapshot)
321
322        if deployed_env := self.state_sync.get_environment(plan.environment.name):
323            promoted_snapshots_by_name = {s.name: s for s in deployed_env.snapshots}
324
325            for name in plan.restatements:
326                snapshot = stage.all_snapshots[name]
327                version = snapshot.table_info.version
328                if (
329                    prod_snapshot := promoted_snapshots_by_name.get(name)
330                ) and prod_snapshot.version != version:
331                    deployed_during_restatement[name] = (
332                        snapshot.table_info,
333                        prod_snapshot.table_info,
334                    )
335
336        # we need to *not* clear the intervals on the snapshots where new versions were deployed while the restatement was running in order to prevent
337        # subsequent plans from having unexpected intervals to backfill.
338        # we instead list the affected models and abort the plan with an error so the user can decide what to do
339        # (either re-attempt the restatement plan or leave things as they are)
340        filtered_intervals_to_clear = [
341            (s.snapshot, s.interval)
342            for s in intervals_to_clear.values()
343            if s.snapshot.name not in deployed_during_restatement
344        ]
345
346        if filtered_intervals_to_clear:
347            # We still clear intervals in other envs for models that were successfully restated without having new versions promoted during restatement
348            self.state_sync.remove_intervals(
349                snapshot_intervals=filtered_intervals_to_clear,
350                remove_shared_versions=plan.is_prod,
351            )
352
353        if deployed_env and deployed_during_restatement:
354            self.console.log_models_updated_during_restatement(
355                list(deployed_during_restatement.values()),
356                plan.environment.naming_info,
357                self.default_catalog,
358            )
359            raise ConflictingPlanError(
360                f"Another plan ({deployed_env.summary.plan_id}) deployed new versions of {len(deployed_during_restatement)} models in the target environment '{plan.environment.name}' while they were being restated by this plan.\n"
361                "Please re-apply your plan if these new versions should be restated."
362            )
363
364    def visit_environment_record_update_stage(
365        self, stage: stages.EnvironmentRecordUpdateStage, plan: EvaluatablePlan
366    ) -> None:
367        self.state_sync.promote(
368            plan.environment,
369            no_gaps_snapshot_names=stage.no_gaps_snapshot_names if plan.no_gaps else set(),
370            environment_statements=plan.environment_statements,
371        )
372
373    def visit_migrate_schemas_stage(
374        self, stage: stages.MigrateSchemasStage, plan: EvaluatablePlan
375    ) -> None:
376        try:
377            self.snapshot_evaluator.migrate(
378                stage.snapshots,
379                stage.all_snapshots,
380                allow_destructive_snapshots=plan.allow_destructive_models,
381                allow_additive_snapshots=plan.allow_additive_models,
382                deployability_index=stage.deployability_index,
383            )
384        except NodeExecutionFailedError as ex:
385            raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex))
386
387    def visit_unpause_stage(self, stage: stages.UnpauseStage, plan: EvaluatablePlan) -> None:
388        self.state_sync.unpause_snapshots(stage.promoted_snapshots, plan.end)
389
390    def visit_virtual_layer_update_stage(
391        self, stage: stages.VirtualLayerUpdateStage, plan: EvaluatablePlan
392    ) -> None:
393        environment = plan.environment
394
395        self.console.start_promotion_progress(
396            list(stage.promoted_snapshots) + list(stage.demoted_snapshots),
397            environment.naming_info,
398            self.default_catalog,
399        )
400
401        completed = False
402        try:
403            self._promote_snapshots(
404                plan,
405                [stage.all_snapshots[s.snapshot_id] for s in stage.promoted_snapshots],
406                environment.naming_info,
407                deployability_index=stage.deployability_index,
408                on_complete=lambda s: self.console.update_promotion_progress(s, True),
409                snapshots=stage.all_snapshots,
410            )
411            if stage.demoted_environment_naming_info:
412                self._demote_snapshots(
413                    [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots],
414                    stage.demoted_environment_naming_info,
415                    deployability_index=stage.deployability_index,
416                    on_complete=lambda s: self.console.update_promotion_progress(s, False),
417                    snapshots=stage.all_snapshots,
418                )
419
420            completed = True
421        finally:
422            self.console.stop_promotion_progress(success=completed)
423
424    def visit_finalize_environment_stage(
425        self, stage: stages.FinalizeEnvironmentStage, plan: EvaluatablePlan
426    ) -> None:
427        self.state_sync.finalize(plan.environment)
428
429    def _promote_snapshots(
430        self,
431        plan: EvaluatablePlan,
432        target_snapshots: t.Iterable[Snapshot],
433        environment_naming_info: EnvironmentNamingInfo,
434        snapshots: t.Dict[SnapshotId, Snapshot],
435        deployability_index: t.Optional[DeployabilityIndex] = None,
436        on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
437    ) -> None:
438        self.snapshot_evaluator.promote(
439            target_snapshots,
440            start=plan.start,
441            end=plan.end,
442            execution_time=plan.execution_time or now(),
443            snapshots=snapshots,
444            table_mapping=to_view_mapping(
445                snapshots.values(),
446                environment_naming_info,
447                default_catalog=self.default_catalog,
448                dialect=self.snapshot_evaluator.adapter.dialect,
449            ),
450            environment_naming_info=environment_naming_info,
451            deployability_index=deployability_index,
452            on_complete=on_complete,
453        )
454
455    def _demote_snapshots(
456        self,
457        target_snapshots: t.Iterable[Snapshot],
458        environment_naming_info: EnvironmentNamingInfo,
459        snapshots: t.Dict[SnapshotId, Snapshot],
460        deployability_index: t.Optional[DeployabilityIndex] = None,
461        on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
462    ) -> None:
463        self.snapshot_evaluator.demote(
464            target_snapshots,
465            environment_naming_info,
466            table_mapping=to_view_mapping(
467                snapshots.values(),
468                environment_naming_info,
469                default_catalog=self.default_catalog,
470                dialect=self.snapshot_evaluator.adapter.dialect,
471            ),
472            deployability_index=deployability_index,
473            on_complete=on_complete,
474        )
475
476    def _update_intervals_for_new_snapshots(self, snapshots: t.Collection[Snapshot]) -> None:
477        snapshots_intervals: t.List[SnapshotIntervals] = []
478        for snapshot in snapshots:
479            if snapshot.is_forward_only:
480                snapshots_intervals.append(
481                    SnapshotIntervals(
482                        name=snapshot.name,
483                        identifier=snapshot.identifier,
484                        version=snapshot.version,
485                        dev_version=snapshot.dev_version,
486                        dev_intervals=snapshot.dev_intervals,
487                    )
488                )
489
490        if snapshots_intervals:
491            self.state_sync.add_snapshots_intervals(snapshots_intervals)

Helper class that provides a standard way to create an ABC using inheritance.

BuiltInPlanEvaluator( state_sync: sqlmesh.core.state_sync.base.StateSync, snapshot_evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, create_scheduler: Callable[[Iterable[sqlmesh.core.snapshot.definition.Snapshot], sqlmesh.core.snapshot.evaluator.SnapshotEvaluator], sqlmesh.core.scheduler.Scheduler], default_catalog: Optional[str], console: Optional[sqlmesh.core.console.Console] = None)
71    def __init__(
72        self,
73        state_sync: StateSync,
74        snapshot_evaluator: SnapshotEvaluator,
75        create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler],
76        default_catalog: t.Optional[str],
77        console: t.Optional[Console] = None,
78    ):
79        self.state_sync = state_sync
80        self.snapshot_evaluator = snapshot_evaluator
81        self.create_scheduler = create_scheduler
82        self.default_catalog = default_catalog
83        self.console = console or get_console()
84        self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None
state_sync
snapshot_evaluator
create_scheduler
default_catalog
console
def evaluate( self, plan: sqlmesh.core.plan.definition.EvaluatablePlan, circuit_breaker: Optional[Callable[[], bool]] = None) -> None:
 86    def evaluate(
 87        self,
 88        plan: EvaluatablePlan,
 89        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 90    ) -> None:
 91        self._circuit_breaker = circuit_breaker
 92        self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id(
 93            CorrelationId.from_plan_id(plan.plan_id)
 94        )
 95
 96        self.console.start_plan_evaluation(plan)
 97        analytics.collector.on_plan_apply_start(
 98            plan=plan,
 99            engine_type=self.snapshot_evaluator.adapter.dialect,
100            state_sync_type=self.state_sync.state_type(),
101            scheduler_type=c.BUILTIN,
102        )
103
104        try:
105            plan_stages = stages.build_plan_stages(plan, self.state_sync, self.default_catalog)
106            self._evaluate_stages(plan_stages, plan)
107        except Exception as e:
108            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e)
109            raise
110        else:
111            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id)
112        finally:
113            self.snapshot_evaluator.recycle()
114            self.console.stop_plan_evaluation()

Evaluates a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.

Arguments:
  • plan: The plan to evaluate.
  • circuit_breaker: The circuit breaker to use.
def visit_before_all_stage( self, stage: sqlmesh.core.plan.stages.BeforeAllStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
128    def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: EvaluatablePlan) -> None:
129        execute_environment_statements(
130            adapter=self.snapshot_evaluator.adapter,
131            environment_statements=stage.statements,
132            runtime_stage=RuntimeStage.BEFORE_ALL,
133            environment_naming_info=plan.environment.naming_info,
134            default_catalog=self.default_catalog,
135            snapshots=stage.all_snapshots,
136            start=plan.start,
137            end=plan.end,
138            execution_time=plan.execution_time,
139            selected_models=plan.selected_models,
140        )
def visit_after_all_stage( self, stage: sqlmesh.core.plan.stages.AfterAllStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
142    def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None:
143        execute_environment_statements(
144            adapter=self.snapshot_evaluator.adapter,
145            environment_statements=stage.statements,
146            runtime_stage=RuntimeStage.AFTER_ALL,
147            environment_naming_info=plan.environment.naming_info,
148            default_catalog=self.default_catalog,
149            snapshots=stage.all_snapshots,
150            start=plan.start,
151            end=plan.end,
152            execution_time=plan.execution_time,
153            selected_models=plan.selected_models,
154        )
def visit_create_snapshot_records_stage( self, stage: sqlmesh.core.plan.stages.CreateSnapshotRecordsStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
156    def visit_create_snapshot_records_stage(
157        self, stage: stages.CreateSnapshotRecordsStage, plan: EvaluatablePlan
158    ) -> None:
159        self.state_sync.push_snapshots(stage.snapshots)
160        analytics.collector.on_snapshots_created(
161            new_snapshots=stage.snapshots, plan_id=plan.plan_id
162        )
163        # Update the intervals for the new forward-only snapshots
164        self._update_intervals_for_new_snapshots(stage.snapshots)
def visit_physical_layer_update_stage( self, stage: sqlmesh.core.plan.stages.PhysicalLayerUpdateStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
166    def visit_physical_layer_update_stage(
167        self, stage: stages.PhysicalLayerUpdateStage, plan: EvaluatablePlan
168    ) -> None:
169        skip_message = "" if plan.restatements else "\nSKIP: No physical layer updates to perform"
170
171        snapshots_to_create = stage.snapshots
172        if not snapshots_to_create:
173            self.console.log_success(skip_message)
174            return
175
176        completion_status = None
177        progress_stopped = False
178        try:
179            completion_status = self.snapshot_evaluator.create(
180                snapshots_to_create,
181                stage.all_snapshots,
182                allow_destructive_snapshots=plan.allow_destructive_models,
183                allow_additive_snapshots=plan.allow_additive_models,
184                deployability_index=stage.deployability_index,
185                on_start=lambda x: self.console.start_creation_progress(
186                    x, plan.environment, self.default_catalog
187                ),
188                on_complete=self.console.update_creation_progress,
189            )
190            if completion_status.is_nothing_to_do:
191                self.console.log_success(skip_message)
192                return
193        except SnapshotCreationFailedError as ex:
194            self.console.stop_creation_progress(success=False)
195            progress_stopped = True
196
197            for error in ex.errors:
198                logger.info(str(error), exc_info=error)
199
200            self.console.log_skipped_models({s.name for s in ex.skipped})
201            self.console.log_failed_models(ex.errors)
202
203            raise PlanError("Plan application failed.")
204        finally:
205            if not progress_stopped:
206                self.console.stop_creation_progress(
207                    success=completion_status is not None and completion_status.is_success
208                )
def visit_physical_layer_schema_creation_stage( self, stage: sqlmesh.core.plan.stages.PhysicalLayerSchemaCreationStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
210    def visit_physical_layer_schema_creation_stage(
211        self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
212    ) -> None:
213        try:
214            self.snapshot_evaluator.create_physical_schemas(
215                stage.snapshots, stage.deployability_index
216            )
217        except Exception as ex:
218            raise PlanError("Plan application failed.") from ex
def visit_backfill_stage( self, stage: sqlmesh.core.plan.stages.BackfillStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
220    def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None:
221        if plan.empty_backfill:
222            intervals_to_add = []
223            for snapshot in stage.all_snapshots.values():
224                if not snapshot.evaluatable or not plan.is_selected_for_backfill(snapshot.name):
225                    # Skip snapshots that are not evaluatable or not selected for backfill.
226                    continue
227                intervals = [
228                    snapshot.inclusive_exclusive(plan.start, plan.end, strict=False, expand=False)
229                ]
230                is_deployable = stage.deployability_index.is_deployable(snapshot)
231                intervals_to_add.append(
232                    SnapshotIntervals(
233                        name=snapshot.name,
234                        identifier=snapshot.identifier,
235                        version=snapshot.version,
236                        dev_version=snapshot.dev_version,
237                        intervals=intervals if is_deployable else [],
238                        dev_intervals=intervals if not is_deployable else [],
239                    )
240                )
241            self.state_sync.add_snapshots_intervals(intervals_to_add)
242            self.console.log_success("SKIP: No model batches to execute")
243            return
244
245        if not stage.snapshot_to_intervals:
246            self.console.log_success("SKIP: No model batches to execute")
247            return
248
249        scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
250        errors, _ = scheduler.run_merged_intervals(
251            merged_intervals=stage.snapshot_to_intervals,
252            deployability_index=stage.deployability_index,
253            environment_naming_info=plan.environment.naming_info,
254            execution_time=plan.execution_time,
255            circuit_breaker=self._circuit_breaker,
256            start=plan.start,
257            end=plan.end,
258            allow_destructive_snapshots=plan.allow_destructive_models,
259            allow_additive_snapshots=plan.allow_additive_models,
260            selected_snapshot_ids=stage.selected_snapshot_ids,
261            selected_models=plan.selected_models,
262            is_restatement=bool(plan.restatements),
263        )
264        if errors:
265            raise PlanError("Plan application failed.")
def visit_audit_only_run_stage( self, stage: sqlmesh.core.plan.stages.AuditOnlyRunStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
267    def visit_audit_only_run_stage(
268        self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan
269    ) -> None:
270        audit_snapshots = stage.snapshots
271        if not audit_snapshots:
272            return
273
274        # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
275        scheduler = self.create_scheduler(audit_snapshots, self.snapshot_evaluator)
276        completion_status = scheduler.audit(
277            plan.environment,
278            plan.start,
279            plan.end,
280            execution_time=plan.execution_time,
281            end_bounded=plan.end_bounded,
282            start_override_per_model=plan.start_override_per_model,
283            end_override_per_model=plan.end_override_per_model,
284        )
285
286        if completion_status.is_failure:
287            raise PlanError("Plan application failed.")
def visit_restatement_stage( self, stage: sqlmesh.core.plan.stages.RestatementStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
289    def visit_restatement_stage(
290        self, stage: stages.RestatementStage, plan: EvaluatablePlan
291    ) -> None:
292        # Restating intervals on prod plans means that once the data for the intervals being restated has been backfilled
293        # (which happens in the backfill stage) then we need to clear those intervals *from state* across all other environments.
294        #
295        # This ensures that work done in dev environments can still be promoted to prod by forcing dev environments to
296        # re-run intervals that changed in prod (because after this stage runs they are cleared from state and thus show as missing)
297        #
298        # It also means that any new dev environments created while this restatement plan was running also get the
299        # correct intervals cleared because we look up matching snapshots as at right now and not as at the time the plan
300        # was created, which could have been several hours ago if there was a lot of data to restate.
301        #
302        # Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
303
304        intervals_to_clear = identify_restatement_intervals_across_snapshot_versions(
305            state_reader=self.state_sync,
306            prod_restatements=plan.restatements,
307            disable_restatement_models=plan.disabled_restatement_models,
308            loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()},
309            current_ts=to_timestamp(plan.execution_time or now()),
310        )
311
312        if not intervals_to_clear:
313            # Nothing to do
314            return
315
316        # While the restatements were being processed, did any of the snapshots being restated get new versions deployed?
317        # If they did, they will not reflect the data that just got restated, so we need to notify the user
318        deployed_during_restatement: t.Dict[
319            str, t.Tuple[SnapshotTableInfo, SnapshotTableInfo]
320        ] = {}  # tuple of (restated_snapshot, current_prod_snapshot)
321
322        if deployed_env := self.state_sync.get_environment(plan.environment.name):
323            promoted_snapshots_by_name = {s.name: s for s in deployed_env.snapshots}
324
325            for name in plan.restatements:
326                snapshot = stage.all_snapshots[name]
327                version = snapshot.table_info.version
328                if (
329                    prod_snapshot := promoted_snapshots_by_name.get(name)
330                ) and prod_snapshot.version != version:
331                    deployed_during_restatement[name] = (
332                        snapshot.table_info,
333                        prod_snapshot.table_info,
334                    )
335
336        # we need to *not* clear the intervals on the snapshots where new versions were deployed while the restatement was running in order to prevent
337        # subsequent plans from having unexpected intervals to backfill.
338        # we instead list the affected models and abort the plan with an error so the user can decide what to do
339        # (either re-attempt the restatement plan or leave things as they are)
340        filtered_intervals_to_clear = [
341            (s.snapshot, s.interval)
342            for s in intervals_to_clear.values()
343            if s.snapshot.name not in deployed_during_restatement
344        ]
345
346        if filtered_intervals_to_clear:
347            # We still clear intervals in other envs for models that were successfully restated without having new versions promoted during restatement
348            self.state_sync.remove_intervals(
349                snapshot_intervals=filtered_intervals_to_clear,
350                remove_shared_versions=plan.is_prod,
351            )
352
353        if deployed_env and deployed_during_restatement:
354            self.console.log_models_updated_during_restatement(
355                list(deployed_during_restatement.values()),
356                plan.environment.naming_info,
357                self.default_catalog,
358            )
359            raise ConflictingPlanError(
360                f"Another plan ({deployed_env.summary.plan_id}) deployed new versions of {len(deployed_during_restatement)} models in the target environment '{plan.environment.name}' while they were being restated by this plan.\n"
361                "Please re-apply your plan if these new versions should be restated."
362            )
def visit_environment_record_update_stage( self, stage: sqlmesh.core.plan.stages.EnvironmentRecordUpdateStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
364    def visit_environment_record_update_stage(
365        self, stage: stages.EnvironmentRecordUpdateStage, plan: EvaluatablePlan
366    ) -> None:
367        self.state_sync.promote(
368            plan.environment,
369            no_gaps_snapshot_names=stage.no_gaps_snapshot_names if plan.no_gaps else set(),
370            environment_statements=plan.environment_statements,
371        )
def visit_migrate_schemas_stage( self, stage: sqlmesh.core.plan.stages.MigrateSchemasStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
373    def visit_migrate_schemas_stage(
374        self, stage: stages.MigrateSchemasStage, plan: EvaluatablePlan
375    ) -> None:
376        try:
377            self.snapshot_evaluator.migrate(
378                stage.snapshots,
379                stage.all_snapshots,
380                allow_destructive_snapshots=plan.allow_destructive_models,
381                allow_additive_snapshots=plan.allow_additive_models,
382                deployability_index=stage.deployability_index,
383            )
384        except NodeExecutionFailedError as ex:
385            raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex))
def visit_unpause_stage( self, stage: sqlmesh.core.plan.stages.UnpauseStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
387    def visit_unpause_stage(self, stage: stages.UnpauseStage, plan: EvaluatablePlan) -> None:
388        self.state_sync.unpause_snapshots(stage.promoted_snapshots, plan.end)
def visit_virtual_layer_update_stage( self, stage: sqlmesh.core.plan.stages.VirtualLayerUpdateStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
390    def visit_virtual_layer_update_stage(
391        self, stage: stages.VirtualLayerUpdateStage, plan: EvaluatablePlan
392    ) -> None:
393        environment = plan.environment
394
395        self.console.start_promotion_progress(
396            list(stage.promoted_snapshots) + list(stage.demoted_snapshots),
397            environment.naming_info,
398            self.default_catalog,
399        )
400
401        completed = False
402        try:
403            self._promote_snapshots(
404                plan,
405                [stage.all_snapshots[s.snapshot_id] for s in stage.promoted_snapshots],
406                environment.naming_info,
407                deployability_index=stage.deployability_index,
408                on_complete=lambda s: self.console.update_promotion_progress(s, True),
409                snapshots=stage.all_snapshots,
410            )
411            if stage.demoted_environment_naming_info:
412                self._demote_snapshots(
413                    [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots],
414                    stage.demoted_environment_naming_info,
415                    deployability_index=stage.deployability_index,
416                    on_complete=lambda s: self.console.update_promotion_progress(s, False),
417                    snapshots=stage.all_snapshots,
418                )
419
420            completed = True
421        finally:
422            self.console.stop_promotion_progress(success=completed)
def visit_finalize_environment_stage( self, stage: sqlmesh.core.plan.stages.FinalizeEnvironmentStage, plan: sqlmesh.core.plan.definition.EvaluatablePlan) -> None:
424    def visit_finalize_environment_stage(
425        self, stage: stages.FinalizeEnvironmentStage, plan: EvaluatablePlan
426    ) -> None:
427        self.state_sync.finalize(plan.environment)