Edit on GitHub

sqlmesh.core.scheduler

  1from __future__ import annotations
  2
  3import logging
  4import traceback
  5import typing as t
  6from datetime import datetime
  7
  8from sqlmesh.core import constants as c
  9from sqlmesh.core.console import Console, get_console
 10from sqlmesh.core.environment import EnvironmentNamingInfo
 11from sqlmesh.core.model import SeedModel
 12from sqlmesh.core.notification_target import (
 13    NotificationEvent,
 14    NotificationTargetManager,
 15)
 16from sqlmesh.core.snapshot import (
 17    DeployabilityIndex,
 18    Snapshot,
 19    SnapshotEvaluator,
 20    earliest_start_date,
 21    missing_intervals,
 22)
 23from sqlmesh.core.snapshot.definition import Interval as SnapshotInterval
 24from sqlmesh.core.snapshot.definition import SnapshotId
 25from sqlmesh.core.state_sync import StateSync
 26from sqlmesh.utils import format_exception
 27from sqlmesh.utils.concurrency import concurrent_apply_to_dag
 28from sqlmesh.utils.dag import DAG
 29from sqlmesh.utils.date import (
 30    TimeLike,
 31    now,
 32    now_timestamp,
 33    to_datetime,
 34    validate_date_range,
 35)
 36from sqlmesh.utils.errors import AuditError, CircuitBreakerError, SQLMeshError
 37
 38logger = logging.getLogger(__name__)
 39Interval = t.Tuple[datetime, datetime]
 40Batch = t.List[Interval]
 41SnapshotToBatches = t.Dict[Snapshot, Batch]
 42# we store snapshot name instead of snapshots/snapshotids because pydantic
 43# is extremely slow to hash. snapshot names should be unique within a dag run
 44SchedulingUnit = t.Tuple[str, t.Tuple[Interval, int]]
 45
 46
 47class Scheduler:
 48    """Schedules and manages the evaluation of snapshots.
 49
 50    The scheduler evaluates multiple snapshots with date intervals in the correct
 51    topological order. It consults the state sync to understand what intervals for each
 52    snapshot needs to be backfilled.
 53
 54    The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.
 55
 56    Args:
 57        snapshots: A collection of snapshots.
 58        snapshot_evaluator: The snapshot evaluator to execute queries.
 59        state_sync: The state sync to pull saved snapshots.
 60        max_workers: The maximum number of parallel queries to run.
 61        console: The rich instance used for printing scheduling information.
 62    """
 63
 64    def __init__(
 65        self,
 66        snapshots: t.Iterable[Snapshot],
 67        snapshot_evaluator: SnapshotEvaluator,
 68        state_sync: StateSync,
 69        default_catalog: t.Optional[str],
 70        max_workers: int = 1,
 71        console: t.Optional[Console] = None,
 72        notification_target_manager: t.Optional[NotificationTargetManager] = None,
 73    ):
 74        self.state_sync = state_sync
 75        self.snapshots = {s.snapshot_id: s for s in snapshots}
 76        self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
 77        self.default_catalog = default_catalog
 78        self.snapshot_evaluator = snapshot_evaluator
 79        self.max_workers = max_workers
 80        self.console = console or get_console()
 81        self.notification_target_manager = (
 82            notification_target_manager or NotificationTargetManager()
 83        )
 84
 85    def batches(
 86        self,
 87        start: t.Optional[TimeLike] = None,
 88        end: t.Optional[TimeLike] = None,
 89        execution_time: t.Optional[TimeLike] = None,
 90        deployability_index: t.Optional[DeployabilityIndex] = None,
 91        restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
 92        ignore_cron: bool = False,
 93        end_bounded: bool = False,
 94        selected_snapshots: t.Optional[t.Set[str]] = None,
 95    ) -> SnapshotToBatches:
 96        """Find the optimal date interval paramaters based on what needs processing and maximal batch size.
 97
 98        For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
 99        calculate the missing intervals that need to be processed given the passed in start and end intervals.
100
101        If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than
102        or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression.
103        For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs
104        with 30 days and 1 job with 10.
105
106        Args:
107            start: The start of the run. Defaults to the min node start date.
108            end: The end of the run. Defaults to now.
109            execution_time: The date/time time reference to use for execution time. Defaults to now.
110            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
111            restatements: A set of snapshot names being restated.
112            ignore_cron: Whether to ignore the node's cron schedule.
113            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
114                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
115            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
116        """
117        restatements = restatements or {}
118        validate_date_range(start, end)
119
120        snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values()
121        if selected_snapshots is not None:
122            snapshots = [s for s in snapshots if s.name in selected_snapshots]
123
124        self.state_sync.refresh_snapshot_intervals(snapshots)
125
126        return compute_interval_params(
127            snapshots,
128            start=start or earliest_start_date(snapshots),
129            end=end or now(),
130            deployability_index=deployability_index,
131            execution_time=execution_time or now(),
132            restatements=restatements,
133            ignore_cron=ignore_cron,
134            end_bounded=end_bounded,
135        )
136
137    def evaluate(
138        self,
139        snapshot: Snapshot,
140        start: TimeLike,
141        end: TimeLike,
142        execution_time: TimeLike,
143        deployability_index: DeployabilityIndex,
144        **kwargs: t.Any,
145    ) -> None:
146        """Evaluate a snapshot and add the processed interval to the state sync.
147
148        Args:
149            snapshot: Snapshot to evaluate.
150            start: The start datetime to render.
151            end: The end datetime to render.
152            execution_time: The date/time time reference to use for execution time. Defaults to now.
153            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
154            kwargs: Additional kwargs to pass to the renderer.
155        """
156        validate_date_range(start, end)
157
158        snapshots = {
159            self.snapshots[p_sid].name: self.snapshots[p_sid] for p_sid in snapshot.parents
160        }
161        snapshots[snapshot.name] = snapshot
162
163        if isinstance(snapshot.node, SeedModel) and not snapshot.node.is_hydrated:
164            snapshot = self.state_sync.get_snapshots([snapshot], hydrate_seeds=True)[
165                snapshot.snapshot_id
166            ]
167
168        is_deployable = deployability_index.is_deployable(snapshot)
169
170        wap_id = self.snapshot_evaluator.evaluate(
171            snapshot,
172            start=start,
173            end=end,
174            execution_time=execution_time,
175            snapshots=snapshots,
176            deployability_index=deployability_index,
177            **kwargs,
178        )
179        try:
180            self.snapshot_evaluator.audit(
181                snapshot=snapshot,
182                start=start,
183                end=end,
184                execution_time=execution_time,
185                snapshots=snapshots,
186                deployability_index=deployability_index,
187                wap_id=wap_id,
188                **kwargs,
189            )
190        except AuditError as e:
191            self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, e)
192            if is_deployable and snapshot.node.owner:
193                self.notification_target_manager.notify_user(
194                    NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, e
195                )
196            logger.error(f"Audit Failure: {traceback.format_exc()}")
197            raise e
198
199        self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)
200
201    def run(
202        self,
203        environment: str | EnvironmentNamingInfo,
204        start: t.Optional[TimeLike] = None,
205        end: t.Optional[TimeLike] = None,
206        execution_time: t.Optional[TimeLike] = None,
207        restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
208        ignore_cron: bool = False,
209        end_bounded: bool = False,
210        selected_snapshots: t.Optional[t.Set[str]] = None,
211        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
212        deployability_index: t.Optional[DeployabilityIndex] = None,
213    ) -> bool:
214        """Concurrently runs all snapshots in topological order.
215
216        Args:
217            environment: The environment naming info the user is targeting when applying their change.
218                Can just be the environment name if the user is targeting a remote environment and wants to get the remote
219                naming info
220            start: The start of the run. Defaults to the min node start date.
221            end: The end of the run. Defaults to now.
222            execution_time: The date/time time reference to use for execution time. Defaults to now.
223            restatements: A dict of snapshots to restate and their intervals.
224            ignore_cron: Whether to ignore the node's cron schedule.
225            end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
226                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
227            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
228            circuit_breaker: An optional handler which checks if the run should be aborted.
229            deployability_index: Determines snapshots that are deployable in the context of this render.
230
231        Returns:
232            True if the execution was successful and False otherwise.
233        """
234        restatements = restatements or {}
235        validate_date_range(start, end)
236        if isinstance(environment, str):
237            env = self.state_sync.get_environment(environment)
238            if not env:
239                raise SQLMeshError(
240                    "Was not provided an environment suffix target and the environment doesn't exist."
241                    "Are you running for the first time and need to run plan/apply first?"
242                )
243            environment_naming_info = env.naming_info
244        else:
245            environment_naming_info = environment
246
247        deployability_index = deployability_index or (
248            DeployabilityIndex.create(self.snapshots.values())
249            if environment_naming_info.name != c.PROD
250            else DeployabilityIndex.all_deployable()
251        )
252        execution_time = execution_time or now()
253        batches = self.batches(
254            start,
255            end,
256            execution_time,
257            deployability_index=deployability_index,
258            restatements=restatements,
259            ignore_cron=ignore_cron,
260            end_bounded=end_bounded,
261            selected_snapshots=selected_snapshots,
262        )
263        if not batches:
264            return True
265
266        dag = self._dag(batches)
267
268        self.console.start_evaluation_progress(
269            {snapshot: len(intervals) for snapshot, intervals in batches.items()},
270            environment_naming_info,
271            self.default_catalog,
272        )
273
274        snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()}
275
276        def evaluate_node(node: SchedulingUnit) -> None:
277            if circuit_breaker and circuit_breaker():
278                raise CircuitBreakerError()
279
280            snapshot_name, ((start, end), batch_idx) = node
281            if batch_idx == -1:
282                return
283            snapshot = snapshots_by_name[snapshot_name]
284
285            self.console.start_snapshot_evaluation_progress(snapshot)
286
287            execution_start_ts = now_timestamp()
288            evaluation_duration_ms: t.Optional[int] = None
289
290            try:
291                assert execution_time  # mypy
292                assert deployability_index  # mypy
293                self.evaluate(snapshot, start, end, execution_time, deployability_index)
294                evaluation_duration_ms = now_timestamp() - execution_start_ts
295            finally:
296                self.console.update_snapshot_evaluation_progress(
297                    snapshot, batch_idx, evaluation_duration_ms
298                )
299
300        try:
301            with self.snapshot_evaluator.concurrent_context():
302                errors, skipped_intervals = concurrent_apply_to_dag(
303                    dag,
304                    evaluate_node,
305                    self.max_workers,
306                    raise_on_error=False,
307                )
308        finally:
309            self.state_sync.recycle()
310
311        self.console.stop_evaluation_progress(success=not errors)
312
313        skipped_snapshots = {i[0] for i in skipped_intervals}
314        for skipped in skipped_snapshots:
315            log_message = f"SKIPPED snapshot {skipped}\n"
316            self.console.log_status_update(log_message)
317            logger.info(log_message)
318
319        for error in errors:
320            if isinstance(error.__cause__, CircuitBreakerError):
321                raise error.__cause__
322            sid = error.node[0]
323            formatted_exception = "".join(format_exception(error.__cause__ or error))
324            log_message = f"FAILED processing snapshot {sid}\n{formatted_exception}"
325            self.console.log_error(log_message)
326            # Log with INFO level to prevent duplicate messages in the console.
327            logger.info(log_message)
328
329        return not errors
330
331    def _dag(self, batches: SnapshotToBatches) -> DAG[SchedulingUnit]:
332        """Builds a DAG of snapshot intervals to be evaluated.
333
334        Args:
335            batches: The batches of snapshots and intervals to evaluate.
336
337        Returns:
338            A DAG of snapshot intervals to be evaluated.
339        """
340
341        intervals_per_snapshot = {
342            snapshot.name: intervals for snapshot, intervals in batches.items()
343        }
344
345        dag = DAG[SchedulingUnit]()
346        terminal_node = ((to_datetime(0), to_datetime(0)), -1)
347
348        for snapshot, intervals in batches.items():
349            if not intervals:
350                continue
351
352            upstream_dependencies = []
353
354            for p_sid in snapshot.parents:
355                if p_sid in self.snapshots:
356                    p_intervals = intervals_per_snapshot.get(p_sid.name, [])
357
358                    if len(p_intervals) > 1:
359                        upstream_dependencies.append((p_sid.name, terminal_node))
360                    else:
361                        for i, interval in enumerate(p_intervals):
362                            upstream_dependencies.append((p_sid.name, (interval, i)))
363
364            batch_concurrency = snapshot.node.batch_concurrency
365            if snapshot.depends_on_past:
366                batch_concurrency = 1
367
368            for i, interval in enumerate(intervals):
369                node = (snapshot.name, (interval, i))
370                dag.add(node, upstream_dependencies)
371
372                if len(intervals) > 1:
373                    dag.add((snapshot.name, terminal_node), [node])
374
375                if batch_concurrency and i >= batch_concurrency:
376                    batch_idx_to_wait_for = i - batch_concurrency
377                    dag.add(
378                        node,
379                        [
380                            (
381                                snapshot.name,
382                                (intervals[batch_idx_to_wait_for], batch_idx_to_wait_for),
383                            )
384                        ],
385                    )
386        return dag
387
388
389def compute_interval_params(
390    snapshots: t.Collection[Snapshot],
391    *,
392    start: TimeLike,
393    end: TimeLike,
394    deployability_index: t.Optional[DeployabilityIndex] = None,
395    execution_time: t.Optional[TimeLike] = None,
396    restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
397    ignore_cron: bool = False,
398    end_bounded: bool = False,
399) -> SnapshotToBatches:
400    """Find the optimal date interval paramaters based on what needs processing and maximal batch size.
401
402    For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
403    calculate the missing intervals that need to be processed given the passed in start and end intervals.
404
405    If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than
406    or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression.
407    For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs
408    with 30 days and 1 job with 10.
409
410    Args:
411        snapshots: A set of target snapshots for which intervals should be computed.
412        intervals: A list of all snapshot intervals that should be considered.
413        start: Start of the interval.
414        end: End of the interval.
415        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
416        execution_time: The date/time time reference to use for execution time.
417        restatements: A dict of snapshot names being restated and their intervals.
418        ignore_cron: Whether to ignore the node's cron schedule.
419        end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
420            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
421
422    Returns:
423        A dict containing all snapshots needing to be run with their associated interval params.
424    """
425    snapshot_batches = {}
426
427    for snapshot, intervals in missing_intervals(
428        snapshots,
429        start=start,
430        end=end,
431        execution_time=execution_time,
432        restatements=restatements,
433        deployability_index=deployability_index,
434        ignore_cron=ignore_cron,
435        end_bounded=end_bounded,
436    ).items():
437        batches = []
438        batch_size = snapshot.node.batch_size
439        next_batch: t.List[t.Tuple[int, int]] = []
440
441        for interval in intervals:
442            if (batch_size and len(next_batch) >= batch_size) or (
443                next_batch and interval[0] != next_batch[-1][-1]
444            ):
445                batches.append((next_batch[0][0], next_batch[-1][-1]))
446                next_batch = []
447            next_batch.append(interval)
448        if next_batch:
449            batches.append((next_batch[0][0], next_batch[-1][-1]))
450        snapshot_batches[snapshot] = [(to_datetime(s), to_datetime(e)) for s, e in batches]
451
452    return snapshot_batches
453
454
455def _resolve_one_snapshot_per_version(
456    snapshots: t.Iterable[Snapshot],
457) -> t.Dict[t.Tuple[str, str], Snapshot]:
458    snapshot_per_version: t.Dict[t.Tuple[str, str], Snapshot] = {}
459    for snapshot in snapshots:
460        key = (snapshot.name, snapshot.version_get_or_generate())
461        if key not in snapshot_per_version:
462            snapshot_per_version[key] = snapshot
463        else:
464            prev_snapshot = snapshot_per_version[key]
465            if snapshot.unpaused_ts and (
466                not prev_snapshot.unpaused_ts or snapshot.created_ts > prev_snapshot.created_ts
467            ):
468                snapshot_per_version[key] = snapshot
469
470    return snapshot_per_version
class Scheduler:
 48class Scheduler:
 49    """Schedules and manages the evaluation of snapshots.
 50
 51    The scheduler evaluates multiple snapshots with date intervals in the correct
 52    topological order. It consults the state sync to understand what intervals for each
 53    snapshot needs to be backfilled.
 54
 55    The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.
 56
 57    Args:
 58        snapshots: A collection of snapshots.
 59        snapshot_evaluator: The snapshot evaluator to execute queries.
 60        state_sync: The state sync to pull saved snapshots.
 61        max_workers: The maximum number of parallel queries to run.
 62        console: The rich instance used for printing scheduling information.
 63    """
 64
 65    def __init__(
 66        self,
 67        snapshots: t.Iterable[Snapshot],
 68        snapshot_evaluator: SnapshotEvaluator,
 69        state_sync: StateSync,
 70        default_catalog: t.Optional[str],
 71        max_workers: int = 1,
 72        console: t.Optional[Console] = None,
 73        notification_target_manager: t.Optional[NotificationTargetManager] = None,
 74    ):
 75        self.state_sync = state_sync
 76        self.snapshots = {s.snapshot_id: s for s in snapshots}
 77        self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
 78        self.default_catalog = default_catalog
 79        self.snapshot_evaluator = snapshot_evaluator
 80        self.max_workers = max_workers
 81        self.console = console or get_console()
 82        self.notification_target_manager = (
 83            notification_target_manager or NotificationTargetManager()
 84        )
 85
 86    def batches(
 87        self,
 88        start: t.Optional[TimeLike] = None,
 89        end: t.Optional[TimeLike] = None,
 90        execution_time: t.Optional[TimeLike] = None,
 91        deployability_index: t.Optional[DeployabilityIndex] = None,
 92        restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
 93        ignore_cron: bool = False,
 94        end_bounded: bool = False,
 95        selected_snapshots: t.Optional[t.Set[str]] = None,
 96    ) -> SnapshotToBatches:
 97        """Find the optimal date interval paramaters based on what needs processing and maximal batch size.
 98
 99        For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
100        calculate the missing intervals that need to be processed given the passed in start and end intervals.
101
102        If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than
103        or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression.
104        For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs
105        with 30 days and 1 job with 10.
106
107        Args:
108            start: The start of the run. Defaults to the min node start date.
109            end: The end of the run. Defaults to now.
110            execution_time: The date/time time reference to use for execution time. Defaults to now.
111            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
112            restatements: A set of snapshot names being restated.
113            ignore_cron: Whether to ignore the node's cron schedule.
114            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
115                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
116            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
117        """
118        restatements = restatements or {}
119        validate_date_range(start, end)
120
121        snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values()
122        if selected_snapshots is not None:
123            snapshots = [s for s in snapshots if s.name in selected_snapshots]
124
125        self.state_sync.refresh_snapshot_intervals(snapshots)
126
127        return compute_interval_params(
128            snapshots,
129            start=start or earliest_start_date(snapshots),
130            end=end or now(),
131            deployability_index=deployability_index,
132            execution_time=execution_time or now(),
133            restatements=restatements,
134            ignore_cron=ignore_cron,
135            end_bounded=end_bounded,
136        )
137
138    def evaluate(
139        self,
140        snapshot: Snapshot,
141        start: TimeLike,
142        end: TimeLike,
143        execution_time: TimeLike,
144        deployability_index: DeployabilityIndex,
145        **kwargs: t.Any,
146    ) -> None:
147        """Evaluate a snapshot and add the processed interval to the state sync.
148
149        Args:
150            snapshot: Snapshot to evaluate.
151            start: The start datetime to render.
152            end: The end datetime to render.
153            execution_time: The date/time time reference to use for execution time. Defaults to now.
154            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
155            kwargs: Additional kwargs to pass to the renderer.
156        """
157        validate_date_range(start, end)
158
159        snapshots = {
160            self.snapshots[p_sid].name: self.snapshots[p_sid] for p_sid in snapshot.parents
161        }
162        snapshots[snapshot.name] = snapshot
163
164        if isinstance(snapshot.node, SeedModel) and not snapshot.node.is_hydrated:
165            snapshot = self.state_sync.get_snapshots([snapshot], hydrate_seeds=True)[
166                snapshot.snapshot_id
167            ]
168
169        is_deployable = deployability_index.is_deployable(snapshot)
170
171        wap_id = self.snapshot_evaluator.evaluate(
172            snapshot,
173            start=start,
174            end=end,
175            execution_time=execution_time,
176            snapshots=snapshots,
177            deployability_index=deployability_index,
178            **kwargs,
179        )
180        try:
181            self.snapshot_evaluator.audit(
182                snapshot=snapshot,
183                start=start,
184                end=end,
185                execution_time=execution_time,
186                snapshots=snapshots,
187                deployability_index=deployability_index,
188                wap_id=wap_id,
189                **kwargs,
190            )
191        except AuditError as e:
192            self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, e)
193            if is_deployable and snapshot.node.owner:
194                self.notification_target_manager.notify_user(
195                    NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, e
196                )
197            logger.error(f"Audit Failure: {traceback.format_exc()}")
198            raise e
199
200        self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)
201
202    def run(
203        self,
204        environment: str | EnvironmentNamingInfo,
205        start: t.Optional[TimeLike] = None,
206        end: t.Optional[TimeLike] = None,
207        execution_time: t.Optional[TimeLike] = None,
208        restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
209        ignore_cron: bool = False,
210        end_bounded: bool = False,
211        selected_snapshots: t.Optional[t.Set[str]] = None,
212        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
213        deployability_index: t.Optional[DeployabilityIndex] = None,
214    ) -> bool:
215        """Concurrently runs all snapshots in topological order.
216
217        Args:
218            environment: The environment naming info the user is targeting when applying their change.
219                Can just be the environment name if the user is targeting a remote environment and wants to get the remote
220                naming info
221            start: The start of the run. Defaults to the min node start date.
222            end: The end of the run. Defaults to now.
223            execution_time: The date/time time reference to use for execution time. Defaults to now.
224            restatements: A dict of snapshots to restate and their intervals.
225            ignore_cron: Whether to ignore the node's cron schedule.
226            end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
227                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
228            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
229            circuit_breaker: An optional handler which checks if the run should be aborted.
230            deployability_index: Determines snapshots that are deployable in the context of this render.
231
232        Returns:
233            True if the execution was successful and False otherwise.
234        """
235        restatements = restatements or {}
236        validate_date_range(start, end)
237        if isinstance(environment, str):
238            env = self.state_sync.get_environment(environment)
239            if not env:
240                raise SQLMeshError(
241                    "Was not provided an environment suffix target and the environment doesn't exist."
242                    "Are you running for the first time and need to run plan/apply first?"
243                )
244            environment_naming_info = env.naming_info
245        else:
246            environment_naming_info = environment
247
248        deployability_index = deployability_index or (
249            DeployabilityIndex.create(self.snapshots.values())
250            if environment_naming_info.name != c.PROD
251            else DeployabilityIndex.all_deployable()
252        )
253        execution_time = execution_time or now()
254        batches = self.batches(
255            start,
256            end,
257            execution_time,
258            deployability_index=deployability_index,
259            restatements=restatements,
260            ignore_cron=ignore_cron,
261            end_bounded=end_bounded,
262            selected_snapshots=selected_snapshots,
263        )
264        if not batches:
265            return True
266
267        dag = self._dag(batches)
268
269        self.console.start_evaluation_progress(
270            {snapshot: len(intervals) for snapshot, intervals in batches.items()},
271            environment_naming_info,
272            self.default_catalog,
273        )
274
275        snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()}
276
277        def evaluate_node(node: SchedulingUnit) -> None:
278            if circuit_breaker and circuit_breaker():
279                raise CircuitBreakerError()
280
281            snapshot_name, ((start, end), batch_idx) = node
282            if batch_idx == -1:
283                return
284            snapshot = snapshots_by_name[snapshot_name]
285
286            self.console.start_snapshot_evaluation_progress(snapshot)
287
288            execution_start_ts = now_timestamp()
289            evaluation_duration_ms: t.Optional[int] = None
290
291            try:
292                assert execution_time  # mypy
293                assert deployability_index  # mypy
294                self.evaluate(snapshot, start, end, execution_time, deployability_index)
295                evaluation_duration_ms = now_timestamp() - execution_start_ts
296            finally:
297                self.console.update_snapshot_evaluation_progress(
298                    snapshot, batch_idx, evaluation_duration_ms
299                )
300
301        try:
302            with self.snapshot_evaluator.concurrent_context():
303                errors, skipped_intervals = concurrent_apply_to_dag(
304                    dag,
305                    evaluate_node,
306                    self.max_workers,
307                    raise_on_error=False,
308                )
309        finally:
310            self.state_sync.recycle()
311
312        self.console.stop_evaluation_progress(success=not errors)
313
314        skipped_snapshots = {i[0] for i in skipped_intervals}
315        for skipped in skipped_snapshots:
316            log_message = f"SKIPPED snapshot {skipped}\n"
317            self.console.log_status_update(log_message)
318            logger.info(log_message)
319
320        for error in errors:
321            if isinstance(error.__cause__, CircuitBreakerError):
322                raise error.__cause__
323            sid = error.node[0]
324            formatted_exception = "".join(format_exception(error.__cause__ or error))
325            log_message = f"FAILED processing snapshot {sid}\n{formatted_exception}"
326            self.console.log_error(log_message)
327            # Log with INFO level to prevent duplicate messages in the console.
328            logger.info(log_message)
329
330        return not errors
331
332    def _dag(self, batches: SnapshotToBatches) -> DAG[SchedulingUnit]:
333        """Builds a DAG of snapshot intervals to be evaluated.
334
335        Args:
336            batches: The batches of snapshots and intervals to evaluate.
337
338        Returns:
339            A DAG of snapshot intervals to be evaluated.
340        """
341
342        intervals_per_snapshot = {
343            snapshot.name: intervals for snapshot, intervals in batches.items()
344        }
345
346        dag = DAG[SchedulingUnit]()
347        terminal_node = ((to_datetime(0), to_datetime(0)), -1)
348
349        for snapshot, intervals in batches.items():
350            if not intervals:
351                continue
352
353            upstream_dependencies = []
354
355            for p_sid in snapshot.parents:
356                if p_sid in self.snapshots:
357                    p_intervals = intervals_per_snapshot.get(p_sid.name, [])
358
359                    if len(p_intervals) > 1:
360                        upstream_dependencies.append((p_sid.name, terminal_node))
361                    else:
362                        for i, interval in enumerate(p_intervals):
363                            upstream_dependencies.append((p_sid.name, (interval, i)))
364
365            batch_concurrency = snapshot.node.batch_concurrency
366            if snapshot.depends_on_past:
367                batch_concurrency = 1
368
369            for i, interval in enumerate(intervals):
370                node = (snapshot.name, (interval, i))
371                dag.add(node, upstream_dependencies)
372
373                if len(intervals) > 1:
374                    dag.add((snapshot.name, terminal_node), [node])
375
376                if batch_concurrency and i >= batch_concurrency:
377                    batch_idx_to_wait_for = i - batch_concurrency
378                    dag.add(
379                        node,
380                        [
381                            (
382                                snapshot.name,
383                                (intervals[batch_idx_to_wait_for], batch_idx_to_wait_for),
384                            )
385                        ],
386                    )
387        return dag

