sqlmesh.core.scheduler
1from __future__ import annotations 2from dataclasses import dataclass 3import abc 4import logging 5import typing as t 6import time 7from datetime import datetime 8from sqlglot import exp 9from sqlmesh.core import constants as c 10from sqlmesh.core.console import Console, get_console 11from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements 12from sqlmesh.core.macros import RuntimeStage 13from sqlmesh.core.model.definition import AuditResult 14from sqlmesh.core.node import IntervalUnit 15from sqlmesh.core.notification_target import ( 16 NotificationEvent, 17 NotificationTargetManager, 18) 19from sqlmesh.core.snapshot import ( 20 DeployabilityIndex, 21 Snapshot, 22 SnapshotId, 23 SnapshotIdBatch, 24 SnapshotEvaluator, 25 apply_auto_restatements, 26 earliest_start_date, 27 missing_intervals, 28 merge_intervals, 29 snapshots_to_dag, 30 Intervals, 31) 32from sqlmesh.core.snapshot.definition import check_ready_intervals 33from sqlmesh.core.snapshot.definition import ( 34 Interval, 35 expand_range, 36 parent_snapshots_by_name, 37) 38from sqlmesh.core.state_sync import StateSync 39from sqlmesh.utils import CompletionStatus 40from sqlmesh.utils.concurrency import concurrent_apply_to_dag, NodeExecutionFailedError 41from sqlmesh.utils.dag import DAG 42from sqlmesh.utils.date import ( 43 TimeLike, 44 now_timestamp, 45 validate_date_range, 46) 47from sqlmesh.utils.errors import ( 48 AuditError, 49 NodeAuditsErrors, 50 CircuitBreakerError, 51 SQLMeshError, 52 SignalEvalError, 53) 54 55if t.TYPE_CHECKING: 56 from sqlmesh.core.context import ExecutionContext 57 58logger = logging.getLogger(__name__) 59SnapshotToIntervals = t.Dict[Snapshot, Intervals] 60 61 62class SchedulingUnit(abc.ABC): 63 snapshot_name: str 64 65 def __lt__(self, other: SchedulingUnit) -> bool: 66 return (self.__class__.__name__, self.snapshot_name) < ( 67 other.__class__.__name__, 68 other.snapshot_name, 69 ) 70 71 72@dataclass(frozen=True) 73class EvaluateNode(SchedulingUnit): 74 snapshot_name: str 75 interval: Interval 76 batch_index: int 77 78 def __lt__(self, other: SchedulingUnit) -> bool: 79 if not isinstance(other, EvaluateNode): 80 return super().__lt__(other) 81 return (self.__class__.__name__, self.snapshot_name, self.interval, self.batch_index) < ( 82 other.__class__.__name__, 83 other.snapshot_name, 84 other.interval, 85 other.batch_index, 86 ) 87 88 89@dataclass(frozen=True) 90class CreateNode(SchedulingUnit): 91 snapshot_name: str 92 93 94@dataclass(frozen=True) 95class DummyNode(SchedulingUnit): 96 snapshot_name: str 97 98 99class Scheduler: 100 """Schedules and manages the evaluation of snapshots. 101 102 The scheduler evaluates multiple snapshots with date intervals in the correct 103 topological order. It consults the state sync to understand what intervals for each 104 snapshot needs to be backfilled. 105 106 The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine. 107 108 Args: 109 snapshots: A collection of snapshots. 110 snapshot_evaluator: The snapshot evaluator to execute queries. 111 state_sync: The state sync to pull saved snapshots. 112 max_workers: The maximum number of parallel queries to run. 113 console: The rich instance used for printing scheduling information. 114 """ 115 116 def __init__( 117 self, 118 snapshots: t.Iterable[Snapshot], 119 snapshot_evaluator: SnapshotEvaluator, 120 state_sync: StateSync, 121 default_catalog: t.Optional[str], 122 max_workers: int = 1, 123 console: t.Optional[Console] = None, 124 notification_target_manager: t.Optional[NotificationTargetManager] = None, 125 ): 126 self.state_sync = state_sync 127 self.snapshots = {s.snapshot_id: s for s in snapshots} 128 self.snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()} 129 self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values()) 130 self.default_catalog = default_catalog 131 self.snapshot_evaluator = snapshot_evaluator 132 self.max_workers = max_workers 133 self.console = console or get_console() 134 self.notification_target_manager = ( 135 notification_target_manager or NotificationTargetManager() 136 ) 137 138 def merged_missing_intervals( 139 self, 140 start: t.Optional[TimeLike] = None, 141 end: t.Optional[TimeLike] = None, 142 execution_time: t.Optional[TimeLike] = None, 143 deployability_index: t.Optional[DeployabilityIndex] = None, 144 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 145 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 146 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 147 ignore_cron: bool = False, 148 end_bounded: bool = False, 149 selected_snapshots: t.Optional[t.Set[str]] = None, 150 ) -> SnapshotToIntervals: 151 """Find the largest contiguous date interval parameters based only on what is missing. 152 153 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 154 calculate the missing intervals that need to be processed given the passed in start and end intervals. 155 156 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 157 158 Args: 159 start: The start of the run. Defaults to the min node start date. 160 end: The end of the run. Defaults to now. 161 execution_time: The date/time reference to use for execution time. Defaults to now. 162 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 163 restatements: A set of snapshot names being restated. 164 start_override_per_model: A mapping of model FQNs to target start dates. 165 end_override_per_model: A mapping of model FQNs to target end dates. 166 ignore_cron: Whether to ignore the node's cron schedule. 167 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 168 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 169 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 170 """ 171 snapshots_to_intervals = merged_missing_intervals( 172 snapshots=self.snapshot_per_version.values(), 173 start=start, 174 end=end, 175 execution_time=execution_time, 176 deployability_index=deployability_index, 177 restatements=restatements, 178 start_override_per_model=start_override_per_model, 179 end_override_per_model=end_override_per_model, 180 ignore_cron=ignore_cron, 181 end_bounded=end_bounded, 182 ) 183 # Filtering snapshots after computing missing intervals because we need all snapshots in order 184 # to correctly infer start dates. 185 if selected_snapshots is not None: 186 snapshots_to_intervals = { 187 s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots 188 } 189 return snapshots_to_intervals 190 191 def evaluate( 192 self, 193 snapshot: Snapshot, 194 start: TimeLike, 195 end: TimeLike, 196 execution_time: TimeLike, 197 deployability_index: DeployabilityIndex, 198 batch_index: int, 199 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 200 allow_destructive_snapshots: t.Optional[t.Set[str]] = None, 201 allow_additive_snapshots: t.Optional[t.Set[str]] = None, 202 target_table_exists: t.Optional[bool] = None, 203 **kwargs: t.Any, 204 ) -> t.List[AuditResult]: 205 """Evaluate a snapshot and add the processed interval to the state sync. 206 207 Args: 208 snapshot: Snapshot to evaluate. 209 start: The start datetime to render. 210 end: The end datetime to render. 211 execution_time: The date/time time reference to use for execution time. Defaults to now. 212 allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. 213 allow_additive_snapshots: Snapshots for which additive schema changes are allowed. 214 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 215 batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it 216 auto_restatement_enabled: Whether to enable auto restatements. 217 target_table_exists: Whether the target table exists. If None, the table will be checked for existence. 218 kwargs: Additional kwargs to pass to the renderer. 219 220 Returns: 221 Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn. 222 """ 223 validate_date_range(start, end) 224 225 snapshots = parent_snapshots_by_name(snapshot, self.snapshots) 226 227 is_deployable = deployability_index.is_deployable(snapshot) 228 229 wap_id = self.snapshot_evaluator.evaluate( 230 snapshot, 231 start=start, 232 end=end, 233 execution_time=execution_time, 234 snapshots=snapshots, 235 allow_destructive_snapshots=allow_destructive_snapshots, 236 allow_additive_snapshots=allow_additive_snapshots, 237 deployability_index=deployability_index, 238 batch_index=batch_index, 239 target_table_exists=target_table_exists, 240 **kwargs, 241 ) 242 audit_results = self._audit_snapshot( 243 snapshot=snapshot, 244 environment_naming_info=environment_naming_info, 245 start=start, 246 end=end, 247 execution_time=execution_time, 248 snapshots=snapshots, 249 deployability_index=deployability_index, 250 wap_id=wap_id, 251 **kwargs, 252 ) 253 254 self.state_sync.add_interval( 255 snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp() 256 ) 257 return audit_results 258 259 def run( 260 self, 261 environment: str | EnvironmentNamingInfo, 262 start: t.Optional[TimeLike] = None, 263 end: t.Optional[TimeLike] = None, 264 execution_time: t.Optional[TimeLike] = None, 265 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 266 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 267 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 268 ignore_cron: bool = False, 269 end_bounded: bool = False, 270 selected_snapshots: t.Optional[t.Set[str]] = None, 271 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 272 deployability_index: t.Optional[DeployabilityIndex] = None, 273 auto_restatement_enabled: bool = False, 274 run_environment_statements: bool = False, 275 ) -> CompletionStatus: 276 return self._run_or_audit( 277 environment=environment, 278 start=start, 279 end=end, 280 execution_time=execution_time, 281 remove_intervals=restatements, 282 start_override_per_model=start_override_per_model, 283 end_override_per_model=end_override_per_model, 284 ignore_cron=ignore_cron, 285 end_bounded=end_bounded, 286 selected_snapshots=selected_snapshots, 287 circuit_breaker=circuit_breaker, 288 deployability_index=deployability_index, 289 auto_restatement_enabled=auto_restatement_enabled, 290 run_environment_statements=run_environment_statements, 291 ) 292 293 def audit( 294 self, 295 environment: str | EnvironmentNamingInfo, 296 start: TimeLike, 297 end: TimeLike, 298 execution_time: t.Optional[TimeLike] = None, 299 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 300 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 301 ignore_cron: bool = False, 302 end_bounded: bool = False, 303 selected_snapshots: t.Optional[t.Set[str]] = None, 304 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 305 deployability_index: t.Optional[DeployabilityIndex] = None, 306 run_environment_statements: bool = False, 307 ) -> CompletionStatus: 308 # Remove the intervals from the snapshots that will be audited so that they can be recomputed 309 # by _run_or_audit as "missing intervals" to reuse the rest of it's logic 310 remove_intervals = {} 311 for snapshot in self.snapshots.values(): 312 removal_intervals = snapshot.get_removal_interval( 313 start, end, execution_time, is_preview=True 314 ) 315 remove_intervals[snapshot.snapshot_id] = removal_intervals 316 317 return self._run_or_audit( 318 environment=environment, 319 start=start, 320 end=end, 321 execution_time=execution_time, 322 remove_intervals=remove_intervals, 323 start_override_per_model=start_override_per_model, 324 end_override_per_model=end_override_per_model, 325 ignore_cron=ignore_cron, 326 end_bounded=end_bounded, 327 selected_snapshots=selected_snapshots, 328 circuit_breaker=circuit_breaker, 329 deployability_index=deployability_index, 330 run_environment_statements=run_environment_statements, 331 audit_only=True, 332 ) 333 334 def batch_intervals( 335 self, 336 merged_intervals: SnapshotToIntervals, 337 deployability_index: t.Optional[DeployabilityIndex], 338 environment_naming_info: EnvironmentNamingInfo, 339 dag: t.Optional[DAG[SnapshotId]] = None, 340 is_restatement: bool = False, 341 ) -> t.Dict[Snapshot, Intervals]: 342 dag = dag or snapshots_to_dag(merged_intervals) 343 344 snapshot_intervals: t.Dict[SnapshotId, t.Tuple[Snapshot, t.List[Interval]]] = { 345 snapshot.snapshot_id: ( 346 snapshot, 347 [ 348 i 349 for interval in intervals 350 for i in _expand_range_as_interval(*interval, snapshot.node.interval_unit) 351 ], 352 ) 353 for snapshot, intervals in merged_intervals.items() 354 } 355 snapshot_batches: t.Dict[Snapshot, Intervals] = {} 356 all_unready_intervals: t.Dict[str, set[Interval]] = {} 357 for snapshot_id in dag: 358 if snapshot_id not in snapshot_intervals: 359 continue 360 snapshot, intervals = snapshot_intervals[snapshot_id] 361 unready = set(intervals) 362 363 from sqlmesh.core.context import ExecutionContext 364 365 adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway) 366 367 parent_intervals: Intervals = [] 368 for parent_id in snapshot.parents: 369 parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, [])) 370 if not parent_snapshot or parent_snapshot.is_external: 371 continue 372 373 parent_intervals.extend(snapshot_batches[parent_snapshot]) 374 375 context = ExecutionContext( 376 adapter, 377 self.snapshots_by_name, 378 deployability_index, 379 default_dialect=adapter.dialect, 380 default_catalog=self.default_catalog, 381 is_restatement=is_restatement, 382 parent_intervals=parent_intervals, 383 ) 384 385 intervals = self._check_ready_intervals( 386 snapshot, 387 intervals, 388 context, 389 environment_naming_info, 390 ) 391 unready -= set(intervals) 392 393 for parent in snapshot.parents: 394 if parent.name in all_unready_intervals: 395 unready.update(all_unready_intervals[parent.name]) 396 397 all_unready_intervals[snapshot.name] = unready 398 399 batches = [] 400 batch_size = snapshot.node.batch_size 401 next_batch: t.List[t.Tuple[int, int]] = [] 402 403 for interval in interval_diff( 404 intervals, merge_intervals(unready), uninterrupted=snapshot.depends_on_past 405 ): 406 if (batch_size and len(next_batch) >= batch_size) or ( 407 next_batch and interval[0] != next_batch[-1][-1] 408 ): 409 batches.append((next_batch[0][0], next_batch[-1][-1])) 410 next_batch = [] 411 412 next_batch.append(interval) 413 414 if next_batch: 415 batches.append((next_batch[0][0], next_batch[-1][-1])) 416 417 snapshot_batches[snapshot] = batches 418 419 return snapshot_batches 420 421 def run_merged_intervals( 422 self, 423 *, 424 merged_intervals: SnapshotToIntervals, 425 deployability_index: DeployabilityIndex, 426 environment_naming_info: EnvironmentNamingInfo, 427 execution_time: t.Optional[TimeLike] = None, 428 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 429 start: t.Optional[TimeLike] = None, 430 end: t.Optional[TimeLike] = None, 431 allow_destructive_snapshots: t.Optional[t.Set[str]] = None, 432 selected_models: t.Optional[t.Set[str]] = None, 433 allow_additive_snapshots: t.Optional[t.Set[str]] = None, 434 selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 435 run_environment_statements: bool = False, 436 audit_only: bool = False, 437 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, 438 is_restatement: bool = False, 439 ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: 440 """Runs precomputed batches of missing intervals. 441 442 Args: 443 merged_intervals: The snapshots and contiguous interval ranges to evaluate. 444 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 445 environment_naming_info: The environment naming info the user is targeting when applying their change. 446 execution_time: The date/time reference to use for execution time. 447 circuit_breaker: An optional handler which checks if the run should be aborted. 448 start: The start of the run. 449 end: The end of the run. 450 allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. 451 allow_additive_snapshots: Snapshots for which additive schema changes are allowed. 452 selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included. 453 454 Returns: 455 A tuple of errors and skipped intervals. 456 """ 457 execution_time = execution_time or now_timestamp() 458 459 selected_snapshots = [self.snapshots[sid] for sid in (selected_snapshot_ids or set())] 460 if not selected_snapshots: 461 selected_snapshots = list(merged_intervals) 462 463 # Build the full DAG from all snapshots to preserve transitive dependencies 464 full_dag = snapshots_to_dag(self.snapshots.values()) 465 466 # Create a subdag that includes the selected snapshots and all their upstream dependencies 467 # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected 468 selected_snapshot_ids_set = {s.snapshot_id for s in selected_snapshots} 469 snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set) 470 471 batched_intervals = self.batch_intervals( 472 merged_intervals, 473 deployability_index, 474 environment_naming_info, 475 dag=snapshot_dag, 476 is_restatement=is_restatement, 477 ) 478 self.console.start_evaluation_progress( 479 batched_intervals, 480 environment_naming_info, 481 self.default_catalog, 482 audit_only=audit_only, 483 ) 484 485 if run_environment_statements: 486 environment_statements = self.state_sync.get_environment_statements( 487 environment_naming_info.name 488 ) 489 execute_environment_statements( 490 adapter=self.snapshot_evaluator.adapter, 491 environment_statements=environment_statements, 492 runtime_stage=RuntimeStage.BEFORE_ALL, 493 environment_naming_info=environment_naming_info, 494 default_catalog=self.default_catalog, 495 snapshots=self.snapshots_by_name, 496 start=start, 497 end=end, 498 execution_time=execution_time, 499 selected_models=selected_models, 500 ) 501 502 # We only need to create physical tables if the snapshot is not representative or if it 503 # needs backfill 504 snapshots_to_create_candidates = [ 505 s 506 for s in selected_snapshots 507 if not deployability_index.is_representative(s) or s in batched_intervals 508 ] 509 snapshots_to_create = { 510 s.snapshot_id 511 for s in self.snapshot_evaluator.get_snapshots_to_create( 512 snapshots_to_create_candidates, deployability_index 513 ) 514 } 515 516 dag = self._dag( 517 batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create 518 ) 519 520 def run_node(node: SchedulingUnit) -> None: 521 if circuit_breaker and circuit_breaker(): 522 raise CircuitBreakerError() 523 if isinstance(node, DummyNode): 524 return 525 526 snapshot = self.snapshots_by_name[node.snapshot_name] 527 528 if isinstance(node, EvaluateNode): 529 self.console.start_snapshot_evaluation_progress(snapshot) 530 execution_start_ts = now_timestamp() 531 evaluation_duration_ms: t.Optional[int] = None 532 start, end = node.interval 533 534 audit_results: t.List[AuditResult] = [] 535 try: 536 assert execution_time # mypy 537 assert deployability_index # mypy 538 539 if audit_only: 540 audit_results = self._audit_snapshot( 541 snapshot=snapshot, 542 environment_naming_info=environment_naming_info, 543 deployability_index=deployability_index, 544 snapshots=self.snapshots_by_name, 545 start=start, 546 end=end, 547 execution_time=execution_time, 548 ) 549 else: 550 # If batch_index > 0, then the target table must exist since the first batch would have created it 551 target_table_exists = ( 552 snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0 553 ) 554 audit_results = self.evaluate( 555 snapshot=snapshot, 556 environment_naming_info=environment_naming_info, 557 start=start, 558 end=end, 559 execution_time=execution_time, 560 deployability_index=deployability_index, 561 batch_index=node.batch_index, 562 allow_destructive_snapshots=allow_destructive_snapshots, 563 allow_additive_snapshots=allow_additive_snapshots, 564 target_table_exists=target_table_exists, 565 selected_models=selected_models, 566 ) 567 568 evaluation_duration_ms = now_timestamp() - execution_start_ts 569 finally: 570 num_audits = len(audit_results) 571 num_audits_failed = sum(1 for result in audit_results if result.count) 572 573 execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats( 574 SnapshotIdBatch(snapshot_id=snapshot.snapshot_id, batch_id=node.batch_index) 575 ) 576 577 self.console.update_snapshot_evaluation_progress( 578 snapshot, 579 batched_intervals[snapshot][node.batch_index], 580 node.batch_index, 581 evaluation_duration_ms, 582 num_audits - num_audits_failed, 583 num_audits_failed, 584 execution_stats=execution_stats, 585 auto_restatement_triggers=auto_restatement_triggers.get( 586 snapshot.snapshot_id 587 ), 588 ) 589 elif isinstance(node, CreateNode): 590 self.snapshot_evaluator.create_snapshot( 591 snapshot=snapshot, 592 snapshots=self.snapshots_by_name, 593 deployability_index=deployability_index, 594 allow_destructive_snapshots=allow_destructive_snapshots or set(), 595 allow_additive_snapshots=allow_additive_snapshots or set(), 596 ) 597 598 try: 599 with self.snapshot_evaluator.concurrent_context(): 600 errors, skipped_intervals = concurrent_apply_to_dag( 601 dag, 602 run_node, 603 self.max_workers, 604 raise_on_error=False, 605 ) 606 self.console.stop_evaluation_progress(success=not errors) 607 608 skipped_snapshots = { 609 i.snapshot_name for i in skipped_intervals if isinstance(i, EvaluateNode) 610 } 611 self.console.log_skipped_models(skipped_snapshots) 612 for skipped in skipped_snapshots: 613 logger.info(f"SKIPPED snapshot {skipped}\n") 614 615 for error in errors: 616 if isinstance(error.__cause__, CircuitBreakerError): 617 raise error.__cause__ 618 logger.info(str(error), exc_info=error) 619 620 self.console.log_failed_models(errors) 621 622 return errors, skipped_intervals 623 finally: 624 if run_environment_statements: 625 execute_environment_statements( 626 adapter=self.snapshot_evaluator.adapter, 627 environment_statements=environment_statements, 628 runtime_stage=RuntimeStage.AFTER_ALL, 629 environment_naming_info=environment_naming_info, 630 default_catalog=self.default_catalog, 631 snapshots=self.snapshots_by_name, 632 start=start, 633 end=end, 634 execution_time=execution_time, 635 selected_models=selected_models, 636 ) 637 638 self.state_sync.recycle() 639 640 def _dag( 641 self, 642 batches: SnapshotToIntervals, 643 snapshot_dag: t.Optional[DAG[SnapshotId]] = None, 644 snapshots_to_create: t.Optional[t.Set[SnapshotId]] = None, 645 ) -> DAG[SchedulingUnit]: 646 """Builds a DAG of snapshot intervals to be evaluated. 647 648 Args: 649 batches: The batches of snapshots and intervals to evaluate. 650 snapshot_dag: The DAG of all snapshots. 651 snapshots_to_create: The snapshots with missing physical tables. 652 653 Returns: 654 A DAG of snapshot intervals to be evaluated. 655 """ 656 657 intervals_per_snapshot = { 658 snapshot.name: intervals for snapshot, intervals in batches.items() 659 } 660 snapshots_to_create = snapshots_to_create or set() 661 original_snapshots_to_create = snapshots_to_create.copy() 662 upstream_dependencies_cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]] = {} 663 664 snapshot_dag = snapshot_dag or snapshots_to_dag(batches) 665 dag = DAG[SchedulingUnit]() 666 667 for snapshot_id in snapshot_dag: 668 if snapshot_id.name not in self.snapshots_by_name: 669 continue 670 671 snapshot = self.snapshots_by_name[snapshot_id.name] 672 intervals = intervals_per_snapshot.get(snapshot.name, []) 673 674 upstream_dependencies: t.Set[SchedulingUnit] = set() 675 676 for p_sid in snapshot.parents: 677 upstream_dependencies.update( 678 self._find_upstream_dependencies( 679 p_sid, 680 intervals_per_snapshot, 681 original_snapshots_to_create, 682 upstream_dependencies_cache, 683 ) 684 ) 685 686 batch_concurrency = snapshot.node.batch_concurrency 687 batch_size = snapshot.node.batch_size 688 if snapshot.depends_on_past: 689 batch_concurrency = 1 690 691 create_node: t.Optional[CreateNode] = None 692 if snapshot.snapshot_id in original_snapshots_to_create and ( 693 snapshot.is_incremental_by_time_range 694 or ((not batch_concurrency or batch_concurrency > 1) and batch_size) 695 or not intervals 696 ): 697 # Add a separate node for table creation in case when there multiple concurrent 698 # evaluation nodes or when there are no intervals to evaluate. 699 create_node = CreateNode(snapshot_name=snapshot.name) 700 dag.add(create_node, upstream_dependencies) 701 snapshots_to_create.remove(snapshot.snapshot_id) 702 703 for i, interval in enumerate(intervals): 704 node = EvaluateNode(snapshot_name=snapshot.name, interval=interval, batch_index=i) 705 706 if create_node: 707 dag.add(node, [create_node]) 708 else: 709 dag.add(node, upstream_dependencies) 710 711 if len(intervals) > 1: 712 dag.add(DummyNode(snapshot_name=snapshot.name), [node]) 713 714 if batch_concurrency and i >= batch_concurrency: 715 batch_idx_to_wait_for = i - batch_concurrency 716 dag.add( 717 node, 718 [ 719 EvaluateNode( 720 snapshot_name=snapshot.name, 721 interval=intervals[batch_idx_to_wait_for], 722 batch_index=batch_idx_to_wait_for, 723 ), 724 ], 725 ) 726 return dag 727 728 def _find_upstream_dependencies( 729 self, 730 parent_sid: SnapshotId, 731 intervals_per_snapshot: t.Dict[str, Intervals], 732 snapshots_to_create: t.Set[SnapshotId], 733 cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]], 734 ) -> t.Set[SchedulingUnit]: 735 if parent_sid not in self.snapshots: 736 return set() 737 if parent_sid in cache: 738 return cache[parent_sid] 739 740 p_intervals = intervals_per_snapshot.get(parent_sid.name, []) 741 742 parent_node: t.Optional[SchedulingUnit] = None 743 if p_intervals: 744 if len(p_intervals) > 1: 745 parent_node = DummyNode(snapshot_name=parent_sid.name) 746 else: 747 interval = p_intervals[0] 748 parent_node = EvaluateNode( 749 snapshot_name=parent_sid.name, interval=interval, batch_index=0 750 ) 751 elif parent_sid in snapshots_to_create: 752 parent_node = CreateNode(snapshot_name=parent_sid.name) 753 754 if parent_node is not None: 755 cache[parent_sid] = {parent_node} 756 return {parent_node} 757 758 # This snapshot has no intervals and doesn't need creation which means 759 # that it can be a transitive dependency 760 transitive_deps: t.Set[SchedulingUnit] = set() 761 parent_snapshot = self.snapshots[parent_sid] 762 for grandparent_sid in parent_snapshot.parents: 763 transitive_deps.update( 764 self._find_upstream_dependencies( 765 grandparent_sid, intervals_per_snapshot, snapshots_to_create, cache 766 ) 767 ) 768 cache[parent_sid] = transitive_deps 769 return transitive_deps 770 771 def _run_or_audit( 772 self, 773 environment: str | EnvironmentNamingInfo, 774 start: t.Optional[TimeLike] = None, 775 end: t.Optional[TimeLike] = None, 776 execution_time: t.Optional[TimeLike] = None, 777 remove_intervals: t.Optional[t.Dict[SnapshotId, Interval]] = None, 778 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 779 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 780 ignore_cron: bool = False, 781 end_bounded: bool = False, 782 selected_snapshots: t.Optional[t.Set[str]] = None, 783 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 784 deployability_index: t.Optional[DeployabilityIndex] = None, 785 auto_restatement_enabled: bool = False, 786 run_environment_statements: bool = False, 787 audit_only: bool = False, 788 ) -> CompletionStatus: 789 """Concurrently runs or audits all snapshots in topological order. 790 791 Args: 792 environment: The environment naming info the user is targeting when applying their change. 793 Can just be the environment name if the user is targeting a remote environment and wants to get the remote 794 naming info 795 start: The start of the run. Defaults to the min node start date. 796 end: The end of the run. Defaults to now. 797 execution_time: The date/time time reference to use for execution time. Defaults to now. 798 remove_intervals: A dict of snapshots to their intervals. For evaluation, these are the intervals that will be restated. For audits, 799 these are the intervals that will be reaudited 800 start_override_per_model: A mapping of model FQNs to target start dates. 801 end_override_per_model: A mapping of model FQNs to target end dates. 802 ignore_cron: Whether to ignore the node's cron schedule. 803 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, 804 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 805 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 806 circuit_breaker: An optional handler which checks if the run should be aborted. 807 deployability_index: Determines snapshots that are deployable in the context of this render. 808 auto_restatement_enabled: Whether to enable auto restatements. 809 810 Returns: 811 True if the execution was successful and False otherwise. 812 """ 813 validate_date_range(start, end) 814 if isinstance(environment, str): 815 env = self.state_sync.get_environment(environment) 816 if not env: 817 raise SQLMeshError( 818 "Was not provided an environment suffix target and the environment doesn't exist." 819 "Are you running for the first time and need to run plan/apply first?" 820 ) 821 environment_naming_info = env.naming_info 822 else: 823 environment_naming_info = environment 824 825 deployability_index = deployability_index or ( 826 DeployabilityIndex.create(self.snapshots.values(), start=start) 827 if environment_naming_info.name != c.PROD 828 else DeployabilityIndex.all_deployable() 829 ) 830 execution_time = execution_time or now_timestamp() 831 832 self.state_sync.refresh_snapshot_intervals(self.snapshots.values()) 833 for s_id, interval in (remove_intervals or {}).items(): 834 self.snapshots[s_id].remove_interval(interval) 835 836 all_auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {} 837 if auto_restatement_enabled: 838 auto_restated_intervals, all_auto_restatement_triggers = apply_auto_restatements( 839 self.snapshots, execution_time 840 ) 841 self.state_sync.add_snapshots_intervals(auto_restated_intervals) 842 self.state_sync.update_auto_restatements( 843 {s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()} 844 ) 845 846 merged_intervals = self.merged_missing_intervals( 847 start, 848 end, 849 execution_time, 850 deployability_index=deployability_index, 851 restatements=remove_intervals, 852 start_override_per_model=start_override_per_model, 853 end_override_per_model=end_override_per_model, 854 ignore_cron=ignore_cron, 855 end_bounded=end_bounded, 856 selected_snapshots=selected_snapshots, 857 ) 858 if not merged_intervals: 859 return CompletionStatus.NOTHING_TO_DO 860 861 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {} 862 if all_auto_restatement_triggers: 863 merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals} 864 auto_restatement_triggers = { 865 s_id: all_auto_restatement_triggers.get(s_id, []) 866 for s_id in merged_intervals_snapshots 867 } 868 869 errors, _ = self.run_merged_intervals( 870 merged_intervals=merged_intervals, 871 deployability_index=deployability_index, 872 environment_naming_info=environment_naming_info, 873 execution_time=execution_time, 874 circuit_breaker=circuit_breaker, 875 start=start, 876 end=end, 877 run_environment_statements=run_environment_statements, 878 audit_only=audit_only, 879 auto_restatement_triggers=auto_restatement_triggers, 880 selected_models={ 881 s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id 882 }, 883 ) 884 885 return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS 886 887 def _audit_snapshot( 888 self, 889 snapshot: Snapshot, 890 deployability_index: DeployabilityIndex, 891 snapshots: t.Dict[str, Snapshot], 892 start: t.Optional[TimeLike] = None, 893 end: t.Optional[TimeLike] = None, 894 execution_time: t.Optional[TimeLike] = None, 895 wap_id: t.Optional[str] = None, 896 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 897 **kwargs: t.Any, 898 ) -> t.List[AuditResult]: 899 is_deployable = deployability_index.is_deployable(snapshot) 900 901 audit_results = self.snapshot_evaluator.audit( 902 snapshot=snapshot, 903 start=start, 904 end=end, 905 execution_time=execution_time, 906 snapshots=snapshots, 907 deployability_index=deployability_index, 908 wap_id=wap_id, 909 **kwargs, 910 ) 911 912 audit_errors_to_raise: t.List[AuditError] = [] 913 audit_errors_to_warn: t.List[AuditError] = [] 914 for audit_result in (result for result in audit_results if result.count): 915 error = AuditError( 916 audit_name=audit_result.audit.name, 917 audit_args=audit_result.audit_args, 918 model=snapshot.model_or_none, 919 count=t.cast(int, audit_result.count), 920 query=t.cast(exp.Query, audit_result.query), 921 adapter_dialect=self.snapshot_evaluator.adapter.dialect, 922 ) 923 self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, error) 924 if is_deployable and snapshot.node.owner: 925 self.notification_target_manager.notify_user( 926 NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, error 927 ) 928 if audit_result.blocking: 929 audit_errors_to_raise.append(error) 930 else: 931 audit_errors_to_warn.append(error) 932 933 if audit_errors_to_raise: 934 raise NodeAuditsErrors(audit_errors_to_raise) 935 936 if environment_naming_info: 937 for audit_error in audit_errors_to_warn: 938 display_name = snapshot.display_name( 939 environment_naming_info, 940 self.default_catalog, 941 self.snapshot_evaluator.adapter.dialect, 942 ) 943 self.console.log_warning( 944 f"\n{display_name}: {audit_error}.", 945 f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}", 946 ) 947 948 return audit_results 949 950 def _check_ready_intervals( 951 self, 952 snapshot: Snapshot, 953 intervals: Intervals, 954 context: ExecutionContext, 955 environment_naming_info: EnvironmentNamingInfo, 956 ) -> Intervals: 957 """Checks if the intervals are ready for evaluation for the given snapshot. 958 959 This implementation also includes the signal progress tracking. 960 Note that this will handle gaps in the provided intervals. The returned intervals 961 may introduce new gaps. 962 963 Args: 964 snapshot: The snapshot to check. 965 intervals: The intervals to check. 966 context: The context to use. 967 environment_naming_info: The environment naming info to use. 968 969 Returns: 970 The intervals that are ready for evaluation. 971 """ 972 signals = snapshot.is_model and snapshot.model.render_signal_calls() 973 974 if not (signals and signals.signals_to_kwargs): 975 return intervals 976 977 self.console.start_signal_progress( 978 snapshot, 979 self.default_catalog, 980 environment_naming_info or EnvironmentNamingInfo(), 981 ) 982 983 for signal_idx, (signal_name, kwargs) in enumerate(signals.signals_to_kwargs.items()): 984 # Capture intervals before signal check for display 985 intervals_to_check = merge_intervals(intervals) 986 987 signal_start_ts = time.perf_counter() 988 989 try: 990 intervals = check_ready_intervals( 991 signals.prepared_python_env[signal_name], 992 intervals, 993 context, 994 python_env=signals.python_env, 995 dialect=snapshot.model.dialect, 996 path=snapshot.model._path, 997 snapshot=snapshot, 998 kwargs=kwargs, 999 ) 1000 except SQLMeshError as e: 1001 raise SignalEvalError( 1002 f"{e} '{signal_name}' for '{snapshot.model.name}' at {snapshot.model._path}" 1003 ) 1004 1005 duration = time.perf_counter() - signal_start_ts 1006 1007 self.console.update_signal_progress( 1008 snapshot=snapshot, 1009 signal_name=signal_name, 1010 signal_idx=signal_idx, 1011 total_signals=len(signals.signals_to_kwargs), 1012 ready_intervals=merge_intervals(intervals), 1013 check_intervals=intervals_to_check, 1014 duration=duration, 1015 ) 1016 1017 self.console.stop_signal_progress() 1018 1019 return intervals 1020 1021 1022def merged_missing_intervals( 1023 snapshots: t.Collection[Snapshot], 1024 start: t.Optional[TimeLike] = None, 1025 end: t.Optional[TimeLike] = None, 1026 execution_time: t.Optional[TimeLike] = None, 1027 deployability_index: t.Optional[DeployabilityIndex] = None, 1028 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 1029 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1030 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1031 ignore_cron: bool = False, 1032 end_bounded: bool = False, 1033) -> SnapshotToIntervals: 1034 """Find the largest contiguous date interval parameters based only on what is missing. 1035 1036 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 1037 calculate the missing intervals that need to be processed given the passed in start and end intervals. 1038 1039 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 1040 1041 Args: 1042 snapshots: A set of target snapshots for which intervals should be computed. 1043 start: The start of the run. Defaults to the min node start date. 1044 end: The end of the run. Defaults to now. 1045 execution_time: The date/time reference to use for execution time. Defaults to now. 1046 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1047 restatements: A set of snapshot names being restated. 1048 start_override_per_model: A mapping of model FQNs to target start dates. 1049 end_override_per_model: A mapping of model FQNs to target end dates. 1050 ignore_cron: Whether to ignore the node's cron schedule. 1051 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1052 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1053 """ 1054 restatements = restatements or {} 1055 validate_date_range(start, end) 1056 1057 return compute_interval_params( 1058 snapshots, 1059 start=start or earliest_start_date(snapshots), 1060 end=end or now_timestamp(), 1061 deployability_index=deployability_index, 1062 execution_time=execution_time or now_timestamp(), 1063 restatements=restatements, 1064 start_override_per_model=start_override_per_model, 1065 end_override_per_model=end_override_per_model, 1066 ignore_cron=ignore_cron, 1067 end_bounded=end_bounded, 1068 ) 1069 1070 1071def compute_interval_params( 1072 snapshots: t.Collection[Snapshot], 1073 *, 1074 start: TimeLike, 1075 end: TimeLike, 1076 deployability_index: t.Optional[DeployabilityIndex] = None, 1077 execution_time: t.Optional[TimeLike] = None, 1078 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 1079 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1080 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1081 ignore_cron: bool = False, 1082 end_bounded: bool = False, 1083) -> SnapshotToIntervals: 1084 """Find the largest contiguous date interval parameters based only on what is missing. 1085 1086 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 1087 calculate the missing intervals that need to be processed given the passed in start and end intervals. 1088 1089 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 1090 1091 Args: 1092 snapshots: A set of target snapshots for which intervals should be computed. 1093 start: Start of the interval. 1094 end: End of the interval. 1095 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1096 execution_time: The date/time reference to use for execution time. 1097 restatements: A dict of snapshot names being restated and their intervals. 1098 start_override_per_model: A mapping of model FQNs to target start dates. 1099 end_override_per_model: A mapping of model FQNs to target end dates. 1100 ignore_cron: Whether to ignore the node's cron schedule. 1101 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1102 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1103 1104 Returns: 1105 A dict containing all snapshots needing to be run with their associated interval params. 1106 """ 1107 snapshot_merged_intervals = {} 1108 1109 for snapshot, intervals in missing_intervals( 1110 snapshots, 1111 start=start, 1112 end=end, 1113 execution_time=execution_time, 1114 restatements=restatements, 1115 deployability_index=deployability_index, 1116 start_override_per_model=start_override_per_model, 1117 end_override_per_model=end_override_per_model, 1118 ignore_cron=ignore_cron, 1119 end_bounded=end_bounded, 1120 ).items(): 1121 contiguous_batch = [] 1122 next_batch: Intervals = [] 1123 1124 for interval in intervals: 1125 if next_batch and interval[0] != next_batch[-1][-1]: 1126 contiguous_batch.append((next_batch[0][0], next_batch[-1][-1])) 1127 next_batch = [] 1128 next_batch.append(interval) 1129 if next_batch: 1130 contiguous_batch.append((next_batch[0][0], next_batch[-1][-1])) 1131 snapshot_merged_intervals[snapshot] = contiguous_batch 1132 1133 return snapshot_merged_intervals 1134 1135 1136def interval_diff( 1137 intervals_a: Intervals, intervals_b: Intervals, uninterrupted: bool = False 1138) -> Intervals: 1139 if not intervals_a or not intervals_b: 1140 return intervals_a 1141 1142 index_a, index_b = 0, 0 1143 len_a = len(intervals_a) 1144 len_b = len(intervals_b) 1145 1146 results = [] 1147 1148 while index_a < len_a and index_b < len_b: 1149 interval_a = intervals_a[index_a] 1150 interval_b = intervals_b[index_b] 1151 1152 if interval_a[0] >= interval_b[1]: 1153 index_b += 1 1154 elif interval_b[0] >= interval_a[1]: 1155 results.append(interval_a) 1156 index_a += 1 1157 else: 1158 if uninterrupted: 1159 return results 1160 1161 if interval_a[0] >= interval_b[0]: 1162 index_a += 1 1163 else: 1164 index_b += 1 1165 1166 if index_a < len_a: 1167 interval_a = intervals_a[index_a] 1168 if interval_a[0] >= interval_b[1] or interval_b[0] >= interval_a[1]: 1169 results.extend(intervals_a[index_a:]) 1170 1171 return results 1172 1173 1174def _resolve_one_snapshot_per_version( 1175 snapshots: t.Iterable[Snapshot], 1176) -> t.Dict[t.Tuple[str, str], Snapshot]: 1177 snapshot_per_version: t.Dict[t.Tuple[str, str], Snapshot] = {} 1178 for snapshot in snapshots: 1179 key = (snapshot.name, snapshot.version_get_or_generate()) 1180 if key not in snapshot_per_version: 1181 snapshot_per_version[key] = snapshot 1182 else: 1183 prev_snapshot = snapshot_per_version[key] 1184 if snapshot.unpaused_ts and ( 1185 not prev_snapshot.unpaused_ts or snapshot.created_ts > prev_snapshot.created_ts 1186 ): 1187 snapshot_per_version[key] = snapshot 1188 1189 return snapshot_per_version 1190 1191 1192def _expand_range_as_interval( 1193 start_ts: int, end_ts: int, interval_unit: IntervalUnit 1194) -> t.List[Interval]: 1195 values = expand_range(start_ts, end_ts, interval_unit) 1196 return [(values[i], values[i + 1]) for i in range(len(values) - 1)]
63class SchedulingUnit(abc.ABC): 64 snapshot_name: str 65 66 def __lt__(self, other: SchedulingUnit) -> bool: 67 return (self.__class__.__name__, self.snapshot_name) < ( 68 other.__class__.__name__, 69 other.snapshot_name, 70 )
Helper class that provides a standard way to create an ABC using inheritance.
73@dataclass(frozen=True) 74class EvaluateNode(SchedulingUnit): 75 snapshot_name: str 76 interval: Interval 77 batch_index: int 78 79 def __lt__(self, other: SchedulingUnit) -> bool: 80 if not isinstance(other, EvaluateNode): 81 return super().__lt__(other) 82 return (self.__class__.__name__, self.snapshot_name, self.interval, self.batch_index) < ( 83 other.__class__.__name__, 84 other.snapshot_name, 85 other.interval, 86 other.batch_index, 87 )
100class Scheduler: 101 """Schedules and manages the evaluation of snapshots. 102 103 The scheduler evaluates multiple snapshots with date intervals in the correct 104 topological order. It consults the state sync to understand what intervals for each 105 snapshot needs to be backfilled. 106 107 The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine. 108 109 Args: 110 snapshots: A collection of snapshots. 111 snapshot_evaluator: The snapshot evaluator to execute queries. 112 state_sync: The state sync to pull saved snapshots. 113 max_workers: The maximum number of parallel queries to run. 114 console: The rich instance used for printing scheduling information. 115 """ 116 117 def __init__( 118 self, 119 snapshots: t.Iterable[Snapshot], 120 snapshot_evaluator: SnapshotEvaluator, 121 state_sync: StateSync, 122 default_catalog: t.Optional[str], 123 max_workers: int = 1, 124 console: t.Optional[Console] = None, 125 notification_target_manager: t.Optional[NotificationTargetManager] = None, 126 ): 127 self.state_sync = state_sync 128 self.snapshots = {s.snapshot_id: s for s in snapshots} 129 self.snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()} 130 self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values()) 131 self.default_catalog = default_catalog 132 self.snapshot_evaluator = snapshot_evaluator 133 self.max_workers = max_workers 134 self.console = console or get_console() 135 self.notification_target_manager = ( 136 notification_target_manager or NotificationTargetManager() 137 ) 138 139 def merged_missing_intervals( 140 self, 141 start: t.Optional[TimeLike] = None, 142 end: t.Optional[TimeLike] = None, 143 execution_time: t.Optional[TimeLike] = None, 144 deployability_index: t.Optional[DeployabilityIndex] = None, 145 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 146 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 147 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 148 ignore_cron: bool = False, 149 end_bounded: bool = False, 150 selected_snapshots: t.Optional[t.Set[str]] = None, 151 ) -> SnapshotToIntervals: 152 """Find the largest contiguous date interval parameters based only on what is missing. 153 154 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 155 calculate the missing intervals that need to be processed given the passed in start and end intervals. 156 157 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 158 159 Args: 160 start: The start of the run. Defaults to the min node start date. 161 end: The end of the run. Defaults to now. 162 execution_time: The date/time reference to use for execution time. Defaults to now. 163 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 164 restatements: A set of snapshot names being restated. 165 start_override_per_model: A mapping of model FQNs to target start dates. 166 end_override_per_model: A mapping of model FQNs to target end dates. 167 ignore_cron: Whether to ignore the node's cron schedule. 168 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 169 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 170 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 171 """ 172 snapshots_to_intervals = merged_missing_intervals( 173 snapshots=self.snapshot_per_version.values(), 174 start=start, 175 end=end, 176 execution_time=execution_time, 177 deployability_index=deployability_index, 178 restatements=restatements, 179 start_override_per_model=start_override_per_model, 180 end_override_per_model=end_override_per_model, 181 ignore_cron=ignore_cron, 182 end_bounded=end_bounded, 183 ) 184 # Filtering snapshots after computing missing intervals because we need all snapshots in order 185 # to correctly infer start dates. 186 if selected_snapshots is not None: 187 snapshots_to_intervals = { 188 s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots 189 } 190 return snapshots_to_intervals 191 192 def evaluate( 193 self, 194 snapshot: Snapshot, 195 start: TimeLike, 196 end: TimeLike, 197 execution_time: TimeLike, 198 deployability_index: DeployabilityIndex, 199 batch_index: int, 200 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 201 allow_destructive_snapshots: t.Optional[t.Set[str]] = None, 202 allow_additive_snapshots: t.Optional[t.Set[str]] = None, 203 target_table_exists: t.Optional[bool] = None, 204 **kwargs: t.Any, 205 ) -> t.List[AuditResult]: 206 """Evaluate a snapshot and add the processed interval to the state sync. 207 208 Args: 209 snapshot: Snapshot to evaluate. 210 start: The start datetime to render. 211 end: The end datetime to render. 212 execution_time: The date/time time reference to use for execution time. Defaults to now. 213 allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. 214 allow_additive_snapshots: Snapshots for which additive schema changes are allowed. 215 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 216 batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it 217 auto_restatement_enabled: Whether to enable auto restatements. 218 target_table_exists: Whether the target table exists. If None, the table will be checked for existence. 219 kwargs: Additional kwargs to pass to the renderer. 220 221 Returns: 222 Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn. 223 """ 224 validate_date_range(start, end) 225 226 snapshots = parent_snapshots_by_name(snapshot, self.snapshots) 227 228 is_deployable = deployability_index.is_deployable(snapshot) 229 230 wap_id = self.snapshot_evaluator.evaluate( 231 snapshot, 232 start=start, 233 end=end, 234 execution_time=execution_time, 235 snapshots=snapshots, 236 allow_destructive_snapshots=allow_destructive_snapshots, 237 allow_additive_snapshots=allow_additive_snapshots, 238 deployability_index=deployability_index, 239 batch_index=batch_index, 240 target_table_exists=target_table_exists, 241 **kwargs, 242 ) 243 audit_results = self._audit_snapshot( 244 snapshot=snapshot, 245 environment_naming_info=environment_naming_info, 246 start=start, 247 end=end, 248 execution_time=execution_time, 249 snapshots=snapshots, 250 deployability_index=deployability_index, 251 wap_id=wap_id, 252 **kwargs, 253 ) 254 255 self.state_sync.add_interval( 256 snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp() 257 ) 258 return audit_results 259 260 def run( 261 self, 262 environment: str | EnvironmentNamingInfo, 263 start: t.Optional[TimeLike] = None, 264 end: t.Optional[TimeLike] = None, 265 execution_time: t.Optional[TimeLike] = None, 266 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 267 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 268 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 269 ignore_cron: bool = False, 270 end_bounded: bool = False, 271 selected_snapshots: t.Optional[t.Set[str]] = None, 272 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 273 deployability_index: t.Optional[DeployabilityIndex] = None, 274 auto_restatement_enabled: bool = False, 275 run_environment_statements: bool = False, 276 ) -> CompletionStatus: 277 return self._run_or_audit( 278 environment=environment, 279 start=start, 280 end=end, 281 execution_time=execution_time, 282 remove_intervals=restatements, 283 start_override_per_model=start_override_per_model, 284 end_override_per_model=end_override_per_model, 285 ignore_cron=ignore_cron, 286 end_bounded=end_bounded, 287 selected_snapshots=selected_snapshots, 288 circuit_breaker=circuit_breaker, 289 deployability_index=deployability_index, 290 auto_restatement_enabled=auto_restatement_enabled, 291 run_environment_statements=run_environment_statements, 292 ) 293 294 def audit( 295 self, 296 environment: str | EnvironmentNamingInfo, 297 start: TimeLike, 298 end: TimeLike, 299 execution_time: t.Optional[TimeLike] = None, 300 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 301 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 302 ignore_cron: bool = False, 303 end_bounded: bool = False, 304 selected_snapshots: t.Optional[t.Set[str]] = None, 305 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 306 deployability_index: t.Optional[DeployabilityIndex] = None, 307 run_environment_statements: bool = False, 308 ) -> CompletionStatus: 309 # Remove the intervals from the snapshots that will be audited so that they can be recomputed 310 # by _run_or_audit as "missing intervals" to reuse the rest of it's logic 311 remove_intervals = {} 312 for snapshot in self.snapshots.values(): 313 removal_intervals = snapshot.get_removal_interval( 314 start, end, execution_time, is_preview=True 315 ) 316 remove_intervals[snapshot.snapshot_id] = removal_intervals 317 318 return self._run_or_audit( 319 environment=environment, 320 start=start, 321 end=end, 322 execution_time=execution_time, 323 remove_intervals=remove_intervals, 324 start_override_per_model=start_override_per_model, 325 end_override_per_model=end_override_per_model, 326 ignore_cron=ignore_cron, 327 end_bounded=end_bounded, 328 selected_snapshots=selected_snapshots, 329 circuit_breaker=circuit_breaker, 330 deployability_index=deployability_index, 331 run_environment_statements=run_environment_statements, 332 audit_only=True, 333 ) 334 335 def batch_intervals( 336 self, 337 merged_intervals: SnapshotToIntervals, 338 deployability_index: t.Optional[DeployabilityIndex], 339 environment_naming_info: EnvironmentNamingInfo, 340 dag: t.Optional[DAG[SnapshotId]] = None, 341 is_restatement: bool = False, 342 ) -> t.Dict[Snapshot, Intervals]: 343 dag = dag or snapshots_to_dag(merged_intervals) 344 345 snapshot_intervals: t.Dict[SnapshotId, t.Tuple[Snapshot, t.List[Interval]]] = { 346 snapshot.snapshot_id: ( 347 snapshot, 348 [ 349 i 350 for interval in intervals 351 for i in _expand_range_as_interval(*interval, snapshot.node.interval_unit) 352 ], 353 ) 354 for snapshot, intervals in merged_intervals.items() 355 } 356 snapshot_batches: t.Dict[Snapshot, Intervals] = {} 357 all_unready_intervals: t.Dict[str, set[Interval]] = {} 358 for snapshot_id in dag: 359 if snapshot_id not in snapshot_intervals: 360 continue 361 snapshot, intervals = snapshot_intervals[snapshot_id] 362 unready = set(intervals) 363 364 from sqlmesh.core.context import ExecutionContext 365 366 adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway) 367 368 parent_intervals: Intervals = [] 369 for parent_id in snapshot.parents: 370 parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, [])) 371 if not parent_snapshot or parent_snapshot.is_external: 372 continue 373 374 parent_intervals.extend(snapshot_batches[parent_snapshot]) 375 376 context = ExecutionContext( 377 adapter, 378 self.snapshots_by_name, 379 deployability_index, 380 default_dialect=adapter.dialect, 381 default_catalog=self.default_catalog, 382 is_restatement=is_restatement, 383 parent_intervals=parent_intervals, 384 ) 385 386 intervals = self._check_ready_intervals( 387 snapshot, 388 intervals, 389 context, 390 environment_naming_info, 391 ) 392 unready -= set(intervals) 393 394 for parent in snapshot.parents: 395 if parent.name in all_unready_intervals: 396 unready.update(all_unready_intervals[parent.name]) 397 398 all_unready_intervals[snapshot.name] = unready 399 400 batches = [] 401 batch_size = snapshot.node.batch_size 402 next_batch: t.List[t.Tuple[int, int]] = [] 403 404 for interval in interval_diff( 405 intervals, merge_intervals(unready), uninterrupted=snapshot.depends_on_past 406 ): 407 if (batch_size and len(next_batch) >= batch_size) or ( 408 next_batch and interval[0] != next_batch[-1][-1] 409 ): 410 batches.append((next_batch[0][0], next_batch[-1][-1])) 411 next_batch = [] 412 413 next_batch.append(interval) 414 415 if next_batch: 416 batches.append((next_batch[0][0], next_batch[-1][-1])) 417 418 snapshot_batches[snapshot] = batches 419 420 return snapshot_batches 421 422 def run_merged_intervals( 423 self, 424 *, 425 merged_intervals: SnapshotToIntervals, 426 deployability_index: DeployabilityIndex, 427 environment_naming_info: EnvironmentNamingInfo, 428 execution_time: t.Optional[TimeLike] = None, 429 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 430 start: t.Optional[TimeLike] = None, 431 end: t.Optional[TimeLike] = None, 432 allow_destructive_snapshots: t.Optional[t.Set[str]] = None, 433 selected_models: t.Optional[t.Set[str]] = None, 434 allow_additive_snapshots: t.Optional[t.Set[str]] = None, 435 selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 436 run_environment_statements: bool = False, 437 audit_only: bool = False, 438 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, 439 is_restatement: bool = False, 440 ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: 441 """Runs precomputed batches of missing intervals. 442 443 Args: 444 merged_intervals: The snapshots and contiguous interval ranges to evaluate. 445 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 446 environment_naming_info: The environment naming info the user is targeting when applying their change. 447 execution_time: The date/time reference to use for execution time. 448 circuit_breaker: An optional handler which checks if the run should be aborted. 449 start: The start of the run. 450 end: The end of the run. 451 allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. 452 allow_additive_snapshots: Snapshots for which additive schema changes are allowed. 453 selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included. 454 455 Returns: 456 A tuple of errors and skipped intervals. 457 """ 458 execution_time = execution_time or now_timestamp() 459 460 selected_snapshots = [self.snapshots[sid] for sid in (selected_snapshot_ids or set())] 461 if not selected_snapshots: 462 selected_snapshots = list(merged_intervals) 463 464 # Build the full DAG from all snapshots to preserve transitive dependencies 465 full_dag = snapshots_to_dag(self.snapshots.values()) 466 467 # Create a subdag that includes the selected snapshots and all their upstream dependencies 468 # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected 469 selected_snapshot_ids_set = {s.snapshot_id for s in selected_snapshots} 470 snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set) 471 472 batched_intervals = self.batch_intervals( 473 merged_intervals, 474 deployability_index, 475 environment_naming_info, 476 dag=snapshot_dag, 477 is_restatement=is_restatement, 478 ) 479 self.console.start_evaluation_progress( 480 batched_intervals, 481 environment_naming_info, 482 self.default_catalog, 483 audit_only=audit_only, 484 ) 485 486 if run_environment_statements: 487 environment_statements = self.state_sync.get_environment_statements( 488 environment_naming_info.name 489 ) 490 execute_environment_statements( 491 adapter=self.snapshot_evaluator.adapter, 492 environment_statements=environment_statements, 493 runtime_stage=RuntimeStage.BEFORE_ALL, 494 environment_naming_info=environment_naming_info, 495 default_catalog=self.default_catalog, 496 snapshots=self.snapshots_by_name, 497 start=start, 498 end=end, 499 execution_time=execution_time, 500 selected_models=selected_models, 501 ) 502 503 # We only need to create physical tables if the snapshot is not representative or if it 504 # needs backfill 505 snapshots_to_create_candidates = [ 506 s 507 for s in selected_snapshots 508 if not deployability_index.is_representative(s) or s in batched_intervals 509 ] 510 snapshots_to_create = { 511 s.snapshot_id 512 for s in self.snapshot_evaluator.get_snapshots_to_create( 513 snapshots_to_create_candidates, deployability_index 514 ) 515 } 516 517 dag = self._dag( 518 batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create 519 ) 520 521 def run_node(node: SchedulingUnit) -> None: 522 if circuit_breaker and circuit_breaker(): 523 raise CircuitBreakerError() 524 if isinstance(node, DummyNode): 525 return 526 527 snapshot = self.snapshots_by_name[node.snapshot_name] 528 529 if isinstance(node, EvaluateNode): 530 self.console.start_snapshot_evaluation_progress(snapshot) 531 execution_start_ts = now_timestamp() 532 evaluation_duration_ms: t.Optional[int] = None 533 start, end = node.interval 534 535 audit_results: t.List[AuditResult] = [] 536 try: 537 assert execution_time # mypy 538 assert deployability_index # mypy 539 540 if audit_only: 541 audit_results = self._audit_snapshot( 542 snapshot=snapshot, 543 environment_naming_info=environment_naming_info, 544 deployability_index=deployability_index, 545 snapshots=self.snapshots_by_name, 546 start=start, 547 end=end, 548 execution_time=execution_time, 549 ) 550 else: 551 # If batch_index > 0, then the target table must exist since the first batch would have created it 552 target_table_exists = ( 553 snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0 554 ) 555 audit_results = self.evaluate( 556 snapshot=snapshot, 557 environment_naming_info=environment_naming_info, 558 start=start, 559 end=end, 560 execution_time=execution_time, 561 deployability_index=deployability_index, 562 batch_index=node.batch_index, 563 allow_destructive_snapshots=allow_destructive_snapshots, 564 allow_additive_snapshots=allow_additive_snapshots, 565 target_table_exists=target_table_exists, 566 selected_models=selected_models, 567 ) 568 569 evaluation_duration_ms = now_timestamp() - execution_start_ts 570 finally: 571 num_audits = len(audit_results) 572 num_audits_failed = sum(1 for result in audit_results if result.count) 573 574 execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats( 575 SnapshotIdBatch(snapshot_id=snapshot.snapshot_id, batch_id=node.batch_index) 576 ) 577 578 self.console.update_snapshot_evaluation_progress( 579 snapshot, 580 batched_intervals[snapshot][node.batch_index], 581 node.batch_index, 582 evaluation_duration_ms, 583 num_audits - num_audits_failed, 584 num_audits_failed, 585 execution_stats=execution_stats, 586 auto_restatement_triggers=auto_restatement_triggers.get( 587 snapshot.snapshot_id 588 ), 589 ) 590 elif isinstance(node, CreateNode): 591 self.snapshot_evaluator.create_snapshot( 592 snapshot=snapshot, 593 snapshots=self.snapshots_by_name, 594 deployability_index=deployability_index, 595 allow_destructive_snapshots=allow_destructive_snapshots or set(), 596 allow_additive_snapshots=allow_additive_snapshots or set(), 597 ) 598 599 try: 600 with self.snapshot_evaluator.concurrent_context(): 601 errors, skipped_intervals = concurrent_apply_to_dag( 602 dag, 603 run_node, 604 self.max_workers, 605 raise_on_error=False, 606 ) 607 self.console.stop_evaluation_progress(success=not errors) 608 609 skipped_snapshots = { 610 i.snapshot_name for i in skipped_intervals if isinstance(i, EvaluateNode) 611 } 612 self.console.log_skipped_models(skipped_snapshots) 613 for skipped in skipped_snapshots: 614 logger.info(f"SKIPPED snapshot {skipped}\n") 615 616 for error in errors: 617 if isinstance(error.__cause__, CircuitBreakerError): 618 raise error.__cause__ 619 logger.info(str(error), exc_info=error) 620 621 self.console.log_failed_models(errors) 622 623 return errors, skipped_intervals 624 finally: 625 if run_environment_statements: 626 execute_environment_statements( 627 adapter=self.snapshot_evaluator.adapter, 628 environment_statements=environment_statements, 629 runtime_stage=RuntimeStage.AFTER_ALL, 630 environment_naming_info=environment_naming_info, 631 default_catalog=self.default_catalog, 632 snapshots=self.snapshots_by_name, 633 start=start, 634 end=end, 635 execution_time=execution_time, 636 selected_models=selected_models, 637 ) 638 639 self.state_sync.recycle() 640 641 def _dag( 642 self, 643 batches: SnapshotToIntervals, 644 snapshot_dag: t.Optional[DAG[SnapshotId]] = None, 645 snapshots_to_create: t.Optional[t.Set[SnapshotId]] = None, 646 ) -> DAG[SchedulingUnit]: 647 """Builds a DAG of snapshot intervals to be evaluated. 648 649 Args: 650 batches: The batches of snapshots and intervals to evaluate. 651 snapshot_dag: The DAG of all snapshots. 652 snapshots_to_create: The snapshots with missing physical tables. 653 654 Returns: 655 A DAG of snapshot intervals to be evaluated. 656 """ 657 658 intervals_per_snapshot = { 659 snapshot.name: intervals for snapshot, intervals in batches.items() 660 } 661 snapshots_to_create = snapshots_to_create or set() 662 original_snapshots_to_create = snapshots_to_create.copy() 663 upstream_dependencies_cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]] = {} 664 665 snapshot_dag = snapshot_dag or snapshots_to_dag(batches) 666 dag = DAG[SchedulingUnit]() 667 668 for snapshot_id in snapshot_dag: 669 if snapshot_id.name not in self.snapshots_by_name: 670 continue 671 672 snapshot = self.snapshots_by_name[snapshot_id.name] 673 intervals = intervals_per_snapshot.get(snapshot.name, []) 674 675 upstream_dependencies: t.Set[SchedulingUnit] = set() 676 677 for p_sid in snapshot.parents: 678 upstream_dependencies.update( 679 self._find_upstream_dependencies( 680 p_sid, 681 intervals_per_snapshot, 682 original_snapshots_to_create, 683 upstream_dependencies_cache, 684 ) 685 ) 686 687 batch_concurrency = snapshot.node.batch_concurrency 688 batch_size = snapshot.node.batch_size 689 if snapshot.depends_on_past: 690 batch_concurrency = 1 691 692 create_node: t.Optional[CreateNode] = None 693 if snapshot.snapshot_id in original_snapshots_to_create and ( 694 snapshot.is_incremental_by_time_range 695 or ((not batch_concurrency or batch_concurrency > 1) and batch_size) 696 or not intervals 697 ): 698 # Add a separate node for table creation in case when there multiple concurrent 699 # evaluation nodes or when there are no intervals to evaluate. 700 create_node = CreateNode(snapshot_name=snapshot.name) 701 dag.add(create_node, upstream_dependencies) 702 snapshots_to_create.remove(snapshot.snapshot_id) 703 704 for i, interval in enumerate(intervals): 705 node = EvaluateNode(snapshot_name=snapshot.name, interval=interval, batch_index=i) 706 707 if create_node: 708 dag.add(node, [create_node]) 709 else: 710 dag.add(node, upstream_dependencies) 711 712 if len(intervals) > 1: 713 dag.add(DummyNode(snapshot_name=snapshot.name), [node]) 714 715 if batch_concurrency and i >= batch_concurrency: 716 batch_idx_to_wait_for = i - batch_concurrency 717 dag.add( 718 node, 719 [ 720 EvaluateNode( 721 snapshot_name=snapshot.name, 722 interval=intervals[batch_idx_to_wait_for], 723 batch_index=batch_idx_to_wait_for, 724 ), 725 ], 726 ) 727 return dag 728 729 def _find_upstream_dependencies( 730 self, 731 parent_sid: SnapshotId, 732 intervals_per_snapshot: t.Dict[str, Intervals], 733 snapshots_to_create: t.Set[SnapshotId], 734 cache: t.Dict[SnapshotId, t.Set[SchedulingUnit]], 735 ) -> t.Set[SchedulingUnit]: 736 if parent_sid not in self.snapshots: 737 return set() 738 if parent_sid in cache: 739 return cache[parent_sid] 740 741 p_intervals = intervals_per_snapshot.get(parent_sid.name, []) 742 743 parent_node: t.Optional[SchedulingUnit] = None 744 if p_intervals: 745 if len(p_intervals) > 1: 746 parent_node = DummyNode(snapshot_name=parent_sid.name) 747 else: 748 interval = p_intervals[0] 749 parent_node = EvaluateNode( 750 snapshot_name=parent_sid.name, interval=interval, batch_index=0 751 ) 752 elif parent_sid in snapshots_to_create: 753 parent_node = CreateNode(snapshot_name=parent_sid.name) 754 755 if parent_node is not None: 756 cache[parent_sid] = {parent_node} 757 return {parent_node} 758 759 # This snapshot has no intervals and doesn't need creation which means 760 # that it can be a transitive dependency 761 transitive_deps: t.Set[SchedulingUnit] = set() 762 parent_snapshot = self.snapshots[parent_sid] 763 for grandparent_sid in parent_snapshot.parents: 764 transitive_deps.update( 765 self._find_upstream_dependencies( 766 grandparent_sid, intervals_per_snapshot, snapshots_to_create, cache 767 ) 768 ) 769 cache[parent_sid] = transitive_deps 770 return transitive_deps 771 772 def _run_or_audit( 773 self, 774 environment: str | EnvironmentNamingInfo, 775 start: t.Optional[TimeLike] = None, 776 end: t.Optional[TimeLike] = None, 777 execution_time: t.Optional[TimeLike] = None, 778 remove_intervals: t.Optional[t.Dict[SnapshotId, Interval]] = None, 779 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 780 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 781 ignore_cron: bool = False, 782 end_bounded: bool = False, 783 selected_snapshots: t.Optional[t.Set[str]] = None, 784 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 785 deployability_index: t.Optional[DeployabilityIndex] = None, 786 auto_restatement_enabled: bool = False, 787 run_environment_statements: bool = False, 788 audit_only: bool = False, 789 ) -> CompletionStatus: 790 """Concurrently runs or audits all snapshots in topological order. 791 792 Args: 793 environment: The environment naming info the user is targeting when applying their change. 794 Can just be the environment name if the user is targeting a remote environment and wants to get the remote 795 naming info 796 start: The start of the run. Defaults to the min node start date. 797 end: The end of the run. Defaults to now. 798 execution_time: The date/time time reference to use for execution time. Defaults to now. 799 remove_intervals: A dict of snapshots to their intervals. For evaluation, these are the intervals that will be restated. For audits, 800 these are the intervals that will be reaudited 801 start_override_per_model: A mapping of model FQNs to target start dates. 802 end_override_per_model: A mapping of model FQNs to target end dates. 803 ignore_cron: Whether to ignore the node's cron schedule. 804 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, 805 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 806 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 807 circuit_breaker: An optional handler which checks if the run should be aborted. 808 deployability_index: Determines snapshots that are deployable in the context of this render. 809 auto_restatement_enabled: Whether to enable auto restatements. 810 811 Returns: 812 True if the execution was successful and False otherwise. 813 """ 814 validate_date_range(start, end) 815 if isinstance(environment, str): 816 env = self.state_sync.get_environment(environment) 817 if not env: 818 raise SQLMeshError( 819 "Was not provided an environment suffix target and the environment doesn't exist." 820 "Are you running for the first time and need to run plan/apply first?" 821 ) 822 environment_naming_info = env.naming_info 823 else: 824 environment_naming_info = environment 825 826 deployability_index = deployability_index or ( 827 DeployabilityIndex.create(self.snapshots.values(), start=start) 828 if environment_naming_info.name != c.PROD 829 else DeployabilityIndex.all_deployable() 830 ) 831 execution_time = execution_time or now_timestamp() 832 833 self.state_sync.refresh_snapshot_intervals(self.snapshots.values()) 834 for s_id, interval in (remove_intervals or {}).items(): 835 self.snapshots[s_id].remove_interval(interval) 836 837 all_auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {} 838 if auto_restatement_enabled: 839 auto_restated_intervals, all_auto_restatement_triggers = apply_auto_restatements( 840 self.snapshots, execution_time 841 ) 842 self.state_sync.add_snapshots_intervals(auto_restated_intervals) 843 self.state_sync.update_auto_restatements( 844 {s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()} 845 ) 846 847 merged_intervals = self.merged_missing_intervals( 848 start, 849 end, 850 execution_time, 851 deployability_index=deployability_index, 852 restatements=remove_intervals, 853 start_override_per_model=start_override_per_model, 854 end_override_per_model=end_override_per_model, 855 ignore_cron=ignore_cron, 856 end_bounded=end_bounded, 857 selected_snapshots=selected_snapshots, 858 ) 859 if not merged_intervals: 860 return CompletionStatus.NOTHING_TO_DO 861 862 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {} 863 if all_auto_restatement_triggers: 864 merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals} 865 auto_restatement_triggers = { 866 s_id: all_auto_restatement_triggers.get(s_id, []) 867 for s_id in merged_intervals_snapshots 868 } 869 870 errors, _ = self.run_merged_intervals( 871 merged_intervals=merged_intervals, 872 deployability_index=deployability_index, 873 environment_naming_info=environment_naming_info, 874 execution_time=execution_time, 875 circuit_breaker=circuit_breaker, 876 start=start, 877 end=end, 878 run_environment_statements=run_environment_statements, 879 audit_only=audit_only, 880 auto_restatement_triggers=auto_restatement_triggers, 881 selected_models={ 882 s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id 883 }, 884 ) 885 886 return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS 887 888 def _audit_snapshot( 889 self, 890 snapshot: Snapshot, 891 deployability_index: DeployabilityIndex, 892 snapshots: t.Dict[str, Snapshot], 893 start: t.Optional[TimeLike] = None, 894 end: t.Optional[TimeLike] = None, 895 execution_time: t.Optional[TimeLike] = None, 896 wap_id: t.Optional[str] = None, 897 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 898 **kwargs: t.Any, 899 ) -> t.List[AuditResult]: 900 is_deployable = deployability_index.is_deployable(snapshot) 901 902 audit_results = self.snapshot_evaluator.audit( 903 snapshot=snapshot, 904 start=start, 905 end=end, 906 execution_time=execution_time, 907 snapshots=snapshots, 908 deployability_index=deployability_index, 909 wap_id=wap_id, 910 **kwargs, 911 ) 912 913 audit_errors_to_raise: t.List[AuditError] = [] 914 audit_errors_to_warn: t.List[AuditError] = [] 915 for audit_result in (result for result in audit_results if result.count): 916 error = AuditError( 917 audit_name=audit_result.audit.name, 918 audit_args=audit_result.audit_args, 919 model=snapshot.model_or_none, 920 count=t.cast(int, audit_result.count), 921 query=t.cast(exp.Query, audit_result.query), 922 adapter_dialect=self.snapshot_evaluator.adapter.dialect, 923 ) 924 self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, error) 925 if is_deployable and snapshot.node.owner: 926 self.notification_target_manager.notify_user( 927 NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, error 928 ) 929 if audit_result.blocking: 930 audit_errors_to_raise.append(error) 931 else: 932 audit_errors_to_warn.append(error) 933 934 if audit_errors_to_raise: 935 raise NodeAuditsErrors(audit_errors_to_raise) 936 937 if environment_naming_info: 938 for audit_error in audit_errors_to_warn: 939 display_name = snapshot.display_name( 940 environment_naming_info, 941 self.default_catalog, 942 self.snapshot_evaluator.adapter.dialect, 943 ) 944 self.console.log_warning( 945 f"\n{display_name}: {audit_error}.", 946 f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}", 947 ) 948 949 return audit_results 950 951 def _check_ready_intervals( 952 self, 953 snapshot: Snapshot, 954 intervals: Intervals, 955 context: ExecutionContext, 956 environment_naming_info: EnvironmentNamingInfo, 957 ) -> Intervals: 958 """Checks if the intervals are ready for evaluation for the given snapshot. 959 960 This implementation also includes the signal progress tracking. 961 Note that this will handle gaps in the provided intervals. The returned intervals 962 may introduce new gaps. 963 964 Args: 965 snapshot: The snapshot to check. 966 intervals: The intervals to check. 967 context: The context to use. 968 environment_naming_info: The environment naming info to use. 969 970 Returns: 971 The intervals that are ready for evaluation. 972 """ 973 signals = snapshot.is_model and snapshot.model.render_signal_calls() 974 975 if not (signals and signals.signals_to_kwargs): 976 return intervals 977 978 self.console.start_signal_progress( 979 snapshot, 980 self.default_catalog, 981 environment_naming_info or EnvironmentNamingInfo(), 982 ) 983 984 for signal_idx, (signal_name, kwargs) in enumerate(signals.signals_to_kwargs.items()): 985 # Capture intervals before signal check for display 986 intervals_to_check = merge_intervals(intervals) 987 988 signal_start_ts = time.perf_counter() 989 990 try: 991 intervals = check_ready_intervals( 992 signals.prepared_python_env[signal_name], 993 intervals, 994 context, 995 python_env=signals.python_env, 996 dialect=snapshot.model.dialect, 997 path=snapshot.model._path, 998 snapshot=snapshot, 999 kwargs=kwargs, 1000 ) 1001 except SQLMeshError as e: 1002 raise SignalEvalError( 1003 f"{e} '{signal_name}' for '{snapshot.model.name}' at {snapshot.model._path}" 1004 ) 1005 1006 duration = time.perf_counter() - signal_start_ts 1007 1008 self.console.update_signal_progress( 1009 snapshot=snapshot, 1010 signal_name=signal_name, 1011 signal_idx=signal_idx, 1012 total_signals=len(signals.signals_to_kwargs), 1013 ready_intervals=merge_intervals(intervals), 1014 check_intervals=intervals_to_check, 1015 duration=duration, 1016 ) 1017 1018 self.console.stop_signal_progress() 1019 1020 return intervals
Schedules and manages the evaluation of snapshots.
The scheduler evaluates multiple snapshots with date intervals in the correct topological order. It consults the state sync to understand what intervals for each snapshot needs to be backfilled.
The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.
Arguments:
- snapshots: A collection of snapshots.
- snapshot_evaluator: The snapshot evaluator to execute queries.
- state_sync: The state sync to pull saved snapshots.
- max_workers: The maximum number of parallel queries to run.
- console: The rich instance used for printing scheduling information.
117 def __init__( 118 self, 119 snapshots: t.Iterable[Snapshot], 120 snapshot_evaluator: SnapshotEvaluator, 121 state_sync: StateSync, 122 default_catalog: t.Optional[str], 123 max_workers: int = 1, 124 console: t.Optional[Console] = None, 125 notification_target_manager: t.Optional[NotificationTargetManager] = None, 126 ): 127 self.state_sync = state_sync 128 self.snapshots = {s.snapshot_id: s for s in snapshots} 129 self.snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()} 130 self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values()) 131 self.default_catalog = default_catalog 132 self.snapshot_evaluator = snapshot_evaluator 133 self.max_workers = max_workers 134 self.console = console or get_console() 135 self.notification_target_manager = ( 136 notification_target_manager or NotificationTargetManager() 137 )
139 def merged_missing_intervals( 140 self, 141 start: t.Optional[TimeLike] = None, 142 end: t.Optional[TimeLike] = None, 143 execution_time: t.Optional[TimeLike] = None, 144 deployability_index: t.Optional[DeployabilityIndex] = None, 145 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 146 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 147 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 148 ignore_cron: bool = False, 149 end_bounded: bool = False, 150 selected_snapshots: t.Optional[t.Set[str]] = None, 151 ) -> SnapshotToIntervals: 152 """Find the largest contiguous date interval parameters based only on what is missing. 153 154 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 155 calculate the missing intervals that need to be processed given the passed in start and end intervals. 156 157 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 158 159 Args: 160 start: The start of the run. Defaults to the min node start date. 161 end: The end of the run. Defaults to now. 162 execution_time: The date/time reference to use for execution time. Defaults to now. 163 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 164 restatements: A set of snapshot names being restated. 165 start_override_per_model: A mapping of model FQNs to target start dates. 166 end_override_per_model: A mapping of model FQNs to target end dates. 167 ignore_cron: Whether to ignore the node's cron schedule. 168 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 169 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 170 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 171 """ 172 snapshots_to_intervals = merged_missing_intervals( 173 snapshots=self.snapshot_per_version.values(), 174 start=start, 175 end=end, 176 execution_time=execution_time, 177 deployability_index=deployability_index, 178 restatements=restatements, 179 start_override_per_model=start_override_per_model, 180 end_override_per_model=end_override_per_model, 181 ignore_cron=ignore_cron, 182 end_bounded=end_bounded, 183 ) 184 # Filtering snapshots after computing missing intervals because we need all snapshots in order 185 # to correctly infer start dates. 186 if selected_snapshots is not None: 187 snapshots_to_intervals = { 188 s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots 189 } 190 return snapshots_to_intervals
Find the largest contiguous date interval parameters based only on what is missing.
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.
This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
Arguments:
- start: The start of the run. Defaults to the min node start date.
- end: The end of the run. Defaults to now.
- execution_time: The date/time reference to use for execution time. Defaults to now.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- restatements: A set of snapshot names being restated.
- start_override_per_model: A mapping of model FQNs to target start dates.
- end_override_per_model: A mapping of model FQNs to target end dates.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
- selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
192 def evaluate( 193 self, 194 snapshot: Snapshot, 195 start: TimeLike, 196 end: TimeLike, 197 execution_time: TimeLike, 198 deployability_index: DeployabilityIndex, 199 batch_index: int, 200 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 201 allow_destructive_snapshots: t.Optional[t.Set[str]] = None, 202 allow_additive_snapshots: t.Optional[t.Set[str]] = None, 203 target_table_exists: t.Optional[bool] = None, 204 **kwargs: t.Any, 205 ) -> t.List[AuditResult]: 206 """Evaluate a snapshot and add the processed interval to the state sync. 207 208 Args: 209 snapshot: Snapshot to evaluate. 210 start: The start datetime to render. 211 end: The end datetime to render. 212 execution_time: The date/time time reference to use for execution time. Defaults to now. 213 allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. 214 allow_additive_snapshots: Snapshots for which additive schema changes are allowed. 215 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 216 batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it 217 auto_restatement_enabled: Whether to enable auto restatements. 218 target_table_exists: Whether the target table exists. If None, the table will be checked for existence. 219 kwargs: Additional kwargs to pass to the renderer. 220 221 Returns: 222 Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn. 223 """ 224 validate_date_range(start, end) 225 226 snapshots = parent_snapshots_by_name(snapshot, self.snapshots) 227 228 is_deployable = deployability_index.is_deployable(snapshot) 229 230 wap_id = self.snapshot_evaluator.evaluate( 231 snapshot, 232 start=start, 233 end=end, 234 execution_time=execution_time, 235 snapshots=snapshots, 236 allow_destructive_snapshots=allow_destructive_snapshots, 237 allow_additive_snapshots=allow_additive_snapshots, 238 deployability_index=deployability_index, 239 batch_index=batch_index, 240 target_table_exists=target_table_exists, 241 **kwargs, 242 ) 243 audit_results = self._audit_snapshot( 244 snapshot=snapshot, 245 environment_naming_info=environment_naming_info, 246 start=start, 247 end=end, 248 execution_time=execution_time, 249 snapshots=snapshots, 250 deployability_index=deployability_index, 251 wap_id=wap_id, 252 **kwargs, 253 ) 254 255 self.state_sync.add_interval( 256 snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp() 257 ) 258 return audit_results
Evaluate a snapshot and add the processed interval to the state sync.
Arguments:
- snapshot: Snapshot to evaluate.
- start: The start datetime to render.
- end: The end datetime to render.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
- allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
- auto_restatement_enabled: Whether to enable auto restatements.
- target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
- kwargs: Additional kwargs to pass to the renderer.
Returns:
Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.
260 def run( 261 self, 262 environment: str | EnvironmentNamingInfo, 263 start: t.Optional[TimeLike] = None, 264 end: t.Optional[TimeLike] = None, 265 execution_time: t.Optional[TimeLike] = None, 266 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 267 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 268 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 269 ignore_cron: bool = False, 270 end_bounded: bool = False, 271 selected_snapshots: t.Optional[t.Set[str]] = None, 272 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 273 deployability_index: t.Optional[DeployabilityIndex] = None, 274 auto_restatement_enabled: bool = False, 275 run_environment_statements: bool = False, 276 ) -> CompletionStatus: 277 return self._run_or_audit( 278 environment=environment, 279 start=start, 280 end=end, 281 execution_time=execution_time, 282 remove_intervals=restatements, 283 start_override_per_model=start_override_per_model, 284 end_override_per_model=end_override_per_model, 285 ignore_cron=ignore_cron, 286 end_bounded=end_bounded, 287 selected_snapshots=selected_snapshots, 288 circuit_breaker=circuit_breaker, 289 deployability_index=deployability_index, 290 auto_restatement_enabled=auto_restatement_enabled, 291 run_environment_statements=run_environment_statements, 292 )
294 def audit( 295 self, 296 environment: str | EnvironmentNamingInfo, 297 start: TimeLike, 298 end: TimeLike, 299 execution_time: t.Optional[TimeLike] = None, 300 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 301 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 302 ignore_cron: bool = False, 303 end_bounded: bool = False, 304 selected_snapshots: t.Optional[t.Set[str]] = None, 305 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 306 deployability_index: t.Optional[DeployabilityIndex] = None, 307 run_environment_statements: bool = False, 308 ) -> CompletionStatus: 309 # Remove the intervals from the snapshots that will be audited so that they can be recomputed 310 # by _run_or_audit as "missing intervals" to reuse the rest of it's logic 311 remove_intervals = {} 312 for snapshot in self.snapshots.values(): 313 removal_intervals = snapshot.get_removal_interval( 314 start, end, execution_time, is_preview=True 315 ) 316 remove_intervals[snapshot.snapshot_id] = removal_intervals 317 318 return self._run_or_audit( 319 environment=environment, 320 start=start, 321 end=end, 322 execution_time=execution_time, 323 remove_intervals=remove_intervals, 324 start_override_per_model=start_override_per_model, 325 end_override_per_model=end_override_per_model, 326 ignore_cron=ignore_cron, 327 end_bounded=end_bounded, 328 selected_snapshots=selected_snapshots, 329 circuit_breaker=circuit_breaker, 330 deployability_index=deployability_index, 331 run_environment_statements=run_environment_statements, 332 audit_only=True, 333 )
335 def batch_intervals( 336 self, 337 merged_intervals: SnapshotToIntervals, 338 deployability_index: t.Optional[DeployabilityIndex], 339 environment_naming_info: EnvironmentNamingInfo, 340 dag: t.Optional[DAG[SnapshotId]] = None, 341 is_restatement: bool = False, 342 ) -> t.Dict[Snapshot, Intervals]: 343 dag = dag or snapshots_to_dag(merged_intervals) 344 345 snapshot_intervals: t.Dict[SnapshotId, t.Tuple[Snapshot, t.List[Interval]]] = { 346 snapshot.snapshot_id: ( 347 snapshot, 348 [ 349 i 350 for interval in intervals 351 for i in _expand_range_as_interval(*interval, snapshot.node.interval_unit) 352 ], 353 ) 354 for snapshot, intervals in merged_intervals.items() 355 } 356 snapshot_batches: t.Dict[Snapshot, Intervals] = {} 357 all_unready_intervals: t.Dict[str, set[Interval]] = {} 358 for snapshot_id in dag: 359 if snapshot_id not in snapshot_intervals: 360 continue 361 snapshot, intervals = snapshot_intervals[snapshot_id] 362 unready = set(intervals) 363 364 from sqlmesh.core.context import ExecutionContext 365 366 adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway) 367 368 parent_intervals: Intervals = [] 369 for parent_id in snapshot.parents: 370 parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, [])) 371 if not parent_snapshot or parent_snapshot.is_external: 372 continue 373 374 parent_intervals.extend(snapshot_batches[parent_snapshot]) 375 376 context = ExecutionContext( 377 adapter, 378 self.snapshots_by_name, 379 deployability_index, 380 default_dialect=adapter.dialect, 381 default_catalog=self.default_catalog, 382 is_restatement=is_restatement, 383 parent_intervals=parent_intervals, 384 ) 385 386 intervals = self._check_ready_intervals( 387 snapshot, 388 intervals, 389 context, 390 environment_naming_info, 391 ) 392 unready -= set(intervals) 393 394 for parent in snapshot.parents: 395 if parent.name in all_unready_intervals: 396 unready.update(all_unready_intervals[parent.name]) 397 398 all_unready_intervals[snapshot.name] = unready 399 400 batches = [] 401 batch_size = snapshot.node.batch_size 402 next_batch: t.List[t.Tuple[int, int]] = [] 403 404 for interval in interval_diff( 405 intervals, merge_intervals(unready), uninterrupted=snapshot.depends_on_past 406 ): 407 if (batch_size and len(next_batch) >= batch_size) or ( 408 next_batch and interval[0] != next_batch[-1][-1] 409 ): 410 batches.append((next_batch[0][0], next_batch[-1][-1])) 411 next_batch = [] 412 413 next_batch.append(interval) 414 415 if next_batch: 416 batches.append((next_batch[0][0], next_batch[-1][-1])) 417 418 snapshot_batches[snapshot] = batches 419 420 return snapshot_batches
422 def run_merged_intervals( 423 self, 424 *, 425 merged_intervals: SnapshotToIntervals, 426 deployability_index: DeployabilityIndex, 427 environment_naming_info: EnvironmentNamingInfo, 428 execution_time: t.Optional[TimeLike] = None, 429 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 430 start: t.Optional[TimeLike] = None, 431 end: t.Optional[TimeLike] = None, 432 allow_destructive_snapshots: t.Optional[t.Set[str]] = None, 433 selected_models: t.Optional[t.Set[str]] = None, 434 allow_additive_snapshots: t.Optional[t.Set[str]] = None, 435 selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 436 run_environment_statements: bool = False, 437 audit_only: bool = False, 438 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, 439 is_restatement: bool = False, 440 ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: 441 """Runs precomputed batches of missing intervals. 442 443 Args: 444 merged_intervals: The snapshots and contiguous interval ranges to evaluate. 445 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 446 environment_naming_info: The environment naming info the user is targeting when applying their change. 447 execution_time: The date/time reference to use for execution time. 448 circuit_breaker: An optional handler which checks if the run should be aborted. 449 start: The start of the run. 450 end: The end of the run. 451 allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. 452 allow_additive_snapshots: Snapshots for which additive schema changes are allowed. 453 selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included. 454 455 Returns: 456 A tuple of errors and skipped intervals. 457 """ 458 execution_time = execution_time or now_timestamp() 459 460 selected_snapshots = [self.snapshots[sid] for sid in (selected_snapshot_ids or set())] 461 if not selected_snapshots: 462 selected_snapshots = list(merged_intervals) 463 464 # Build the full DAG from all snapshots to preserve transitive dependencies 465 full_dag = snapshots_to_dag(self.snapshots.values()) 466 467 # Create a subdag that includes the selected snapshots and all their upstream dependencies 468 # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected 469 selected_snapshot_ids_set = {s.snapshot_id for s in selected_snapshots} 470 snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set) 471 472 batched_intervals = self.batch_intervals( 473 merged_intervals, 474 deployability_index, 475 environment_naming_info, 476 dag=snapshot_dag, 477 is_restatement=is_restatement, 478 ) 479 self.console.start_evaluation_progress( 480 batched_intervals, 481 environment_naming_info, 482 self.default_catalog, 483 audit_only=audit_only, 484 ) 485 486 if run_environment_statements: 487 environment_statements = self.state_sync.get_environment_statements( 488 environment_naming_info.name 489 ) 490 execute_environment_statements( 491 adapter=self.snapshot_evaluator.adapter, 492 environment_statements=environment_statements, 493 runtime_stage=RuntimeStage.BEFORE_ALL, 494 environment_naming_info=environment_naming_info, 495 default_catalog=self.default_catalog, 496 snapshots=self.snapshots_by_name, 497 start=start, 498 end=end, 499 execution_time=execution_time, 500 selected_models=selected_models, 501 ) 502 503 # We only need to create physical tables if the snapshot is not representative or if it 504 # needs backfill 505 snapshots_to_create_candidates = [ 506 s 507 for s in selected_snapshots 508 if not deployability_index.is_representative(s) or s in batched_intervals 509 ] 510 snapshots_to_create = { 511 s.snapshot_id 512 for s in self.snapshot_evaluator.get_snapshots_to_create( 513 snapshots_to_create_candidates, deployability_index 514 ) 515 } 516 517 dag = self._dag( 518 batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create 519 ) 520 521 def run_node(node: SchedulingUnit) -> None: 522 if circuit_breaker and circuit_breaker(): 523 raise CircuitBreakerError() 524 if isinstance(node, DummyNode): 525 return 526 527 snapshot = self.snapshots_by_name[node.snapshot_name] 528 529 if isinstance(node, EvaluateNode): 530 self.console.start_snapshot_evaluation_progress(snapshot) 531 execution_start_ts = now_timestamp() 532 evaluation_duration_ms: t.Optional[int] = None 533 start, end = node.interval 534 535 audit_results: t.List[AuditResult] = [] 536 try: 537 assert execution_time # mypy 538 assert deployability_index # mypy 539 540 if audit_only: 541 audit_results = self._audit_snapshot( 542 snapshot=snapshot, 543 environment_naming_info=environment_naming_info, 544 deployability_index=deployability_index, 545 snapshots=self.snapshots_by_name, 546 start=start, 547 end=end, 548 execution_time=execution_time, 549 ) 550 else: 551 # If batch_index > 0, then the target table must exist since the first batch would have created it 552 target_table_exists = ( 553 snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0 554 ) 555 audit_results = self.evaluate( 556 snapshot=snapshot, 557 environment_naming_info=environment_naming_info, 558 start=start, 559 end=end, 560 execution_time=execution_time, 561 deployability_index=deployability_index, 562 batch_index=node.batch_index, 563 allow_destructive_snapshots=allow_destructive_snapshots, 564 allow_additive_snapshots=allow_additive_snapshots, 565 target_table_exists=target_table_exists, 566 selected_models=selected_models, 567 ) 568 569 evaluation_duration_ms = now_timestamp() - execution_start_ts 570 finally: 571 num_audits = len(audit_results) 572 num_audits_failed = sum(1 for result in audit_results if result.count) 573 574 execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats( 575 SnapshotIdBatch(snapshot_id=snapshot.snapshot_id, batch_id=node.batch_index) 576 ) 577 578 self.console.update_snapshot_evaluation_progress( 579 snapshot, 580 batched_intervals[snapshot][node.batch_index], 581 node.batch_index, 582 evaluation_duration_ms, 583 num_audits - num_audits_failed, 584 num_audits_failed, 585 execution_stats=execution_stats, 586 auto_restatement_triggers=auto_restatement_triggers.get( 587 snapshot.snapshot_id 588 ), 589 ) 590 elif isinstance(node, CreateNode): 591 self.snapshot_evaluator.create_snapshot( 592 snapshot=snapshot, 593 snapshots=self.snapshots_by_name, 594 deployability_index=deployability_index, 595 allow_destructive_snapshots=allow_destructive_snapshots or set(), 596 allow_additive_snapshots=allow_additive_snapshots or set(), 597 ) 598 599 try: 600 with self.snapshot_evaluator.concurrent_context(): 601 errors, skipped_intervals = concurrent_apply_to_dag( 602 dag, 603 run_node, 604 self.max_workers, 605 raise_on_error=False, 606 ) 607 self.console.stop_evaluation_progress(success=not errors) 608 609 skipped_snapshots = { 610 i.snapshot_name for i in skipped_intervals if isinstance(i, EvaluateNode) 611 } 612 self.console.log_skipped_models(skipped_snapshots) 613 for skipped in skipped_snapshots: 614 logger.info(f"SKIPPED snapshot {skipped}\n") 615 616 for error in errors: 617 if isinstance(error.__cause__, CircuitBreakerError): 618 raise error.__cause__ 619 logger.info(str(error), exc_info=error) 620 621 self.console.log_failed_models(errors) 622 623 return errors, skipped_intervals 624 finally: 625 if run_environment_statements: 626 execute_environment_statements( 627 adapter=self.snapshot_evaluator.adapter, 628 environment_statements=environment_statements, 629 runtime_stage=RuntimeStage.AFTER_ALL, 630 environment_naming_info=environment_naming_info, 631 default_catalog=self.default_catalog, 632 snapshots=self.snapshots_by_name, 633 start=start, 634 end=end, 635 execution_time=execution_time, 636 selected_models=selected_models, 637 ) 638 639 self.state_sync.recycle()
Runs precomputed batches of missing intervals.
Arguments:
- merged_intervals: The snapshots and contiguous interval ranges to evaluate.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- environment_naming_info: The environment naming info the user is targeting when applying their change.
- execution_time: The date/time reference to use for execution time.
- circuit_breaker: An optional handler which checks if the run should be aborted.
- start: The start of the run.
- end: The end of the run.
- allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
- allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
- selected_snapshot_ids: The snapshots to include in the run DAG. If None, all snapshots with missing intervals will be included.
Returns:
A tuple of errors and skipped intervals.
1023def merged_missing_intervals( 1024 snapshots: t.Collection[Snapshot], 1025 start: t.Optional[TimeLike] = None, 1026 end: t.Optional[TimeLike] = None, 1027 execution_time: t.Optional[TimeLike] = None, 1028 deployability_index: t.Optional[DeployabilityIndex] = None, 1029 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 1030 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1031 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1032 ignore_cron: bool = False, 1033 end_bounded: bool = False, 1034) -> SnapshotToIntervals: 1035 """Find the largest contiguous date interval parameters based only on what is missing. 1036 1037 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 1038 calculate the missing intervals that need to be processed given the passed in start and end intervals. 1039 1040 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 1041 1042 Args: 1043 snapshots: A set of target snapshots for which intervals should be computed. 1044 start: The start of the run. Defaults to the min node start date. 1045 end: The end of the run. Defaults to now. 1046 execution_time: The date/time reference to use for execution time. Defaults to now. 1047 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1048 restatements: A set of snapshot names being restated. 1049 start_override_per_model: A mapping of model FQNs to target start dates. 1050 end_override_per_model: A mapping of model FQNs to target end dates. 1051 ignore_cron: Whether to ignore the node's cron schedule. 1052 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1053 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1054 """ 1055 restatements = restatements or {} 1056 validate_date_range(start, end) 1057 1058 return compute_interval_params( 1059 snapshots, 1060 start=start or earliest_start_date(snapshots), 1061 end=end or now_timestamp(), 1062 deployability_index=deployability_index, 1063 execution_time=execution_time or now_timestamp(), 1064 restatements=restatements, 1065 start_override_per_model=start_override_per_model, 1066 end_override_per_model=end_override_per_model, 1067 ignore_cron=ignore_cron, 1068 end_bounded=end_bounded, 1069 )
Find the largest contiguous date interval parameters based only on what is missing.
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.
This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
Arguments:
- snapshots: A set of target snapshots for which intervals should be computed.
- start: The start of the run. Defaults to the min node start date.
- end: The end of the run. Defaults to now.
- execution_time: The date/time reference to use for execution time. Defaults to now.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- restatements: A set of snapshot names being restated.
- start_override_per_model: A mapping of model FQNs to target start dates.
- end_override_per_model: A mapping of model FQNs to target end dates.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1072def compute_interval_params( 1073 snapshots: t.Collection[Snapshot], 1074 *, 1075 start: TimeLike, 1076 end: TimeLike, 1077 deployability_index: t.Optional[DeployabilityIndex] = None, 1078 execution_time: t.Optional[TimeLike] = None, 1079 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 1080 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1081 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1082 ignore_cron: bool = False, 1083 end_bounded: bool = False, 1084) -> SnapshotToIntervals: 1085 """Find the largest contiguous date interval parameters based only on what is missing. 1086 1087 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 1088 calculate the missing intervals that need to be processed given the passed in start and end intervals. 1089 1090 This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc. 1091 1092 Args: 1093 snapshots: A set of target snapshots for which intervals should be computed. 1094 start: Start of the interval. 1095 end: End of the interval. 1096 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1097 execution_time: The date/time reference to use for execution time. 1098 restatements: A dict of snapshot names being restated and their intervals. 1099 start_override_per_model: A mapping of model FQNs to target start dates. 1100 end_override_per_model: A mapping of model FQNs to target end dates. 1101 ignore_cron: Whether to ignore the node's cron schedule. 1102 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1103 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1104 1105 Returns: 1106 A dict containing all snapshots needing to be run with their associated interval params. 1107 """ 1108 snapshot_merged_intervals = {} 1109 1110 for snapshot, intervals in missing_intervals( 1111 snapshots, 1112 start=start, 1113 end=end, 1114 execution_time=execution_time, 1115 restatements=restatements, 1116 deployability_index=deployability_index, 1117 start_override_per_model=start_override_per_model, 1118 end_override_per_model=end_override_per_model, 1119 ignore_cron=ignore_cron, 1120 end_bounded=end_bounded, 1121 ).items(): 1122 contiguous_batch = [] 1123 next_batch: Intervals = [] 1124 1125 for interval in intervals: 1126 if next_batch and interval[0] != next_batch[-1][-1]: 1127 contiguous_batch.append((next_batch[0][0], next_batch[-1][-1])) 1128 next_batch = [] 1129 next_batch.append(interval) 1130 if next_batch: 1131 contiguous_batch.append((next_batch[0][0], next_batch[-1][-1])) 1132 snapshot_merged_intervals[snapshot] = contiguous_batch 1133 1134 return snapshot_merged_intervals
Find the largest contiguous date interval parameters based only on what is missing.
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.
This is a superset of what may actually get processed at runtime based on things like batch size, signal readiness, etc.
Arguments:
- snapshots: A set of target snapshots for which intervals should be computed.
- start: Start of the interval.
- end: End of the interval.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- execution_time: The date/time reference to use for execution time.
- restatements: A dict of snapshot names being restated and their intervals.
- start_override_per_model: A mapping of model FQNs to target start dates.
- end_override_per_model: A mapping of model FQNs to target end dates.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
Returns:
A dict containing all snapshots needing to be run with their associated interval params.
1137def interval_diff( 1138 intervals_a: Intervals, intervals_b: Intervals, uninterrupted: bool = False 1139) -> Intervals: 1140 if not intervals_a or not intervals_b: 1141 return intervals_a 1142 1143 index_a, index_b = 0, 0 1144 len_a = len(intervals_a) 1145 len_b = len(intervals_b) 1146 1147 results = [] 1148 1149 while index_a < len_a and index_b < len_b: 1150 interval_a = intervals_a[index_a] 1151 interval_b = intervals_b[index_b] 1152 1153 if interval_a[0] >= interval_b[1]: 1154 index_b += 1 1155 elif interval_b[0] >= interval_a[1]: 1156 results.append(interval_a) 1157 index_a += 1 1158 else: 1159 if uninterrupted: 1160 return results 1161 1162 if interval_a[0] >= interval_b[0]: 1163 index_a += 1 1164 else: 1165 index_b += 1 1166 1167 if index_a < len_a: 1168 interval_a = intervals_a[index_a] 1169 if interval_a[0] >= interval_b[1] or interval_b[0] >= interval_a[1]: 1170 results.extend(intervals_a[index_a:]) 1171 1172 return results