Edit on GitHub

sqlmesh.core.scheduler

   1from __future__ import annotations
   2from dataclasses import dataclass
   3import abc
   4import logging
   5import typing as t
   6import time
   7from datetime import datetime
   8from sqlglot import exp
   9from sqlmesh.core import constants as c
  10from sqlmesh.core.console import Console, get_console
  11from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
  12from sqlmesh.core.macros import RuntimeStage
  13from sqlmesh.core.model.definition import AuditResult
  14from sqlmesh.core.node import IntervalUnit
  15from sqlmesh.core.notification_target import (
  16    NotificationEvent,
  17    NotificationTargetManager,
  18)
  19from sqlmesh.core.snapshot import (
  20    DeployabilityIndex,
  21    Snapshot,
  22    SnapshotId,
  23    SnapshotIdBatch,
  24    SnapshotEvaluator,
  25    apply_auto_restatements,
  26    earliest_start_date,
  27    missing_intervals,
  28    merge_intervals,
  29    snapshots_to_dag,
  30    Intervals,
  31)
  32from sqlmesh.core.snapshot.definition import check_ready_intervals
  33from sqlmesh.core.snapshot.definition import (
  34    Interval,
  35    expand_range,
  36    parent_snapshots_by_name,
  37)
  38from sqlmesh.core.state_sync import StateSync
  39from sqlmesh.utils import CompletionStatus
  40from sqlmesh.utils.concurrency import concurrent_apply_to_dag, NodeExecutionFailedError
  41from sqlmesh.utils.dag import DAG
  42from sqlmesh.utils.date import (
  43    TimeLike,
  44    now_timestamp,
  45    validate_date_range,
  46)
  47from sqlmesh.utils.errors import (
  48    AuditError,
  49    NodeAuditsErrors,
  50    CircuitBreakerError,
  51    SQLMeshError,
  52    SignalEvalError,
  53)
  54
  55if t.TYPE_CHECKING:
  56    from sqlmesh.core.context import ExecutionContext
  57
  58logger = logging.getLogger(__name__)
  59SnapshotToIntervals = t.Dict[Snapshot, Intervals]
  60
  61
  62class SchedulingUnit(abc.ABC):
  63    snapshot_name: str
  64
  65    def __lt__(self, other: SchedulingUnit) -> bool:
  66        return (self.__class__.__name__, self.snapshot_name) < (
  67            other.__class__.__name__,
  68            other.snapshot_name,
  69        )
  70
  71
  72@dataclass(frozen=True)
  73class EvaluateNode(SchedulingUnit):
  74    snapshot_name: str
  75    interval: Interval
  76    batch_index: int
  77
  78    def __lt__(self, other: SchedulingUnit) -> bool:
  79        if not isinstance(other, EvaluateNode):
  80            return super().__lt__(other)
  81        return (self.__class__.__name__, self.snapshot_name, self.interval, self.batch_index) < (
  82            other.__class__.__name__,
  83            other.snapshot_name,
  84            other.interval,
  85            other.batch_index,
  86        )
  87
  88
  89@dataclass(frozen=True)
  90class CreateNode(SchedulingUnit):
  91    snapshot_name: str
  92
  93
  94@dataclass(frozen=True)
  95class DummyNode(SchedulingUnit):
  96    snapshot_name: str
  97
  98
  99class Scheduler:
 100    """Schedules and manages the evaluation of snapshots.
 101
 102    The scheduler evaluates multiple snapshots with date intervals in the correct
 103    topological order. It consults the state sync to understand what intervals for each
 104    snapshot needs to be backfilled.
 105
 106    The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.
 107
 108    Args:
 109        snapshots: A collection of snapshots.
 110        snapshot_evaluator: The snapshot evaluator to execute queries.
 111        state_sync: The state sync to pull saved snapshots.
 112        max_workers: The maximum number of parallel queries to run.
 113        console: The rich instance used for printing scheduling information.
 114    """
 115
 116    def __init__(
 117        self,
 118        snapshots: t.Iterable[Snapshot],
 119        snapshot_evaluator: SnapshotEvaluator,
 120        state_sync: StateSync,
 121        default_catalog: t.Optional[str],
 122        max_workers: int = 1,
 123        console: t.Optional[Console] = None,
 124        notification_target_manager: t.Optional[NotificationTargetManager] = None,
 125    ):
 126        self.state_sync = state_sync
 127        self.snapshots = {s.snapshot_id: s for s in snapshots}
 128        self.snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()}
 129        self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
 130        self.default_catalog = default_catalog
 131        self.snapshot_evaluator = snapshot_evaluator
 132        self.max_workers = max_workers
 133        self.console = console or get_console()
 134        self.notification_target_manager = (
 135            notification_target_manager or NotificationTargetManager()
 136        )
 137
 138    def merged_missing_intervals(
 139        self,
 140        start: t.Optional[TimeLike] = None,
 141        end: t.Optional[TimeLike] = None,
 142        execution_time: t.Optional[TimeLike] = None,
 143        deployability_index: t.Optional[DeployabilityIndex] = None,
 144        restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
 145        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 146        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 147        ignore_cron: bool = False,
 148        end_bounded: bool = False,
 149        selected_snapshots: t.Optional[t.Set[str]] = None,
 150    ) -> SnapshotToIntervals:
 151        """Find the largest contiguous date interval parameters based only on what is missing.
 152
 153        For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
 154        calculate the missing intervals that need to be processed given the passed in start and end intervals.
 155
 156        This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
 157
 158        Args:
 159            start: The start of the run. Defaults to the min node start date.
 160            end: The end of the run. Defaults to now.
 161            execution_time: The date/time reference to use for execution time. Defaults to now.
 162            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 163            restatements: A set of snapshot names being restated.
 164            start_override_per_model: A mapping of model FQNs to target start dates.
 165            end_override_per_model: A mapping of model FQNs to target end dates.
 166            ignore_cron: Whether to ignore the node's cron schedule.
 167            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
 168                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
 169            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
 170        """
 171        snapshots_to_intervals = merged_missing_intervals(
 172            snapshots=self.snapshot_per_version.values(),
 173            start=start,
 174            end=end,
 175            execution_time=execution_time,
 176            deployability_index=deployability_index,
 177            restatements=restatements,
 178            start_override_per_model=start_override_per_model,
 179            end_override_per_model=end_override_per_model,
 180            ignore_cron=ignore_cron,
 181            end_bounded=end_bounded,
 182        )
 183        # Filtering snapshots after computing missing intervals because we need all snapshots in order
 184        # to correctly infer start dates.
 185        if selected_snapshots is not None:
 186            snapshots_to_intervals = {
 187                s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
 188            }
 189        return snapshots_to_intervals
 190
 191    def evaluate(
 192        self,
 193        snapshot: Snapshot,
 194        start: TimeLike,
 195        end: TimeLike,
 196        execution_time: TimeLike,
 197        deployability_index: DeployabilityIndex,
 198        batch_index: int,
 199        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
 200        allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
 201        allow_additive_snapshots: t.Optional[t.Set[str]] = None,
 202        target_table_exists: t.Optional[bool] = None,
 203        **kwargs: t.Any,
 204    ) -> t.List[AuditResult]:
 205        """Evaluate a snapshot and add the processed interval to the state sync.
 206
 207        Args:
 208            snapshot: Snapshot to evaluate.
 209            start: The start datetime to render.
 210            end: The end datetime to render.
 211            execution_time: The date/time time reference to use for execution time. Defaults to now.
 212            allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
 213            allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
 214            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 215            batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
 216            auto_restatement_enabled: Whether to enable auto restatements.
 217            target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
 218            kwargs: Additional kwargs to pass to the renderer.
 219
 220        Returns:
 221            Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.
 222        """
 223        validate_date_range(start, end)
 224
 225        snapshots = parent_snapshots_by_name(snapshot, self.snapshots)
 226
 227        is_deployable = deployability_index.is_deployable(snapshot)
 228
 229        wap_id = self.snapshot_evaluator.evaluate(
 230            snapshot,
 231            start=start,
 232            end=end,
 233            execution_time=execution_time,
 234            snapshots=snapshots,
 235            allow_destructive_snapshots=allow_destructive_snapshots,
 236            allow_additive_snapshots=allow_additive_snapshots,
 237            deployability_index=deployability_index,
 238            batch_index=batch_index,
 239            target_table_exists=target_table_exists,
 240            **kwargs,
 241        )
 242        audit_results = self._audit_snapshot(
 243            snapshot=snapshot,
 244            environment_naming_info=environment_naming_info,
 245            start=start,
 246            end=end,
 247            execution_time=execution_time,
 248            snapshots=snapshots,
 249            deployability_index=deployability_index,
 250            wap_id=wap_id,
 251            **kwargs,
 252        )
 253
 254        self.state_sync.add_interval(
 255            snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp()
 256        )
 257        return audit_results
 258
 259    def run(
 260        self,
 261        environment: str | EnvironmentNamingInfo,
 262        start: t.Optional[TimeLike] = None,
 263        end: t.Optional[TimeLike] = None,
 264        execution_time: t.Optional[TimeLike] = None,
 265        restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
 266        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 267        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 268        ignore_cron: bool = False,
 269        end_bounded: bool = False,
 270        selected_snapshots: t.Optional[t.Set[str]] = None,
 271        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 272        deployability_index: t.Optional[DeployabilityIndex] = None,
 273        auto_restatement_enabled: bool = False,
 274        run_environment_statements: bool = False,
 275    ) -> CompletionStatus:
 276        return self._run_or_audit(
 277            environment=environment,
 278            start=start,
 279            end=end,
 280            execution_time=execution_time,
 281            remove_intervals=restatements,
 282            start_override_per_model=start_override_per_model,
 283            end_override_per_model=end_override_per_model,
 284            ignore_cron=ignore_cron,
 285            end_bounded=end_bounded,
 286            selected_snapshots=selected_snapshots,
 287            circuit_breaker=circuit_breaker,
 288            deployability_index=deployability_index,
 289            auto_restatement_enabled=auto_restatement_enabled,
 290            run_environment_statements=run_environment_statements,
 291        )
 292
 293    def audit(
 294        self,
 295        environment: str | EnvironmentNamingInfo,
 296        start: TimeLike,
 297        end: TimeLike,
 298        execution_time: t.Optional[TimeLike] = None,
 299        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 300        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 301        ignore_cron: bool = False,
 302        end_bounded: bool = False,
 303        selected_snapshots: t.Optional[t.Set[str]] = None,
 304        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 305        deployability_index: t.Optional[DeployabilityIndex] = None,
 306        run_environment_statements: bool = False,
 307    ) -> CompletionStatus:
 308        # Remove the intervals from the snapshots that will be audited so that they can be recomputed
 309        # by _run_or_audit as "missing intervals" to reuse the rest of it's logic
 310        remove_intervals = {}
 311        for snapshot in self.snapshots.values():
 312            removal_intervals = snapshot.get_removal_interval(
 313                start, end, execution_time, is_preview=True
 314            )
 315            remove_intervals[snapshot.snapshot_id] = removal_intervals
 316
 317        return self._run_or_audit(
 318            environment=environment,
 319            start=start,
 320            end=end,
 321            execution_time=execution_time,
 322            remove_intervals=remove_intervals,
 323            start_override_per_model=start_override_per_model,
 324            end_override_per_model=end_override_per_model,
 325            ignore_cron=ignore_cron,
 326            end_bounded=end_bounded,
 327            selected_snapshots=selected_snapshots,
 328            circuit_breaker=circuit_breaker,
 329            deployability_index=deployability_index,
 330            run_environment_statements=run_environment_statements,
 331            audit_only=True,
 332        )
 333
 334    def batch_intervals(
 335        self,
 336        merged_intervals: SnapshotToIntervals,
 337        deployability_index: t.Optional[DeployabilityIndex],
 338        environment_naming_info: EnvironmentNamingInfo,
 339        dag: t.Optional[DAG[SnapshotId]] = None,
 340        is_restatement: bool = False,
 341    ) -> t.Dict[Snapshot, Intervals]:
 342        dag = dag or snapshots_to_dag(merged_intervals)
 343
 344        snapshot_intervals: t.Dict[SnapshotId, t.Tuple[Snapshot, t.List[Interval]]] = {
 345            snapshot.snapshot_id: (
 346                snapshot,
 347                [
 348                    i
 349                    for interval in intervals
 350                    for i in _expand_range_as_interval(*interval, snapshot.node.interval_unit)
 351                ],
 352            )
 353            for snapshot, intervals in merged_intervals.items()
 354        }
 355        snapshot_batches: t.Dict[Snapshot, Intervals] = {}
 356        all_unready_intervals: t.Dict[str, set[Interval]] = {}
 357        for snapshot_id in dag:
 358            if snapshot_id not in snapshot_intervals:
 359                continue
 360            snapshot, intervals = snapshot_intervals[snapshot_id]
 361            unready = set(intervals)
 362
 363            from sqlmesh.core.context import ExecutionContext
 364
 365            adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway)
 366
 367            parent_intervals: Intervals = []
 368            for parent_id in snapshot.parents:
 369                parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, []))
 370                if not parent_snapshot or parent_snapshot.is_external:
 371                    continue
 372
 373                parent_intervals.extend(snapshot_batches[parent_snapshot])
 374
 375            context = ExecutionContext(
 376                adapter,
 377                self.snapshots_by_name,
 378                deployability_index,
 379                default_dialect=adapter.dialect,
 380                default_catalog=self.default_catalog,
 381                is_restatement=is_restatement,
 382                parent_intervals=parent_intervals,
 383            )
 384
 385            intervals = self._check_ready_intervals(
 386                snapshot,
 387                intervals,
 388                context,
 389                environment_naming_info,
 390            )
 391            unready -= set(intervals)
 392
 393            for parent in snapshot.parents:
 394                if parent.name in all_unready_intervals:
 395                    unready.update(all_unready_intervals[parent.name])
 396
 397            all_unready_intervals[snapshot.name] = unready
 398
 399            batches = []
 400            batch_size = snapshot.node.batch_size
 401            next_batch: t.List[t.Tuple[int, int]] = []
 402
 403            for interval in interval_diff(
 404                intervals, merge_intervals(unready), uninterrupted=snapshot.depends_on_past
 405            ):
 406                if (batch_size and len(next_batch) >= batch_size) or (
 407                    next_batch and interval[0] != next_batch[-1][-1]
 408                ):
 409                    batches.append((next_batch[0][0], next_batch[-1][-1]))
 410                    next_batch = []
 411
 412                next_batch.append(interval)
 413
 414            if next_batch:
 415                batches.append((next_batch[0][0], next_batch[-1][-1]))
 416
 417            snapshot_batches[snapshot] = batches
 418
 419        return snapshot_batches
 420
 421    def run_merged_intervals(
 422        self,
 423        *,
 424        merged_intervals: SnapshotToIntervals,
 425        deployability_index: DeployabilityIndex,
 426        environment_naming_info: EnvironmentNamingInfo,
 427        execution_time: t.Optional[TimeLike] = None,
 428        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 429        start: t.Optional[TimeLike] = None,
 430        end: t.Optional[TimeLike] = None,
 431        allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
 432        selected_models: t.Optional[t.Set[str]] = None,
 433        allow_additive_snapshots: t.Optional[t.Set[str]] = None,
 434        selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 435        run_environment_statements: bool = False,
 436        audit_only: bool = False,
 437        auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
 438        is_restatement: bool = False,
 439    ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
 440        """Runs precomputed batches of missing intervals.
 441
 442        Args:
 443            merged_intervals: The snapshots and contiguous interval ranges to evaluate.
 444            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 445            environment_naming_info: The environment naming info the user is targeting when applying their change.
 446            execution_time: The date/time reference to use for execution time.
 447            circuit_breaker: An optional handler which checks if the run should be aborted.
 448            start: The start of the run.
 449            end: The end of the run.
 450            allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
 451            allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
 452            selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included.
 453
 454        Returns:
 455            A tuple of errors and skipped intervals.
 456        """
 457        execution_time = execution_time or now_timestamp()
 458
 459        selected_snapshots = [self.snapshots[sid] for sid in (selected_snapshot_ids or set())]
 460        if not selected_snapshots:
 461            selected_snapshots = list(merged_intervals)
 462
 463        # Build the full DAG from all snapshots to preserve transitive dependencies
 464        full_dag = snapshots_to_dag(self.snapshots.values())
 465
 466        # Create a subdag that includes the selected snapshots and all their upstream dependencies
 467        # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected
 468        selected_snapshot_ids_set = {s.snapshot_id for s in selected_snapshots}
 469        snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set)
 470
 471        batched_intervals = self.batch_intervals(
 472            merged_intervals,
 473            deployability_index,
 474            environment_naming_info,
 475            dag=snapshot_dag,
 476            is_restatement=is_restatement,
 477        )
 478        self.console.start_evaluation_progress(
 479            batched_intervals,
 480            environment_naming_info,
 481            self.default_catalog,
 482            audit_only=audit_only,
 483        )
 484
 485        if run_environment_statements:
 486            environment_statements = self.state_sync.get_environment_statements(
 487                environment_naming_info.name
 488            )
 489            execute_environment_statements(
 490                adapter=self.snapshot_evaluator.adapter,
 491                environment_statements=environment_statements,
 492                runtime_stage=RuntimeStage.BEFORE_ALL,
 493                environment_naming_info=environment_naming_info,
 494                default_catalog=self.default_catalog,
 495                snapshots=self.snapshots_by_name,
 496                start=start,
 497                end=end,
 498                execution_time=execution_time,
 499                selected_models=selected_models,
 500            )
 501
 502        # We only need to create physical tables if the snapshot is not representative or if it
 503        # needs backfill
 504        snapshots_to_create_candidates = [
 505            s
 506            for s in selected_snapshots
 507            if not deployability_index.is_representative(s) or s in batched_intervals
 508        ]
 509        snapshots_to_create = {
 510            s.snapshot_id
 511            for s in self.snapshot_evaluator.get_snapshots_to_create(
 512                snapshots_to_create_candidates, deployability_index
 513            )
 514        }
 515
 516        dag = self._dag(
 517            batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create
 518        )
 519
 520        def run_node(node: SchedulingUnit) -> None:
 521            if circuit_breaker and circuit_breaker():
 522                raise CircuitBreakerError()
 523            if isinstance(node, DummyNode):
 524                return
 525
 526            snapshot = self.snapshots_by_name[node.snapshot_name]
 527
 528            if isinstance(node, EvaluateNode):
 529                self.console.start_snapshot_evaluation_progress(snapshot)
 530                execution_start_ts = now_timestamp()
 531                evaluation_duration_ms: t.Optional[int] = None
 532                start, end = node.interval
 533
 534                audit_results: t.List[AuditResult] = []
 535                try:
 536                    assert execution_time  # mypy
 537                    assert deployability_index  # mypy
 538
 539                    if audit_only:
 540                        audit_results = self._audit_snapshot(
 541                            snapshot=snapshot,
 542                            environment_naming_info=environment_naming_info,
 543                            deployability_index=deployability_index,
 544                            snapshots=self.snapshots_by_name,
 545                            start=start,
 546                            end=end,
 547                            execution_time=execution_time,
 548                        )
 549                    else:
 550                        # If batch_index > 0, then the target table must exist since the first batch would have created it
 551                        target_table_exists = (
 552                            snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
 553                        )
 554                        audit_results = self.evaluate(
 555                            snapshot=snapshot,
 556                            environment_naming_info=environment_naming_info,
 557                            start=start,
 558                            end=end,
 559                            execution_time=execution_time,
 560                            deployability_index=deployability_index,
 561                            batch_index=node.batch_index,
 562                            allow_destructive_snapshots=allow_destructive_snapshots,
 563                            allow_additive_snapshots=allow_additive_snapshots,
 564                            target_table_exists=target_table_exists,
 565                            selected_models=selected_models,
 566                        )
 567
 568                    evaluation_duration_ms = now_timestamp() - execution_start_ts
 569                finally:
 570                    num_audits = len(audit_results)
 571                    num_audits_failed = sum(1 for result in audit_results if result.count)
 572
 573                    execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
 574                        SnapshotIdBatch(snapshot_id=snapshot.snapshot_id, batch_id=node.batch_index)
 575                    )
 576
 577                    self.console.update_snapshot_evaluation_progress(
 578                        snapshot,
 579                        batched_intervals[snapshot][node.batch_index],
 580                        node.batch_index,
 581                        evaluation_duration_ms,
 582                        num_audits - num_audits_failed,
 583                        num_audits_failed,
 584                        execution_stats=execution_stats,
 585                        auto_restatement_triggers=auto_restatement_triggers.get(
 586                            snapshot.snapshot_id
 587                        ),
 588                    )
 589            elif isinstance(node, CreateNode):
 590                self.snapshot_evaluator.create_snapshot(
 591                    snapshot=snapshot,
 592                    snapshots=self.snapshots_by_name,
 593                    deployability_index=deployability_index,
 594                    allow_destructive_snapshots=allow_destructive_snapshots or set(),
 595                    allow_additive_snapshots=allow_additive_snapshots or set(),
 596                )
 597
 598        try:
 599            with self.snapshot_evaluator.concurrent_context():
 600                errors, skipped_intervals = concurrent_apply_to_dag(
 601                    dag,
 602                    run_node,
 603                    self.max_workers,
 604                    raise_on_error=False,
 605                )
 606                self.console.stop_evaluation_progress(success=not errors)
 607
 608                skipped_snapshots = {
 609                    i.snapshot_name for i in skipped_intervals if isinstance(i, EvaluateNode)
 610                }
 611                self.console.log_skipped_models(skipped_snapshots)
 612                for skipped in skipped_snapshots:
 613                    logger.info(f"SKIPPED snapshot {skipped}\n")
 614
 615                for error in errors:
 616                    if isinstance(error.__cause__, CircuitBreakerError):
 617                        raise error.__cause__
 618                    logger.info(str(error), exc_info=error)
 619
 620                self.console.log_failed_models(errors)
 621
 622                return errors, skipped_intervals
 623        finally:
 624            if run_environment_statements:
 625                execute_environment_statements(
 626                    adapter=self.snapshot_evaluator.adapter,
 627                    environment_statements=environment_statements,
 628                    runtime_stage=RuntimeStage.AFTER_ALL,
 629                    environment_naming_info=environment_naming_info,
 630                    default_catalog=self.default_catalog,
 631                    snapshots=self.snapshots_by_name,
 632                    start=start,
 633                    end=end,
 634                    execution_time=execution_time,
 635                    selected_models=selected_models,
 636                )
 637
 638            self.state_sync.recycle()
 639
 640    def _dag(
 641        self,
 642        batches: SnapshotToIntervals,
 643        snapshot_dag: t.Optional[DAG[SnapshotId]] = None,
 644        snapshots_to_create: t.Optional[t.Set[SnapshotId]] = None,
 645    ) -> DAG[SchedulingUnit]:
 646        """Builds a DAG of snapshot intervals to be evaluated.
 647
 648        Args:
 649            batches: The batches of snapshots and intervals to evaluate.
 650            snapshot_dag: The DAG of all snapshots.
 651            snapshots_to_create: The snapshots with missing physical tables.
 652
 653        Returns:
 654            A DAG of snapshot intervals to be evaluated.
 655        """
 656
 657        intervals_per_snapshot = {
 658            snapshot.name: intervals for snapshot, intervals in batches.items()
 659        }
 660        snapshots_to_create = snapshots_to_create or set()
 661        original_snapshots_to_create = snapshots_to_create.copy()
 662        upstream_dependencies_cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]] = {}
 663
 664        snapshot_dag = snapshot_dag or snapshots_to_dag(batches)
 665        dag = DAG[SchedulingUnit]()
 666
 667        for snapshot_id in snapshot_dag:
 668            if snapshot_id.name not in self.snapshots_by_name:
 669                continue
 670
 671            snapshot = self.snapshots_by_name[snapshot_id.name]
 672            intervals = intervals_per_snapshot.get(snapshot.name, [])
 673
 674            upstream_dependencies: t.Set[SchedulingUnit] = set()
 675
 676            for p_sid in snapshot.parents:
 677                upstream_dependencies.update(
 678                    self._find_upstream_dependencies(
 679                        p_sid,
 680                        intervals_per_snapshot,
 681                        original_snapshots_to_create,
 682                        upstream_dependencies_cache,
 683                    )
 684                )
 685
 686            batch_concurrency = snapshot.node.batch_concurrency
 687            batch_size = snapshot.node.batch_size
 688            if snapshot.depends_on_past:
 689                batch_concurrency = 1
 690
 691            create_node: t.Optional[CreateNode] = None
 692            if snapshot.snapshot_id in original_snapshots_to_create and (
 693                snapshot.is_incremental_by_time_range
 694                or ((not batch_concurrency or batch_concurrency > 1) and batch_size)
 695                or not intervals
 696            ):
 697                # Add a separate node for table creation in case when there multiple concurrent
 698                # evaluation nodes or when there are no intervals to evaluate.
 699                create_node = CreateNode(snapshot_name=snapshot.name)
 700                dag.add(create_node, upstream_dependencies)
 701                snapshots_to_create.remove(snapshot.snapshot_id)
 702
 703            for i, interval in enumerate(intervals):
 704                node = EvaluateNode(snapshot_name=snapshot.name, interval=interval, batch_index=i)
 705
 706                if create_node:
 707                    dag.add(node, [create_node])
 708                else:
 709                    dag.add(node, upstream_dependencies)
 710
 711                if len(intervals) > 1:
 712                    dag.add(DummyNode(snapshot_name=snapshot.name), [node])
 713
 714                if batch_concurrency and i >= batch_concurrency:
 715                    batch_idx_to_wait_for = i - batch_concurrency
 716                    dag.add(
 717                        node,
 718                        [
 719                            EvaluateNode(
 720                                snapshot_name=snapshot.name,
 721                                interval=intervals[batch_idx_to_wait_for],
 722                                batch_index=batch_idx_to_wait_for,
 723                            ),
 724                        ],
 725                    )
 726        return dag
 727
 728    def _find_upstream_dependencies(
 729        self,
 730        parent_sid: SnapshotId,
 731        intervals_per_snapshot: t.Dict[str, Intervals],
 732        snapshots_to_create: t.Set[SnapshotId],
 733        cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]],
 734    ) -> t.Set[SchedulingUnit]:
 735        if parent_sid not in self.snapshots:
 736            return set()
 737        if parent_sid in cache:
 738            return cache[parent_sid]
 739
 740        p_intervals = intervals_per_snapshot.get(parent_sid.name, [])
 741
 742        parent_node: t.Optional[SchedulingUnit] = None
 743        if p_intervals:
 744            if len(p_intervals) > 1:
 745                parent_node = DummyNode(snapshot_name=parent_sid.name)
 746            else:
 747                interval = p_intervals[0]
 748                parent_node = EvaluateNode(
 749                    snapshot_name=parent_sid.name, interval=interval, batch_index=0
 750                )
 751        elif parent_sid in snapshots_to_create:
 752            parent_node = CreateNode(snapshot_name=parent_sid.name)
 753
 754        if parent_node is not None:
 755            cache[parent_sid] = {parent_node}
 756            return {parent_node}
 757
 758        # This snapshot has no intervals and doesn't need creation which means
 759        # that it can be a transitive dependency
 760        transitive_deps: t.Set[SchedulingUnit] = set()
 761        parent_snapshot = self.snapshots[parent_sid]
 762        for grandparent_sid in parent_snapshot.parents:
 763            transitive_deps.update(
 764                self._find_upstream_dependencies(
 765                    grandparent_sid, intervals_per_snapshot, snapshots_to_create, cache
 766                )
 767            )
 768        cache[parent_sid] = transitive_deps
 769        return transitive_deps
 770
 771    def _run_or_audit(
 772        self,
 773        environment: str | EnvironmentNamingInfo,
 774        start: t.Optional[TimeLike] = None,
 775        end: t.Optional[TimeLike] = None,
 776        execution_time: t.Optional[TimeLike] = None,
 777        remove_intervals: t.Optional[t.Dict[SnapshotId, Interval]] = None,
 778        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 779        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 780        ignore_cron: bool = False,
 781        end_bounded: bool = False,
 782        selected_snapshots: t.Optional[t.Set[str]] = None,
 783        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 784        deployability_index: t.Optional[DeployabilityIndex] = None,
 785        auto_restatement_enabled: bool = False,
 786        run_environment_statements: bool = False,
 787        audit_only: bool = False,
 788    ) -> CompletionStatus:
 789        """Concurrently runs or audits all snapshots in topological order.
 790
 791        Args:
 792            environment: The environment naming info the user is targeting when applying their change.
 793                Can just be the environment name if the user is targeting a remote environment and wants to get the remote
 794                naming info
 795            start: The start of the run. Defaults to the min node start date.
 796            end: The end of the run. Defaults to now.
 797            execution_time: The date/time time reference to use for execution time. Defaults to now.
 798            remove_intervals: A dict of snapshots to their intervals. For evaluation, these are the intervals that will be restated. For audits,
 799                              these are the intervals that will be reaudited
 800            start_override_per_model: A mapping of model FQNs to target start dates.
 801            end_override_per_model: A mapping of model FQNs to target end dates.
 802            ignore_cron: Whether to ignore the node's cron schedule.
 803            end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
 804                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
 805            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
 806            circuit_breaker: An optional handler which checks if the run should be aborted.
 807            deployability_index: Determines snapshots that are deployable in the context of this render.
 808            auto_restatement_enabled: Whether to enable auto restatements.
 809
 810        Returns:
 811            True if the execution was successful and False otherwise.
 812        """
 813        validate_date_range(start, end)
 814        if isinstance(environment, str):
 815            env = self.state_sync.get_environment(environment)
 816            if not env:
 817                raise SQLMeshError(
 818                    "Was not provided an environment suffix target and the environment doesn't exist."
 819                    "Are you running for the first time and need to run plan/apply first?"
 820                )
 821            environment_naming_info = env.naming_info
 822        else:
 823            environment_naming_info = environment
 824
 825        deployability_index = deployability_index or (
 826            DeployabilityIndex.create(self.snapshots.values(), start=start)
 827            if environment_naming_info.name != c.PROD
 828            else DeployabilityIndex.all_deployable()
 829        )
 830        execution_time = execution_time or now_timestamp()
 831
 832        self.state_sync.refresh_snapshot_intervals(self.snapshots.values())
 833        for s_id, interval in (remove_intervals or {}).items():
 834            self.snapshots[s_id].remove_interval(interval)
 835
 836        all_auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
 837        if auto_restatement_enabled:
 838            auto_restated_intervals, all_auto_restatement_triggers = apply_auto_restatements(
 839                self.snapshots, execution_time
 840            )
 841            self.state_sync.add_snapshots_intervals(auto_restated_intervals)
 842            self.state_sync.update_auto_restatements(
 843                {s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
 844            )
 845
 846        merged_intervals = self.merged_missing_intervals(
 847            start,
 848            end,
 849            execution_time,
 850            deployability_index=deployability_index,
 851            restatements=remove_intervals,
 852            start_override_per_model=start_override_per_model,
 853            end_override_per_model=end_override_per_model,
 854            ignore_cron=ignore_cron,
 855            end_bounded=end_bounded,
 856            selected_snapshots=selected_snapshots,
 857        )
 858        if not merged_intervals:
 859            return CompletionStatus.NOTHING_TO_DO
 860
 861        auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
 862        if all_auto_restatement_triggers:
 863            merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals}
 864            auto_restatement_triggers = {
 865                s_id: all_auto_restatement_triggers.get(s_id, [])
 866                for s_id in merged_intervals_snapshots
 867            }
 868
 869        errors, _ = self.run_merged_intervals(
 870            merged_intervals=merged_intervals,
 871            deployability_index=deployability_index,
 872            environment_naming_info=environment_naming_info,
 873            execution_time=execution_time,
 874            circuit_breaker=circuit_breaker,
 875            start=start,
 876            end=end,
 877            run_environment_statements=run_environment_statements,
 878            audit_only=audit_only,
 879            auto_restatement_triggers=auto_restatement_triggers,
 880            selected_models={
 881                s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id
 882            },
 883        )
 884
 885        return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS
 886
 887    def _audit_snapshot(
 888        self,
 889        snapshot: Snapshot,
 890        deployability_index: DeployabilityIndex,
 891        snapshots: t.Dict[str, Snapshot],
 892        start: t.Optional[TimeLike] = None,
 893        end: t.Optional[TimeLike] = None,
 894        execution_time: t.Optional[TimeLike] = None,
 895        wap_id: t.Optional[str] = None,
 896        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
 897        **kwargs: t.Any,
 898    ) -> t.List[AuditResult]:
 899        is_deployable = deployability_index.is_deployable(snapshot)
 900
 901        audit_results = self.snapshot_evaluator.audit(
 902            snapshot=snapshot,
 903            start=start,
 904            end=end,
 905            execution_time=execution_time,
 906            snapshots=snapshots,
 907            deployability_index=deployability_index,
 908            wap_id=wap_id,
 909            **kwargs,
 910        )
 911
 912        audit_errors_to_raise: t.List[AuditError] = []
 913        audit_errors_to_warn: t.List[AuditError] = []
 914        for audit_result in (result for result in audit_results if result.count):
 915            error = AuditError(
 916                audit_name=audit_result.audit.name,
 917                audit_args=audit_result.audit_args,
 918                model=snapshot.model_or_none,
 919                count=t.cast(int, audit_result.count),
 920                query=t.cast(exp.Query, audit_result.query),
 921                adapter_dialect=self.snapshot_evaluator.adapter.dialect,
 922            )
 923            self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, error)
 924            if is_deployable and snapshot.node.owner:
 925                self.notification_target_manager.notify_user(
 926                    NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, error
 927                )
 928            if audit_result.blocking:
 929                audit_errors_to_raise.append(error)
 930            else:
 931                audit_errors_to_warn.append(error)
 932
 933        if audit_errors_to_raise:
 934            raise NodeAuditsErrors(audit_errors_to_raise)
 935
 936        if environment_naming_info:
 937            for audit_error in audit_errors_to_warn:
 938                display_name = snapshot.display_name(
 939                    environment_naming_info,
 940                    self.default_catalog,
 941                    self.snapshot_evaluator.adapter.dialect,
 942                )
 943                self.console.log_warning(
 944                    f"\n{display_name}: {audit_error}.",
 945                    f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}",
 946                )
 947
 948        return audit_results
 949
 950    def _check_ready_intervals(
 951        self,
 952        snapshot: Snapshot,
 953        intervals: Intervals,
 954        context: ExecutionContext,
 955        environment_naming_info: EnvironmentNamingInfo,
 956    ) -> Intervals:
 957        """Checks if the intervals are ready for evaluation for the given snapshot.
 958
 959        This implementation also includes the signal progress tracking.
 960        Note that this will handle gaps in the provided intervals. The returned intervals
 961        may introduce new gaps.
 962
 963        Args:
 964            snapshot: The snapshot to check.
 965            intervals: The intervals to check.
 966            context: The context to use.
 967            environment_naming_info: The environment naming info to use.
 968
 969        Returns:
 970            The intervals that are ready for evaluation.
 971        """
 972        signals = snapshot.is_model and snapshot.model.render_signal_calls()
 973
 974        if not (signals and signals.signals_to_kwargs):
 975            return intervals
 976
 977        self.console.start_signal_progress(
 978            snapshot,
 979            self.default_catalog,
 980            environment_naming_info or EnvironmentNamingInfo(),
 981        )
 982
 983        for signal_idx, (signal_name, kwargs) in enumerate(signals.signals_to_kwargs.items()):
 984            # Capture intervals before signal check for display
 985            intervals_to_check = merge_intervals(intervals)
 986
 987            signal_start_ts = time.perf_counter()
 988
 989            try:
 990                intervals = check_ready_intervals(
 991                    signals.prepared_python_env[signal_name],
 992                    intervals,
 993                    context,
 994                    python_env=signals.python_env,
 995                    dialect=snapshot.model.dialect,
 996                    path=snapshot.model._path,
 997                    snapshot=snapshot,
 998                    kwargs=kwargs,
 999                )
