Edit on GitHub

PlanEvaluator

A plan evaluator is responsible for evaluating a plan when it is being applied.

Evaluation steps

At a high level, when a plan is evaluated, SQLMesh will:

  • Push new snapshots to the state sync.
  • Create snapshot tables.
  • Backfill data.
  • Promote the snapshots.

Refer to sqlmesh.core.plan.

  1"""
  2# PlanEvaluator
  3
  4A plan evaluator is responsible for evaluating a plan when it is being applied.
  5
  6# Evaluation steps
  7
  8At a high level, when a plan is evaluated, SQLMesh will:
  9- Push new snapshots to the state sync.
 10- Create snapshot tables.
 11- Backfill data.
 12- Promote the snapshots.
 13
 14Refer to `sqlmesh.core.plan`.
 15"""
 16
 17import abc
 18import logging
 19import typing as t
 20
 21from sqlmesh.core import analytics
 22from sqlmesh.core import constants as c
 23from sqlmesh.core.console import Console, get_console
 24from sqlmesh.core.notification_target import (
 25    NotificationTarget,
 26    NotificationTargetManager,
 27)
 28from sqlmesh.core.plan.definition import Plan
 29from sqlmesh.core.scheduler import Scheduler
 30from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot, SnapshotEvaluator
 31from sqlmesh.core.state_sync import StateSync
 32from sqlmesh.core.state_sync.base import PromotionResult
 33from sqlmesh.core.user import User
 34from sqlmesh.schedulers.airflow import common as airflow_common
 35from sqlmesh.schedulers.airflow.client import AirflowClient, BaseAirflowClient
 36from sqlmesh.schedulers.airflow.mwaa_client import MWAAClient
 37from sqlmesh.utils.errors import SQLMeshError
 38
 39logger = logging.getLogger(__name__)
 40
 41
 42class PlanEvaluator(abc.ABC):
 43    @abc.abstractmethod
 44    def evaluate(
 45        self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None
 46    ) -> None:
 47        """Evaluates a plan by pushing snapshots and backfilling data.
 48
 49        Given a plan, it pushes snapshots into the state and then kicks off
 50        the backfill process for all affected snapshots. Once backfill is done,
 51        snapshots that are part of the plan are promoted in the environment targeted
 52        by this plan.
 53
 54        Args:
 55            plan: The plan to evaluate.
 56        """
 57
 58
 59class BuiltInPlanEvaluator(PlanEvaluator):
 60    def __init__(
 61        self,
 62        state_sync: StateSync,
 63        snapshot_evaluator: SnapshotEvaluator,
 64        default_catalog: t.Optional[str],
 65        backfill_concurrent_tasks: int = 1,
 66        console: t.Optional[Console] = None,
 67        notification_target_manager: t.Optional[NotificationTargetManager] = None,
 68    ):
 69        self.state_sync = state_sync
 70        self.snapshot_evaluator = snapshot_evaluator
 71        self.default_catalog = default_catalog
 72        self.backfill_concurrent_tasks = backfill_concurrent_tasks
 73        self.console = console or get_console()
 74        self.notification_target_manager = notification_target_manager
 75
 76    def evaluate(
 77        self,
 78        plan: Plan,
 79        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 80    ) -> None:
 81        self.console.start_plan_evaluation(plan)
 82        analytics.collector.on_plan_apply_start(
 83            plan=plan,
 84            engine_type=self.snapshot_evaluator.adapter.dialect,
 85            state_sync_type=self.state_sync.state_type(),
 86            scheduler_type=c.BUILTIN,
 87        )
 88
 89        try:
 90            snapshots = plan.snapshots
 91            all_names = {
 92                s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
 93            }
 94            deployability_index_for_evaluation = DeployabilityIndex.create(snapshots)
 95            deployability_index_for_creation = deployability_index_for_evaluation
 96            if plan.is_dev:
 97                before_promote_snapshots = all_names
 98                after_promote_snapshots = set()
 99            else:
