sqlmesh.core.scheduler
1from __future__ import annotations 2 3import logging 4import traceback 5import typing as t 6from datetime import datetime 7 8from sqlmesh.core import constants as c 9from sqlmesh.core.console import Console, get_console 10from sqlmesh.core.environment import EnvironmentNamingInfo 11from sqlmesh.core.model import SeedModel 12from sqlmesh.core.notification_target import ( 13 NotificationEvent, 14 NotificationTargetManager, 15) 16from sqlmesh.core.snapshot import ( 17 DeployabilityIndex, 18 Snapshot, 19 SnapshotEvaluator, 20 earliest_start_date, 21 missing_intervals, 22) 23from sqlmesh.core.snapshot.definition import Interval as SnapshotInterval 24from sqlmesh.core.snapshot.definition import SnapshotId 25from sqlmesh.core.state_sync import StateSync 26from sqlmesh.utils import format_exception 27from sqlmesh.utils.concurrency import concurrent_apply_to_dag 28from sqlmesh.utils.dag import DAG 29from sqlmesh.utils.date import ( 30 TimeLike, 31 now, 32 now_timestamp, 33 to_datetime, 34 validate_date_range, 35) 36from sqlmesh.utils.errors import AuditError, CircuitBreakerError, SQLMeshError 37 38logger = logging.getLogger(__name__) 39Interval = t.Tuple[datetime, datetime] 40Batch = t.List[Interval] 41SnapshotToBatches = t.Dict[Snapshot, Batch] 42# we store snapshot name instead of snapshots/snapshotids because pydantic 43# is extremely slow to hash. snapshot names should be unique within a dag run 44SchedulingUnit = t.Tuple[str, t.Tuple[Interval, int]] 45 46 47class Scheduler: 48 """Schedules and manages the evaluation of snapshots. 49 50 The scheduler evaluates multiple snapshots with date intervals in the correct 51 topological order. It consults the state sync to understand what intervals for each 52 snapshot needs to be backfilled. 53 54 The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine. 55 56 Args: 57 snapshots: A collection of snapshots. 58 snapshot_evaluator: The snapshot evaluator to execute queries. 59 state_sync: The state sync to pull saved snapshots. 60 max_workers: The maximum number of parallel queries to run. 61 console: The rich instance used for printing scheduling information. 62 """ 63 64 def __init__( 65 self, 66 snapshots: t.Iterable[Snapshot], 67 snapshot_evaluator: SnapshotEvaluator, 68 state_sync: StateSync, 69 default_catalog: t.Optional[str], 70 max_workers: int = 1, 71 console: t.Optional[Console] = None, 72 notification_target_manager: t.Optional[NotificationTargetManager] = None, 73 ): 74 self.state_sync = state_sync 75 self.snapshots = {s.snapshot_id: s for s in snapshots} 76 self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values()) 77 self.default_catalog = default_catalog 78 self.snapshot_evaluator = snapshot_evaluator 79 self.max_workers = max_workers 80 self.console = console or get_console() 81 self.notification_target_manager = ( 82 notification_target_manager or NotificationTargetManager() 83 ) 84 85 def batches( 86 self, 87 start: t.Optional[TimeLike] = None, 88 end: t.Optional[TimeLike] = None, 89 execution_time: t.Optional[TimeLike] = None, 90 deployability_index: t.Optional[DeployabilityIndex] = None, 91 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 92 ignore_cron: bool = False, 93 end_bounded: bool = False, 94 selected_snapshots: t.Optional[t.Set[str]] = None, 95 ) -> SnapshotToBatches: 96 """Find the optimal date interval paramaters based on what needs processing and maximal batch size. 97 98 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 99 calculate the missing intervals that need to be processed given the passed in start and end intervals. 100 101 If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than 102 or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. 103 For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs 104 with 30 days and 1 job with 10. 105 106 Args: 107 start: The start of the run. Defaults to the min node start date. 108 end: The end of the run. Defaults to now. 109 execution_time: The date/time time reference to use for execution time. Defaults to now. 110 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 111 restatements: A set of snapshot names being restated. 112 ignore_cron: Whether to ignore the node's cron schedule. 113 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 114 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 115 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 116 """ 117 restatements = restatements or {} 118 validate_date_range(start, end) 119 120 snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values() 121 if selected_snapshots is not None: 122 snapshots = [s for s in snapshots if s.name in selected_snapshots] 123 124 self.state_sync.refresh_snapshot_intervals(snapshots) 125 126 return compute_interval_params( 127 snapshots, 128 start=start or earliest_start_date(snapshots), 129 end=end or now(), 130 deployability_index=deployability_index, 131 execution_time=execution_time or now(), 132 restatements=restatements, 133 ignore_cron=ignore_cron, 134 end_bounded=end_bounded, 135 ) 136 137 def evaluate( 138 self, 139 snapshot: Snapshot, 140 start: TimeLike, 141 end: TimeLike, 142 execution_time: TimeLike, 143 deployability_index: DeployabilityIndex, 144 **kwargs: t.Any, 145 ) -> None: 146 """Evaluate a snapshot and add the processed interval to the state sync. 147 148 Args: 149 snapshot: Snapshot to evaluate. 150 start: The start datetime to render. 151 end: The end datetime to render. 152 execution_time: The date/time time reference to use for execution time. Defaults to now. 153 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 154 kwargs: Additional kwargs to pass to the renderer. 155 """ 156 validate_date_range(start, end) 157 158 snapshots = { 159 self.snapshots[p_sid].name: self.snapshots[p_sid] for p_sid in snapshot.parents 160 } 161 snapshots[snapshot.name] = snapshot 162 163 if isinstance(snapshot.node, SeedModel) and not snapshot.node.is_hydrated: 164 snapshot = self.state_sync.get_snapshots([snapshot], hydrate_seeds=True)[ 165 snapshot.snapshot_id 166 ] 167 168 is_deployable = deployability_index.is_deployable(snapshot) 169 170 wap_id = self.snapshot_evaluator.evaluate( 171 snapshot, 172 start=start, 173 end=end, 174 execution_time=execution_time, 175 snapshots=snapshots, 176 deployability_index=deployability_index, 177 **kwargs, 178 ) 179 try: 180 self.snapshot_evaluator.audit( 181 snapshot=snapshot, 182 start=start, 183 end=end, 184 execution_time=execution_time, 185 snapshots=snapshots, 186 deployability_index=deployability_index, 187 wap_id=wap_id, 188 **kwargs, 189 ) 190 except AuditError as e: 191 self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, e) 192 if is_deployable and snapshot.node.owner: 193 self.notification_target_manager.notify_user( 194 NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, e 195 ) 196 logger.error(f"Audit Failure: {traceback.format_exc()}") 197 raise e 198 199 self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable) 200 201 def run( 202 self, 203 environment: str | EnvironmentNamingInfo, 204 start: t.Optional[TimeLike] = None, 205 end: t.Optional[TimeLike] = None, 206 execution_time: t.Optional[TimeLike] = None, 207 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 208 ignore_cron: bool = False, 209 end_bounded: bool = False, 210 selected_snapshots: t.Optional[t.Set[str]] = None, 211 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 212 deployability_index: t.Optional[DeployabilityIndex] = None, 213 ) -> bool: 214 """Concurrently runs all snapshots in topological order. 215 216 Args: 217 environment: The environment naming info the user is targeting when applying their change. 218 Can just be the environment name if the user is targeting a remote environment and wants to get the remote 219 naming info 220 start: The start of the run. Defaults to the min node start date. 221 end: The end of the run. Defaults to now. 222 execution_time: The date/time time reference to use for execution time. Defaults to now. 223 restatements: A dict of snapshots to restate and their intervals. 224 ignore_cron: Whether to ignore the node's cron schedule. 225 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, 226 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 227 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 228 circuit_breaker: An optional handler which checks if the run should be aborted. 229 deployability_index: Determines snapshots that are deployable in the context of this render. 230 231 Returns: 232 True if the execution was successful and False otherwise. 233 """ 234 restatements = restatements or {} 235 validate_date_range(start, end) 236 if isinstance(environment, str): 237 env = self.state_sync.get_environment(environment) 238 if not env: 239 raise SQLMeshError( 240 "Was not provided an environment suffix target and the environment doesn't exist." 241 "Are you running for the first time and need to run plan/apply first?" 242 ) 243 environment_naming_info = env.naming_info 244 else: 245 environment_naming_info = environment 246 247 deployability_index = deployability_index or ( 248 DeployabilityIndex.create(self.snapshots.values()) 249 if environment_naming_info.name != c.PROD 250 else DeployabilityIndex.all_deployable() 251 ) 252 execution_time = execution_time or now() 253 batches = self.batches( 254 start, 255 end, 256 execution_time, 257 deployability_index=deployability_index, 258 restatements=restatements, 259 ignore_cron=ignore_cron, 260 end_bounded=end_bounded, 261 selected_snapshots=selected_snapshots, 262 ) 263 if not batches: 264 return True 265 266 dag = self._dag(batches) 267 268 self.console.start_evaluation_progress( 269 {snapshot: len(intervals) for snapshot, intervals in batches.items()}, 270 environment_naming_info, 271 self.default_catalog, 272 ) 273 274 snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()} 275 276 def evaluate_node(node: SchedulingUnit) -> None: 277 if circuit_breaker and circuit_breaker(): 278 raise CircuitBreakerError() 279 280 snapshot_name, ((start, end), batch_idx) = node 281 if batch_idx == -1: 282 return 283 snapshot = snapshots_by_name[snapshot_name] 284 285 self.console.start_snapshot_evaluation_progress(snapshot) 286 287 execution_start_ts = now_timestamp() 288 evaluation_duration_ms: t.Optional[int] = None 289 290 try: 291 assert execution_time # mypy 292 assert deployability_index # mypy 293 self.evaluate(snapshot, start, end, execution_time, deployability_index) 294 evaluation_duration_ms = now_timestamp() - execution_start_ts 295 finally: 296 self.console.update_snapshot_evaluation_progress( 297 snapshot, batch_idx, evaluation_duration_ms 298 ) 299 300 try: 301 with self.snapshot_evaluator.concurrent_context(): 302 errors, skipped_intervals = concurrent_apply_to_dag( 303 dag, 304 evaluate_node, 305 self.max_workers, 306 raise_on_error=False, 307 ) 308 finally: 309 self.state_sync.recycle() 310 311 self.console.stop_evaluation_progress(success=not errors) 312 313 skipped_snapshots = {i[0] for i in skipped_intervals} 314 for skipped in skipped_snapshots: 315 log_message = f"SKIPPED snapshot {skipped}\n" 316 self.console.log_status_update(log_message) 317 logger.info(log_message) 318 319 for error in errors: 320 if isinstance(error.__cause__, CircuitBreakerError): 321 raise error.__cause__ 322 sid = error.node[0] 323 formatted_exception = "".join(format_exception(error.__cause__ or error)) 324 log_message = f"FAILED processing snapshot {sid}\n{formatted_exception}" 325 self.console.log_error(log_message) 326 # Log with INFO level to prevent duplicate messages in the console. 327 logger.info(log_message) 328 329 return not errors 330 331 def _dag(self, batches: SnapshotToBatches) -> DAG[SchedulingUnit]: 332 """Builds a DAG of snapshot intervals to be evaluated. 333 334 Args: 335 batches: The batches of snapshots and intervals to evaluate. 336 337 Returns: 338 A DAG of snapshot intervals to be evaluated. 339 """ 340 341 intervals_per_snapshot = { 342 snapshot.name: intervals for snapshot, intervals in batches.items() 343 } 344 345 dag = DAG[SchedulingUnit]() 346 terminal_node = ((to_datetime(0), to_datetime(0)), -1) 347 348 for snapshot, intervals in batches.items(): 349 if not intervals: 350 continue 351 352 upstream_dependencies = [] 353 354 for p_sid in snapshot.parents: 355 if p_sid in self.snapshots: 356 p_intervals = intervals_per_snapshot.get(p_sid.name, []) 357 358 if len(p_intervals) > 1: 359 upstream_dependencies.append((p_sid.name, terminal_node)) 360 else: 361 for i, interval in enumerate(p_intervals): 362 upstream_dependencies.append((p_sid.name, (interval, i))) 363 364 batch_concurrency = snapshot.node.batch_concurrency 365 if snapshot.depends_on_past: 366 batch_concurrency = 1 367 368 for i, interval in enumerate(intervals): 369 node = (snapshot.name, (interval, i)) 370 dag.add(node, upstream_dependencies) 371 372 if len(intervals) > 1: 373 dag.add((snapshot.name, terminal_node), [node]) 374 375 if batch_concurrency and i >= batch_concurrency: 376 batch_idx_to_wait_for = i - batch_concurrency 377 dag.add( 378 node, 379 [ 380 ( 381 snapshot.name, 382 (intervals[batch_idx_to_wait_for], batch_idx_to_wait_for), 383 ) 384 ], 385 ) 386 return dag 387 388 389def compute_interval_params( 390 snapshots: t.Collection[Snapshot], 391 *, 392 start: TimeLike, 393 end: TimeLike, 394 deployability_index: t.Optional[DeployabilityIndex] = None, 395 execution_time: t.Optional[TimeLike] = None, 396 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 397 ignore_cron: bool = False, 398 end_bounded: bool = False, 399) -> SnapshotToBatches: 400 """Find the optimal date interval paramaters based on what needs processing and maximal batch size. 401 402 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 403 calculate the missing intervals that need to be processed given the passed in start and end intervals. 404 405 If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than 406 or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. 407 For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs 408 with 30 days and 1 job with 10. 409 410 Args: 411 snapshots: A set of target snapshots for which intervals should be computed. 412 intervals: A list of all snapshot intervals that should be considered. 413 start: Start of the interval. 414 end: End of the interval. 415 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 416 execution_time: The date/time time reference to use for execution time. 417 restatements: A dict of snapshot names being restated and their intervals. 418 ignore_cron: Whether to ignore the node's cron schedule. 419 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 420 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 421 422 Returns: 423 A dict containing all snapshots needing to be run with their associated interval params. 424 """ 425 snapshot_batches = {} 426 427 for snapshot, intervals in missing_intervals( 428 snapshots, 429 start=start, 430 end=end, 431 execution_time=execution_time, 432 restatements=restatements, 433 deployability_index=deployability_index, 434 ignore_cron=ignore_cron, 435 end_bounded=end_bounded, 436 ).items(): 437 batches = [] 438 batch_size = snapshot.node.batch_size 439 next_batch: t.List[t.Tuple[int, int]] = [] 440 441 for interval in intervals: 442 if (batch_size and len(next_batch) >= batch_size) or ( 443 next_batch and interval[0] != next_batch[-1][-1] 444 ): 445 batches.append((next_batch[0][0], next_batch[-1][-1])) 446 next_batch = [] 447 next_batch.append(interval) 448 if next_batch: 449 batches.append((next_batch[0][0], next_batch[-1][-1])) 450 snapshot_batches[snapshot] = [(to_datetime(s), to_datetime(e)) for s, e in batches] 451 452 return snapshot_batches 453 454 455def _resolve_one_snapshot_per_version( 456 snapshots: t.Iterable[Snapshot], 457) -> t.Dict[t.Tuple[str, str], Snapshot]: 458 snapshot_per_version: t.Dict[t.Tuple[str, str], Snapshot] = {} 459 for snapshot in snapshots: 460 key = (snapshot.name, snapshot.version_get_or_generate()) 461 if key not in snapshot_per_version: 462 snapshot_per_version[key] = snapshot 463 else: 464 prev_snapshot = snapshot_per_version[key] 465 if snapshot.unpaused_ts and ( 466 not prev_snapshot.unpaused_ts or snapshot.created_ts > prev_snapshot.created_ts 467 ): 468 snapshot_per_version[key] = snapshot 469 470 return snapshot_per_version
48class Scheduler: 49 """Schedules and manages the evaluation of snapshots. 50 51 The scheduler evaluates multiple snapshots with date intervals in the correct 52 topological order. It consults the state sync to understand what intervals for each 53 snapshot needs to be backfilled. 54 55 The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine. 56 57 Args: 58 snapshots: A collection of snapshots. 59 snapshot_evaluator: The snapshot evaluator to execute queries. 60 state_sync: The state sync to pull saved snapshots. 61 max_workers: The maximum number of parallel queries to run. 62 console: The rich instance used for printing scheduling information. 63 """ 64 65 def __init__( 66 self, 67 snapshots: t.Iterable[Snapshot], 68 snapshot_evaluator: SnapshotEvaluator, 69 state_sync: StateSync, 70 default_catalog: t.Optional[str], 71 max_workers: int = 1, 72 console: t.Optional[Console] = None, 73 notification_target_manager: t.Optional[NotificationTargetManager] = None, 74 ): 75 self.state_sync = state_sync 76 self.snapshots = {s.snapshot_id: s for s in snapshots} 77 self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values()) 78 self.default_catalog = default_catalog 79 self.snapshot_evaluator = snapshot_evaluator 80 self.max_workers = max_workers 81 self.console = console or get_console() 82 self.notification_target_manager = ( 83 notification_target_manager or NotificationTargetManager() 84 ) 85 86 def batches( 87 self, 88 start: t.Optional[TimeLike] = None, 89 end: t.Optional[TimeLike] = None, 90 execution_time: t.Optional[TimeLike] = None, 91 deployability_index: t.Optional[DeployabilityIndex] = None, 92 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 93 ignore_cron: bool = False, 94 end_bounded: bool = False, 95 selected_snapshots: t.Optional[t.Set[str]] = None, 96 ) -> SnapshotToBatches: 97 """Find the optimal date interval paramaters based on what needs processing and maximal batch size. 98 99 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 100 calculate the missing intervals that need to be processed given the passed in start and end intervals. 101 102 If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than 103 or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. 104 For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs 105 with 30 days and 1 job with 10. 106 107 Args: 108 start: The start of the run. Defaults to the min node start date. 109 end: The end of the run. Defaults to now. 110 execution_time: The date/time time reference to use for execution time. Defaults to now. 111 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 112 restatements: A set of snapshot names being restated. 113 ignore_cron: Whether to ignore the node's cron schedule. 114 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 115 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 116 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 117 """ 118 restatements = restatements or {} 119 validate_date_range(start, end) 120 121 snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values() 122 if selected_snapshots is not None: 123 snapshots = [s for s in snapshots if s.name in selected_snapshots] 124 125 self.state_sync.refresh_snapshot_intervals(snapshots) 126 127 return compute_interval_params( 128 snapshots, 129 start=start or earliest_start_date(snapshots), 130 end=end or now(), 131 deployability_index=deployability_index, 132 execution_time=execution_time or now(), 133 restatements=restatements, 134 ignore_cron=ignore_cron, 135 end_bounded=end_bounded, 136 ) 137 138 def evaluate( 139 self, 140 snapshot: Snapshot, 141 start: TimeLike, 142 end: TimeLike, 143 execution_time: TimeLike, 144 deployability_index: DeployabilityIndex, 145 **kwargs: t.Any, 146 ) -> None: 147 """Evaluate a snapshot and add the processed interval to the state sync. 148 149 Args: 150 snapshot: Snapshot to evaluate. 151 start: The start datetime to render. 152 end: The end datetime to render. 153 execution_time: The date/time time reference to use for execution time. Defaults to now. 154 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 155 kwargs: Additional kwargs to pass to the renderer. 156 """ 157 validate_date_range(start, end) 158 159 snapshots = { 160 self.snapshots[p_sid].name: self.snapshots[p_sid] for p_sid in snapshot.parents 161 } 162 snapshots[snapshot.name] = snapshot 163 164 if isinstance(snapshot.node, SeedModel) and not snapshot.node.is_hydrated: 165 snapshot = self.state_sync.get_snapshots([snapshot], hydrate_seeds=True)[ 166 snapshot.snapshot_id 167 ] 168 169 is_deployable = deployability_index.is_deployable(snapshot) 170 171 wap_id = self.snapshot_evaluator.evaluate( 172 snapshot, 173 start=start, 174 end=end, 175 execution_time=execution_time, 176 snapshots=snapshots, 177 deployability_index=deployability_index, 178 **kwargs, 179 ) 180 try: 181 self.snapshot_evaluator.audit( 182 snapshot=snapshot, 183 start=start, 184 end=end, 185 execution_time=execution_time, 186 snapshots=snapshots, 187 deployability_index=deployability_index, 188 wap_id=wap_id, 189 **kwargs, 190 ) 191 except AuditError as e: 192 self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, e) 193 if is_deployable and snapshot.node.owner: 194 self.notification_target_manager.notify_user( 195 NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, e 196 ) 197 logger.error(f"Audit Failure: {traceback.format_exc()}") 198 raise e 199 200 self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable) 201 202 def run( 203 self, 204 environment: str | EnvironmentNamingInfo, 205 start: t.Optional[TimeLike] = None, 206 end: t.Optional[TimeLike] = None, 207 execution_time: t.Optional[TimeLike] = None, 208 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 209 ignore_cron: bool = False, 210 end_bounded: bool = False, 211 selected_snapshots: t.Optional[t.Set[str]] = None, 212 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 213 deployability_index: t.Optional[DeployabilityIndex] = None, 214 ) -> bool: 215 """Concurrently runs all snapshots in topological order. 216 217 Args: 218 environment: The environment naming info the user is targeting when applying their change. 219 Can just be the environment name if the user is targeting a remote environment and wants to get the remote 220 naming info 221 start: The start of the run. Defaults to the min node start date. 222 end: The end of the run. Defaults to now. 223 execution_time: The date/time time reference to use for execution time. Defaults to now. 224 restatements: A dict of snapshots to restate and their intervals. 225 ignore_cron: Whether to ignore the node's cron schedule. 226 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, 227 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 228 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 229 circuit_breaker: An optional handler which checks if the run should be aborted. 230 deployability_index: Determines snapshots that are deployable in the context of this render. 231 232 Returns: 233 True if the execution was successful and False otherwise. 234 """ 235 restatements = restatements or {} 236 validate_date_range(start, end) 237 if isinstance(environment, str): 238 env = self.state_sync.get_environment(environment) 239 if not env: 240 raise SQLMeshError( 241 "Was not provided an environment suffix target and the environment doesn't exist." 242 "Are you running for the first time and need to run plan/apply first?" 243 ) 244 environment_naming_info = env.naming_info 245 else: 246 environment_naming_info = environment 247 248 deployability_index = deployability_index or ( 249 DeployabilityIndex.create(self.snapshots.values()) 250 if environment_naming_info.name != c.PROD 251 else DeployabilityIndex.all_deployable() 252 ) 253 execution_time = execution_time or now() 254 batches = self.batches( 255 start, 256 end, 257 execution_time, 258 deployability_index=deployability_index, 259 restatements=restatements, 260 ignore_cron=ignore_cron, 261 end_bounded=end_bounded, 262 selected_snapshots=selected_snapshots, 263 ) 264 if not batches: 265 return True 266 267 dag = self._dag(batches) 268 269 self.console.start_evaluation_progress( 270 {snapshot: len(intervals) for snapshot, intervals in batches.items()}, 271 environment_naming_info, 272 self.default_catalog, 273 ) 274 275 snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()} 276 277 def evaluate_node(node: SchedulingUnit) -> None: 278 if circuit_breaker and circuit_breaker(): 279 raise CircuitBreakerError() 280 281 snapshot_name, ((start, end), batch_idx) = node 282 if batch_idx == -1: 283 return 284 snapshot = snapshots_by_name[snapshot_name] 285 286 self.console.start_snapshot_evaluation_progress(snapshot) 287 288 execution_start_ts = now_timestamp() 289 evaluation_duration_ms: t.Optional[int] = None 290 291 try: 292 assert execution_time # mypy 293 assert deployability_index # mypy 294 self.evaluate(snapshot, start, end, execution_time, deployability_index) 295 evaluation_duration_ms = now_timestamp() - execution_start_ts 296 finally: 297 self.console.update_snapshot_evaluation_progress( 298 snapshot, batch_idx, evaluation_duration_ms 299 ) 300 301 try: 302 with self.snapshot_evaluator.concurrent_context(): 303 errors, skipped_intervals = concurrent_apply_to_dag( 304 dag, 305 evaluate_node, 306 self.max_workers, 307 raise_on_error=False, 308 ) 309 finally: 310 self.state_sync.recycle() 311 312 self.console.stop_evaluation_progress(success=not errors) 313 314 skipped_snapshots = {i[0] for i in skipped_intervals} 315 for skipped in skipped_snapshots: 316 log_message = f"SKIPPED snapshot {skipped}\n" 317 self.console.log_status_update(log_message) 318 logger.info(log_message) 319 320 for error in errors: 321 if isinstance(error.__cause__, CircuitBreakerError): 322 raise error.__cause__ 323 sid = error.node[0] 324 formatted_exception = "".join(format_exception(error.__cause__ or error)) 325 log_message = f"FAILED processing snapshot {sid}\n{formatted_exception}" 326 self.console.log_error(log_message) 327 # Log with INFO level to prevent duplicate messages in the console. 328 logger.info(log_message) 329 330 return not errors 331 332 def _dag(self, batches: SnapshotToBatches) -> DAG[SchedulingUnit]: 333 """Builds a DAG of snapshot intervals to be evaluated. 334 335 Args: 336 batches: The batches of snapshots and intervals to evaluate. 337 338 Returns: 339 A DAG of snapshot intervals to be evaluated. 340 """ 341 342 intervals_per_snapshot = { 343 snapshot.name: intervals for snapshot, intervals in batches.items() 344 } 345 346 dag = DAG[SchedulingUnit]() 347 terminal_node = ((to_datetime(0), to_datetime(0)), -1) 348 349 for snapshot, intervals in batches.items(): 350 if not intervals: 351 continue 352 353 upstream_dependencies = [] 354 355 for p_sid in snapshot.parents: 356 if p_sid in self.snapshots: 357 p_intervals = intervals_per_snapshot.get(p_sid.name, []) 358 359 if len(p_intervals) > 1: 360 upstream_dependencies.append((p_sid.name, terminal_node)) 361 else: 362 for i, interval in enumerate(p_intervals): 363 upstream_dependencies.append((p_sid.name, (interval, i))) 364 365 batch_concurrency = snapshot.node.batch_concurrency 366 if snapshot.depends_on_past: 367 batch_concurrency = 1 368 369 for i, interval in enumerate(intervals): 370 node = (snapshot.name, (interval, i)) 371 dag.add(node, upstream_dependencies) 372 373 if len(intervals) > 1: 374 dag.add((snapshot.name, terminal_node), [node]) 375 376 if batch_concurrency and i >= batch_concurrency: 377 batch_idx_to_wait_for = i - batch_concurrency 378 dag.add( 379 node, 380 [ 381 ( 382 snapshot.name, 383 (intervals[batch_idx_to_wait_for], batch_idx_to_wait_for), 384 ) 385 ], 386 ) 387 return dag
Schedules and manages the evaluation of snapshots.
The scheduler evaluates multiple snapshots with date intervals in the correct topological order. It consults the state sync to understand what intervals for each snapshot needs to be backfilled.
The scheduler comes equipped with a simple ThreadPoolExecutor based evaluation engine.
Arguments:
- snapshots: A collection of snapshots.
- snapshot_evaluator: The snapshot evaluator to execute queries.
- state_sync: The state sync to pull saved snapshots.
- max_workers: The maximum number of parallel queries to run.
- console: The rich instance used for printing scheduling information.
65 def __init__( 66 self, 67 snapshots: t.Iterable[Snapshot], 68 snapshot_evaluator: SnapshotEvaluator, 69 state_sync: StateSync, 70 default_catalog: t.Optional[str], 71 max_workers: int = 1, 72 console: t.Optional[Console] = None, 73 notification_target_manager: t.Optional[NotificationTargetManager] = None, 74 ): 75 self.state_sync = state_sync 76 self.snapshots = {s.snapshot_id: s for s in snapshots} 77 self.snapshot_per_version = _resolve_one_snapshot_per_version(self.snapshots.values()) 78 self.default_catalog = default_catalog 79 self.snapshot_evaluator = snapshot_evaluator 80 self.max_workers = max_workers 81 self.console = console or get_console() 82 self.notification_target_manager = ( 83 notification_target_manager or NotificationTargetManager() 84 )
86 def batches( 87 self, 88 start: t.Optional[TimeLike] = None, 89 end: t.Optional[TimeLike] = None, 90 execution_time: t.Optional[TimeLike] = None, 91 deployability_index: t.Optional[DeployabilityIndex] = None, 92 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 93 ignore_cron: bool = False, 94 end_bounded: bool = False, 95 selected_snapshots: t.Optional[t.Set[str]] = None, 96 ) -> SnapshotToBatches: 97 """Find the optimal date interval paramaters based on what needs processing and maximal batch size. 98 99 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 100 calculate the missing intervals that need to be processed given the passed in start and end intervals. 101 102 If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than 103 or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. 104 For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs 105 with 30 days and 1 job with 10. 106 107 Args: 108 start: The start of the run. Defaults to the min node start date. 109 end: The end of the run. Defaults to now. 110 execution_time: The date/time time reference to use for execution time. Defaults to now. 111 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 112 restatements: A set of snapshot names being restated. 113 ignore_cron: Whether to ignore the node's cron schedule. 114 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 115 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 116 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 117 """ 118 restatements = restatements or {} 119 validate_date_range(start, end) 120 121 snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values() 122 if selected_snapshots is not None: 123 snapshots = [s for s in snapshots if s.name in selected_snapshots] 124 125 self.state_sync.refresh_snapshot_intervals(snapshots) 126 127 return compute_interval_params( 128 snapshots, 129 start=start or earliest_start_date(snapshots), 130 end=end or now(), 131 deployability_index=deployability_index, 132 execution_time=execution_time or now(), 133 restatements=restatements, 134 ignore_cron=ignore_cron, 135 end_bounded=end_bounded, 136 )
Find the optimal date interval paramaters based on what needs processing and maximal batch size.
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.
If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs with 30 days and 1 job with 10.
Arguments:
- start: The start of the run. Defaults to the min node start date.
- end: The end of the run. Defaults to now.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- restatements: A set of snapshot names being restated.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
- selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
138 def evaluate( 139 self, 140 snapshot: Snapshot, 141 start: TimeLike, 142 end: TimeLike, 143 execution_time: TimeLike, 144 deployability_index: DeployabilityIndex, 145 **kwargs: t.Any, 146 ) -> None: 147 """Evaluate a snapshot and add the processed interval to the state sync. 148 149 Args: 150 snapshot: Snapshot to evaluate. 151 start: The start datetime to render. 152 end: The end datetime to render. 153 execution_time: The date/time time reference to use for execution time. Defaults to now. 154 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 155 kwargs: Additional kwargs to pass to the renderer. 156 """ 157 validate_date_range(start, end) 158 159 snapshots = { 160 self.snapshots[p_sid].name: self.snapshots[p_sid] for p_sid in snapshot.parents 161 } 162 snapshots[snapshot.name] = snapshot 163 164 if isinstance(snapshot.node, SeedModel) and not snapshot.node.is_hydrated: 165 snapshot = self.state_sync.get_snapshots([snapshot], hydrate_seeds=True)[ 166 snapshot.snapshot_id 167 ] 168 169 is_deployable = deployability_index.is_deployable(snapshot) 170 171 wap_id = self.snapshot_evaluator.evaluate( 172 snapshot, 173 start=start, 174 end=end, 175 execution_time=execution_time, 176 snapshots=snapshots, 177 deployability_index=deployability_index, 178 **kwargs, 179 ) 180 try: 181 self.snapshot_evaluator.audit( 182 snapshot=snapshot, 183 start=start, 184 end=end, 185 execution_time=execution_time, 186 snapshots=snapshots, 187 deployability_index=deployability_index, 188 wap_id=wap_id, 189 **kwargs, 190 ) 191 except AuditError as e: 192 self.notification_target_manager.notify(NotificationEvent.AUDIT_FAILURE, e) 193 if is_deployable and snapshot.node.owner: 194 self.notification_target_manager.notify_user( 195 NotificationEvent.AUDIT_FAILURE, snapshot.node.owner, e 196 ) 197 logger.error(f"Audit Failure: {traceback.format_exc()}") 198 raise e 199 200 self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)
Evaluate a snapshot and add the processed interval to the state sync.
Arguments:
- snapshot: Snapshot to evaluate.
- start: The start datetime to render.
- end: The end datetime to render.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- kwargs: Additional kwargs to pass to the renderer.
202 def run( 203 self, 204 environment: str | EnvironmentNamingInfo, 205 start: t.Optional[TimeLike] = None, 206 end: t.Optional[TimeLike] = None, 207 execution_time: t.Optional[TimeLike] = None, 208 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 209 ignore_cron: bool = False, 210 end_bounded: bool = False, 211 selected_snapshots: t.Optional[t.Set[str]] = None, 212 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 213 deployability_index: t.Optional[DeployabilityIndex] = None, 214 ) -> bool: 215 """Concurrently runs all snapshots in topological order. 216 217 Args: 218 environment: The environment naming info the user is targeting when applying their change. 219 Can just be the environment name if the user is targeting a remote environment and wants to get the remote 220 naming info 221 start: The start of the run. Defaults to the min node start date. 222 end: The end of the run. Defaults to now. 223 execution_time: The date/time time reference to use for execution time. Defaults to now. 224 restatements: A dict of snapshots to restate and their intervals. 225 ignore_cron: Whether to ignore the node's cron schedule. 226 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, 227 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 228 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run. 229 circuit_breaker: An optional handler which checks if the run should be aborted. 230 deployability_index: Determines snapshots that are deployable in the context of this render. 231 232 Returns: 233 True if the execution was successful and False otherwise. 234 """ 235 restatements = restatements or {} 236 validate_date_range(start, end) 237 if isinstance(environment, str): 238 env = self.state_sync.get_environment(environment) 239 if not env: 240 raise SQLMeshError( 241 "Was not provided an environment suffix target and the environment doesn't exist." 242 "Are you running for the first time and need to run plan/apply first?" 243 ) 244 environment_naming_info = env.naming_info 245 else: 246 environment_naming_info = environment 247 248 deployability_index = deployability_index or ( 249 DeployabilityIndex.create(self.snapshots.values()) 250 if environment_naming_info.name != c.PROD 251 else DeployabilityIndex.all_deployable() 252 ) 253 execution_time = execution_time or now() 254 batches = self.batches( 255 start, 256 end, 257 execution_time, 258 deployability_index=deployability_index, 259 restatements=restatements, 260 ignore_cron=ignore_cron, 261 end_bounded=end_bounded, 262 selected_snapshots=selected_snapshots, 263 ) 264 if not batches: 265 return True 266 267 dag = self._dag(batches) 268 269 self.console.start_evaluation_progress( 270 {snapshot: len(intervals) for snapshot, intervals in batches.items()}, 271 environment_naming_info, 272 self.default_catalog, 273 ) 274 275 snapshots_by_name = {snapshot.name: snapshot for snapshot in self.snapshots.values()} 276 277 def evaluate_node(node: SchedulingUnit) -> None: 278 if circuit_breaker and circuit_breaker(): 279 raise CircuitBreakerError() 280 281 snapshot_name, ((start, end), batch_idx) = node 282 if batch_idx == -1: 283 return 284 snapshot = snapshots_by_name[snapshot_name] 285 286 self.console.start_snapshot_evaluation_progress(snapshot) 287 288 execution_start_ts = now_timestamp() 289 evaluation_duration_ms: t.Optional[int] = None 290 291 try: 292 assert execution_time # mypy 293 assert deployability_index # mypy 294 self.evaluate(snapshot, start, end, execution_time, deployability_index) 295 evaluation_duration_ms = now_timestamp() - execution_start_ts 296 finally: 297 self.console.update_snapshot_evaluation_progress( 298 snapshot, batch_idx, evaluation_duration_ms 299 ) 300 301 try: 302 with self.snapshot_evaluator.concurrent_context(): 303 errors, skipped_intervals = concurrent_apply_to_dag( 304 dag, 305 evaluate_node, 306 self.max_workers, 307 raise_on_error=False, 308 ) 309 finally: 310 self.state_sync.recycle() 311 312 self.console.stop_evaluation_progress(success=not errors) 313 314 skipped_snapshots = {i[0] for i in skipped_intervals} 315 for skipped in skipped_snapshots: 316 log_message = f"SKIPPED snapshot {skipped}\n" 317 self.console.log_status_update(log_message) 318 logger.info(log_message) 319 320 for error in errors: 321 if isinstance(error.__cause__, CircuitBreakerError): 322 raise error.__cause__ 323 sid = error.node[0] 324 formatted_exception = "".join(format_exception(error.__cause__ or error)) 325 log_message = f"FAILED processing snapshot {sid}\n{formatted_exception}" 326 self.console.log_error(log_message) 327 # Log with INFO level to prevent duplicate messages in the console. 328 logger.info(log_message) 329 330 return not errors
Concurrently runs all snapshots in topological order.
Arguments:
- environment: The environment naming info the user is targeting when applying their change. Can just be the environment name if the user is targeting a remote environment and wants to get the remote naming info
- start: The start of the run. Defaults to the min node start date.
- end: The end of the run. Defaults to now.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- restatements: A dict of snapshots to restate and their intervals.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
- selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
- circuit_breaker: An optional handler which checks if the run should be aborted.
- deployability_index: Determines snapshots that are deployable in the context of this render.
Returns:
True if the execution was successful and False otherwise.
390def compute_interval_params( 391 snapshots: t.Collection[Snapshot], 392 *, 393 start: TimeLike, 394 end: TimeLike, 395 deployability_index: t.Optional[DeployabilityIndex] = None, 396 execution_time: t.Optional[TimeLike] = None, 397 restatements: t.Optional[t.Dict[SnapshotId, SnapshotInterval]] = None, 398 ignore_cron: bool = False, 399 end_bounded: bool = False, 400) -> SnapshotToBatches: 401 """Find the optimal date interval paramaters based on what needs processing and maximal batch size. 402 403 For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, 404 calculate the missing intervals that need to be processed given the passed in start and end intervals. 405 406 If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than 407 or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. 408 For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs 409 with 30 days and 1 job with 10. 410 411 Args: 412 snapshots: A set of target snapshots for which intervals should be computed. 413 intervals: A list of all snapshot intervals that should be considered. 414 start: Start of the interval. 415 end: End of the interval. 416 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 417 execution_time: The date/time time reference to use for execution time. 418 restatements: A dict of snapshot names being restated and their intervals. 419 ignore_cron: Whether to ignore the node's cron schedule. 420 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 421 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 422 423 Returns: 424 A dict containing all snapshots needing to be run with their associated interval params. 425 """ 426 snapshot_batches = {} 427 428 for snapshot, intervals in missing_intervals( 429 snapshots, 430 start=start, 431 end=end, 432 execution_time=execution_time, 433 restatements=restatements, 434 deployability_index=deployability_index, 435 ignore_cron=ignore_cron, 436 end_bounded=end_bounded, 437 ).items(): 438 batches = [] 439 batch_size = snapshot.node.batch_size 440 next_batch: t.List[t.Tuple[int, int]] = [] 441 442 for interval in intervals: 443 if (batch_size and len(next_batch) >= batch_size) or ( 444 next_batch and interval[0] != next_batch[-1][-1] 445 ): 446 batches.append((next_batch[0][0], next_batch[-1][-1])) 447 next_batch = [] 448 next_batch.append(interval) 449 if next_batch: 450 batches.append((next_batch[0][0], next_batch[-1][-1])) 451 snapshot_batches[snapshot] = [(to_datetime(s), to_datetime(e)) for s, e in batches] 452 453 return snapshot_batches
Find the optimal date interval paramaters based on what needs processing and maximal batch size.
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found, calculate the missing intervals that need to be processed given the passed in start and end intervals.
If a snapshot's node specifies a batch size, consecutive intervals are merged into batches of a size that is less than or equal to the configured one. If no batch size is specified, then it uses the intervals that correspond to the node's cron expression. For example, if a node is supposed to run daily and has 70 days to backfill with a batch size set to 30, there would be 2 jobs with 30 days and 1 job with 10.
Arguments:
- snapshots: A set of target snapshots for which intervals should be computed.
- intervals: A list of all snapshot intervals that should be considered.
- start: Start of the interval.
- end: End of the interval.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- execution_time: The date/time time reference to use for execution time.
- restatements: A dict of snapshot names being restated and their intervals.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
Returns:
A dict containing all snapshots needing to be run with their associated interval params.