1000            except SQLMeshError as e:
1001                raise SignalEvalError(
1002                    f"{e} '{signal_name}' for '{snapshot.model.name}' at {snapshot.model._path}"
1003                )
1004
1005            duration = time.perf_counter() - signal_start_ts
1006
1007            self.console.update_signal_progress(
1008                snapshot=snapshot,
1009                signal_name=signal_name,
1010                signal_idx=signal_idx,
1011                total_signals=len(signals.signals_to_kwargs),
1012                ready_intervals=merge_intervals(intervals),
1013                check_intervals=intervals_to_check,
1014                duration=duration,
1015            )
1016
1017        self.console.stop_signal_progress()
1018
1019        return intervals
1020
1021
1022def merged_missing_intervals(
1023    snapshots: t.Collection[Snapshot],
1024    start: t.Optional[TimeLike] = None,
1025    end: t.Optional[TimeLike] = None,
1026    execution_time: t.Optional[TimeLike] = None,
1027    deployability_index: t.Optional[DeployabilityIndex] = None,
1028    restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
1029    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1030    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1031    ignore_cron: bool = False,
1032    end_bounded: bool = False,
1033) -> SnapshotToIntervals:
1034    """Find the largest contiguous date interval parameters based only on what is missing.
1035
1036    For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
1037    calculate the missing intervals that need to be processed given the passed in start and end intervals.
1038
1039    This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
1040
1041    Args:
1042        snapshots: A set of target snapshots for which intervals should be computed.
1043        start: The start of the run. Defaults to the min node start date.
1044        end: The end of the run. Defaults to now.
1045        execution_time: The date/time reference to use for execution time. Defaults to now.
1046        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1047        restatements: A set of snapshot names being restated.
1048        start_override_per_model: A mapping of model FQNs to target start dates.
1049        end_override_per_model: A mapping of model FQNs to target end dates.
1050        ignore_cron: Whether to ignore the node's cron schedule.
1051        end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1052            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1053    """
1054    restatements = restatements or {}
1055    validate_date_range(start, end)
1056
1057    return compute_interval_params(
1058        snapshots,
1059        start=start or earliest_start_date(snapshots),
1060        end=end or now_timestamp(),
1061        deployability_index=deployability_index,
1062        execution_time=execution_time or now_timestamp(),
1063        restatements=restatements,
1064        start_override_per_model=start_override_per_model,
1065        end_override_per_model=end_override_per_model,
1066        ignore_cron=ignore_cron,
1067        end_bounded=end_bounded,
1068    )
1069
1070
1071def compute_interval_params(
1072    snapshots: t.Collection[Snapshot],
1073    *,
1074    start: TimeLike,
1075    end: TimeLike,
1076    deployability_index: t.Optional[DeployabilityIndex] = None,
1077    execution_time: t.Optional[TimeLike] = None,
1078    restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
1079    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1080    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1081    ignore_cron: bool = False,
1082    end_bounded: bool = False,
1083) -> SnapshotToIntervals:
1084    """Find the largest contiguous date interval parameters based only on what is missing.
1085
1086    For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
1087    calculate the missing intervals that need to be processed given the passed in start and end intervals.
1088
1089    This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
1090
1091    Args:
1092        snapshots: A set of target snapshots for which intervals should be computed.
1093        start: Start of the interval.
1094        end: End of the interval.
1095        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1096        execution_time: The date/time reference to use for execution time.
1097        restatements: A dict of snapshot names being restated and their intervals.
1098        start_override_per_model: A mapping of model FQNs to target start dates.
1099        end_override_per_model: A mapping of model FQNs to target end dates.
1100        ignore_cron: Whether to ignore the node's cron schedule.
1101        end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1102            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1103
1104    Returns:
1105        A dict containing all snapshots needing to be run with their associated interval params.
1106    """
1107    snapshot_merged_intervals = {}
1108
1109    for snapshot, intervals in missing_intervals(
1110        snapshots,
1111        start=start,
1112        end=end,
1113        execution_time=execution_time,
1114        restatements=restatements,
1115        deployability_index=deployability_index,
1116        start_override_per_model=start_override_per_model,
1117        end_override_per_model=end_override_per_model,
1118        ignore_cron=ignore_cron,
1119        end_bounded=end_bounded,
1120    ).items():
1121        contiguous_batch = []
1122        next_batch: Intervals = []
1123
1124        for interval in intervals:
1125            if next_batch and interval[0] != next_batch[-1][-1]:
1126                contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
1127                next_batch = []
1128            next_batch.append(interval)
1129        if next_batch:
1130            contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
1131        snapshot_merged_intervals[snapshot] = contiguous_batch
1132
1133    return snapshot_merged_intervals
1134
1135
1136def interval_diff(
1137    intervals_a: Intervals, intervals_b: Intervals, uninterrupted: bool = False
1138) -> Intervals:
1139    if not intervals_a or not intervals_b:
1140        return intervals_a
1141
1142    index_a, index_b = 0, 0
1143    len_a = len(intervals_a)
1144    len_b = len(intervals_b)
1145
1146    results = []
1147
1148    while index_a < len_a and index_b < len_b:
1149        interval_a = intervals_a[index_a]
1150        interval_b = intervals_b[index_b]
1151
1152        if interval_a[0] >= interval_b[1]:
1153            index_b += 1
1154        elif interval_b[0] >= interval_a[1]:
1155            results.append(interval_a)
1156            index_a += 1
1157        else:
1158            if uninterrupted:
1159                return results
1160
1161            if interval_a[0] >= interval_b[0]:
1162                index_a += 1
1163            else:
1164                index_b += 1
1165
1166    if index_a < len_a:
1167        interval_a = intervals_a[index_a]
1168        if interval_a[0] >= interval_b[1] or interval_b[0] >= interval_a[1]:
1169            results.extend(intervals_a[index_a:])
1170
1171    return results
1172
1173
1174def _resolve_one_snapshot_per_version(
1175    snapshots: t.Iterable[Snapshot],
1176) -> t.Dict[t.Tuple[str, str], Snapshot]:
1177    snapshot_per_version: t.Dict[t.Tuple[str, str], Snapshot] = {}
1178    for snapshot in snapshots:
1179        key = (snapshot.name, snapshot.version_get_or_generate())
1180        if key not in snapshot_per_version:
1181            snapshot_per_version[key] = snapshot
1182        else:
1183            prev_snapshot = snapshot_per_version[key]
1184            if snapshot.unpaused_ts and (
1185                not prev_snapshot.unpaused_ts or snapshot.created_ts > prev_snapshot.created_ts
1186            ):
1187                snapshot_per_version[key] = snapshot
1188
1189    return snapshot_per_version
1190
1191
1192def _expand_range_as_interval(
1193    start_ts: int, end_ts: int, interval_unit: IntervalUnit
1194) -> t.List[Interval]:
1195    values = expand_range(start_ts, end_ts, interval_unit)
1196    return [(values[i], values[i + 1]) for i in range(len(values) - 1)]
logger = <Logger sqlmesh.core.scheduler (WARNING)>
SnapshotToIntervals = typing.Dict[sqlmesh.core.snapshot.definition.Snapshot, typing.List[typing.Tuple[int, int]]]
class SchedulingUnit(abc.ABC):
63class SchedulingUnit(abc.ABC):
64    snapshot_name: str
65
66    def __lt__(self, other: SchedulingUnit) -> bool:
67        return (self.__class__.__name__, self.snapshot_name) < (
68            other.__class__.__name__,
69            other.snapshot_name,
70        )