Schedules and manages the evaluation of snapshots.

The scheduler evaluates multiple snapshots with date intervals in the correct topological order. It consults the state sync to understand what intervals for each snapshot needs to be backfilled.

The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.

Arguments:
  • snapshots: A collection of snapshots.
  • snapshot_evaluator: The snapshot evaluator to execute queries.
  • state_sync: The state sync to pull saved snapshots.
  • max_workers: The maximum number of parallel queries to run.
  • console: The rich instance used for printing scheduling information.
Scheduler( snapshots: Iterable[sqlmesh.core.snapshot.definition.Snapshot], snapshot_evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, state_sync: sqlmesh.core.state_sync.base.StateSync, default_catalog: Union[str, NoneType], max_workers: int = 1, console: Union[sqlmesh.core.console.Console, NoneType] = None, notification_target_manager: Union[sqlmesh.core.notification_target.NotificationTargetManager, NoneType] = None)
65    def __init__(
66        self,
67        snapshots: t.Iterable[Snapshot],
68        snapshot_evaluator: SnapshotEvaluator,
69        state_sync: StateSync,
70        default_catalog: t.Optional[str],
71        max_workers: int = 1,
72        console: t.Optional[Console] = None,
73        notification_target_manager: t.Optional[NotificationTargetManager] = None,
74    ):
75        self.state_sync = state_sync
76        self.snapshots = {s.snapshot_id: s for s in snapshots}
77        self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
78        self.default_catalog = default_catalog
79        self.snapshot_evaluator = snapshot_evaluator
80        self.max_workers = max_workers
81        self.console = console or get_console()
82        self.notification_target_manager = (
83            notification_target_manager or NotificationTargetManager()
84        )
def batches( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, restatements: Union[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]], NoneType] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: Union[Set[str], NoneType] = None) -> Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[datetime.datetime, datetime.datetime]]]:
 86    def batches(
 87        self,
 88        start: t.Optional[TimeLike] = None,
 89        end: t.Optional[TimeLike] = None,
 90        execution_time: t.Optional[TimeLike] = None,
 91        deployability_index: t.Optional[DeployabilityIndex] = None,
 92        restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
 93        ignore_cron: bool = False,
 94        end_bounded: bool = False,
 95        selected_snapshots: t.Optional[t.Set[str]] = None,
 96    ) -> SnapshotToBatches:
 97        """Find the optimal date interval paramaters based on what needs processing and maximal batch size.
 98
 99        For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
100        calculate the missing intervals that need to be processed given the passed in start and end intervals.
101
102        If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than
103        or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression.
104        For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs
105        with 30 days and 1 job with 10.
106
107        Args:
108            start: The start of the run. Defaults to the min node start date.
109            end: The end of the run. Defaults to now.
110            execution_time: The date/time time reference to use for execution time. Defaults to now.
111            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
112            restatements: A set of snapshot names being restated.
113            ignore_cron: Whether to ignore the node's cron schedule.
114            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
115                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
116            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
117        """
118        restatements = restatements or {}
119        validate_date_range(start, end)
120
121        snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values()
122        if selected_snapshots is not None:
123            snapshots = [s for s in snapshots if s.name in selected_snapshots]
124
125        self.state_sync.refresh_snapshot_intervals(snapshots)
126
127        return compute_interval_params(
128            snapshots,
129            start=start or earliest_start_date(snapshots),
130            end=end or now(),
131            deployability_index=deployability_index,
132            execution_time=execution_time or now(),
133            restatements=restatements,
134            ignore_cron=ignore_cron,
135            end_bounded=end_bounded,
136        )