100                before_promote_snapshots = {
101                    s.name
102                    for s in snapshots.values()
103                    if deployability_index_for_evaluation.is_representative(s)
104                    and plan.is_selected_for_backfill(s.name)
105                }
106                after_promote_snapshots = all_names - before_promote_snapshots
107                deployability_index_for_evaluation = DeployabilityIndex.all_deployable()
108
109            self._push(plan, deployability_index_for_creation)
110            update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync)
111            self._restate(plan)
112            self._backfill(
113                plan,
114                before_promote_snapshots,
115                deployability_index_for_evaluation,
116                circuit_breaker=circuit_breaker,
117            )
118            promotion_result = self._promote(plan, before_promote_snapshots)
119            self._backfill(
120                plan,
121                after_promote_snapshots,
122                deployability_index_for_evaluation,
123                circuit_breaker=circuit_breaker,
124            )
125            self._update_views(plan, promotion_result, deployability_index_for_evaluation)
126
127            if not plan.requires_backfill:
128                self.console.log_success("Virtual Update executed successfully")
129        except Exception as e:
130            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e)
131            raise
132        finally:
133            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id)
134            self.console.stop_plan_evaluation()
135
136    def _backfill(
137        self,
138        plan: Plan,
139        selected_snapshots: t.Set[str],
140        deployability_index: DeployabilityIndex,
141        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
142    ) -> None:
143        """Backfill missing intervals for snapshots that are part of the given plan.
144
145        Args:
146            plan: The plan to source snapshots from.
147            selected_snapshots: The snapshots to backfill.
148        """
149        if not plan.requires_backfill or not selected_snapshots:
150            return
151
152        snapshots = plan.snapshots
153        scheduler = Scheduler(
154            snapshots.values(),
155            self.snapshot_evaluator,
156            self.state_sync,
157            default_catalog=self.default_catalog,
158            max_workers=self.backfill_concurrent_tasks,
159            console=self.console,
160            notification_target_manager=self.notification_target_manager,
161        )
162        is_run_successful = scheduler.run(
163            plan.environment_naming_info,
164            plan.start,
165            plan.end,
166            execution_time=plan.execution_time,
167            restatements=plan.restatements,
168            selected_snapshots=selected_snapshots,
169            deployability_index=deployability_index,
170            circuit_breaker=circuit_breaker,
171            end_bounded=plan.end_bounded,
172        )
173        if not is_run_successful:
174            raise SQLMeshError("Plan application failed.")
175
176    def _push(self, plan: Plan, deployability_index: t.Optional[DeployabilityIndex] = None) -> None:
177        """Push the snapshots to the state sync.
178
179        As a part of plan pushing, snapshot tables are created.
180
181        Args:
182            plan: The plan to source snapshots from.
183            deployability_index: Indicates which snapshots are deployable in the context of this creation.
184        """
185        snapshots_to_create = [
186            s
187            for s in plan.snapshots.values()
188            if s.is_model and not s.is_symbolic and plan.is_selected_for_backfill(s.name)
189        ]
190        snapshots_to_create_count = len(snapshots_to_create)
191
192        if snapshots_to_create_count > 0:
193            self.console.start_creation_progress(
194                snapshots_to_create_count, plan.environment_naming_info, self.default_catalog
195            )
196
197        completed = False
198        try:
199            self.snapshot_evaluator.create(
200                snapshots_to_create,
201                plan.snapshots,
202                deployability_index=deployability_index,
203                on_complete=self.console.update_creation_progress,
204            )
205            completed = True
206        finally:
207            self.console.stop_creation_progress(success=completed)
208
209        self.state_sync.push_snapshots(plan.new_snapshots)
210
211        analytics.collector.on_snapshots_created(
212            new_snapshots=plan.new_snapshots, plan_id=plan.plan_id
213        )
214
215    def _promote(
216        self, plan: Plan, no_gaps_snapshot_names: t.Optional[t.Set[str]] = None
217    ) -> PromotionResult:
218        """Promote a plan.
219
220        Args:
221            plan: The plan to promote.
222            no_gaps_snapshot_names: The names of snapshots to check for gaps if the no gaps check is enabled in the plan.
223            If not provided, all snapshots are checked.
224        """
225        promotion_result = self.state_sync.promote(
226            plan.environment,
227            no_gaps_snapshot_names=no_gaps_snapshot_names if plan.no_gaps else set(),
228        )
229
230        if not plan.is_dev:
231            self.snapshot_evaluator.migrate(
232                [s for s in plan.snapshots.values() if s.is_paused],
233                plan.snapshots,
234            )
235            if not plan.ensure_finalized_snapshots:
236                # Only unpause at this point if we don't have to use the finalized snapshots
237                # for subsequent plan applications. Otherwise, unpause right before finalizing
238                # the environment.
239                self.state_sync.unpause_snapshots(promotion_result.added, plan.end)
240
241        return promotion_result
242
243    def _update_views(
244        self,
245        plan: Plan,
246        promotion_result: PromotionResult,
247        deployability_index: t.Optional[DeployabilityIndex] = None,
248    ) -> None:
249        """Update environment views.
250
251        Args:
252            plan: The plan to promote.
253            promotion_result: The result of the promotion.
254            deployability_index: Indicates which snapshots are deployable in the context of this promotion.
255        """
256        if not plan.is_dev and plan.ensure_finalized_snapshots:
257            # Unpause right before finalizing the environment in case when
258            # we need to use the finalized snapshots for subsequent plan applications.
259            # Otherwise, unpause right after updatig the environment record.
260            self.state_sync.unpause_snapshots(promotion_result.added, plan.end)
261
262        environment = plan.environment
263
264        self.console.start_promotion_progress(
265            len(promotion_result.added) + len(promotion_result.removed),
266            environment.naming_info,
267            self.default_catalog,
268        )
269
270        completed = False
271        try:
272            self.snapshot_evaluator.promote(
273                [plan.context_diff.snapshots[s.snapshot_id] for s in promotion_result.added],
274                environment.naming_info,
275                deployability_index=deployability_index,
276                on_complete=lambda s: self.console.update_promotion_progress(s, True),
277            )
278            if promotion_result.removed_environment_naming_info:
279                self.snapshot_evaluator.demote(
280                    promotion_result.removed,
281                    promotion_result.removed_environment_naming_info,
282                    on_complete=lambda s: self.console.update_promotion_progress(s, False),
283                )
284            self.state_sync.finalize(environment)
285            completed = True
286        finally:
287            self.console.stop_promotion_progress(success=completed)
288
289    def _restate(self, plan: Plan) -> None:
290        if not plan.restatements:
291            return
292
293        self.state_sync.remove_interval(
294            [
295                (plan.context_diff.snapshots[s_id], interval)
296                for s_id, interval in plan.restatements.items()
297            ],
298            remove_shared_versions=not plan.is_dev,
299        )
300
301
302class BaseAirflowPlanEvaluator(PlanEvaluator):
303    def __init__(
304        self,
305        console: t.Optional[Console],
306        blocking: bool,
307        dag_run_poll_interval_secs: int,
308        dag_creation_poll_interval_secs: int,
309        dag_creation_max_retry_attempts: int,
310    ):
311        self.blocking = blocking
312        self.dag_run_poll_interval_secs = dag_run_poll_interval_secs
313        self.dag_creation_poll_interval_secs = dag_creation_poll_interval_secs
314        self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts
315        self.console = console or get_console()
316
317    def evaluate(
318        self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None
319    ) -> None:
320        plan_request_id = plan.plan_id
321        self._apply_plan(plan, plan_request_id)
322
323        analytics.collector.on_plan_apply_start(
324            plan=plan,
325            engine_type=None,
326            state_sync_type=None,
327            scheduler_type=c.AIRFLOW,
328        )
329
330        if self.blocking:
331            plan_application_dag_id = airflow_common.plan_application_dag_id(
332                plan.environment_naming_info.name, plan_request_id
333            )
334
335            self.console.log_status_update(
336                f"Waiting for the plan application DAG '{plan_application_dag_id}' to be provisioned on Airflow"
337            )
338
339            plan_application_dag_run_id = self.client.wait_for_first_dag_run(
340                plan_application_dag_id,
341                self.dag_creation_poll_interval_secs,
342                self.dag_creation_max_retry_attempts,
343            )
344
345            self.client.print_tracking_url(
346                plan_application_dag_id,
347                plan_application_dag_run_id,
348                "plan application",
349            )
350            plan_application_succeeded = self.client.wait_for_dag_run_completion(
351                plan_application_dag_id,
352                plan_application_dag_run_id,
353                self.dag_run_poll_interval_secs,
354            )
355            if not plan_application_succeeded:
356                raise SQLMeshError("Plan application failed.")
357
358            self.console.log_success("The plan has been applied successfully")
359
360    @property
361    def client(self) -> BaseAirflowClient:
362        raise NotImplementedError
363
364    def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
365        raise NotImplementedError
366
367
368class StateBasedAirflowPlanEvaluator(BaseAirflowPlanEvaluator):
369    backfill_concurrent_tasks: int
370    ddl_concurrent_tasks: int
371    notification_targets: t.Optional[t.List[NotificationTarget]]
372    users: t.Optional[t.List[User]]
373
374    def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
375        from sqlmesh.schedulers.airflow.plan import PlanDagState, create_plan_dag_spec
376
377        plan_application_request = airflow_common.PlanApplicationRequest(
378            new_snapshots=plan.new_snapshots,
379            environment=plan.environment,
380            no_gaps=plan.no_gaps,
381            skip_backfill=plan.skip_backfill,
382            request_id=plan_request_id,
383            restatements=plan.restatements or {},
384            notification_targets=self.notification_targets or [],
385            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
386            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
387            users=self.users or [],
388            is_dev=plan.is_dev,
389            forward_only=plan.forward_only,
390            models_to_backfill=plan.models_to_backfill,
391            end_bounded=plan.end_bounded,
392            ensure_finalized_snapshots=plan.ensure_finalized_snapshots,
393            directly_modified_snapshots=list(plan.directly_modified),
394            indirectly_modified_snapshots={
395                change_source.name: list(snapshots)
396                for change_source, snapshots in plan.indirectly_modified.items()
397            },
398            removed_snapshots=list(plan.context_diff.removed_snapshots),
399            execution_time=plan.execution_time,
400        )
401        plan_dag_spec = create_plan_dag_spec(plan_application_request, self.state_sync)
402        PlanDagState.from_state_sync(self.state_sync).add_dag_spec(plan_dag_spec)
403
404    @property
405    def state_sync(self) -> StateSync:
406        raise NotImplementedError
407
408
409class AirflowPlanEvaluator(StateBasedAirflowPlanEvaluator):
410    def __init__(
411        self,
412        airflow_client: AirflowClient,
413        console: t.Optional[Console] = None,
414        blocking: bool = True,
415        dag_run_poll_interval_secs: int = 10,
416        dag_creation_poll_interval_secs: int = 30,
417        dag_creation_max_retry_attempts: int = 10,
418        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
419        backfill_concurrent_tasks: int = 1,
420        ddl_concurrent_tasks: int = 1,
421        users: t.Optional[t.List[User]] = None,
422        state_sync: t.Optional[StateSync] = None,
423    ):
424        super().__init__(
425            console,
426            blocking,
427            dag_run_poll_interval_secs,
428            dag_creation_poll_interval_secs,
429            dag_creation_max_retry_attempts,
430        )
431        self._airflow_client = airflow_client
432        self.notification_targets = notification_targets or []
433        self.backfill_concurrent_tasks = backfill_concurrent_tasks
434        self.ddl_concurrent_tasks = ddl_concurrent_tasks
435        self.users = users or []
436
437        self._state_sync = state_sync
438
439    @property
440    def client(self) -> BaseAirflowClient:
441        return self._airflow_client
442
443    @property
444    def state_sync(self) -> StateSync:
445        if self._state_sync is None:
446            raise SQLMeshError("State Sync is not configured")
447        return self._state_sync
448
449    def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
450        if self._state_sync is not None:
451            super()._apply_plan(plan, plan_request_id)
452            return
453
454        self._airflow_client.apply_plan(
455            plan.new_snapshots,
456            plan.environment,
457            plan_request_id,
458            no_gaps=plan.no_gaps,
459            skip_backfill=plan.skip_backfill,
460            restatements=plan.restatements,
461            notification_targets=self.notification_targets,
462            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
463            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
464            users=self.users,
465            is_dev=plan.is_dev,
466            forward_only=plan.forward_only,
467            models_to_backfill=plan.models_to_backfill,
468            end_bounded=plan.end_bounded,
469            ensure_finalized_snapshots=plan.ensure_finalized_snapshots,
470            directly_modified_snapshots=list(plan.directly_modified),
471            indirectly_modified_snapshots={
472                change_source.name: list(snapshots)
473                for change_source, snapshots in plan.indirectly_modified.items()
474            },
475            removed_snapshots=list(plan.context_diff.removed_snapshots),
476            execution_time=plan.execution_time,
477        )
478
479
480class MWAAPlanEvaluator(StateBasedAirflowPlanEvaluator):
481    def __init__(
482        self,
483        client: MWAAClient,
484        state_sync: StateSync,
485        console: t.Optional[Console] = None,
486        blocking: bool = True,
487        dag_run_poll_interval_secs: int = 10,
488        dag_creation_poll_interval_secs: int = 30,
489        dag_creation_max_retry_attempts: int = 10,
490        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
491        backfill_concurrent_tasks: int = 1,
492        ddl_concurrent_tasks: int = 1,
493        users: t.Optional[t.List[User]] = None,
494    ):
495        super().__init__(
496            console,
497            blocking,
498            dag_run_poll_interval_secs,
499            dag_creation_poll_interval_secs,
500            dag_creation_max_retry_attempts,
501        )
502        self._mwaa_client = client
503        self._state_sync = state_sync
504        self.notification_targets = notification_targets or []
505        self.backfill_concurrent_tasks = backfill_concurrent_tasks
506        self.ddl_concurrent_tasks = ddl_concurrent_tasks
507        self.users = users or []
508
509    @property
510    def client(self) -> BaseAirflowClient:
511        return self._mwaa_client
512
513    @property
514    def state_sync(self) -> StateSync:
515        return self._state_sync
516
517
518def update_intervals_for_new_snapshots(
519    snapshots: t.Collection[Snapshot], state_sync: StateSync
520) -> None:
521    for snapshot in state_sync.refresh_snapshot_intervals(snapshots):
522        if snapshot.is_forward_only:
523            snapshot.dev_intervals = snapshot.intervals.copy()
524            for start, end in snapshot.dev_intervals:
525                state_sync.add_interval(snapshot, start, end, is_dev=True)
class PlanEvaluator(abc.ABC):
43class PlanEvaluator(abc.ABC):
44    @abc.abstractmethod
45    def evaluate(
46        self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None
47    ) -> None:
48        """Evaluates a plan by pushing snapshots and backfilling data.
49
50        Given a plan, it pushes snapshots into the state and then kicks off
51        the backfill process for all affected snapshots. Once backfill is done,
52        snapshots that are part of the plan are promoted in the environment targeted
53        by this plan.
54
55        Args:
56            plan: The plan to evaluate.
57        """

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def evaluate( self, plan: sqlmesh.core.plan.definition.Plan, circuit_breaker: Union[Callable[[], bool], NoneType] = None) -> None:
44    @abc.abstractmethod
45    def evaluate(
46        self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None
47    ) -> None:
48        """Evaluates a plan by pushing snapshots and backfilling data.
49
50        Given a plan, it pushes snapshots into the state and then kicks off
51        the backfill process for all affected snapshots. Once backfill is done,
52        snapshots that are part of the plan are promoted in the environment targeted
53        by this plan.
54
55        Args:
56            plan: The plan to evaluate.
57        """