Helper class that provides a standard way to create an ABC using inheritance.

snapshot_name: str
@dataclass(frozen=True)
class EvaluateNode(SchedulingUnit):
73@dataclass(frozen=True)
74class EvaluateNode(SchedulingUnit):
75    snapshot_name: str
76    interval: Interval
77    batch_index: int
78
79    def __lt__(self, other: SchedulingUnit) -> bool:
80        if not isinstance(other, EvaluateNode):
81            return super().__lt__(other)
82        return (self.__class__.__name__, self.snapshot_name, self.interval, self.batch_index) < (
83            other.__class__.__name__,
84            other.snapshot_name,
85            other.interval,
86            other.batch_index,
87        )
EvaluateNode(snapshot_name: str, interval: Tuple[int, int], batch_index: int)
snapshot_name: str
interval: Tuple[int, int]
batch_index: int
@dataclass(frozen=True)
class CreateNode(SchedulingUnit):
90@dataclass(frozen=True)
91class CreateNode(SchedulingUnit):
92    snapshot_name: str
CreateNode(snapshot_name: str)
snapshot_name: str
@dataclass(frozen=True)
class DummyNode(SchedulingUnit):
95@dataclass(frozen=True)
96class DummyNode(SchedulingUnit):
97    snapshot_name: str
DummyNode(snapshot_name: str)
snapshot_name: str
class Scheduler:
 100class Scheduler:
 101    """Schedules and manages the evaluation of snapshots.
 102
 103    The scheduler evaluates multiple snapshots with date intervals in the correct
 104    topological order. It consults the state sync to understand what intervals for each
 105    snapshot needs to be backfilled.
 106
 107    The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.
 108
 109    Args:
 110        snapshots: A collection of snapshots.
 111        snapshot_evaluator: The snapshot evaluator to execute queries.
 112        state_sync: The state sync to pull saved snapshots.
 113        max_workers: The maximum number of parallel queries to run.
 114        console: The rich instance used for printing scheduling information.
 115    """
 116
 117    def __init__(
 118        self,
 119        snapshots: t.Iterable[Snapshot],
 120        snapshot_evaluator: SnapshotEvaluator,
 121        state_sync: StateSync,
 122        default_catalog: t.Optional[str],
 123        max_workers: int = 1,
 124        console: t.Optional[Console] = None,
 125        notification_target_manager: t.Optional[NotificationTargetManager] = None,
 126    ):
 127        self.state_sync = state_sync
 128        self.snapshots = {s.snapshot_id: s for s in snapshots}
 129        self.snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()}
 130        self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
 131        self.default_catalog = default_catalog
 132        self.snapshot_evaluator = snapshot_evaluator
 133        self.max_workers = max_workers
 134        self.console = console or get_console()
 135        self.notification_target_manager = (
 136            notification_target_manager or NotificationTargetManager()
 137        )
 138
 139    def merged_missing_intervals(
 140        self,
 141        start: t.Optional[TimeLike] = None,
 142        end: t.Optional[TimeLike] = None,
 143        execution_time: t.Optional[TimeLike] = None,
 144        deployability_index: t.Optional[DeployabilityIndex] = None,
 145        restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
 146        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 147        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 148        ignore_cron: bool = False,
 149        end_bounded: bool = False,
 150        selected_snapshots: t.Optional[t.Set[str]] = None,
 151    ) -> SnapshotToIntervals:
 152        """Find the largest contiguous date interval parameters based only on what is missing.
 153
 154        For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
 155        calculate the missing intervals that need to be processed given the passed in start and end intervals.
 156
 157        This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
 158
 159        Args:
 160            start: The start of the run. Defaults to the min node start date.
 161            end: The end of the run. Defaults to now.
 162            execution_time: The date/time reference to use for execution time. Defaults to now.
 163            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 164            restatements: A set of snapshot names being restated.
 165            start_override_per_model: A mapping of model FQNs to target start dates.
 166            end_override_per_model: A mapping of model FQNs to target end dates.
 167            ignore_cron: Whether to ignore the node's cron schedule.
 168            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
 169                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
 170            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
 171        """
 172        snapshots_to_intervals = merged_missing_intervals(
 173            snapshots=self.snapshot_per_version.values(),
 174            start=start,
 175            end=end,
 176            execution_time=execution_time,
 177            deployability_index=deployability_index,
 178            restatements=restatements,
 179            start_override_per_model=start_override_per_model,
 180            end_override_per_model=end_override_per_model,
 181            ignore_cron=ignore_cron,
 182            end_bounded=end_bounded,
 183        )
 184        # Filtering snapshots after computing missing intervals because we need all snapshots in order
 185        # to correctly infer start dates.
 186        if selected_snapshots is not None:
 187            snapshots_to_intervals = {
 188                s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
 189            }
 190        return snapshots_to_intervals
 191
 192    def evaluate(
 193        self,
 194        snapshot: Snapshot,
 195        start: TimeLike,
 196        end: TimeLike,
 197        execution_time: TimeLike,
 198        deployability_index: DeployabilityIndex,
 199        batch_index: int,
 200        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
 201        allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
 202        allow_additive_snapshots: t.Optional[t.Set[str]] = None,
 203        target_table_exists: t.Optional[bool] = None,
 204        **kwargs: t.Any,
 205    ) -> t.List[AuditResult]:
 206        """Evaluate a snapshot and add the processed interval to the state sync.
 207
 208        Args:
 209            snapshot: Snapshot to evaluate.
 210            start: The start datetime to render.
 211            end: The end datetime to render.
 212            execution_time: The date/time time reference to use for execution time. Defaults to now.
 213            allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
 214            allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
 215            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 216            batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
 217            auto_restatement_enabled: Whether to enable auto restatements.
 218            target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
 219            kwargs: Additional kwargs to pass to the renderer.
 220
 221        Returns:
 222            Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.
 223        """
 224        validate_date_range(start, end)
 225
 226        snapshots = parent_snapshots_by_name(snapshot, self.snapshots)
 227
 228        is_deployable = deployability_index.is_deployable(snapshot)
 229
 230        wap_id = self.snapshot_evaluator.evaluate(
 231            snapshot,
 232            start=start,
 233            end=end,
 234            execution_time=execution_time,
 235            snapshots=snapshots,
 236            allow_destructive_snapshots=allow_destructive_snapshots,
 237            allow_additive_snapshots=allow_additive_snapshots,
 238            deployability_index=deployability_index,
 239            batch_index=batch_index,
 240            target_table_exists=target_table_exists,
 241            **kwargs,
 242        )
 243        audit_results = self._audit_snapshot(
 244            snapshot=snapshot,
 245            environment_naming_info=environment_naming_info,
 246            start=start,
 247            end=end,
 248            execution_time=execution_time,
 249            snapshots=snapshots,
 250            deployability_index=deployability_index,
 251            wap_id=wap_id,
 252            **kwargs,
 253        )
 254
 255        self.state_sync.add_interval(
 256            snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp()
 257        )
 258        return audit_results
 259
 260    def run(
 261        self,
 262        environment: str | EnvironmentNamingInfo,
 263        start: t.Optional[TimeLike] = None,
 264        end: t.Optional[TimeLike] = None,
 265        execution_time: t.Optional[TimeLike] = None,
 266        restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
 267        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 268        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 269        ignore_cron: bool = False,
 270        end_bounded: bool = False,
 271        selected_snapshots: t.Optional[t.Set[str]] = None,
 272        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 273        deployability_index: t.Optional[DeployabilityIndex] = None,
 274        auto_restatement_enabled: bool = False,
 275        run_environment_statements: bool = False,
 276    ) -> CompletionStatus:
 277        return self._run_or_audit(
 278            environment=environment,
 279            start=start,
 280            end=end,
 281            execution_time=execution_time,
 282            remove_intervals=restatements,
 283            start_override_per_model=start_override_per_model,
 284            end_override_per_model=end_override_per_model,
 285            ignore_cron=ignore_cron,
 286            end_bounded=end_bounded,
 287            selected_snapshots=selected_snapshots,
 288            circuit_breaker=circuit_breaker,
 289            deployability_index=deployability_index,
 290            auto_restatement_enabled=auto_restatement_enabled,
 291            run_environment_statements=run_environment_statements,
 292        )
 293
 294    def audit(
 295        self,
 296        environment: str | EnvironmentNamingInfo,
 297        start: TimeLike,
 298        end: TimeLike,
 299        execution_time: t.Optional[TimeLike] = None,
 300        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 301        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 302        ignore_cron: bool = False,
 303        end_bounded: bool = False,
 304        selected_snapshots: t.Optional[t.Set[str]] = None,
 305        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 306        deployability_index: t.Optional[DeployabilityIndex] = None,
 307        run_environment_statements: bool = False,
 308    ) -> CompletionStatus:
 309        # Remove the intervals from the snapshots that will be audited so that they can be recomputed
 310        # by _run_or_audit as "missing intervals" to reuse the rest of it's logic
 311        remove_intervals = {}
 312        for snapshot in self.snapshots.values():
 313            removal_intervals = snapshot.get_removal_interval(
 314                start, end, execution_time, is_preview=True
 315            )
 316            remove_intervals[snapshot.snapshot_id] = removal_intervals
 317
 318        return self._run_or_audit(
 319            environment=environment,
 320            start=start,
 321            end=end,
 322            execution_time=execution_time,
 323            remove_intervals=remove_intervals,
 324            start_override_per_model=start_override_per_model,
 325            end_override_per_model=end_override_per_model,
 326            ignore_cron=ignore_cron,
 327            end_bounded=end_bounded,
 328            selected_snapshots=selected_snapshots,
 329            circuit_breaker=circuit_breaker,
 330            deployability_index=deployability_index,
 331            run_environment_statements=run_environment_statements,
 332            audit_only=True,
 333        )
 334
 335    def batch_intervals(
 336        self,
 337        merged_intervals: SnapshotToIntervals,
 338        deployability_index: t.Optional[DeployabilityIndex],
 339        environment_naming_info: EnvironmentNamingInfo,
 340        dag: t.Optional[DAG[SnapshotId]] = None,
 341        is_restatement: bool = False,
 342    ) -> t.Dict[Snapshot, Intervals]:
 343        dag = dag or snapshots_to_dag(merged_intervals)
 344
 345        snapshot_intervals: t.Dict[SnapshotId, t.Tuple[Snapshot, t.List[Interval]]] = {
 346            snapshot.snapshot_id: (
 347                snapshot,
 348                [
 349                    i
 350                    for interval in intervals
 351                    for i in _expand_range_as_interval(*interval, snapshot.node.interval_unit)
 352                ],
 353            )
 354            for snapshot, intervals in merged_intervals.items()
 355        }
 356        snapshot_batches: t.Dict[Snapshot, Intervals] = {}
 357        all_unready_intervals: t.Dict[str, set[Interval]] = {}
 358        for snapshot_id in dag:
 359            if snapshot_id not in snapshot_intervals:
 360                continue
 361            snapshot, intervals = snapshot_intervals[snapshot_id]
 362            unready = set(intervals)
 363
 364            from sqlmesh.core.context import ExecutionContext
 365
 366            adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway)
 367
 368            parent_intervals: Intervals = []
 369            for parent_id in snapshot.parents:
 370                parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, []))
 371                if not parent_snapshot or parent_snapshot.is_external:
 372                    continue
 373
 374                parent_intervals.extend(snapshot_batches[parent_snapshot])
 375
 376            context = ExecutionContext(
 377                adapter,
 378                self.snapshots_by_name,
 379                deployability_index,
 380                default_dialect=adapter.dialect,
 381                default_catalog=self.default_catalog,
 382                is_restatement=is_restatement,
 383                parent_intervals=parent_intervals,
 384            )
 385
 386            intervals = self._check_ready_intervals(
 387                snapshot,
 388                intervals,
 389                context,
 390                environment_naming_info,
 391            )
 392            unready -= set(intervals)
 393
 394            for parent in snapshot.parents:
 395                if parent.name in all_unready_intervals:
 396                    unready.update(all_unready_intervals[parent.name])
 397
 398            all_unready_intervals[snapshot.name] = unready
 399
 400            batches = []
 401            batch_size = snapshot.node.batch_size
 402            next_batch: t.List[t.Tuple[int, int]] = []
 403
 404            for interval in interval_diff(
 405                intervals, merge_intervals(unready), uninterrupted=snapshot.depends_on_past
 406            ):
 407                if (batch_size and len(next_batch) >= batch_size) or (
 408                    next_batch and interval[0] != next_batch[-1][-1]
 409                ):
 410                    batches.append((next_batch[0][0], next_batch[-1][-1]))
 411                    next_batch = []
 412
 413                next_batch.append(interval)
 414
 415            if next_batch:
 416                batches.append((next_batch[0][0], next_batch[-1][-1]))
 417
 418            snapshot_batches[snapshot] = batches
 419
 420        return snapshot_batches
 421
 422    def run_merged_intervals(
 423        self,
 424        *,
 425        merged_intervals: SnapshotToIntervals,
 426        deployability_index: DeployabilityIndex,
 427        environment_naming_info: EnvironmentNamingInfo,
 428        execution_time: t.Optional[TimeLike] = None,
 429        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 430        start: t.Optional[TimeLike] = None,
 431        end: t.Optional[TimeLike] = None,
 432        allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
 433        selected_models: t.Optional[t.Set[str]] = None,
 434        allow_additive_snapshots: t.Optional[t.Set[str]] = None,
 435        selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 436        run_environment_statements: bool = False,
 437        audit_only: bool = False,
 438        auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
 439        is_restatement: bool = False,
 440    ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
 441        """Runs precomputed batches of missing intervals.
 442
 443        Args:
 444            merged_intervals: The snapshots and contiguous interval ranges to evaluate.
 445            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 446            environment_naming_info: The environment naming info the user is targeting when applying their change.
 447            execution_time: The date/time reference to use for execution time.
 448            circuit_breaker: An optional handler which checks if the run should be aborted.
 449            start: The start of the run.
 450            end: The end of the run.
 451            allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
 452            allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
 453            selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included.
 454
 455        Returns:
 456            A tuple of errors and skipped intervals.
 457        """
 458        execution_time = execution_time or now_timestamp()
 459
 460        selected_snapshots = [self.snapshots[sid] for sid in (selected_snapshot_ids or set())]
 461        if not selected_snapshots:
 462            selected_snapshots = list(merged_intervals)
 463
 464        # Build the full DAG from all snapshots to preserve transitive dependencies
 465        full_dag = snapshots_to_dag(self.snapshots.values())
 466
 467        # Create a subdag that includes the selected snapshots and all their upstream dependencies
 468        # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected
 469        selected_snapshot_ids_set = {s.snapshot_id for s in selected_snapshots}
 470        snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set)
 471
 472        batched_intervals = self.batch_intervals(
 473            merged_intervals,
 474            deployability_index,
 475            environment_naming_info,
 476            dag=snapshot_dag,
 477            is_restatement=is_restatement,
 478        )
 479        self.console.start_evaluation_progress(
 480            batched_intervals,
 481            environment_naming_info,
 482            self.default_catalog,
 483            audit_only=audit_only,
 484        )
 485
 486        if run_environment_statements:
 487            environment_statements = self.state_sync.get_environment_statements(
 488                environment_naming_info.name
 489            )
 490            execute_environment_statements(
 491                adapter=self.snapshot_evaluator.adapter,
 492                environment_statements=environment_statements,
 493                runtime_stage=RuntimeStage.BEFORE_ALL,
 494                environment_naming_info=environment_naming_info,
 495                default_catalog=self.default_catalog,
 496                snapshots=self.snapshots_by_name,
 497                start=start,
 498                end=end,
 499                execution_time=execution_time,
 500                selected_models=selected_models,
 501            )
 502
 503        # We only need to create physical tables if the snapshot is not representative or if it
 504        # needs backfill
 505        snapshots_to_create_candidates = [
 506            s
 507            for s in selected_snapshots
 508            if not deployability_index.is_representative(s) or s in batched_intervals
 509        ]
 510        snapshots_to_create = {
 511            s.snapshot_id
 512            for s in self.snapshot_evaluator.get_snapshots_to_create(
 513                snapshots_to_create_candidates, deployability_index
 514            )
 515        }
 516
 517        dag = self._dag(
 518            batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create
 519        )
 520
 521        def run_node(node: SchedulingUnit) -> None:
 522            if circuit_breaker and circuit_breaker():
 523                raise CircuitBreakerError()
 524            if isinstance(node, DummyNode):
 525                return
 526
 527            snapshot = self.snapshots_by_name[node.snapshot_name]
 528
 529            if isinstance(node, EvaluateNode):
 530                self.console.start_snapshot_evaluation_progress(snapshot)
 531                execution_start_ts = now_timestamp()
 532                evaluation_duration_ms: t.Optional[int] = None
 533                start, end = node.interval
 534
 535                audit_results: t.List[AuditResult] = []
 536                try:
 537                    assert execution_time  # mypy
 538                    assert deployability_index  # mypy
 539
 540                    if audit_only:
 541                        audit_results = self._audit_snapshot(
 542                            snapshot=snapshot,
 543                            environment_naming_info=environment_naming_info,
 544                            deployability_index=deployability_index,
 545                            snapshots=self.snapshots_by_name,
 546                            start=start,
 547                            end=end,
 548                            execution_time=execution_time,
 549                        )
 550                    else:
 551                        # If batch_index > 0, then the target table must exist since the first batch would have created it
 552                        target_table_exists = (
 553                            snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
 554                        )
 555                        audit_results = self.evaluate(
 556                            snapshot=snapshot,
 557                            environment_naming_info=environment_naming_info,
 558                            start=start,
 559                            end=end,
 560                            execution_time=execution_time,
 561                            deployability_index=deployability_index,
 562                            batch_index=node.batch_index,
 563                            allow_destructive_snapshots=allow_destructive_snapshots,
 564                            allow_additive_snapshots=allow_additive_snapshots,
 565                            target_table_exists=target_table_exists,
 566                            selected_models=selected_models,
 567                        )
 568
 569                    evaluation_duration_ms = now_timestamp() - execution_start_ts
 570                finally:
 571                    num_audits = len(audit_results)
 572                    num_audits_failed = sum(1 for result in audit_results if result.count)
 573
 574                    execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
 575                        SnapshotIdBatch(snapshot_id=snapshot.snapshot_id, batch_id=node.batch_index)
 576                    )
 577
 578                    self.console.update_snapshot_evaluation_progress(
 579                        snapshot,
 580                        batched_intervals[snapshot][node.batch_index],
 581                        node.batch_index,
 582                        evaluation_duration_ms,
 583                        num_audits - num_audits_failed,
 584                        num_audits_failed,
 585                        execution_stats=execution_stats,
 586                        auto_restatement_triggers=auto_restatement_triggers.get(
 587                            snapshot.snapshot_id
 588                        ),
 589                    )
 590            elif isinstance(node, CreateNode):
 591                self.snapshot_evaluator.create_snapshot(
 592                    snapshot=snapshot,
 593                    snapshots=self.snapshots_by_name,
 594                    deployability_index=deployability_index,
 595                    allow_destructive_snapshots=allow_destructive_snapshots or set(),
 596                    allow_additive_snapshots=allow_additive_snapshots or set(),
 597                )
 598
 599        try:
 600            with self.snapshot_evaluator.concurrent_context():
 601                errors, skipped_intervals = concurrent_apply_to_dag(
 602                    dag,
 603                    run_node,
 604                    self.max_workers,
 605                    raise_on_error=False,
 606                )
 607                self.console.stop_evaluation_progress(success=not errors)
 608
 609                skipped_snapshots = {
 610                    i.snapshot_name for i in skipped_intervals if isinstance(i, EvaluateNode)
 611                }
 612                self.console.log_skipped_models(skipped_snapshots)
 613                for skipped in skipped_snapshots:
 614                    logger.info(f"SKIPPED snapshot {skipped}\n")
 615
 616                for error in errors:
 617                    if isinstance(error.__cause__, CircuitBreakerError):
 618                        raise error.__cause__
 619                    logger.info(str(error), exc_info=error)
 620
 621                self.console.log_failed_models(errors)
 622
 623                return errors, skipped_intervals
 624        finally:
 625            if run_environment_statements:
 626                execute_environment_statements(
 627                    adapter=self.snapshot_evaluator.adapter,
 628                    environment_statements=environment_statements,
 629                    runtime_stage=RuntimeStage.AFTER_ALL,
 630                    environment_naming_info=environment_naming_info,
 631                    default_catalog=self.default_catalog,
 632                    snapshots=self.snapshots_by_name,
 633                    start=start,
 634                    end=end,
 635                    execution_time=execution_time,
 636                    selected_models=selected_models,
 637                )
 638
 639            self.state_sync.recycle()
 640
 641    def _dag(
 642        self,
 643        batches: SnapshotToIntervals,
 644        snapshot_dag: t.Optional[DAG[SnapshotId]] = None,
 645        snapshots_to_create: t.Optional[t.Set[SnapshotId]] = None,
 646    ) -> DAG[SchedulingUnit]:
 647        """Builds a DAG of snapshot intervals to be evaluated.
 648
 649        Args:
 650            batches: The batches of snapshots and intervals to evaluate.
 651            snapshot_dag: The DAG of all snapshots.
 652            snapshots_to_create: The snapshots with missing physical tables.
 653
 654        Returns:
 655            A DAG of snapshot intervals to be evaluated.
 656        """
 657
 658        intervals_per_snapshot = {
 659            snapshot.name: intervals for snapshot, intervals in batches.items()
 660        }
 661        snapshots_to_create = snapshots_to_create or set()
 662        original_snapshots_to_create = snapshots_to_create.copy()
 663        upstream_dependencies_cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]] = {}
 664
 665        snapshot_dag = snapshot_dag or snapshots_to_dag(batches)
 666        dag = DAG[SchedulingUnit]()
 667
 668        for snapshot_id in snapshot_dag:
 669            if snapshot_id.name not in self.snapshots_by_name:
 670                continue
 671
 672            snapshot = self.snapshots_by_name[snapshot_id.name]
 673            intervals = intervals_per_snapshot.get(snapshot.name, [])
 674
 675            upstream_dependencies: t.Set[SchedulingUnit] = set()
 676
 677            for p_sid in snapshot.parents:
 678                upstream_dependencies.update(
 679                    self._find_upstream_dependencies(
 680                        p_sid,
 681                        intervals_per_snapshot,
 682                        original_snapshots_to_create,
 683                        upstream_dependencies_cache,
 684                    )
 685                )
 686
 687            batch_concurrency = snapshot.node.batch_concurrency
 688            batch_size = snapshot.node.batch_size
 689            if snapshot.depends_on_past:
 690                batch_concurrency = 1
 691
 692            create_node: t.Optional[CreateNode] = None
 693            if snapshot.snapshot_id in original_snapshots_to_create and (
 694                snapshot.is_incremental_by_time_range
 695                or ((not batch_concurrency or batch_concurrency > 1) and batch_size)
 696                or not intervals
 697            ):
 698                # Add a separate node for table creation in case when there multiple concurrent
 699                # evaluation nodes or when there are no intervals to evaluate.
 700                create_node = CreateNode(snapshot_name=snapshot.name)
 701                dag.add(create_node, upstream_dependencies)
 702                snapshots_to_create.remove(snapshot.snapshot_id)
 703
 704            for i, interval in enumerate(intervals):
 705                node = EvaluateNode(snapshot_name=snapshot.name, interval=interval, batch_index=i)
 706
 707                if create_node:
 708                    dag.add(node, [create_node])
 709                else:
 710                    dag.add(node, upstream_dependencies)
 711
 712                if len(intervals) > 1:
 713                    dag.add(DummyNode(snapshot_name=snapshot.name), [node])
 714
 715                if batch_concurrency and i >= batch_concurrency:
 716                    batch_idx_to_wait_for = i - batch_concurrency
 717                    dag.add(
 718                        node,
 719                        [
 720                            EvaluateNode(
 721                                snapshot_name=snapshot.name,
 722                                interval=intervals[batch_idx_to_wait_for],
 723                                batch_index=batch_idx_to_wait_for,
 724                            ),
 725                        ],
 726                    )
 727        return dag
 728
 729    def _find_upstream_dependencies(
 730        self,
 731        parent_sid: SnapshotId,
 732        intervals_per_snapshot: t.Dict[str, Intervals],
 733        snapshots_to_create: t.Set[SnapshotId],
 734        cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]],
 735    ) -> t.Set[SchedulingUnit]:
 736        if parent_sid not in self.snapshots:
 737            return set()
 738        if parent_sid in cache:
 739            return cache[parent_sid]
 740
 741        p_intervals = intervals_per_snapshot.get(parent_sid.name, [])
 742
 743        parent_node: t.Optional[SchedulingUnit] = None
 744        if p_intervals:
 745            if len(p_intervals) > 1:
 746                parent_node = DummyNode(snapshot_name=parent_sid.name)
 747            else:
 748                interval = p_intervals[0]
 749                parent_node = EvaluateNode(
 750                    snapshot_name=parent_sid.name, interval=interval, batch_index=0
 751                )
 752        elif parent_sid in snapshots_to_create:
 753            parent_node = CreateNode(snapshot_name=parent_sid.name)
 754
 755        if parent_node is not None:
 756            cache[parent_sid] = {parent_node}
 757            return {parent_node}
 758
 759        # This snapshot has no intervals and doesn't need creation which means
 760        # that it can be a transitive dependency
 761        transitive_deps: t.Set[SchedulingUnit] = set()
 762        parent_snapshot = self.snapshots[parent_sid]
 763        for grandparent_sid in parent_snapshot.parents:
 764            transitive_deps.update(
 765                self._find_upstream_dependencies(
 766                    grandparent_sid, intervals_per_snapshot, snapshots_to_create, cache
 767                )
 768            )
 769        cache[parent_sid] = transitive_deps
 770        return transitive_deps
 771
 772    def _run_or_audit(
 773        self,
 774        environment: str | EnvironmentNamingInfo,
 775        start: t.Optional[TimeLike] = None,
 776        end: t.Optional[TimeLike] = None,
 777        execution_time: t.Optional[TimeLike] = None,
 778        remove_intervals: t.Optional[t.Dict[SnapshotId, Interval]] = None,
 779        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 780        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
 781        ignore_cron: bool = False,
 782        end_bounded: bool = False,
 783        selected_snapshots: t.Optional[t.Set[str]] = None,
 784        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 785        deployability_index: t.Optional[DeployabilityIndex] = None,
 786        auto_restatement_enabled: bool = False,
 787        run_environment_statements: bool = False,
 788        audit_only: bool = False,
 789    ) -> CompletionStatus:
 790        """Concurrently runs or audits all snapshots in topological order.
 791
 792        Args:
 793            environment: The environment naming info the user is targeting when applying their change.
 794                Can just be the environment name if the user is targeting a remote environment and wants to get the remote
 795                naming info
 796            start: The start of the run. Defaults to the min node start date.
 797            end: The end of the run. Defaults to now.
 798            execution_time: The date/time time reference to use for execution time. Defaults to now.
 799            remove_intervals: A dict of snapshots to their intervals. For evaluation, these are the intervals that will be restated. For audits,
 800                              these are the intervals that will be reaudited
 801            start_override_per_model: A mapping of model FQNs to target start dates.
 802            end_override_per_model: A mapping of model FQNs to target end dates.
 803            ignore_cron: Whether to ignore the node's cron schedule.
 804            end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
 805                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
 806            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
 807            circuit_breaker: An optional handler which checks if the run should be aborted.
 808            deployability_index: Determines snapshots that are deployable in the context of this render.
 809            auto_restatement_enabled: Whether to enable auto restatements.
 810
 811        Returns:
 812            True if the execution was successful and False otherwise.
 813        """
 814        validate_date_range(start, end)
 815        if isinstance(environment, str):
 816            env = self.state_sync.get_environment(environment)
 817            if not env:
 818                raise SQLMeshError(
 819                    "Was not provided an environment suffix target and the environment doesn't exist."
 820                    "Are you running for the first time and need to run plan/apply first?"
 821                )
 822            environment_naming_info = env.naming_info
 823        else:
 824            environment_naming_info = environment
 825
 826        deployability_index = deployability_index or (
 827            DeployabilityIndex.create(self.snapshots.values(), start=start)
 828            if environment_naming_info.name != c.PROD
 829            else DeployabilityIndex.all_deployable()
 830        )
 831        execution_time = execution_time or now_timestamp()
 832
 833        self.state_sync.refresh_snapshot_intervals(self.snapshots.values())
 834        for s_id, interval in (remove_intervals or {}).items():
 835            self.snapshots[s_id].remove_interval(interval)
 836
 837        all_auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
 838        if auto_restatement_enabled:
 839            auto_restated_intervals, all_auto_restatement_triggers = apply_auto_restatements(
 840                self.snapshots, execution_time
 841            )
 842            self.state_sync.add_snapshots_intervals(auto_restated_intervals)
 843            self.state_sync.update_auto_restatements(
 844                {s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
 845            )
 846
 847        merged_intervals = self.merged_missing_intervals(
 848            start,
 849            end,
 850            execution_time,
 851            deployability_index=deployability_index,
 852            restatements=remove_intervals,
 853            start_override_per_model=start_override_per_model,
 854            end_override_per_model=end_override_per_model,
 855            ignore_cron=ignore_cron,
 856            end_bounded=end_bounded,
 857            selected_snapshots=selected_snapshots,
 858        )
 859        if not merged_intervals:
 860            return CompletionStatus.NOTHING_TO_DO
 861
 862        auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
 863        if all_auto_restatement_triggers:
 864            merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals}
 865            auto_restatement_triggers = {
 866                s_id: all_auto_restatement_triggers.get(s_id, [])
 867                for s_id in merged_intervals_snapshots
 868            }
 869
 870        errors, _ = self.run_merged_intervals(
 871            merged_intervals=merged_intervals,
 872            deployability_index=deployability_index,
 873            environment_naming_info=environment_naming_info,
 874            execution_time=execution_time,
 875            circuit_breaker=circuit_breaker,
 876            start=start,
 877            end=end,
 878            run_environment_statements=run_environment_statements,
 879            audit_only=audit_only,
 880            auto_restatement_triggers=auto_restatement_triggers,
 881            selected_models={
 882                s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id
 883            },
 884        )
 885
 886        return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS
 887
 888    def _audit_snapshot(
 889        self,
 890        snapshot: Snapshot,
 891        deployability_index: DeployabilityIndex,
 892        snapshots: t.Dict[str, Snapshot],
 893        start: t.Optional[TimeLike] = None,
 894        end: t.Optional[TimeLike] = None,
 895        execution_time: t.Optional[TimeLike] = None,
 896        wap_id: t.Optional[str] = None,
 897        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
 898        **kwargs: t.Any,
 899    ) -> t.List[AuditResult]:
 900        is_deployable = deployability_index.is_deployable(snapshot)
 901
 902        audit_results = self.snapshot_evaluator.audit(
 903            snapshot=snapshot,
 904            start=start,
 905            end=end,
 906            execution_time=execution_time,
 907            snapshots=snapshots,
 908            deployability_index=deployability_index,
 909            wap_id=wap_id,
 910            **kwargs,
 911        )
 912
 913        audit_errors_to_raise: t.List[AuditError] = []
 914        audit_errors_to_warn: t.List[AuditError] = []
 915        for audit_result in (result for result in audit_results if result.count):
 916            error = AuditError(
 917                audit_name=audit_result.audit.name,
 918                audit_args=audit_result.audit_args,
 919                model=snapshot.model_or_none,
 920                count=t.cast(int, audit_result.count),
 921                query=t.cast(exp.Query, audit_result.query),
 922                adapter_dialect=self.snapshot_evaluator.adapter.dialect,
 923            )
 924            self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, error)
 925            if is_deployable and snapshot.node.owner:
 926                self.notification_target_manager.notify_user(
 927                    NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, error
 928                )
 929            if audit_result.blocking:
 930                audit_errors_to_raise.append(error)
 931            else:
 932                audit_errors_to_warn.append(error)
 933
 934        if audit_errors_to_raise:
 935            raise NodeAuditsErrors(audit_errors_to_raise)
 936
 937        if environment_naming_info:
 938            for audit_error in audit_errors_to_warn:
 939                display_name = snapshot.display_name(
 940                    environment_naming_info,
 941                    self.default_catalog,
 942                    self.snapshot_evaluator.adapter.dialect,
 943                )
 944                self.console.log_warning(
 945                    f"\n{display_name}: {audit_error}.",
 946                    f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}",
 947                )
 948
 949        return audit_results
 950
 951    def _check_ready_intervals(
 952        self,
 953        snapshot: Snapshot,
 954        intervals: Intervals,
 955        context: ExecutionContext,
 956        environment_naming_info: EnvironmentNamingInfo,
 957    ) -> Intervals:
 958        """Checks if the intervals are ready for evaluation for the given snapshot.
 959
 960        This implementation also includes the signal progress tracking.
 961        Note that this will handle gaps in the provided intervals. The returned intervals
 962        may introduce new gaps.
 963
 964        Args:
 965            snapshot: The snapshot to check.
 966            intervals: The intervals to check.
 967            context: The context to use.
 968            environment_naming_info: The environment naming info to use.
 969
 970        Returns:
 971            The intervals that are ready for evaluation.
 972        """
 973        signals = snapshot.is_model and snapshot.model.render_signal_calls()
 974
 975        if not (signals and signals.signals_to_kwargs):
 976            return intervals
 977
 978        self.console.start_signal_progress(
 979            snapshot,
 980            self.default_catalog,
 981            environment_naming_info or EnvironmentNamingInfo(),
 982        )
 983
 984        for signal_idx, (signal_name, kwargs) in enumerate(signals.signals_to_kwargs.items()):
 985            # Capture intervals before signal check for display
 986            intervals_to_check = merge_intervals(intervals)
 987
 988            signal_start_ts = time.perf_counter()
 989
 990            try:
 991                intervals = check_ready_intervals(
 992                    signals.prepared_python_env[signal_name],
 993                    intervals,
 994                    context,
 995                    python_env=signals.python_env,
 996                    dialect=snapshot.model.dialect,
 997                    path=snapshot.model._path,
 998                    snapshot=snapshot,
 999                    kwargs=kwargs,
1000                )
1001            except SQLMeshError as e:
1002                raise SignalEvalError(
1003                    f"{e} '{signal_name}' for '{snapshot.model.name}' at {snapshot.model._path}"
1004                )
1005
1006            duration = time.perf_counter() - signal_start_ts
1007
1008            self.console.update_signal_progress(
1009                snapshot=snapshot,
1010                signal_name=signal_name,
1011                signal_idx=signal_idx,
1012                total_signals=len(signals.signals_to_kwargs),
1013                ready_intervals=merge_intervals(intervals),
1014                check_intervals=intervals_to_check,
1015                duration=duration,
1016            )
1017
1018        self.console.stop_signal_progress()
1019
1020        return intervals