Find the optimal date interval paramaters based on what needs processing and maximal batch size.

For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.

If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs with 30 days and 1 job with 10.

Arguments:
  • start: The start of the run. Defaults to the min node start date.
  • end: The end of the run. Defaults to now.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • restatements: A set of snapshot names being restated.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
  • selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
def evaluate( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float], deployability_index: sqlmesh.core.snapshot.definition.DeployabilityIndex, **kwargs: Any) -> None:
138    def evaluate(
139        self,
140        snapshot: Snapshot,
141        start: TimeLike,
142        end: TimeLike,
143        execution_time: TimeLike,
144        deployability_index: DeployabilityIndex,
145        **kwargs: t.Any,
146    ) -> None:
147        """Evaluate a snapshot and add the processed interval to the state sync.
148
149        Args:
150            snapshot: Snapshot to evaluate.
151            start: The start datetime to render.
152            end: The end datetime to render.
153            execution_time: The date/time time reference to use for execution time. Defaults to now.
154            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
155            kwargs: Additional kwargs to pass to the renderer.
156        """
157        validate_date_range(start, end)
158
159        snapshots = {
160            self.snapshots[p_sid].name: self.snapshots[p_sid] for p_sid in snapshot.parents
161        }
162        snapshots[snapshot.name] = snapshot
163
164        if isinstance(snapshot.node, SeedModel) and not snapshot.node.is_hydrated:
165            snapshot = self.state_sync.get_snapshots([snapshot], hydrate_seeds=True)[
166                snapshot.snapshot_id
167            ]
168
169        is_deployable = deployability_index.is_deployable(snapshot)
170
171        wap_id = self.snapshot_evaluator.evaluate(
172            snapshot,
173            start=start,
174            end=end,
175            execution_time=execution_time,
176            snapshots=snapshots,
177            deployability_index=deployability_index,
178            **kwargs,
179        )
180        try:
181            self.snapshot_evaluator.audit(
182                snapshot=snapshot,
183                start=start,
184                end=end,
185                execution_time=execution_time,
186                snapshots=snapshots,
187                deployability_index=deployability_index,
188                wap_id=wap_id,
189                **kwargs,
190            )
191        except AuditError as e:
192            self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, e)
193            if is_deployable and snapshot.node.owner:
194                self.notification_target_manager.notify_user(
195                    NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, e
196                )
197            logger.error(f"Audit Failure: {traceback.format_exc()}")
198            raise e
199
200        self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)