Evaluates a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.

Arguments:
  • plan: The plan to evaluate.
class BuiltInPlanEvaluator(PlanEvaluator):
 60class BuiltInPlanEvaluator(PlanEvaluator):
 61    def __init__(
 62        self,
 63        state_sync: StateSync,
 64        snapshot_evaluator: SnapshotEvaluator,
 65        default_catalog: t.Optional[str],
 66        backfill_concurrent_tasks: int = 1,
 67        console: t.Optional[Console] = None,
 68        notification_target_manager: t.Optional[NotificationTargetManager] = None,
 69    ):
 70        self.state_sync = state_sync
 71        self.snapshot_evaluator = snapshot_evaluator
 72        self.default_catalog = default_catalog
 73        self.backfill_concurrent_tasks = backfill_concurrent_tasks
 74        self.console = console or get_console()
 75        self.notification_target_manager = notification_target_manager
 76
 77    def evaluate(
 78        self,
 79        plan: Plan,
 80        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 81    ) -> None:
 82        self.console.start_plan_evaluation(plan)
 83        analytics.collector.on_plan_apply_start(
 84            plan=plan,
 85            engine_type=self.snapshot_evaluator.adapter.dialect,
 86            state_sync_type=self.state_sync.state_type(),
 87            scheduler_type=c.BUILTIN,
 88        )
 89
 90        try:
 91            snapshots = plan.snapshots
 92            all_names = {
 93                s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
 94            }
 95            deployability_index_for_evaluation = DeployabilityIndex.create(snapshots)
 96            deployability_index_for_creation = deployability_index_for_evaluation
 97            if plan.is_dev:
 98                before_promote_snapshots = all_names
 99                after_promote_snapshots = set()