Schedules and manages the evaluation of snapshots.

The scheduler evaluates multiple snapshots with date intervals in the correct topological order. It consults the state sync to understand what intervals for each snapshot needs to be backfilled.

The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.

Arguments:
  • snapshots: A collection of snapshots.
  • snapshot_evaluator: The snapshot evaluator to execute queries.
  • state_sync: The state sync to pull saved snapshots.
  • max_workers: The maximum number of parallel queries to run.
  • console: The rich instance used for printing scheduling information.
Scheduler( snapshots: Iterable[sqlmesh.core.snapshot.definition.Snapshot], snapshot_evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, state_sync: sqlmesh.core.state_sync.base.StateSync, default_catalog: Optional[str], max_workers: int = 1, console: Optional[sqlmesh.core.console.Console] = None, notification_target_manager: Optional[sqlmesh.core.notification_target.NotificationTargetManager] = None)
117    def __init__(
118        self,
119        snapshots: t.Iterable[Snapshot],
120        snapshot_evaluator: SnapshotEvaluator,
121        state_sync: StateSync,
122        default_catalog: t.Optional[str],
123        max_workers: int = 1,
124        console: t.Optional[Console] = None,
125        notification_target_manager: t.Optional[NotificationTargetManager] = None,
126    ):
127        self.state_sync = state_sync
128        self.snapshots = {s.snapshot_id: s for s in snapshots}
129        self.snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()}
130        self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values())
131        self.default_catalog = default_catalog
132        self.snapshot_evaluator = snapshot_evaluator
133        self.max_workers = max_workers
134        self.console = console or get_console()
135        self.notification_target_manager = (
136            notification_target_manager or NotificationTargetManager()
137        )
state_sync
snapshots
snapshots_by_name
snapshot_per_version
default_catalog
snapshot_evaluator
max_workers
console
notification_target_manager
def merged_missing_intervals( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, restatements: Optional[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]]] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: Optional[Set[str]] = None) -> Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[int, int]]]:
139    def merged_missing_intervals(
140        self,
141        start: t.Optional[TimeLike] = None,
142        end: t.Optional[TimeLike] = None,
143        execution_time: t.Optional[TimeLike] = None,
144        deployability_index: t.Optional[DeployabilityIndex] = None,
145        restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
146        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
147        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
148        ignore_cron: bool = False,
149        end_bounded: bool = False,
150        selected_snapshots: t.Optional[t.Set[str]] = None,
151    ) -> SnapshotToIntervals:
152        """Find the largest contiguous date interval parameters based only on what is missing.
153
154        For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
155        calculate the missing intervals that need to be processed given the passed in start and end intervals.
156
157        This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
158
159        Args:
160            start: The start of the run. Defaults to the min node start date.
161            end: The end of the run. Defaults to now.
162            execution_time: The date/time reference to use for execution time. Defaults to now.
163            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
164            restatements: A set of snapshot names being restated.
165            start_override_per_model: A mapping of model FQNs to target start dates.
166            end_override_per_model: A mapping of model FQNs to target end dates.
167            ignore_cron: Whether to ignore the node's cron schedule.
168            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
169                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
170            selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
171        """
172        snapshots_to_intervals = merged_missing_intervals(
173            snapshots=self.snapshot_per_version.values(),
174            start=start,
175            end=end,
176            execution_time=execution_time,
177            deployability_index=deployability_index,
178            restatements=restatements,
179            start_override_per_model=start_override_per_model,
180            end_override_per_model=end_override_per_model,
181            ignore_cron=ignore_cron,
182            end_bounded=end_bounded,
183        )
184        # Filtering snapshots after computing missing intervals because we need all snapshots in order
185        # to correctly infer start dates.
186        if selected_snapshots is not None:
187            snapshots_to_intervals = {
188                s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
189            }
190        return snapshots_to_intervals