Evaluate a snapshot and add the processed interval to the state sync.

Arguments:
  • snapshot: Snapshot to evaluate.
  • start: The start datetime to render.
  • end: The end datetime to render.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • kwargs: Additional kwargs to pass to the renderer.
def run( self, environment: 'str | EnvironmentNamingInfo', start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, restatements: Union[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]], NoneType] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: Union[Set[str], NoneType] = None, circuit_breaker: Union[Callable[[], bool], NoneType] = None, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None) -> bool:
202    def run(
203        self,
204        environment: str | EnvironmentNamingInfo,
205        start: t.Optional[TimeLike] = None,
206        end: t.Optional[TimeLike] = None,
207        execution_time: t.Optional[TimeLike] = None,
208        restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
209        ignore_cron: bool = False,
210        end_bounded: bool = False,
211        selected_snapshots: t.Optional[t.Set[str]] = None,
212        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
213        deployability_index: t.Optional[DeployabilityIndex] = None,
214    ) -> bool:
215        """Concurrently runs all snapshots in topological order.
216
217        Args:
218            environment: The environment naming info the user is targeting when applying their change.
219                Can just be the environment name if the user is targeting a remote environment and wants to get the remote
220                naming info
221            start: The start of the run. Defaults to the min node start date.
222            end: The end of the run. Defaults to now.
223            execution_time: The date/time time reference to use for execution time. Defaults to now.
224            restatements: A dict of snapshots to restate and their intervals.
225            ignore_cron: Whether to ignore the node's cron schedule.
226            end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
227                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
228            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
229            circuit_breaker: An optional handler which checks if the run should be aborted.
230            deployability_index: Determines snapshots that are deployable in the context of this render.
231
232        Returns:
233            True if the execution was successful and False otherwise.
234        """
235        restatements = restatements or {}
236        validate_date_range(start, end)
237        if isinstance(environment, str):
238            env = self.state_sync.get_environment(environment)
239            if not env:
240                raise SQLMeshError(
241                    "Was not provided an environment suffix target and the environment doesn't exist."
242                    "Are you running for the first time and need to run plan/apply first?"
243                )
244            environment_naming_info = env.naming_info
245        else:
246            environment_naming_info = environment
247
248        deployability_index = deployability_index or (
249            DeployabilityIndex.create(self.snapshots.values())
250            if environment_naming_info.name != c.PROD
251            else DeployabilityIndex.all_deployable()
252        )
253        execution_time = execution_time or now()
254        batches = self.batches(
255            start,
256            end,
257            execution_time,
258            deployability_index=deployability_index,
259            restatements=restatements,
260            ignore_cron=ignore_cron,
261            end_bounded=end_bounded,
262            selected_snapshots=selected_snapshots,
263        )
264        if not batches:
265            return True
266
267        dag = self._dag(batches)
268
269        self.console.start_evaluation_progress(
270            {snapshot: len(intervals) for snapshot, intervals in batches.items()},
271            environment_naming_info,
272            self.default_catalog,
273        )
274
275        snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()}
276
277        def evaluate_node(node: SchedulingUnit) -> None:
278            if circuit_breaker and circuit_breaker():
279                raise CircuitBreakerError()
280
281            snapshot_name, ((start, end), batch_idx) = node
282            if batch_idx == -1:
283                return
284            snapshot = snapshots_by_name[snapshot_name]
285
286            self.console.start_snapshot_evaluation_progress(snapshot)
287
288            execution_start_ts = now_timestamp()
289            evaluation_duration_ms: t.Optional[int] = None
290
291            try:
292                assert execution_time  # mypy
293                assert deployability_index  # mypy
294                self.evaluate(snapshot, start, end, execution_time, deployability_index)
295                evaluation_duration_ms = now_timestamp() - execution_start_ts
296            finally:
297                self.console.update_snapshot_evaluation_progress(
298                    snapshot, batch_idx, evaluation_duration_ms
299                )
300
301        try:
302            with self.snapshot_evaluator.concurrent_context():
303                errors, skipped_intervals = concurrent_apply_to_dag(
304                    dag,
305                    evaluate_node,
306                    self.max_workers,
307                    raise_on_error=False,
308                )
309        finally:
310            self.state_sync.recycle()
311
312        self.console.stop_evaluation_progress(success=not errors)
313
314        skipped_snapshots = {i[0] for i in skipped_intervals}
315        for skipped in skipped_snapshots:
316            log_message = f"SKIPPED snapshot {skipped}\n"
317            self.console.log_status_update(log_message)
318            logger.info(log_message)
319
320        for error in errors:
321            if isinstance(error.__cause__, CircuitBreakerError):
322                raise error.__cause__
323            sid = error.node[0]
324            formatted_exception = "".join(format_exception(error.__cause__ or error))
325            log_message = f"FAILED processing snapshot {sid}\n{formatted_exception}"
326            self.console.log_error(log_message)
327            # Log with INFO level to prevent duplicate messages in the console.
328            logger.info(log_message)
329
330        return not errors