100            else:
101                before_promote_snapshots = {
102                    s.name
103                    for s in snapshots.values()
104                    if deployability_index_for_evaluation.is_representative(s)
105                    and plan.is_selected_for_backfill(s.name)
106                }
107                after_promote_snapshots = all_names - before_promote_snapshots
108                deployability_index_for_evaluation = DeployabilityIndex.all_deployable()
109
110            self._push(plan, deployability_index_for_creation)
111            update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync)
112            self._restate(plan)
113            self._backfill(
114                plan,
115                before_promote_snapshots,
116                deployability_index_for_evaluation,
117                circuit_breaker=circuit_breaker,
118            )
119            promotion_result = self._promote(plan, before_promote_snapshots)
120            self._backfill(
121                plan,
122                after_promote_snapshots,
123                deployability_index_for_evaluation,
124                circuit_breaker=circuit_breaker,
125            )
126            self._update_views(plan, promotion_result, deployability_index_for_evaluation)
127
128            if not plan.requires_backfill:
129                self.console.log_success("Virtual Update executed successfully")
130        except Exception as e:
131            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e)
132            raise
133        finally:
134            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id)
135            self.console.stop_plan_evaluation()
136
137    def _backfill(
138        self,
139        plan: Plan,
140        selected_snapshots: t.Set[str],
141        deployability_index: DeployabilityIndex,
142        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
143    ) -> None:
144        """Backfill missing intervals for snapshots that are part of the given plan.
145
146        Args:
147            plan: The plan to source snapshots from.
148            selected_snapshots: The snapshots to backfill.
149        """
150        if not plan.requires_backfill or not selected_snapshots:
151            return
152
153        snapshots = plan.snapshots
154        scheduler = Scheduler(
155            snapshots.values(),
156            self.snapshot_evaluator,
157            self.state_sync,
158            default_catalog=self.default_catalog,
159            max_workers=self.backfill_concurrent_tasks,
160            console=self.console,
161            notification_target_manager=self.notification_target_manager,
162        )
163        is_run_successful = scheduler.run(
164            plan.environment_naming_info,
165            plan.start,
166            plan.end,
167            execution_time=plan.execution_time,
168            restatements=plan.restatements,
169            selected_snapshots=selected_snapshots,
170            deployability_index=deployability_index,
171            circuit_breaker=circuit_breaker,
172            end_bounded=plan.end_bounded,
173        )
174        if not is_run_successful:
175            raise SQLMeshError("Plan application failed.")
176
177    def _push(self, plan: Plan, deployability_index: t.Optional[DeployabilityIndex] = None) -> None:
178        """Push the snapshots to the state sync.
179
180        As a part of plan pushing, snapshot tables are created.
181
182        Args:
183            plan: The plan to source snapshots from.
184            deployability_index: Indicates which snapshots are deployable in the context of this creation.
185        """
186        snapshots_to_create = [
187            s
188            for s in plan.snapshots.values()
189            if s.is_model and not s.is_symbolic and plan.is_selected_for_backfill(s.name)
190        ]
191        snapshots_to_create_count = len(snapshots_to_create)
192
193        if snapshots_to_create_count > 0:
194            self.console.start_creation_progress(
195                snapshots_to_create_count, plan.environment_naming_info, self.default_catalog
196            )
197
198        completed = False
199        try:
200            self.snapshot_evaluator.create(
201                snapshots_to_create,
202                plan.snapshots,
203                deployability_index=deployability_index,
204                on_complete=self.console.update_creation_progress,
205            )
206            completed = True
207        finally:
208            self.console.stop_creation_progress(success=completed)
209
210        self.state_sync.push_snapshots(plan.new_snapshots)
211
212        analytics.collector.on_snapshots_created(
213            new_snapshots=plan.new_snapshots, plan_id=plan.plan_id
214        )
215
216    def _promote(
217        self, plan: Plan, no_gaps_snapshot_names: t.Optional[t.Set[str]] = None
218    ) -> PromotionResult:
219        """Promote a plan.
220
221        Args:
222            plan: The plan to promote.
223            no_gaps_snapshot_names: The names of snapshots to check for gaps if the no gaps check is enabled in the plan.
224            If not provided, all snapshots are checked.
225        """
226        promotion_result = self.state_sync.promote(
227            plan.environment,
228            no_gaps_snapshot_names=no_gaps_snapshot_names if plan.no_gaps else set(),
229        )
230
231        if not plan.is_dev:
232            self.snapshot_evaluator.migrate(
233                [s for s in plan.snapshots.values() if s.is_paused],
234                plan.snapshots,
235            )
236            if not plan.ensure_finalized_snapshots:
237                # Only unpause at this point if we don't have to use the finalized snapshots
238                # for subsequent plan applications. Otherwise, unpause right before finalizing
239                # the environment.
240                self.state_sync.unpause_snapshots(promotion_result.added, plan.end)
241
242        return promotion_result
243
244    def _update_views(
245        self,
246        plan: Plan,
247        promotion_result: PromotionResult,
248        deployability_index: t.Optional[DeployabilityIndex] = None,
249    ) -> None:
250        """Update environment views.
251
252        Args:
253            plan: The plan to promote.
254            promotion_result: The result of the promotion.
255            deployability_index: Indicates which snapshots are deployable in the context of this promotion.
256        """
257        if not plan.is_dev and plan.ensure_finalized_snapshots:
258            # Unpause right before finalizing the environment in case when
259            # we need to use the finalized snapshots for subsequent plan applications.
260            # Otherwise, unpause right after updatig the environment record.
261            self.state_sync.unpause_snapshots(promotion_result.added, plan.end)
262
263        environment = plan.environment
264
265        self.console.start_promotion_progress(
266            len(promotion_result.added) + len(promotion_result.removed),
267            environment.naming_info,
268            self.default_catalog,
269        )
270
271        completed = False
272        try:
273            self.snapshot_evaluator.promote(
274                [plan.context_diff.snapshots[s.snapshot_id] for s in promotion_result.added],
275                environment.naming_info,
276                deployability_index=deployability_index,
277                on_complete=lambda s: self.console.update_promotion_progress(s, True),
278            )
279            if promotion_result.removed_environment_naming_info:
280                self.snapshot_evaluator.demote(
281                    promotion_result.removed,
282                    promotion_result.removed_environment_naming_info,
283                    on_complete=lambda s: self.console.update_promotion_progress(s, False),
284                )
285            self.state_sync.finalize(environment)
286            completed = True
287        finally:
288            self.console.stop_promotion_progress(success=completed)
289
290    def _restate(self, plan: Plan) -> None:
291        if not plan.restatements:
292            return
293
294        self.state_sync.remove_interval(
295            [
296                (plan.context_diff.snapshots[s_id], interval)
297                for s_id, interval in plan.restatements.items()
298            ],
299            remove_shared_versions=not plan.is_dev,
300        )