Find the largest contiguous date interval parameters based only on what is missing.

For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.

This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.

Arguments:
  • start: The start of the run. Defaults to the min node start date.
  • end: The end of the run. Defaults to now.
  • execution_time: The date/time reference to use for execution time. Defaults to now.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • restatements: A set of snapshot names being restated.
  • start_override_per_model: A mapping of model FQNs to target start dates.
  • end_override_per_model: A mapping of model FQNs to target end dates.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
  • selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
def evaluate( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float], deployability_index: sqlmesh.core.snapshot.definition.DeployabilityIndex, batch_index: int, environment_naming_info: Optional[sqlmesh.core.environment.EnvironmentNamingInfo] = None, allow_destructive_snapshots: Optional[Set[str]] = None, allow_additive_snapshots: Optional[Set[str]] = None, target_table_exists: Optional[bool] = None, **kwargs: Any) -> List[sqlmesh.core.model.definition.AuditResult]:
192    def evaluate(
193        self,
194        snapshot: Snapshot,
195        start: TimeLike,
196        end: TimeLike,
197        execution_time: TimeLike,
198        deployability_index: DeployabilityIndex,
199        batch_index: int,
200        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
201        allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
202        allow_additive_snapshots: t.Optional[t.Set[str]] = None,
203        target_table_exists: t.Optional[bool] = None,
204        **kwargs: t.Any,
205    ) -> t.List[AuditResult]:
206        """Evaluate a snapshot and add the processed interval to the state sync.
207
208        Args:
209            snapshot: Snapshot to evaluate.
210            start: The start datetime to render.
211            end: The end datetime to render.
212            execution_time: The date/time time reference to use for execution time. Defaults to now.
213            allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
214            allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
215            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
216            batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
217            auto_restatement_enabled: Whether to enable auto restatements.
218            target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
219            kwargs: Additional kwargs to pass to the renderer.
220
221        Returns:
222            Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.
223        """
224        validate_date_range(start, end)
225
226        snapshots = parent_snapshots_by_name(snapshot, self.snapshots)
227
228        is_deployable = deployability_index.is_deployable(snapshot)
229
230        wap_id = self.snapshot_evaluator.evaluate(
231            snapshot,
232            start=start,
233            end=end,
234            execution_time=execution_time,
235            snapshots=snapshots,
236            allow_destructive_snapshots=allow_destructive_snapshots,
237            allow_additive_snapshots=allow_additive_snapshots,
238            deployability_index=deployability_index,
239            batch_index=batch_index,
240            target_table_exists=target_table_exists,
241            **kwargs,
242        )
243        audit_results = self._audit_snapshot(
244            snapshot=snapshot,
245            environment_naming_info=environment_naming_info,
246            start=start,
247            end=end,
248            execution_time=execution_time,
249            snapshots=snapshots,
250            deployability_index=deployability_index,
251            wap_id=wap_id,
252            **kwargs,
253        )
254
255        self.state_sync.add_interval(
256            snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp()
257        )
258        return audit_results