Concurrently runs all snapshots in topological order.

Arguments:
  • environment: The environment naming info the user is targeting when applying their change. Can just be the environment name if the user is targeting a remote environment and wants to get the remote naming info
  • start: The start of the run. Defaults to the min node start date.
  • end: The end of the run. Defaults to now.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • restatements: A dict of snapshots to restate and their intervals.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
  • selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
  • circuit_breaker: An optional handler which checks if the run should be aborted.
  • deployability_index: Determines snapshots that are deployable in the context of this render.
Returns:

True if the execution was successful and False otherwise.

def compute_interval_params( snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], *, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, restatements: Union[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]], NoneType] = None, ignore_cron: bool = False, end_bounded: bool = False) -> Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[datetime.datetime, datetime.datetime]]]:
390def compute_interval_params(
391    snapshots: t.Collection[Snapshot],
392    *,
393    start: TimeLike,
394    end: TimeLike,
395    deployability_index: t.Optional[DeployabilityIndex] = None,
396    execution_time: t.Optional[TimeLike] = None,
397    restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None,
398    ignore_cron: bool = False,
399    end_bounded: bool = False,
400) -> SnapshotToBatches:
401    """Find the optimal date interval paramaters based on what needs processing and maximal batch size.
402
403    For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
404    calculate the missing intervals that need to be processed given the passed in start and end intervals.
405
406    If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than
407    or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression.
408    For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs
409    with 30 days and 1 job with 10.
410
411    Args:
412        snapshots: A set of target snapshots for which intervals should be computed.
413        intervals: A list of all snapshot intervals that should be considered.
414        start: Start of the interval.
415        end: End of the interval.
416        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
417        execution_time: The date/time time reference to use for execution time.
418        restatements: A dict of snapshot names being restated and their intervals.
419        ignore_cron: Whether to ignore the node's cron schedule.
420        end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
421            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
422
423    Returns:
424        A dict containing all snapshots needing to be run with their associated interval params.
425    """
426    snapshot_batches = {}
427
428    for snapshot, intervals in missing_intervals(
429        snapshots,
430        start=start,
431        end=end,
432        execution_time=execution_time,
433        restatements=restatements,
434        deployability_index=deployability_index,
435        ignore_cron=ignore_cron,
436        end_bounded=end_bounded,
437    ).items():
438        batches = []
439        batch_size = snapshot.node.batch_size
440        next_batch: t.List[t.Tuple[int, int]] = []
441
442        for interval in intervals:
443            if (batch_size and len(next_batch) >= batch_size) or (
444                next_batch and interval[0] != next_batch[-1][-1]
445            ):
446                batches.append((next_batch[0][0], next_batch[-1][-1]))
447                next_batch = []
448            next_batch.append(interval)
449        if next_batch:
450            batches.append((next_batch[0][0], next_batch[-1][-1]))
451        snapshot_batches[snapshot] = [(to_datetime(s), to_datetime(e)) for s, e in batches]
452
453    return snapshot_batches

Find the optimal date interval paramaters based on what needs processing and maximal batch size.

For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.

If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs with 30 days and 1 job with 10.

Arguments:
  • snapshots: A set of target snapshots for which intervals should be computed.
  • intervals: A list of all snapshot intervals that should be considered.
  • start: Start of the interval.
  • end: End of the interval.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • execution_time: The date/time time reference to use for execution time.
  • restatements: A dict of snapshot names being restated and their intervals.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
Returns:

A dict containing all snapshots needing to be run with their associated interval params.