Helper class that provides a standard way to create an ABC using inheritance.

BuiltInPlanEvaluator( state_sync: sqlmesh.core.state_sync.base.StateSync, snapshot_evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, default_catalog: Union[str, NoneType], backfill_concurrent_tasks: int = 1, console: Union[sqlmesh.core.console.Console, NoneType] = None, notification_target_manager: Union[sqlmesh.core.notification_target.NotificationTargetManager, NoneType] = None)
61    def __init__(
62        self,
63        state_sync: StateSync,
64        snapshot_evaluator: SnapshotEvaluator,
65        default_catalog: t.Optional[str],
66        backfill_concurrent_tasks: int = 1,
67        console: t.Optional[Console] = None,
68        notification_target_manager: t.Optional[NotificationTargetManager] = None,
69    ):
70        self.state_sync = state_sync
71        self.snapshot_evaluator = snapshot_evaluator
72        self.default_catalog = default_catalog
73        self.backfill_concurrent_tasks = backfill_concurrent_tasks
74        self.console = console or get_console()
75        self.notification_target_manager = notification_target_manager
def evaluate( self, plan: sqlmesh.core.plan.definition.Plan, circuit_breaker: Union[Callable[[], bool], NoneType] = None) -> None:
 77    def evaluate(
 78        self,
 79        plan: Plan,
 80        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 81    ) -> None:
 82        self.console.start_plan_evaluation(plan)
 83        analytics.collector.on_plan_apply_start(
 84            plan=plan,
 85            engine_type=self.snapshot_evaluator.adapter.dialect,
 86            state_sync_type=self.state_sync.state_type(),
 87            scheduler_type=c.BUILTIN,
 88        )
 89
 90        try:
 91            snapshots = plan.snapshots
 92            all_names = {
 93                s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
 94            }
 95            deployability_index_for_evaluation = DeployabilityIndex.create(snapshots)
 96            deployability_index_for_creation = deployability_index_for_evaluation
 97            if plan.is_dev:
 98                before_promote_snapshots = all_names
 99                after_promote_snapshots = set()