Evaluate a snapshot and add the processed interval to the state sync.

Arguments:
  • snapshot: Snapshot to evaluate.
  • start: The start datetime to render.
  • end: The end datetime to render.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
  • allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
  • auto_restatement_enabled: Whether to enable auto restatements.
  • target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
  • kwargs: Additional kwargs to pass to the renderer.
Returns:

Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.

def run( self, environment: str | sqlmesh.core.environment.EnvironmentNamingInfo, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, restatements: Optional[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]]] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: Optional[Set[str]] = None, circuit_breaker: Optional[Callable[[], bool]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, auto_restatement_enabled: bool = False, run_environment_statements: bool = False) -> sqlmesh.utils.CompletionStatus:
260    def run(
261        self,
262        environment: str | EnvironmentNamingInfo,
263        start: t.Optional[TimeLike] = None,
264        end: t.Optional[TimeLike] = None,
265        execution_time: t.Optional[TimeLike] = None,
266        restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
267        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
268        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
269        ignore_cron: bool = False,
270        end_bounded: bool = False,
271        selected_snapshots: t.Optional[t.Set[str]] = None,
272        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
273        deployability_index: t.Optional[DeployabilityIndex] = None,
274        auto_restatement_enabled: bool = False,
275        run_environment_statements: bool = False,
276    ) -> CompletionStatus:
277        return self._run_or_audit(
278            environment=environment,
279            start=start,
280            end=end,
281            execution_time=execution_time,
282            remove_intervals=restatements,
283            start_override_per_model=start_override_per_model,
284            end_override_per_model=end_override_per_model,
285            ignore_cron=ignore_cron,
286            end_bounded=end_bounded,
287            selected_snapshots=selected_snapshots,
288            circuit_breaker=circuit_breaker,
289            deployability_index=deployability_index,
290            auto_restatement_enabled=auto_restatement_enabled,
291            run_environment_statements=run_environment_statements,
292        )
def audit( self, environment: str | sqlmesh.core.environment.EnvironmentNamingInfo, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: Optional[Set[str]] = None, circuit_breaker: Optional[Callable[[], bool]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, run_environment_statements: bool = False) -> sqlmesh.utils.CompletionStatus:
294    def audit(
295        self,
296        environment: str | EnvironmentNamingInfo,
297        start: TimeLike,
298        end: TimeLike,
299        execution_time: t.Optional[TimeLike] = None,
300        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
301        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
302        ignore_cron: bool = False,
303        end_bounded: bool = False,
304        selected_snapshots: t.Optional[t.Set[str]] = None,
305        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
306        deployability_index: t.Optional[DeployabilityIndex] = None,
307        run_environment_statements: bool = False,
308    ) -> CompletionStatus:
309        # Remove the intervals from the snapshots that will be audited so that they can be recomputed
310        # by _run_or_audit as "missing intervals" to reuse the rest of it's logic
311        remove_intervals = {}
312        for snapshot in self.snapshots.values():
313            removal_intervals = snapshot.get_removal_interval(
314                start, end, execution_time, is_preview=True
315            )
316            remove_intervals[snapshot.snapshot_id] = removal_intervals
317
318        return self._run_or_audit(
319            environment=environment,
320            start=start,
321            end=end,
322            execution_time=execution_time,
323            remove_intervals=remove_intervals,
324            start_override_per_model=start_override_per_model,
325            end_override_per_model=end_override_per_model,
326            ignore_cron=ignore_cron,
327            end_bounded=end_bounded,
328            selected_snapshots=selected_snapshots,
329            circuit_breaker=circuit_breaker,
330            deployability_index=deployability_index,
331            run_environment_statements=run_environment_statements,
332            audit_only=True,
333        )
def batch_intervals( self, merged_intervals: Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[int, int]]], deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dag: Optional[sqlmesh.utils.dag.DAG[sqlmesh.core.snapshot.definition.SnapshotId]] = None, is_restatement: bool = False) -> Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[int, int]]]:
335    def batch_intervals(
336        self,
337        merged_intervals: SnapshotToIntervals,
338        deployability_index: t.Optional[DeployabilityIndex],
339        environment_naming_info: EnvironmentNamingInfo,
340        dag: t.Optional[DAG[SnapshotId]] = None,
341        is_restatement: bool = False,
342    ) -> t.Dict[Snapshot, Intervals]:
343        dag = dag or snapshots_to_dag(merged_intervals)
344
345        snapshot_intervals: t.Dict[SnapshotId, t.Tuple[Snapshot, t.List[Interval]]] = {
346            snapshot.snapshot_id: (
347                snapshot,
348                [
349                    i
350                    for interval in intervals
351                    for i in _expand_range_as_interval(*interval, snapshot.node.interval_unit)
352                ],
353            )
354            for snapshot, intervals in merged_intervals.items()
355        }
356        snapshot_batches: t.Dict[Snapshot, Intervals] = {}
357        all_unready_intervals: t.Dict[str, set[Interval]] = {}
358        for snapshot_id in dag:
359            if snapshot_id not in snapshot_intervals:
360                continue
361            snapshot, intervals = snapshot_intervals[snapshot_id]
362            unready = set(intervals)
363
364            from sqlmesh.core.context import ExecutionContext
365
366            adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway)
367
368            parent_intervals: Intervals = []
369            for parent_id in snapshot.parents:
370                parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, []))
371                if not parent_snapshot or parent_snapshot.is_external:
372                    continue
373
374                parent_intervals.extend(snapshot_batches[parent_snapshot])
375
376            context = ExecutionContext(
377                adapter,
378                self.snapshots_by_name,
379                deployability_index,
380                default_dialect=adapter.dialect,
381                default_catalog=self.default_catalog,
382                is_restatement=is_restatement,
383                parent_intervals=parent_intervals,
384            )
385
386            intervals = self._check_ready_intervals(
387                snapshot,
388                intervals,
389                context,
390                environment_naming_info,
391            )
392            unready -= set(intervals)
393
394            for parent in snapshot.parents:
395                if parent.name in all_unready_intervals:
396                    unready.update(all_unready_intervals[parent.name])
397
398            all_unready_intervals[snapshot.name] = unready
399
400            batches = []
401            batch_size = snapshot.node.batch_size
402            next_batch: t.List[t.Tuple[int, int]] = []
403
404            for interval in interval_diff(
405                intervals, merge_intervals(unready), uninterrupted=snapshot.depends_on_past
406            ):
407                if (batch_size and len(next_batch) >= batch_size) or (
408                    next_batch and interval[0] != next_batch[-1][-1]
409                ):
410                    batches.append((next_batch[0][0], next_batch[-1][-1]))
411                    next_batch = []
412
413                next_batch.append(interval)
414
415            if next_batch:
416                batches.append((next_batch[0][0], next_batch[-1][-1]))
417
418            snapshot_batches[snapshot] = batches
419
420        return snapshot_batches
def run_merged_intervals( self, *, merged_intervals: Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[int, int]]], deployability_index: sqlmesh.core.snapshot.definition.DeployabilityIndex, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, circuit_breaker: Optional[Callable[[], bool]] = None, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, allow_destructive_snapshots: Optional[Set[str]] = None, selected_models: Optional[Set[str]] = None, allow_additive_snapshots: Optional[Set[str]] = None, selected_snapshot_ids: Optional[Set[sqlmesh.core.snapshot.definition.SnapshotId]] = None, run_environment_statements: bool = False, audit_only: bool = False, auto_restatement_triggers: Dict[sqlmesh.core.snapshot.definition.SnapshotId, List[sqlmesh.core.snapshot.definition.SnapshotId]] = {}, is_restatement: bool = False) -> Tuple[List[sqlmesh.utils.concurrency.NodeExecutionFailedError[SchedulingUnit]], List[SchedulingUnit]]:
422    def run_merged_intervals(
423        self,
424        *,
425        merged_intervals: SnapshotToIntervals,
426        deployability_index: DeployabilityIndex,
427        environment_naming_info: EnvironmentNamingInfo,
428        execution_time: t.Optional[TimeLike] = None,
429        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
430        start: t.Optional[TimeLike] = None,
431        end: t.Optional[TimeLike] = None,
432        allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
433        selected_models: t.Optional[t.Set[str]] = None,
434        allow_additive_snapshots: t.Optional[t.Set[str]] = None,
435        selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
436        run_environment_statements: bool = False,
437        audit_only: bool = False,
438        auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
439        is_restatement: bool = False,
440    ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
441        """Runs precomputed batches of missing intervals.
442
443        Args:
444            merged_intervals: The snapshots and contiguous interval ranges to evaluate.
445            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
446            environment_naming_info: The environment naming info the user is targeting when applying their change.
447            execution_time: The date/time reference to use for execution time.
448            circuit_breaker: An optional handler which checks if the run should be aborted.
449            start: The start of the run.
450            end: The end of the run.
451            allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
452            allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
453            selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included.
454
455        Returns:
456            A tuple of errors and skipped intervals.
457        """
458        execution_time = execution_time or now_timestamp()
459
460        selected_snapshots = [self.snapshots[sid] for sid in (selected_snapshot_ids or set())]
461        if not selected_snapshots:
462            selected_snapshots = list(merged_intervals)
463
464        # Build the full DAG from all snapshots to preserve transitive dependencies
465        full_dag = snapshots_to_dag(self.snapshots.values())
466
467        # Create a subdag that includes the selected snapshots and all their upstream dependencies
468        # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected
469        selected_snapshot_ids_set = {s.snapshot_id for s in selected_snapshots}
470        snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set)
471
472        batched_intervals = self.batch_intervals(
473            merged_intervals,
474            deployability_index,
475            environment_naming_info,
476            dag=snapshot_dag,
477            is_restatement=is_restatement,
478        )
479        self.console.start_evaluation_progress(
480            batched_intervals,
481            environment_naming_info,
482            self.default_catalog,
483            audit_only=audit_only,
484        )
485
486        if run_environment_statements:
487            environment_statements = self.state_sync.get_environment_statements(
488                environment_naming_info.name
489            )
490            execute_environment_statements(
491                adapter=self.snapshot_evaluator.adapter,
492                environment_statements=environment_statements,
493                runtime_stage=RuntimeStage.BEFORE_ALL,
494                environment_naming_info=environment_naming_info,
495                default_catalog=self.default_catalog,
496                snapshots=self.snapshots_by_name,
497                start=start,
498                end=end,
499                execution_time=execution_time,
500                selected_models=selected_models,
501            )
502
503        # We only need to create physical tables if the snapshot is not representative or if it
504        # needs backfill
505        snapshots_to_create_candidates = [
506            s
507            for s in selected_snapshots
508            if not deployability_index.is_representative(s) or s in batched_intervals
509        ]
510        snapshots_to_create = {
511            s.snapshot_id
512            for s in self.snapshot_evaluator.get_snapshots_to_create(
513                snapshots_to_create_candidates, deployability_index
514            )
515        }
516
517        dag = self._dag(
518            batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create
519        )
520
521        def run_node(node: SchedulingUnit) -> None:
522            if circuit_breaker and circuit_breaker():
523                raise CircuitBreakerError()
524            if isinstance(node, DummyNode):
525                return
526
527            snapshot = self.snapshots_by_name[node.snapshot_name]
528
529            if isinstance(node, EvaluateNode):
530                self.console.start_snapshot_evaluation_progress(snapshot)
531                execution_start_ts = now_timestamp()
532                evaluation_duration_ms: t.Optional[int] = None
533                start, end = node.interval
534
535                audit_results: t.List[AuditResult] = []
536                try:
537                    assert execution_time  # mypy
538                    assert deployability_index  # mypy
539
540                    if audit_only:
541                        audit_results = self._audit_snapshot(
542                            snapshot=snapshot,
543                            environment_naming_info=environment_naming_info,
544                            deployability_index=deployability_index,
545                            snapshots=self.snapshots_by_name,
546                            start=start,
547                            end=end,
548                            execution_time=execution_time,
549                        )
550                    else:
551                        # If batch_index > 0, then the target table must exist since the first batch would have created it
552                        target_table_exists = (
553                            snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
554                        )
555                        audit_results = self.evaluate(
556                            snapshot=snapshot,
557                            environment_naming_info=environment_naming_info,
558                            start=start,
559                            end=end,
560                            execution_time=execution_time,
561                            deployability_index=deployability_index,
562                            batch_index=node.batch_index,
563                            allow_destructive_snapshots=allow_destructive_snapshots,
564                            allow_additive_snapshots=allow_additive_snapshots,
565                            target_table_exists=target_table_exists,
566                            selected_models=selected_models,
567                        )
568
569                    evaluation_duration_ms = now_timestamp() - execution_start_ts
570                finally:
571                    num_audits = len(audit_results)
572                    num_audits_failed = sum(1 for result in audit_results if result.count)
573
574                    execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
575                        SnapshotIdBatch(snapshot_id=snapshot.snapshot_id, batch_id=node.batch_index)
576                    )
577
578                    self.console.update_snapshot_evaluation_progress(
579                        snapshot,
580                        batched_intervals[snapshot][node.batch_index],
581                        node.batch_index,
582                        evaluation_duration_ms,
583                        num_audits - num_audits_failed,
584                        num_audits_failed,
585                        execution_stats=execution_stats,
586                        auto_restatement_triggers=auto_restatement_triggers.get(
587                            snapshot.snapshot_id
588                        ),
589                    )
590            elif isinstance(node, CreateNode):
591                self.snapshot_evaluator.create_snapshot(
592                    snapshot=snapshot,
593                    snapshots=self.snapshots_by_name,
594                    deployability_index=deployability_index,
595                    allow_destructive_snapshots=allow_destructive_snapshots or set(),
596                    allow_additive_snapshots=allow_additive_snapshots or set(),
597                )
598
599        try:
600            with self.snapshot_evaluator.concurrent_context():
601                errors, skipped_intervals = concurrent_apply_to_dag(
602                    dag,
603                    run_node,
604                    self.max_workers,
605                    raise_on_error=False,
606                )
607                self.console.stop_evaluation_progress(success=not errors)
608
609                skipped_snapshots = {
610                    i.snapshot_name for i in skipped_intervals if isinstance(i, EvaluateNode)
611                }
612                self.console.log_skipped_models(skipped_snapshots)
613                for skipped in skipped_snapshots:
614                    logger.info(f"SKIPPED snapshot {skipped}\n")
615
616                for error in errors:
617                    if isinstance(error.__cause__, CircuitBreakerError):
618                        raise error.__cause__
619                    logger.info(str(error), exc_info=error)
620
621                self.console.log_failed_models(errors)
622
623                return errors, skipped_intervals
624        finally:
625            if run_environment_statements:
626                execute_environment_statements(
627                    adapter=self.snapshot_evaluator.adapter,
628                    environment_statements=environment_statements,
629                    runtime_stage=RuntimeStage.AFTER_ALL,
630                    environment_naming_info=environment_naming_info,
631                    default_catalog=self.default_catalog,
632                    snapshots=self.snapshots_by_name,
633                    start=start,
634                    end=end,
635                    execution_time=execution_time,
636                    selected_models=selected_models,
637                )
638
639            self.state_sync.recycle()

Runs precomputed batches of missing intervals.

Arguments:
  • merged_intervals: The snapshots and contiguous interval ranges to evaluate.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • environment_naming_info: The environment naming info the user is targeting when applying their change.
  • execution_time: The date/time reference to use for execution time.
  • circuit_breaker: An optional handler which checks if the run should be aborted.
  • start: The start of the run.
  • end: The end of the run.
  • allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
  • allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
  • selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included.
Returns:

A tuple of errors and skipped intervals.

def merged_missing_intervals( snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, restatements: Optional[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]]] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False) -> Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[int, int]]]:
1023def merged_missing_intervals(
1024    snapshots: t.Collection[Snapshot],
1025    start: t.Optional[TimeLike] = None,
1026    end: t.Optional[TimeLike] = None,
1027    execution_time: t.Optional[TimeLike] = None,
1028    deployability_index: t.Optional[DeployabilityIndex] = None,
1029    restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
1030    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1031    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1032    ignore_cron: bool = False,
1033    end_bounded: bool = False,
1034) -> SnapshotToIntervals:
1035    """Find the largest contiguous date interval parameters based only on what is missing.
1036
1037    For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
1038    calculate the missing intervals that need to be processed given the passed in start and end intervals.
1039
1040    This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
1041
1042    Args:
1043        snapshots: A set of target snapshots for which intervals should be computed.
1044        start: The start of the run. Defaults to the min node start date.
1045        end: The end of the run. Defaults to now.
1046        execution_time: The date/time reference to use for execution time. Defaults to now.
1047        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1048        restatements: A set of snapshot names being restated.
1049        start_override_per_model: A mapping of model FQNs to target start dates.
1050        end_override_per_model: A mapping of model FQNs to target end dates.
1051        ignore_cron: Whether to ignore the node's cron schedule.
1052        end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1053            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1054    """
1055    restatements = restatements or {}
1056    validate_date_range(start, end)
1057
1058    return compute_interval_params(
1059        snapshots,
1060        start=start or earliest_start_date(snapshots),
1061        end=end or now_timestamp(),
1062        deployability_index=deployability_index,
1063        execution_time=execution_time or now_timestamp(),
1064        restatements=restatements,
1065        start_override_per_model=start_override_per_model,
1066        end_override_per_model=end_override_per_model,
1067        ignore_cron=ignore_cron,
1068        end_bounded=end_bounded,
1069    )