100            else:
101                before_promote_snapshots = {
102                    s.name
103                    for s in snapshots.values()
104                    if deployability_index_for_evaluation.is_representative(s)
105                    and plan.is_selected_for_backfill(s.name)
106                }
107                after_promote_snapshots = all_names - before_promote_snapshots
108                deployability_index_for_evaluation = DeployabilityIndex.all_deployable()
109
110            self._push(plan, deployability_index_for_creation)
111            update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync)
112            self._restate(plan)
113            self._backfill(
114                plan,
115                before_promote_snapshots,
116                deployability_index_for_evaluation,
117                circuit_breaker=circuit_breaker,
118            )
119            promotion_result = self._promote(plan, before_promote_snapshots)
120            self._backfill(
121                plan,
122                after_promote_snapshots,
123                deployability_index_for_evaluation,
124                circuit_breaker=circuit_breaker,
125            )
126            self._update_views(plan, promotion_result, deployability_index_for_evaluation)
127
128            if not plan.requires_backfill:
129                self.console.log_success("Virtual Update executed successfully")
130        except Exception as e:
131            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id, error=e)
132            raise
133        finally:
134            analytics.collector.on_plan_apply_end(plan_id=plan.plan_id)
135            self.console.stop_plan_evaluation()

Evaluates a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.

Arguments:
  • plan: The plan to evaluate.
class BaseAirflowPlanEvaluator(PlanEvaluator):
303class BaseAirflowPlanEvaluator(PlanEvaluator):
304    def __init__(
305        self,
306        console: t.Optional[Console],
307        blocking: bool,
308        dag_run_poll_interval_secs: int,
309        dag_creation_poll_interval_secs: int,
310        dag_creation_max_retry_attempts: int,
311    ):
312        self.blocking = blocking
313        self.dag_run_poll_interval_secs = dag_run_poll_interval_secs
314        self.dag_creation_poll_interval_secs = dag_creation_poll_interval_secs
315        self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts
316        self.console = console or get_console()
317
318    def evaluate(
319        self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None
320    ) -> None:
321        plan_request_id = plan.plan_id
322        self._apply_plan(plan, plan_request_id)
323
324        analytics.collector.on_plan_apply_start(
325            plan=plan,
326            engine_type=None,
327            state_sync_type=None,
328            scheduler_type=c.AIRFLOW,
329        )
330
331        if self.blocking:
332            plan_application_dag_id = airflow_common.plan_application_dag_id(
333                plan.environment_naming_info.name, plan_request_id
334            )
335
336            self.console.log_status_update(
337                f"Waiting for the plan application DAG '{plan_application_dag_id}' to be provisioned on Airflow"
338            )
339
340            plan_application_dag_run_id = self.client.wait_for_first_dag_run(
341                plan_application_dag_id,
342                self.dag_creation_poll_interval_secs,
343                self.dag_creation_max_retry_attempts,
344            )
345
346            self.client.print_tracking_url(
347                plan_application_dag_id,
348                plan_application_dag_run_id,
349                "plan application",
350            )
351            plan_application_succeeded = self.client.wait_for_dag_run_completion(
352                plan_application_dag_id,
353                plan_application_dag_run_id,
354                self.dag_run_poll_interval_secs,
355            )
356            if not plan_application_succeeded:
357                raise SQLMeshError("Plan application failed.")
358
359            self.console.log_success("The plan has been applied successfully")
360
361    @property
362    def client(self) -> BaseAirflowClient:
363        raise NotImplementedError
364
365    def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
366        raise NotImplementedError

Helper class that provides a standard way to create an ABC using inheritance.