Find the largest contiguous date interval parameters based only on what is missing.

For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.

This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.

Arguments:
  • snapshots: A set of target snapshots for which intervals should be computed.
  • start: The start of the run. Defaults to the min node start date.
  • end: The end of the run. Defaults to now.
  • execution_time: The date/time reference to use for execution time. Defaults to now.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • restatements: A set of snapshot names being restated.
  • start_override_per_model: A mapping of model FQNs to target start dates.
  • end_override_per_model: A mapping of model FQNs to target end dates.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
def compute_interval_params( snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], *, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, restatements: Optional[Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]]] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False) -> Dict[sqlmesh.core.snapshot.definition.Snapshot, List[Tuple[int, int]]]:
1072def compute_interval_params(
1073    snapshots: t.Collection[Snapshot],
1074    *,
1075    start: TimeLike,
1076    end: TimeLike,
1077    deployability_index: t.Optional[DeployabilityIndex] = None,
1078    execution_time: t.Optional[TimeLike] = None,
1079    restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
1080    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1081    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1082    ignore_cron: bool = False,
1083    end_bounded: bool = False,
1084) -> SnapshotToIntervals:
1085    """Find the largest contiguous date interval parameters based only on what is missing.
1086
1087    For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
1088    calculate the missing intervals that need to be processed given the passed in start and end intervals.
1089
1090    This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
1091
1092    Args:
1093        snapshots: A set of target snapshots for which intervals should be computed.
1094        start: Start of the interval.
1095        end: End of the interval.
1096        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1097        execution_time: The date/time reference to use for execution time.
1098        restatements: A dict of snapshot names being restated and their intervals.
1099        start_override_per_model: A mapping of model FQNs to target start dates.
1100        end_override_per_model: A mapping of model FQNs to target end dates.
1101        ignore_cron: Whether to ignore the node's cron schedule.
1102        end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1103            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1104
1105    Returns:
1106        A dict containing all snapshots needing to be run with their associated interval params.
1107    """
1108    snapshot_merged_intervals = {}
1109
1110    for snapshot, intervals in missing_intervals(
1111        snapshots,
1112        start=start,
1113        end=end,
1114        execution_time=execution_time,
1115        restatements=restatements,
1116        deployability_index=deployability_index,
1117        start_override_per_model=start_override_per_model,
1118        end_override_per_model=end_override_per_model,
1119        ignore_cron=ignore_cron,
1120        end_bounded=end_bounded,
1121    ).items():
1122        contiguous_batch = []
1123        next_batch: Intervals = []
1124
1125        for interval in intervals:
1126            if next_batch and interval[0] != next_batch[-1][-1]:
1127                contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
1128                next_batch = []
1129            next_batch.append(interval)
1130        if next_batch:
1131            contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
1132        snapshot_merged_intervals[snapshot] = contiguous_batch
1133
1134    return snapshot_merged_intervals

Find the largest contiguous date interval parameters based only on what is missing.

For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.

This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.

Arguments:
  • snapshots: A set of target snapshots for which intervals should be computed.
  • start: Start of the interval.
  • end: End of the interval.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • execution_time: The date/time reference to use for execution time.
  • restatements: A dict of snapshot names being restated and their intervals.
  • start_override_per_model: A mapping of model FQNs to target start dates.
  • end_override_per_model: A mapping of model FQNs to target end dates.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
Returns:

A dict containing all snapshots needing to be run with their associated interval params.

def interval_diff( intervals_a: List[Tuple[int, int]], intervals_b: List[Tuple[int, int]], uninterrupted: bool = False) -> List[Tuple[int, int]]:
1137def interval_diff(
1138    intervals_a: Intervals, intervals_b: Intervals, uninterrupted: bool = False
1139) -> Intervals:
1140    if not intervals_a or not intervals_b:
1141        return intervals_a
1142
1143    index_a, index_b = 0, 0
1144    len_a = len(intervals_a)
1145    len_b = len(intervals_b)
1146
1147    results = []
1148
1149    while index_a < len_a and index_b < len_b:
1150        interval_a = intervals_a[index_a]
1151        interval_b = intervals_b[index_b]
1152
1153        if interval_a[0] >= interval_b[1]:
1154            index_b += 1
1155        elif interval_b[0] >= interval_a[1]:
1156            results.append(interval_a)
1157            index_a += 1
1158        else:
1159            if uninterrupted:
1160                return results
1161
1162            if interval_a[0] >= interval_b[0]:
1163                index_a += 1
1164            else:
1165                index_b += 1
1166
1167    if index_a < len_a:
1168        interval_a = intervals_a[index_a]
1169        if interval_a[0] >= interval_b[1] or interval_b[0] >= interval_a[1]:
1170            results.extend(intervals_a[index_a:])
1171
1172    return results