BaseAirflowPlanEvaluator( console: Union[sqlmesh.core.console.Console, NoneType], blocking: bool, dag_run_poll_interval_secs: int, dag_creation_poll_interval_secs: int, dag_creation_max_retry_attempts: int)
304    def __init__(
305        self,
306        console: t.Optional[Console],
307        blocking: bool,
308        dag_run_poll_interval_secs: int,
309        dag_creation_poll_interval_secs: int,
310        dag_creation_max_retry_attempts: int,
311    ):
312        self.blocking = blocking
313        self.dag_run_poll_interval_secs = dag_run_poll_interval_secs
314        self.dag_creation_poll_interval_secs = dag_creation_poll_interval_secs
315        self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts
316        self.console = console or get_console()
def evaluate( self, plan: sqlmesh.core.plan.definition.Plan, circuit_breaker: Union[Callable[[], bool], NoneType] = None) -> None:
318    def evaluate(
319        self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]] = None
320    ) -> None:
321        plan_request_id = plan.plan_id
322        self._apply_plan(plan, plan_request_id)
323
324        analytics.collector.on_plan_apply_start(
325            plan=plan,
326            engine_type=None,
327            state_sync_type=None,
328            scheduler_type=c.AIRFLOW,
329        )
330
331        if self.blocking:
332            plan_application_dag_id = airflow_common.plan_application_dag_id(
333                plan.environment_naming_info.name, plan_request_id
334            )
335
336            self.console.log_status_update(
337                f"Waiting for the plan application DAG '{plan_application_dag_id}' to be provisioned on Airflow"
338            )
339
340            plan_application_dag_run_id = self.client.wait_for_first_dag_run(
341                plan_application_dag_id,
342                self.dag_creation_poll_interval_secs,
343                self.dag_creation_max_retry_attempts,
344            )
345
346            self.client.print_tracking_url(
347                plan_application_dag_id,
348                plan_application_dag_run_id,
349                "plan application",
350            )
351            plan_application_succeeded = self.client.wait_for_dag_run_completion(
352                plan_application_dag_id,
353                plan_application_dag_run_id,
354                self.dag_run_poll_interval_secs,
355            )
356            if not plan_application_succeeded:
357                raise SQLMeshError("Plan application failed.")
358
359            self.console.log_success("The plan has been applied successfully")

Evaluates a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.

Arguments:
  • plan: The plan to evaluate.
class StateBasedAirflowPlanEvaluator(BaseAirflowPlanEvaluator):
369class StateBasedAirflowPlanEvaluator(BaseAirflowPlanEvaluator):
370    backfill_concurrent_tasks: int
371    ddl_concurrent_tasks: int
372    notification_targets: t.Optional[t.List[NotificationTarget]]
373    users: t.Optional[t.List[User]]
374
375    def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
376        from sqlmesh.schedulers.airflow.plan import PlanDagState, create_plan_dag_spec
377
378        plan_application_request = airflow_common.PlanApplicationRequest(
379            new_snapshots=plan.new_snapshots,
380            environment=plan.environment,
381            no_gaps=plan.no_gaps,
382            skip_backfill=plan.skip_backfill,
383            request_id=plan_request_id,
384            restatements=plan.restatements or {},
385            notification_targets=self.notification_targets or [],
386            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
387            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
388            users=self.users or [],
389            is_dev=plan.is_dev,
390            forward_only=plan.forward_only,
391            models_to_backfill=plan.models_to_backfill,
392            end_bounded=plan.end_bounded,
393            ensure_finalized_snapshots=plan.ensure_finalized_snapshots,
394            directly_modified_snapshots=list(plan.directly_modified),
395            indirectly_modified_snapshots={
396                change_source.name: list(snapshots)
397                for change_source, snapshots in plan.indirectly_modified.items()
398            },
399            removed_snapshots=list(plan.context_diff.removed_snapshots),
400            execution_time=plan.execution_time,
401        )
402        plan_dag_spec = create_plan_dag_spec(plan_application_request, self.state_sync)
403        PlanDagState.from_state_sync(self.state_sync).add_dag_spec(plan_dag_spec)
404
405    @property
406    def state_sync(self) -> StateSync:
407        raise NotImplementedError

Helper class that provides a standard way to create an ABC using inheritance.

class AirflowPlanEvaluator(StateBasedAirflowPlanEvaluator):
410class AirflowPlanEvaluator(StateBasedAirflowPlanEvaluator):
411    def __init__(
412        self,
413        airflow_client: AirflowClient,
414        console: t.Optional[Console] = None,
415        blocking: bool = True,
416        dag_run_poll_interval_secs: int = 10,
417        dag_creation_poll_interval_secs: int = 30,
418        dag_creation_max_retry_attempts: int = 10,
419        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
420        backfill_concurrent_tasks: int = 1,
421        ddl_concurrent_tasks: int = 1,
422        users: t.Optional[t.List[User]] = None,
423        state_sync: t.Optional[StateSync] = None,
424    ):
425        super().__init__(
426            console,
427            blocking,
428            dag_run_poll_interval_secs,
429            dag_creation_poll_interval_secs,
430            dag_creation_max_retry_attempts,
431        )
432        self._airflow_client = airflow_client
433        self.notification_targets = notification_targets or []
434        self.backfill_concurrent_tasks = backfill_concurrent_tasks
435        self.ddl_concurrent_tasks = ddl_concurrent_tasks
436        self.users = users or []
437
438        self._state_sync = state_sync
439
440    @property
441    def client(self) -> BaseAirflowClient:
442        return self._airflow_client
443
444    @property
445    def state_sync(self) -> StateSync:
446        if self._state_sync is None:
447            raise SQLMeshError("State Sync is not configured")
448        return self._state_sync
449
450    def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
451        if self._state_sync is not None:
452            super()._apply_plan(plan, plan_request_id)
453            return
454
455        self._airflow_client.apply_plan(
456            plan.new_snapshots,
457            plan.environment,
458            plan_request_id,
459            no_gaps=plan.no_gaps,
460            skip_backfill=plan.skip_backfill,
461            restatements=plan.restatements,
462            notification_targets=self.notification_targets,
463            backfill_concurrent_tasks=self.backfill_concurrent_tasks,
464            ddl_concurrent_tasks=self.ddl_concurrent_tasks,
465            users=self.users,
466            is_dev=plan.is_dev,
467            forward_only=plan.forward_only,
468            models_to_backfill=plan.models_to_backfill,
469            end_bounded=plan.end_bounded,
470            ensure_finalized_snapshots=plan.ensure_finalized_snapshots,
471            directly_modified_snapshots=list(plan.directly_modified),
472            indirectly_modified_snapshots={
473                change_source.name: list(snapshots)
474                for change_source, snapshots in plan.indirectly_modified.items()
475            },
476            removed_snapshots=list(plan.context_diff.removed_snapshots),
477            execution_time=plan.execution_time,
478        )

Helper class that provides a standard way to create an ABC using inheritance.

AirflowPlanEvaluator( airflow_client: sqlmesh.schedulers.airflow.client.AirflowClient, console: Union[sqlmesh.core.console.Console, NoneType] = None, blocking: bool = True, dag_run_poll_interval_secs: int = 10, dag_creation_poll_interval_secs: int = 30, dag_creation_max_retry_attempts: int = 10, notification_targets: Union[List[typing_extensions.Annotated[Union[sqlmesh.core.notification_target.BasicSMTPNotificationTarget, sqlmesh.core.notification_target.ConsoleNotificationTarget, sqlmesh.core.notification_target.SlackApiNotificationTarget, sqlmesh.core.notification_target.SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]], NoneType] = None, backfill_concurrent_tasks: int = 1, ddl_concurrent_tasks: int = 1, users: Union[List[sqlmesh.core.user.User], NoneType] = None, state_sync: Union[sqlmesh.core.state_sync.base.StateSync, NoneType] = None)
411    def __init__(
412        self,
413        airflow_client: AirflowClient,
414        console: t.Optional[Console] = None,
415        blocking: bool = True,
416        dag_run_poll_interval_secs: int = 10,
417        dag_creation_poll_interval_secs: int = 30,
418        dag_creation_max_retry_attempts: int = 10,
419        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
420        backfill_concurrent_tasks: int = 1,
421        ddl_concurrent_tasks: int = 1,
422        users: t.Optional[t.List[User]] = None,
423        state_sync: t.Optional[StateSync] = None,
424    ):
425        super().__init__(
426            console,
427            blocking,
428            dag_run_poll_interval_secs,
429            dag_creation_poll_interval_secs,
430            dag_creation_max_retry_attempts,
431        )
432        self._airflow_client = airflow_client
433        self.notification_targets = notification_targets or []
434        self.backfill_concurrent_tasks = backfill_concurrent_tasks
435        self.ddl_concurrent_tasks = ddl_concurrent_tasks
436        self.users = users or []
437
438        self._state_sync = state_sync
class MWAAPlanEvaluator(StateBasedAirflowPlanEvaluator):
481class MWAAPlanEvaluator(StateBasedAirflowPlanEvaluator):
482    def __init__(
483        self,
484        client: MWAAClient,
485        state_sync: StateSync,
486        console: t.Optional[Console] = None,
487        blocking: bool = True,
488        dag_run_poll_interval_secs: int = 10,
489        dag_creation_poll_interval_secs: int = 30,
490        dag_creation_max_retry_attempts: int = 10,
491        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
492        backfill_concurrent_tasks: int = 1,
493        ddl_concurrent_tasks: int = 1,
494        users: t.Optional[t.List[User]] = None,
495    ):
496        super().__init__(
497            console,
498            blocking,
499            dag_run_poll_interval_secs,
500            dag_creation_poll_interval_secs,
501            dag_creation_max_retry_attempts,
502        )
503        self._mwaa_client = client
504        self._state_sync = state_sync
505        self.notification_targets = notification_targets or []
506        self.backfill_concurrent_tasks = backfill_concurrent_tasks
507        self.ddl_concurrent_tasks = ddl_concurrent_tasks
508        self.users = users or []
509
510    @property
511    def client(self) -> BaseAirflowClient:
512        return self._mwaa_client
513
514    @property
515    def state_sync(self) -> StateSync:
516        return self._state_sync

Helper class that provides a standard way to create an ABC using inheritance.

MWAAPlanEvaluator( client: sqlmesh.schedulers.airflow.mwaa_client.MWAAClient, state_sync: sqlmesh.core.state_sync.base.StateSync, console: Union[sqlmesh.core.console.Console, NoneType] = None, blocking: bool = True, dag_run_poll_interval_secs: int = 10, dag_creation_poll_interval_secs: int = 30, dag_creation_max_retry_attempts: int = 10, notification_targets: Union[List[typing_extensions.Annotated[Union[sqlmesh.core.notification_target.BasicSMTPNotificationTarget, sqlmesh.core.notification_target.ConsoleNotificationTarget, sqlmesh.core.notification_target.SlackApiNotificationTarget, sqlmesh.core.notification_target.SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]], NoneType] = None, backfill_concurrent_tasks: int = 1, ddl_concurrent_tasks: int = 1, users: Union[List[sqlmesh.core.user.User], NoneType] = None)
482    def __init__(
483        self,
484        client: MWAAClient,
485        state_sync: StateSync,
486        console: t.Optional[Console] = None,
487        blocking: bool = True,
488        dag_run_poll_interval_secs: int = 10,
489        dag_creation_poll_interval_secs: int = 30,
490        dag_creation_max_retry_attempts: int = 10,
491        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
492        backfill_concurrent_tasks: int = 1,
493        ddl_concurrent_tasks: int = 1,
494        users: t.Optional[t.List[User]] = None,
495    ):
496        super().__init__(
497            console,
498            blocking,
499            dag_run_poll_interval_secs,
500            dag_creation_poll_interval_secs,
501            dag_creation_max_retry_attempts,
502        )
503        self._mwaa_client = client
504        self._state_sync = state_sync
505        self.notification_targets = notification_targets or []
506        self.backfill_concurrent_tasks = backfill_concurrent_tasks
507        self.ddl_concurrent_tasks = ddl_concurrent_tasks
508        self.users = users or []
def update_intervals_for_new_snapshots( snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], state_sync: sqlmesh.core.state_sync.base.StateSync) -> None:
519def update_intervals_for_new_snapshots(
520    snapshots: t.Collection[Snapshot], state_sync: StateSync
521) -> None:
522    for snapshot in state_sync.refresh_snapshot_intervals(snapshots):
523        if snapshot.is_forward_only:
524            snapshot.dev_intervals = snapshot.intervals.copy()
525            for start, end in snapshot.dev_intervals:
526                state_sync.add_interval(snapshot, start, end, is_dev=True)