sqlmesh.core.plan.builder
1from __future__ import annotations 2 3import logging 4import re 5import typing as t 6from collections import defaultdict 7from functools import cached_property 8from datetime import datetime 9 10 11from sqlmesh.core.console import PlanBuilderConsole, get_console 12from sqlmesh.core.config import ( 13 AutoCategorizationMode, 14 CategorizerConfig, 15 EnvironmentSuffixTarget, 16) 17from sqlmesh.core.context_diff import ContextDiff 18from sqlmesh.core.environment import EnvironmentNamingInfo 19from sqlmesh.core.plan.common import should_force_rebuild, is_breaking_kind_change 20from sqlmesh.core.plan.definition import ( 21 Plan, 22 SnapshotMapping, 23 UserProvidedFlags, 24 earliest_interval_start, 25) 26from sqlmesh.core.schema_diff import ( 27 get_schema_differ, 28 has_drop_alteration, 29 has_additive_alteration, 30 TableAlterOperation, 31) 32from sqlmesh.core.snapshot import ( 33 DeployabilityIndex, 34 Snapshot, 35 SnapshotChangeCategory, 36) 37from sqlmesh.core.snapshot.categorizer import categorize_change 38from sqlmesh.core.snapshot.definition import Interval, SnapshotId 39from sqlmesh.utils import columns_to_types_all_known, random_id 40from sqlmesh.utils.dag import DAG 41from sqlmesh.utils.date import ( 42 TimeLike, 43 now, 44 to_datetime, 45 yesterday_ds, 46 to_timestamp, 47 time_like_to_str, 48 is_relative, 49) 50from sqlmesh.utils.errors import NoChangesPlanError, PlanError 51 52logger = logging.getLogger(__name__) 53 54 55class PlanBuilder: 56 """Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes. 57 58 Args: 59 context_diff: The context diff that the plan is based on. 60 start: The start time to backfill data. 61 end: The end time to backfill data. 62 execution_time: The date/time time reference to use for execution time. Defaults to now. 63 If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time 64 apply: The callback to apply the plan. 65 restate_models: A list of models for which the data should be restated for the time range 66 specified in this plan. Note: models defined outside SQLMesh (external) won't be a part 67 of the restatement. 68 restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals 69 being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments). 70 If set to None, the default behaviour is to not clear anything unless the target environment is prod. 71 backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan. 72 no_gaps: Whether to ensure that new snapshots for nodes that are already a 73 part of the target environment have no data gaps when compared against previous 74 snapshots for same nodes. 75 skip_backfill: Whether to skip the backfill step. 76 empty_backfill: Like skip_backfill, but also records processed intervals. 77 is_dev: Whether this plan is for development purposes. 78 forward_only: Whether the purpose of the plan is to make forward only changes. 79 allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive. 80 allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive. 81 environment_ttl: The period of time that a development environment should exist before being deleted. 82 categorizer_config: Auto categorization settings. 83 auto_categorization_enabled: Whether to apply auto categorization. 84 effective_from: The effective date from which to apply forward-only changes on production. 85 include_unmodified: Indicates whether to include unmodified nodes in the target development environment. 86 environment_suffix_target: Indicates whether to append the environment name to the schema or table name. 87 default_start: The default plan start to use if not specified. 88 default_end: The default plan end to use if not specified. 89 enable_preview: Whether to enable preview for forward-only models in development environments. 90 end_bounded: If set to true, the missing intervals will be bounded by the target end date, disregarding lookback, 91 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 92 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 93 environment state, or to use whatever snapshots are in the current environment state even if 94 the environment is not finalized. 95 start_override_per_model: A mapping of model FQNs to target start dates. 96 end_override_per_model: A mapping of model FQNs to target end dates. 97 ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals. 98 explain: Whether to explain the plan instead of applying it. 99 """ 100 101 def __init__( 102 self, 103 context_diff: ContextDiff, 104 start: t.Optional[TimeLike] = None, 105 end: t.Optional[TimeLike] = None, 106 execution_time: t.Optional[TimeLike] = None, 107 apply: t.Optional[t.Callable[[Plan], None]] = None, 108 restate_models: t.Optional[t.Iterable[str]] = None, 109 restate_all_snapshots: bool = False, 110 backfill_models: t.Optional[t.Iterable[str]] = None, 111 no_gaps: bool = False, 112 skip_backfill: bool = False, 113 empty_backfill: bool = False, 114 is_dev: bool = False, 115 forward_only: bool = False, 116 allow_destructive_models: t.Optional[t.Iterable[str]] = None, 117 allow_additive_models: t.Optional[t.Iterable[str]] = None, 118 environment_ttl: t.Optional[str] = None, 119 environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default, 120 environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, 121 categorizer_config: t.Optional[CategorizerConfig] = None, 122 auto_categorization_enabled: bool = True, 123 effective_from: t.Optional[TimeLike] = None, 124 include_unmodified: bool = False, 125 default_start: t.Optional[TimeLike] = None, 126 default_end: t.Optional[TimeLike] = None, 127 enable_preview: bool = False, 128 end_bounded: bool = False, 129 ensure_finalized_snapshots: bool = False, 130 explain: bool = False, 131 ignore_cron: bool = False, 132 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 133 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 134 console: t.Optional[PlanBuilderConsole] = None, 135 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None, 136 selected_models: t.Optional[t.Set[str]] = None, 137 ): 138 self._context_diff = context_diff 139 self._no_gaps = no_gaps 140 self._skip_backfill = skip_backfill 141 self._empty_backfill = empty_backfill 142 self._is_dev = is_dev 143 self._forward_only = forward_only 144 self._allow_destructive_models = set( 145 allow_destructive_models if allow_destructive_models is not None else [] 146 ) 147 self._allow_additive_models = set( 148 allow_additive_models if allow_additive_models is not None else [] 149 ) 150 self._enable_preview = enable_preview 151 self._end_bounded = end_bounded 152 self._ensure_finalized_snapshots = ensure_finalized_snapshots 153 self._ignore_cron = ignore_cron 154 self._start_override_per_model = start_override_per_model 155 self._end_override_per_model = end_override_per_model 156 self._environment_ttl = environment_ttl 157 self._categorizer_config = categorizer_config or CategorizerConfig() 158 self._auto_categorization_enabled = auto_categorization_enabled 159 self._include_unmodified = include_unmodified 160 self._restate_models = set(restate_models) if restate_models is not None else None 161 self._restate_all_snapshots = restate_all_snapshots 162 self._effective_from = effective_from 163 164 # note: this deliberately doesnt default to now() here. 165 # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run 166 # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None 167 # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past 168 # ref: https://github.com/SQLMesh/sqlmesh/pull/4702#discussion_r2140696156 169 self._execution_time = execution_time 170 171 self._backfill_models = backfill_models 172 self._end = end or default_end 173 self._default_start = default_start 174 self._apply = apply 175 self._console = console or get_console() 176 self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {} 177 self._user_provided_flags = user_provided_flags 178 self._selected_models = selected_models 179 self._explain = explain 180 181 self._start = start 182 if not self._start and ( 183 self._forward_only_preview_needed or self._non_forward_only_preview_needed 184 ): 185 self._start = default_start or yesterday_ds() 186 187 self._plan_id: str = random_id() 188 self._model_fqn_to_snapshot = {s.name: s for s in self._context_diff.snapshots.values()} 189 190 self.override_start = start is not None 191 self.override_end = end is not None 192 self.environment_naming_info = EnvironmentNamingInfo.from_environment_catalog_mapping( 193 environment_catalog_mapping or {}, 194 name=self._context_diff.environment, 195 suffix_target=environment_suffix_target, 196 normalize_name=self._context_diff.normalize_environment_name, 197 gateway_managed=self._context_diff.gateway_managed_virtual_layer, 198 ) 199 200 self._latest_plan: t.Optional[Plan] = None 201 202 @property 203 def is_start_and_end_allowed(self) -> bool: 204 """Indicates whether this plan allows to set the start and end dates.""" 205 return self._is_dev or bool(self._restate_models) 206 207 @property 208 def start(self) -> t.Optional[TimeLike]: 209 if self._start and is_relative(self._start): 210 # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' 211 return to_datetime(self._start, relative_base=to_datetime(self.execution_time)) 212 return self._start 213 214 @property 215 def end(self) -> t.Optional[TimeLike]: 216 if self._end and is_relative(self._end): 217 # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' 218 return to_datetime(self._end, relative_base=to_datetime(self.execution_time)) 219 return self._end 220 221 @cached_property 222 def execution_time(self) -> TimeLike: 223 # this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings 224 # during the plan building process 225 return self._execution_time or now() 226 227 def set_start(self, new_start: TimeLike) -> PlanBuilder: 228 self._start = new_start 229 self.override_start = True 230 self._latest_plan = None 231 return self 232 233 def set_end(self, new_end: TimeLike) -> PlanBuilder: 234 self._end = new_end 235 self.override_end = True 236 self._latest_plan = None 237 return self 238 239 def set_effective_from(self, effective_from: t.Optional[TimeLike]) -> PlanBuilder: 240 """Sets the effective date for all new snapshots in the plan. 241 242 Note: this is only applicable for forward-only plans. 243 244 Args: 245 effective_from: The effective date to set. 246 """ 247 self._effective_from = effective_from 248 if effective_from and self._is_dev and not self.override_start: 249 self._start = effective_from 250 self._latest_plan = None 251 return self 252 253 def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> PlanBuilder: 254 """Sets a snapshot version based on the user choice. 255 256 Args: 257 snapshot: The target snapshot. 258 choice: The user decision on how to version the target snapshot and its children. 259 """ 260 if not self._is_new_snapshot(snapshot): 261 raise PlanError( 262 f"A choice can't be changed for the existing version of {snapshot.name}." 263 ) 264 if ( 265 not self._context_diff.directly_modified(snapshot.name) 266 and snapshot.snapshot_id not in self._context_diff.added 267 ): 268 raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).") 269 270 self._choices[snapshot.snapshot_id] = choice 271 self._latest_plan = None 272 return self 273 274 def apply(self) -> None: 275 """Builds and applies the plan.""" 276 if not self._apply: 277 raise PlanError("Plan was not initialized with an applier.") 278 self._apply(self.build()) 279 280 def build(self) -> Plan: 281 """Builds the plan.""" 282 if self._latest_plan: 283 return self._latest_plan 284 285 self._ensure_new_env_with_changes() 286 self._ensure_valid_date_range() 287 self._ensure_no_broken_references() 288 289 self._apply_effective_from() 290 291 dag = self._build_dag() 292 directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag) 293 294 self._check_destructive_additive_changes(directly_modified) 295 self._categorize_snapshots(dag, indirectly_modified) 296 self._adjust_snapshot_intervals() 297 298 deployability_index = ( 299 DeployabilityIndex.create( 300 self._context_diff.snapshots.values(), 301 start=self._start, 302 start_override_per_model=self._start_override_per_model, 303 ) 304 if self._is_dev 305 else DeployabilityIndex.all_deployable() 306 ) 307 308 restatements = self._build_restatements( 309 dag, 310 earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time), 311 ) 312 models_to_backfill = self._build_models_to_backfill(dag, restatements) 313 314 end_override_per_model = self._end_override_per_model 315 if end_override_per_model and self.override_end: 316 # If the end date was provided explicitly by a user, then interval end for each individual 317 # model should be ignored. 318 end_override_per_model = None 319 320 # this deliberately uses the passed in self._execution_time and not self.execution_time cached property 321 # the reason is because that there can be a delay between the Plan being built and the Plan being actually run, 322 # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to 323 # the current timestamp of when the Plan is eventually run 324 plan_execution_time = self._execution_time 325 326 plan = Plan( 327 context_diff=self._context_diff, 328 plan_id=self._plan_id, 329 provided_start=self.start, 330 provided_end=self.end, 331 is_dev=self._is_dev, 332 skip_backfill=self._skip_backfill, 333 empty_backfill=self._empty_backfill, 334 no_gaps=self._no_gaps, 335 forward_only=self._forward_only, 336 explain=self._explain, 337 allow_destructive_models=t.cast(t.Set, self._allow_destructive_models), 338 allow_additive_models=t.cast(t.Set, self._allow_additive_models), 339 include_unmodified=self._include_unmodified, 340 environment_ttl=self._environment_ttl, 341 environment_naming_info=self.environment_naming_info, 342 directly_modified=directly_modified, 343 indirectly_modified=indirectly_modified, 344 deployability_index=deployability_index, 345 selected_models_to_restate=self._restate_models, 346 restatements=restatements, 347 restate_all_snapshots=self._restate_all_snapshots, 348 start_override_per_model=self._start_override_per_model, 349 end_override_per_model=end_override_per_model, 350 selected_models_to_backfill=self._backfill_models, 351 models_to_backfill=models_to_backfill, 352 effective_from=self._effective_from, 353 execution_time=plan_execution_time, 354 end_bounded=self._end_bounded, 355 ensure_finalized_snapshots=self._ensure_finalized_snapshots, 356 ignore_cron=self._ignore_cron, 357 user_provided_flags=self._user_provided_flags, 358 selected_models=self._selected_models, 359 ) 360 self._latest_plan = plan 361 return plan 362 363 def _build_dag(self) -> DAG[SnapshotId]: 364 dag: DAG[SnapshotId] = DAG() 365 for s_id, context_snapshot in self._context_diff.snapshots.items(): 366 dag.add(s_id, context_snapshot.parents) 367 return dag 368 369 def _build_restatements( 370 self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike 371 ) -> t.Dict[SnapshotId, Interval]: 372 restate_models = self._restate_models 373 if restate_models == set(): 374 # This is a warning but we print this as error since the Console is lacking API for warnings. 375 self._console.log_error( 376 "Provided restated models do not match any models. No models will be included in plan." 377 ) 378 return {} 379 380 restatements: t.Dict[SnapshotId, Interval] = {} 381 forward_only_preview_needed = self._forward_only_preview_needed 382 is_preview = False 383 if not restate_models and forward_only_preview_needed: 384 # Add model names for new forward-only snapshots to the restatement list 385 # in order to compute previews. 386 restate_models = { 387 s.name 388 for s in self._context_diff.new_snapshots.values() 389 if s.is_model 390 and not s.is_symbolic 391 and (s.is_forward_only or s.model.forward_only) 392 and not s.is_no_preview 393 and ( 394 # Metadata changes should not be previewed. 395 self._context_diff.directly_modified(s.name) 396 or self._context_diff.indirectly_modified(s.name) 397 ) 398 } 399 is_preview = True 400 401 if not restate_models: 402 return {} 403 404 start = self._start or earliest_interval_start 405 end = self._end or now() 406 407 # Add restate snapshots and their downstream snapshots 408 for model_fqn in restate_models: 409 if model_fqn not in self._model_fqn_to_snapshot: 410 raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.") 411 412 # Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's 413 # restatement range that it's downstream dependencies all expand their restatement ranges as well. 414 for s_id in dag: 415 snapshot = self._context_diff.snapshots[s_id] 416 417 if is_preview and snapshot.is_no_preview: 418 continue 419 420 # Since we are traversing the graph in topological order and the largest interval range is pushed down 421 # the graph we just have to check our immediate parents in the graph and not the whole upstream graph. 422 restating_parents = [ 423 self._context_diff.snapshots[s] for s in snapshot.parents if s in restatements 424 ] 425 426 if not restating_parents and snapshot.name not in restate_models: 427 continue 428 429 if not forward_only_preview_needed: 430 if self._is_dev and not snapshot.is_paused: 431 self._console.log_warning( 432 f"Cannot restate model '{snapshot.name}' because the current version is used in production. " 433 "Run the restatement against the production environment instead to restate this model." 434 ) 435 continue 436 elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement: 437 self._console.log_warning( 438 f"Cannot restate model '{snapshot.name}'. " 439 "Restatement is disabled for this model to prevent possible data loss. " 440 "If you want to restate this model, change the model's `disable_restatement` setting to `false`." 441 ) 442 continue 443 elif snapshot.is_seed: 444 logger.info("Skipping restatement for model '%s'", snapshot.name) 445 continue 446 447 possible_intervals = { 448 restatements[p.snapshot_id] for p in restating_parents if p.is_incremental 449 } 450 possible_intervals.add( 451 snapshot.get_removal_interval( 452 start, 453 end, 454 self._execution_time, 455 strict=False, 456 is_preview=is_preview, 457 ) 458 ) 459 snapshot_start = min(i[0] for i in possible_intervals) 460 snapshot_end = max(i[1] for i in possible_intervals) 461 462 # We may be tasked with restating a time range smaller than the target snapshot interval unit 463 # For example, restating an hour of Hourly Model A, which has a downstream dependency of Daily Model B 464 # we need to ensure the whole affected day in Model B is restated 465 floored_snapshot_start = snapshot.node.interval_unit.cron_floor(snapshot_start) 466 floored_snapshot_end = snapshot.node.interval_unit.cron_floor(snapshot_end) 467 if to_timestamp(floored_snapshot_end) < snapshot_end: 468 snapshot_start = to_timestamp(floored_snapshot_start) 469 snapshot_end = to_timestamp( 470 snapshot.node.interval_unit.cron_next(floored_snapshot_end) 471 ) 472 473 restatements[s_id] = (snapshot_start, snapshot_end) 474 475 return restatements 476 477 def _build_directly_and_indirectly_modified( 478 self, dag: DAG[SnapshotId] 479 ) -> t.Tuple[t.Set[SnapshotId], SnapshotMapping]: 480 """Builds collections of directly and indirectly modified snapshots. 481 482 Returns: 483 The tuple in which the first element contains a list of added and directly modified 484 snapshots while the second element contains a mapping of indirectly modified snapshots. 485 """ 486 directly_modified = set() 487 all_indirectly_modified = set() 488 489 for s_id in dag: 490 if s_id.name in self._context_diff.modified_snapshots: 491 if self._context_diff.directly_modified(s_id.name): 492 directly_modified.add(s_id) 493 else: 494 all_indirectly_modified.add(s_id) 495 elif s_id in self._context_diff.added: 496 directly_modified.add(s_id) 497 498 indirectly_modified: SnapshotMapping = defaultdict(set) 499 for snapshot in directly_modified: 500 for downstream_s_id in dag.downstream(snapshot.snapshot_id): 501 if downstream_s_id in all_indirectly_modified: 502 indirectly_modified[snapshot.snapshot_id].add(downstream_s_id) 503 504 return ( 505 directly_modified, 506 indirectly_modified, 507 ) 508 509 def _build_models_to_backfill( 510 self, dag: DAG[SnapshotId], restatements: t.Collection[SnapshotId] 511 ) -> t.Optional[t.Set[str]]: 512 backfill_models = ( 513 self._backfill_models 514 if self._backfill_models is not None 515 else [r.name for r in restatements] 516 # Only backfill models explicitly marked for restatement. 517 if self._restate_models 518 else None 519 ) 520 if backfill_models is None: 521 return None 522 return { 523 self._context_diff.snapshots[s_id].name 524 for s_id in dag.subdag( 525 *[ 526 self._model_fqn_to_snapshot[m].snapshot_id 527 for m in backfill_models 528 if m in self._model_fqn_to_snapshot 529 ] 530 ).sorted 531 } 532 533 def _adjust_snapshot_intervals(self) -> None: 534 for new, old in self._context_diff.modified_snapshots.values(): 535 if not new.is_model or not old.is_model: 536 continue 537 is_same_version = old.version_get_or_generate() == new.version_get_or_generate() 538 if is_same_version and should_force_rebuild(old, new): 539 # If the difference between 2 snapshots requires a full rebuild, 540 # then clear the intervals for the new snapshot. 541 self._context_diff.snapshots[new.snapshot_id].intervals = [] 542 elif new.snapshot_id in self._context_diff.new_snapshots: 543 new.intervals = [] 544 new.dev_intervals = [] 545 if is_same_version: 546 new.merge_intervals(old) 547 if new.is_forward_only: 548 new.dev_intervals = new.intervals.copy() 549 550 def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None: 551 for s_id in sorted(directly_modified): 552 if s_id.name not in self._context_diff.modified_snapshots: 553 continue 554 555 snapshot = self._context_diff.snapshots[s_id] 556 needs_destructive_check = snapshot.needs_destructive_check( 557 self._allow_destructive_models 558 ) 559 needs_additive_check = snapshot.needs_additive_check(self._allow_additive_models) 560 # should we raise/warn if this snapshot has/inherits a destructive change? 561 should_raise_or_warn = (self._is_forward_only_change(s_id) or self._forward_only) and ( 562 needs_destructive_check or needs_additive_check 563 ) 564 565 if not should_raise_or_warn or not snapshot.is_model: 566 continue 567 568 new, old = self._context_diff.modified_snapshots[snapshot.name] 569 570 # we must know all columns_to_types to determine whether a change is destructive 571 old_columns_to_types = old.model.columns_to_types or {} 572 new_columns_to_types = new.model.columns_to_types or {} 573 574 if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known( 575 new_columns_to_types 576 ): 577 alter_operations = t.cast( 578 t.List[TableAlterOperation], 579 get_schema_differ(snapshot.model.dialect).compare_columns( 580 new.name, 581 old_columns_to_types, 582 new_columns_to_types, 583 ignore_destructive=new.model.on_destructive_change.is_ignore, 584 ignore_additive=new.model.on_additive_change.is_ignore, 585 ), 586 ) 587 588 snapshot_name = snapshot.name 589 model_dialect = snapshot.model.dialect 590 591 if needs_destructive_check and has_drop_alteration(alter_operations): 592 self._console.log_destructive_change( 593 snapshot_name, 594 alter_operations, 595 model_dialect, 596 error=not snapshot.model.on_destructive_change.is_warn, 597 ) 598 if snapshot.model.on_destructive_change.is_error: 599 raise PlanError( 600 "Plan requires a destructive change to a forward-only model." 601 ) 602 603 if needs_additive_check and has_additive_alteration(alter_operations): 604 self._console.log_additive_change( 605 snapshot_name, 606 alter_operations, 607 model_dialect, 608 error=not snapshot.model.on_additive_change.is_warn, 609 ) 610 if snapshot.model.on_additive_change.is_error: 611 raise PlanError("Plan requires an additive change to a forward-only model.") 612 613 def _categorize_snapshots( 614 self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping 615 ) -> None: 616 """Automatically categorizes snapshots that can be automatically categorized and 617 returns a list of added and directly modified snapshots as well as the mapping of 618 indirectly modified snapshots. 619 """ 620 621 # Iterating in DAG order since a category for a snapshot may depend on the categories 622 # assigned to its upstream dependencies. 623 for s_id in dag: 624 snapshot = self._context_diff.snapshots.get(s_id) 625 626 if not snapshot or not self._is_new_snapshot(snapshot): 627 continue 628 629 forward_only = self._forward_only or self._is_forward_only_change(s_id) 630 if forward_only and s_id.name in self._context_diff.modified_snapshots: 631 new, old = self._context_diff.modified_snapshots[s_id.name] 632 if is_breaking_kind_change(old, new) or snapshot.is_seed: 633 # Breaking kind changes and seed changes can't be forward-only. 634 forward_only = False 635 636 if s_id in self._choices: 637 snapshot.categorize_as(self._choices[s_id], forward_only) 638 continue 639 640 if s_id in self._context_diff.added: 641 snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) 642 elif s_id.name in self._context_diff.modified_snapshots: 643 self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified) 644 645 def _categorize_snapshot( 646 self, 647 snapshot: Snapshot, 648 forward_only: bool, 649 dag: DAG[SnapshotId], 650 indirectly_modified: SnapshotMapping, 651 ) -> None: 652 s_id = snapshot.snapshot_id 653 654 if self._context_diff.directly_modified(s_id.name): 655 if self._auto_categorization_enabled: 656 new, old = self._context_diff.modified_snapshots[s_id.name] 657 if is_breaking_kind_change(old, new): 658 snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False) 659 return 660 661 s_id_with_missing_columns: t.Optional[SnapshotId] = None 662 this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id} 663 for downstream_s_id in this_sid_with_downstream: 664 downstream_snapshot = self._context_diff.snapshots[downstream_s_id] 665 if ( 666 downstream_snapshot.is_model 667 and downstream_snapshot.model.columns_to_types is None 668 ): 669 s_id_with_missing_columns = downstream_s_id 670 break 671 672 if s_id_with_missing_columns is None: 673 change_category = categorize_change(new, old, config=self._categorizer_config) 674 if change_category is not None: 675 snapshot.categorize_as(change_category, forward_only) 676 else: 677 mode = self._categorizer_config.dict().get( 678 new.model.source_type, AutoCategorizationMode.OFF 679 ) 680 if mode == AutoCategorizationMode.FULL: 681 snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) 682 elif self._context_diff.indirectly_modified(snapshot.name): 683 if snapshot.is_materialized_view and not forward_only: 684 # We categorize changes as breaking to allow for instantaneous switches in a virtual layer. 685 # Otherwise, there might be a potentially long downtime during MVs recreation. 686 # In the case of forward-only changes this optimization is not applicable because we want to continue 687 # using the same (existing) table version. 688 snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only) 689 return 690 691 all_upstream_forward_only = set() 692 all_upstream_categories = set() 693 direct_parent_categories = set() 694 695 for p_id in dag.upstream(s_id): 696 parent = self._context_diff.snapshots.get(p_id) 697 698 if parent and self._is_new_snapshot(parent): 699 all_upstream_categories.add(parent.change_category) 700 all_upstream_forward_only.add(parent.is_forward_only) 701 if p_id in snapshot.parents: 702 direct_parent_categories.add(parent.change_category) 703 704 if all_upstream_forward_only == {True} or ( 705 snapshot.is_model and snapshot.model.forward_only 706 ): 707 forward_only = True 708 709 if direct_parent_categories.intersection( 710 {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} 711 ): 712 snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only) 713 elif not direct_parent_categories: 714 snapshot.categorize_as( 715 self._get_orphaned_indirect_change_category(snapshot), forward_only 716 ) 717 elif all_upstream_categories == {SnapshotChangeCategory.METADATA}: 718 snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only) 719 else: 720 snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only) 721 else: 722 # Metadata updated. 723 snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only) 724 725 def _get_orphaned_indirect_change_category( 726 self, indirect_snapshot: Snapshot 727 ) -> SnapshotChangeCategory: 728 """Sometimes an indirectly changed downstream snapshot ends up with no directly changed parents introduced in the same plan. 729 This may happen when 2 or more parent models were changed independently in different plans and then the changes were 730 merged together and applied in a single plan. As a result, a combination of 2 or more previously changed parents produces 731 a new downstream snapshot not previously seen. 732 733 This function is used to infer the correct change category for such downstream snapshots based on change categories of their parents. 734 """ 735 previous_snapshot = self._context_diff.modified_snapshots[indirect_snapshot.name][1] 736 previous_parent_snapshot_ids = {p.name: p for p in previous_snapshot.parents} 737 738 current_parent_snapshots = [ 739 self._context_diff.snapshots[p_id] 740 for p_id in indirect_snapshot.parents 741 if p_id in self._context_diff.snapshots 742 ] 743 744 indirect_category: t.Optional[SnapshotChangeCategory] = None 745 for current_parent_snapshot in current_parent_snapshots: 746 if current_parent_snapshot.name not in previous_parent_snapshot_ids: 747 # This is a new parent so falling back to INDIRECT_BREAKING 748 return SnapshotChangeCategory.INDIRECT_BREAKING 749 pevious_parent_snapshot_id = previous_parent_snapshot_ids[current_parent_snapshot.name] 750 751 if current_parent_snapshot.snapshot_id == pevious_parent_snapshot_id: 752 # There were no new versions of this parent since the previous version of this snapshot, 753 # so we can skip it 754 continue 755 756 # Find the previous snapshot ID of the same parent in the historical chain 757 previous_parent_found = False 758 previous_parent_categories = set() 759 for pv in reversed(current_parent_snapshot.all_versions): 760 pv_snapshot_id = pv.snapshot_id(current_parent_snapshot.name) 761 if pv_snapshot_id == pevious_parent_snapshot_id: 762 previous_parent_found = True 763 break 764 previous_parent_categories.add(pv.change_category) 765 766 if not previous_parent_found: 767 # The previous parent is not in the historical chain so falling back to INDIRECT_BREAKING 768 return SnapshotChangeCategory.INDIRECT_BREAKING 769 770 if previous_parent_categories.intersection( 771 {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} 772 ): 773 # One of the new parents in the chain was breaking so this indirect snapshot is breaking 774 return SnapshotChangeCategory.INDIRECT_BREAKING 775 776 if previous_parent_categories.intersection( 777 { 778 SnapshotChangeCategory.NON_BREAKING, 779 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 780 } 781 ): 782 # All changes in the chain were non-breaking so this indirect snapshot can be non-breaking too 783 indirect_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING 784 elif ( 785 previous_parent_categories == {SnapshotChangeCategory.METADATA} 786 and indirect_category is None 787 ): 788 # All changes in the chain were metadata so this indirect snapshot can be metadata too 789 indirect_category = SnapshotChangeCategory.METADATA 790 791 return indirect_category or SnapshotChangeCategory.INDIRECT_BREAKING 792 793 def _apply_effective_from(self) -> None: 794 if self._effective_from: 795 if not self._forward_only: 796 raise PlanError("Effective date can only be set for a forward-only plan.") 797 if to_datetime(self._effective_from) > now(): 798 raise PlanError("Effective date cannot be in the future.") 799 800 for snapshot in self._context_diff.new_snapshots.values(): 801 if ( 802 snapshot.evaluatable 803 and not snapshot.disable_restatement 804 and (not snapshot.full_history_restatement_only or not snapshot.is_incremental) 805 ): 806 snapshot.effective_from = self._effective_from 807 808 def _is_forward_only_change(self, s_id: SnapshotId) -> bool: 809 if not self._context_diff.directly_modified( 810 s_id.name 811 ) and not self._context_diff.indirectly_modified(s_id.name): 812 return False 813 snapshot = self._context_diff.snapshots[s_id] 814 if snapshot.name in self._context_diff.modified_snapshots: 815 _, old = self._context_diff.modified_snapshots[snapshot.name] 816 # If the model kind has changed in a breaking way, then we can't consider this to be a forward-only change. 817 if snapshot.is_model and is_breaking_kind_change(old, snapshot): 818 return False 819 return ( 820 snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions) 821 ) 822 823 def _is_new_snapshot(self, snapshot: Snapshot) -> bool: 824 """Returns True if the given snapshot is a new snapshot in this plan.""" 825 return snapshot.snapshot_id in self._context_diff.new_snapshots 826 827 def _ensure_valid_date_range(self) -> None: 828 if (self.override_start or self.override_end) and not self.is_start_and_end_allowed: 829 raise PlanError( 830 "The start and end dates can't be set for a production plan without restatements." 831 ) 832 833 if (start := self.start) and (end := self.end): 834 if to_datetime(start) > to_datetime(end): 835 raise PlanError( 836 f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'" 837 ) 838 839 if end := self.end: 840 if to_datetime(end) > to_datetime(self.execution_time): 841 raise PlanError( 842 f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')" 843 ) 844 845 # Validate model-specific start/end dates 846 if (start := self.start or self._default_start) and (end := self.end): 847 start_ts = to_datetime(start) 848 end_ts = to_datetime(end) 849 if start_ts > end_ts: 850 models_to_check: t.Set[str] = ( 851 set(self._backfill_models or []) 852 | set(self._context_diff.modified_snapshots.keys()) 853 | {s.name for s in self._context_diff.added} 854 | set((self._end_override_per_model or {}).keys()) 855 ) 856 for model_name in models_to_check: 857 if snapshot := self._model_fqn_to_snapshot.get(model_name): 858 if snapshot.node.start is None or to_datetime(snapshot.node.start) > end_ts: 859 raise PlanError( 860 f"Model '{model_name}': Start date / time '({time_like_to_str(start_ts)})' can't be greater than end date / time '({time_like_to_str(end_ts)})'.\n" 861 f"Set the `start` attribute in your project config model defaults to avoid this issue." 862 ) 863 864 def _ensure_no_broken_references(self) -> None: 865 for snapshot in self._context_diff.snapshots.values(): 866 broken_references = { 867 x.name for x in self._context_diff.removed_snapshots.values() if not x.is_external 868 } & {x for x in snapshot.node.depends_on} 869 if broken_references: 870 broken_references_msg = ", ".join(f"'{x}'" for x in broken_references) 871 raise PlanError( 872 f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding.""" 873 ) 874 875 def _ensure_new_env_with_changes(self) -> None: 876 if ( 877 self._is_dev 878 and not self._include_unmodified 879 and self._context_diff.is_new_environment 880 and not self._context_diff.has_snapshot_changes 881 and not self._context_diff.has_environment_statements_changes 882 and not self._backfill_models 883 ): 884 raise NoChangesPlanError( 885 f"Creating a new environment requires a change, but project files match the `{self._context_diff.create_from}` environment. Make a change or use the --include-unmodified flag to create a new environment without changes." 886 ) 887 888 @cached_property 889 def _forward_only_preview_needed(self) -> bool: 890 """Determines whether the plan should compute previews for forward-only changes (if there are any).""" 891 return self._is_dev and ( 892 self._forward_only 893 or ( 894 self._enable_preview 895 and any( 896 snapshot.model.forward_only 897 for snapshot in self._modified_and_added_snapshots 898 if snapshot.is_model 899 ) 900 ) 901 ) 902 903 @cached_property 904 def _non_forward_only_preview_needed(self) -> bool: 905 if not self._is_dev: 906 return False 907 for snapshot in self._modified_and_added_snapshots: 908 if not snapshot.is_model: 909 continue 910 if ( 911 not snapshot.virtual_environment_mode.is_full 912 or snapshot.model.auto_restatement_cron is not None 913 ): 914 return True 915 return False 916 917 @cached_property 918 def _modified_and_added_snapshots(self) -> t.List[Snapshot]: 919 return [ 920 snapshot 921 for snapshot in self._context_diff.snapshots.values() 922 if snapshot.name in self._context_diff.modified_snapshots 923 or snapshot.snapshot_id in self._context_diff.added 924 ]
logger =
<Logger sqlmesh.core.plan.builder (WARNING)>
class
PlanBuilder:
56class PlanBuilder: 57 """Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes. 58 59 Args: 60 context_diff: The context diff that the plan is based on. 61 start: The start time to backfill data. 62 end: The end time to backfill data. 63 execution_time: The date/time time reference to use for execution time. Defaults to now. 64 If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time 65 apply: The callback to apply the plan. 66 restate_models: A list of models for which the data should be restated for the time range 67 specified in this plan. Note: models defined outside SQLMesh (external) won't be a part 68 of the restatement. 69 restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals 70 being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments). 71 If set to None, the default behaviour is to not clear anything unless the target environment is prod. 72 backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan. 73 no_gaps: Whether to ensure that new snapshots for nodes that are already a 74 part of the target environment have no data gaps when compared against previous 75 snapshots for same nodes. 76 skip_backfill: Whether to skip the backfill step. 77 empty_backfill: Like skip_backfill, but also records processed intervals. 78 is_dev: Whether this plan is for development purposes. 79 forward_only: Whether the purpose of the plan is to make forward only changes. 80 allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive. 81 allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive. 82 environment_ttl: The period of time that a development environment should exist before being deleted. 83 categorizer_config: Auto categorization settings. 84 auto_categorization_enabled: Whether to apply auto categorization. 85 effective_from: The effective date from which to apply forward-only changes on production. 86 include_unmodified: Indicates whether to include unmodified nodes in the target development environment. 87 environment_suffix_target: Indicates whether to append the environment name to the schema or table name. 88 default_start: The default plan start to use if not specified. 89 default_end: The default plan end to use if not specified. 90 enable_preview: Whether to enable preview for forward-only models in development environments. 91 end_bounded: If set to true, the missing intervals will be bounded by the target end date, disregarding lookback, 92 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 93 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 94 environment state, or to use whatever snapshots are in the current environment state even if 95 the environment is not finalized. 96 start_override_per_model: A mapping of model FQNs to target start dates. 97 end_override_per_model: A mapping of model FQNs to target end dates. 98 ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals. 99 explain: Whether to explain the plan instead of applying it. 100 """ 101 102 def __init__( 103 self, 104 context_diff: ContextDiff, 105 start: t.Optional[TimeLike] = None, 106 end: t.Optional[TimeLike] = None, 107 execution_time: t.Optional[TimeLike] = None, 108 apply: t.Optional[t.Callable[[Plan], None]] = None, 109 restate_models: t.Optional[t.Iterable[str]] = None, 110 restate_all_snapshots: bool = False, 111 backfill_models: t.Optional[t.Iterable[str]] = None, 112 no_gaps: bool = False, 113 skip_backfill: bool = False, 114 empty_backfill: bool = False, 115 is_dev: bool = False, 116 forward_only: bool = False, 117 allow_destructive_models: t.Optional[t.Iterable[str]] = None, 118 allow_additive_models: t.Optional[t.Iterable[str]] = None, 119 environment_ttl: t.Optional[str] = None, 120 environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default, 121 environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, 122 categorizer_config: t.Optional[CategorizerConfig] = None, 123 auto_categorization_enabled: bool = True, 124 effective_from: t.Optional[TimeLike] = None, 125 include_unmodified: bool = False, 126 default_start: t.Optional[TimeLike] = None, 127 default_end: t.Optional[TimeLike] = None, 128 enable_preview: bool = False, 129 end_bounded: bool = False, 130 ensure_finalized_snapshots: bool = False, 131 explain: bool = False, 132 ignore_cron: bool = False, 133 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 134 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 135 console: t.Optional[PlanBuilderConsole] = None, 136 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None, 137 selected_models: t.Optional[t.Set[str]] = None, 138 ): 139 self._context_diff = context_diff 140 self._no_gaps = no_gaps 141 self._skip_backfill = skip_backfill 142 self._empty_backfill = empty_backfill 143 self._is_dev = is_dev 144 self._forward_only = forward_only 145 self._allow_destructive_models = set( 146 allow_destructive_models if allow_destructive_models is not None else [] 147 ) 148 self._allow_additive_models = set( 149 allow_additive_models if allow_additive_models is not None else [] 150 ) 151 self._enable_preview = enable_preview 152 self._end_bounded = end_bounded 153 self._ensure_finalized_snapshots = ensure_finalized_snapshots 154 self._ignore_cron = ignore_cron 155 self._start_override_per_model = start_override_per_model 156 self._end_override_per_model = end_override_per_model 157 self._environment_ttl = environment_ttl 158 self._categorizer_config = categorizer_config or CategorizerConfig() 159 self._auto_categorization_enabled = auto_categorization_enabled 160 self._include_unmodified = include_unmodified 161 self._restate_models = set(restate_models) if restate_models is not None else None 162 self._restate_all_snapshots = restate_all_snapshots 163 self._effective_from = effective_from 164 165 # note: this deliberately doesnt default to now() here. 166 # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run 167 # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None 168 # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past 169 # ref: https://github.com/SQLMesh/sqlmesh/pull/4702#discussion_r2140696156 170 self._execution_time = execution_time 171 172 self._backfill_models = backfill_models 173 self._end = end or default_end 174 self._default_start = default_start 175 self._apply = apply 176 self._console = console or get_console() 177 self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {} 178 self._user_provided_flags = user_provided_flags 179 self._selected_models = selected_models 180 self._explain = explain 181 182 self._start = start 183 if not self._start and ( 184 self._forward_only_preview_needed or self._non_forward_only_preview_needed 185 ): 186 self._start = default_start or yesterday_ds() 187 188 self._plan_id: str = random_id() 189 self._model_fqn_to_snapshot = {s.name: s for s in self._context_diff.snapshots.values()} 190 191 self.override_start = start is not None 192 self.override_end = end is not None 193 self.environment_naming_info = EnvironmentNamingInfo.from_environment_catalog_mapping( 194 environment_catalog_mapping or {}, 195 name=self._context_diff.environment, 196 suffix_target=environment_suffix_target, 197 normalize_name=self._context_diff.normalize_environment_name, 198 gateway_managed=self._context_diff.gateway_managed_virtual_layer, 199 ) 200 201 self._latest_plan: t.Optional[Plan] = None 202 203 @property 204 def is_start_and_end_allowed(self) -> bool: 205 """Indicates whether this plan allows to set the start and end dates.""" 206 return self._is_dev or bool(self._restate_models) 207 208 @property 209 def start(self) -> t.Optional[TimeLike]: 210 if self._start and is_relative(self._start): 211 # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' 212 return to_datetime(self._start, relative_base=to_datetime(self.execution_time)) 213 return self._start 214 215 @property 216 def end(self) -> t.Optional[TimeLike]: 217 if self._end and is_relative(self._end): 218 # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' 219 return to_datetime(self._end, relative_base=to_datetime(self.execution_time)) 220 return self._end 221 222 @cached_property 223 def execution_time(self) -> TimeLike: 224 # this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings 225 # during the plan building process 226 return self._execution_time or now() 227 228 def set_start(self, new_start: TimeLike) -> PlanBuilder: 229 self._start = new_start 230 self.override_start = True 231 self._latest_plan = None 232 return self 233 234 def set_end(self, new_end: TimeLike) -> PlanBuilder: 235 self._end = new_end 236 self.override_end = True 237 self._latest_plan = None 238 return self 239 240 def set_effective_from(self, effective_from: t.Optional[TimeLike]) -> PlanBuilder: 241 """Sets the effective date for all new snapshots in the plan. 242 243 Note: this is only applicable for forward-only plans. 244 245 Args: 246 effective_from: The effective date to set. 247 """ 248 self._effective_from = effective_from 249 if effective_from and self._is_dev and not self.override_start: 250 self._start = effective_from 251 self._latest_plan = None 252 return self 253 254 def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> PlanBuilder: 255 """Sets a snapshot version based on the user choice. 256 257 Args: 258 snapshot: The target snapshot. 259 choice: The user decision on how to version the target snapshot and its children. 260 """ 261 if not self._is_new_snapshot(snapshot): 262 raise PlanError( 263 f"A choice can't be changed for the existing version of {snapshot.name}." 264 ) 265 if ( 266 not self._context_diff.directly_modified(snapshot.name) 267 and snapshot.snapshot_id not in self._context_diff.added 268 ): 269 raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).") 270 271 self._choices[snapshot.snapshot_id] = choice 272 self._latest_plan = None 273 return self 274 275 def apply(self) -> None: 276 """Builds and applies the plan.""" 277 if not self._apply: 278 raise PlanError("Plan was not initialized with an applier.") 279 self._apply(self.build()) 280 281 def build(self) -> Plan: 282 """Builds the plan.""" 283 if self._latest_plan: 284 return self._latest_plan 285 286 self._ensure_new_env_with_changes() 287 self._ensure_valid_date_range() 288 self._ensure_no_broken_references() 289 290 self._apply_effective_from() 291 292 dag = self._build_dag() 293 directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag) 294 295 self._check_destructive_additive_changes(directly_modified) 296 self._categorize_snapshots(dag, indirectly_modified) 297 self._adjust_snapshot_intervals() 298 299 deployability_index = ( 300 DeployabilityIndex.create( 301 self._context_diff.snapshots.values(), 302 start=self._start, 303 start_override_per_model=self._start_override_per_model, 304 ) 305 if self._is_dev 306 else DeployabilityIndex.all_deployable() 307 ) 308 309 restatements = self._build_restatements( 310 dag, 311 earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time), 312 ) 313 models_to_backfill = self._build_models_to_backfill(dag, restatements) 314 315 end_override_per_model = self._end_override_per_model 316 if end_override_per_model and self.override_end: 317 # If the end date was provided explicitly by a user, then interval end for each individual 318 # model should be ignored. 319 end_override_per_model = None 320 321 # this deliberately uses the passed in self._execution_time and not self.execution_time cached property 322 # the reason is because that there can be a delay between the Plan being built and the Plan being actually run, 323 # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to 324 # the current timestamp of when the Plan is eventually run 325 plan_execution_time = self._execution_time 326 327 plan = Plan( 328 context_diff=self._context_diff, 329 plan_id=self._plan_id, 330 provided_start=self.start, 331 provided_end=self.end, 332 is_dev=self._is_dev, 333 skip_backfill=self._skip_backfill, 334 empty_backfill=self._empty_backfill, 335 no_gaps=self._no_gaps, 336 forward_only=self._forward_only, 337 explain=self._explain, 338 allow_destructive_models=t.cast(t.Set, self._allow_destructive_models), 339 allow_additive_models=t.cast(t.Set, self._allow_additive_models), 340 include_unmodified=self._include_unmodified, 341 environment_ttl=self._environment_ttl, 342 environment_naming_info=self.environment_naming_info, 343 directly_modified=directly_modified, 344 indirectly_modified=indirectly_modified, 345 deployability_index=deployability_index, 346 selected_models_to_restate=self._restate_models, 347 restatements=restatements, 348 restate_all_snapshots=self._restate_all_snapshots, 349 start_override_per_model=self._start_override_per_model, 350 end_override_per_model=end_override_per_model, 351 selected_models_to_backfill=self._backfill_models, 352 models_to_backfill=models_to_backfill, 353 effective_from=self._effective_from, 354 execution_time=plan_execution_time, 355 end_bounded=self._end_bounded, 356 ensure_finalized_snapshots=self._ensure_finalized_snapshots, 357 ignore_cron=self._ignore_cron, 358 user_provided_flags=self._user_provided_flags, 359 selected_models=self._selected_models, 360 ) 361 self._latest_plan = plan 362 return plan 363 364 def _build_dag(self) -> DAG[SnapshotId]: 365 dag: DAG[SnapshotId] = DAG() 366 for s_id, context_snapshot in self._context_diff.snapshots.items(): 367 dag.add(s_id, context_snapshot.parents) 368 return dag 369 370 def _build_restatements( 371 self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike 372 ) -> t.Dict[SnapshotId, Interval]: 373 restate_models = self._restate_models 374 if restate_models == set(): 375 # This is a warning but we print this as error since the Console is lacking API for warnings. 376 self._console.log_error( 377 "Provided restated models do not match any models. No models will be included in plan." 378 ) 379 return {} 380 381 restatements: t.Dict[SnapshotId, Interval] = {} 382 forward_only_preview_needed = self._forward_only_preview_needed 383 is_preview = False 384 if not restate_models and forward_only_preview_needed: 385 # Add model names for new forward-only snapshots to the restatement list 386 # in order to compute previews. 387 restate_models = { 388 s.name 389 for s in self._context_diff.new_snapshots.values() 390 if s.is_model 391 and not s.is_symbolic 392 and (s.is_forward_only or s.model.forward_only) 393 and not s.is_no_preview 394 and ( 395 # Metadata changes should not be previewed. 396 self._context_diff.directly_modified(s.name) 397 or self._context_diff.indirectly_modified(s.name) 398 ) 399 } 400 is_preview = True 401 402 if not restate_models: 403 return {} 404 405 start = self._start or earliest_interval_start 406 end = self._end or now() 407 408 # Add restate snapshots and their downstream snapshots 409 for model_fqn in restate_models: 410 if model_fqn not in self._model_fqn_to_snapshot: 411 raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.") 412 413 # Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's 414 # restatement range that it's downstream dependencies all expand their restatement ranges as well. 415 for s_id in dag: 416 snapshot = self._context_diff.snapshots[s_id] 417 418 if is_preview and snapshot.is_no_preview: 419 continue 420 421 # Since we are traversing the graph in topological order and the largest interval range is pushed down 422 # the graph we just have to check our immediate parents in the graph and not the whole upstream graph. 423 restating_parents = [ 424 self._context_diff.snapshots[s] for s in snapshot.parents if s in restatements 425 ] 426 427 if not restating_parents and snapshot.name not in restate_models: 428 continue 429 430 if not forward_only_preview_needed: 431 if self._is_dev and not snapshot.is_paused: 432 self._console.log_warning( 433 f"Cannot restate model '{snapshot.name}' because the current version is used in production. " 434 "Run the restatement against the production environment instead to restate this model." 435 ) 436 continue 437 elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement: 438 self._console.log_warning( 439 f"Cannot restate model '{snapshot.name}'. " 440 "Restatement is disabled for this model to prevent possible data loss. " 441 "If you want to restate this model, change the model's `disable_restatement` setting to `false`." 442 ) 443 continue 444 elif snapshot.is_seed: 445 logger.info("Skipping restatement for model '%s'", snapshot.name) 446 continue 447 448 possible_intervals = { 449 restatements[p.snapshot_id] for p in restating_parents if p.is_incremental 450 } 451 possible_intervals.add( 452 snapshot.get_removal_interval( 453 start, 454 end, 455 self._execution_time, 456 strict=False, 457 is_preview=is_preview, 458 ) 459 ) 460 snapshot_start = min(i[0] for i in possible_intervals) 461 snapshot_end = max(i[1] for i in possible_intervals) 462 463 # We may be tasked with restating a time range smaller than the target snapshot interval unit 464 # For example, restating an hour of Hourly Model A, which has a downstream dependency of Daily Model B 465 # we need to ensure the whole affected day in Model B is restated 466 floored_snapshot_start = snapshot.node.interval_unit.cron_floor(snapshot_start) 467 floored_snapshot_end = snapshot.node.interval_unit.cron_floor(snapshot_end) 468 if to_timestamp(floored_snapshot_end) < snapshot_end: 469 snapshot_start = to_timestamp(floored_snapshot_start) 470 snapshot_end = to_timestamp( 471 snapshot.node.interval_unit.cron_next(floored_snapshot_end) 472 ) 473 474 restatements[s_id] = (snapshot_start, snapshot_end) 475 476 return restatements 477 478 def _build_directly_and_indirectly_modified( 479 self, dag: DAG[SnapshotId] 480 ) -> t.Tuple[t.Set[SnapshotId], SnapshotMapping]: 481 """Builds collections of directly and indirectly modified snapshots. 482 483 Returns: 484 The tuple in which the first element contains a list of added and directly modified 485 snapshots while the second element contains a mapping of indirectly modified snapshots. 486 """ 487 directly_modified = set() 488 all_indirectly_modified = set() 489 490 for s_id in dag: 491 if s_id.name in self._context_diff.modified_snapshots: 492 if self._context_diff.directly_modified(s_id.name): 493 directly_modified.add(s_id) 494 else: 495 all_indirectly_modified.add(s_id) 496 elif s_id in self._context_diff.added: 497 directly_modified.add(s_id) 498 499 indirectly_modified: SnapshotMapping = defaultdict(set) 500 for snapshot in directly_modified: 501 for downstream_s_id in dag.downstream(snapshot.snapshot_id): 502 if downstream_s_id in all_indirectly_modified: 503 indirectly_modified[snapshot.snapshot_id].add(downstream_s_id) 504 505 return ( 506 directly_modified, 507 indirectly_modified, 508 ) 509 510 def _build_models_to_backfill( 511 self, dag: DAG[SnapshotId], restatements: t.Collection[SnapshotId] 512 ) -> t.Optional[t.Set[str]]: 513 backfill_models = ( 514 self._backfill_models 515 if self._backfill_models is not None 516 else [r.name for r in restatements] 517 # Only backfill models explicitly marked for restatement. 518 if self._restate_models 519 else None 520 ) 521 if backfill_models is None: 522 return None 523 return { 524 self._context_diff.snapshots[s_id].name 525 for s_id in dag.subdag( 526 *[ 527 self._model_fqn_to_snapshot[m].snapshot_id 528 for m in backfill_models 529 if m in self._model_fqn_to_snapshot 530 ] 531 ).sorted 532 } 533 534 def _adjust_snapshot_intervals(self) -> None: 535 for new, old in self._context_diff.modified_snapshots.values(): 536 if not new.is_model or not old.is_model: 537 continue 538 is_same_version = old.version_get_or_generate() == new.version_get_or_generate() 539 if is_same_version and should_force_rebuild(old, new): 540 # If the difference between 2 snapshots requires a full rebuild, 541 # then clear the intervals for the new snapshot. 542 self._context_diff.snapshots[new.snapshot_id].intervals = [] 543 elif new.snapshot_id in self._context_diff.new_snapshots: 544 new.intervals = [] 545 new.dev_intervals = [] 546 if is_same_version: 547 new.merge_intervals(old) 548 if new.is_forward_only: 549 new.dev_intervals = new.intervals.copy() 550 551 def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None: 552 for s_id in sorted(directly_modified): 553 if s_id.name not in self._context_diff.modified_snapshots: 554 continue 555 556 snapshot = self._context_diff.snapshots[s_id] 557 needs_destructive_check = snapshot.needs_destructive_check( 558 self._allow_destructive_models 559 ) 560 needs_additive_check = snapshot.needs_additive_check(self._allow_additive_models) 561 # should we raise/warn if this snapshot has/inherits a destructive change? 562 should_raise_or_warn = (self._is_forward_only_change(s_id) or self._forward_only) and ( 563 needs_destructive_check or needs_additive_check 564 ) 565 566 if not should_raise_or_warn or not snapshot.is_model: 567 continue 568 569 new, old = self._context_diff.modified_snapshots[snapshot.name] 570 571 # we must know all columns_to_types to determine whether a change is destructive 572 old_columns_to_types = old.model.columns_to_types or {} 573 new_columns_to_types = new.model.columns_to_types or {} 574 575 if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known( 576 new_columns_to_types 577 ): 578 alter_operations = t.cast( 579 t.List[TableAlterOperation], 580 get_schema_differ(snapshot.model.dialect).compare_columns( 581 new.name, 582 old_columns_to_types, 583 new_columns_to_types, 584 ignore_destructive=new.model.on_destructive_change.is_ignore, 585 ignore_additive=new.model.on_additive_change.is_ignore, 586 ), 587 ) 588 589 snapshot_name = snapshot.name 590 model_dialect = snapshot.model.dialect 591 592 if needs_destructive_check and has_drop_alteration(alter_operations): 593 self._console.log_destructive_change( 594 snapshot_name, 595 alter_operations, 596 model_dialect, 597 error=not snapshot.model.on_destructive_change.is_warn, 598 ) 599 if snapshot.model.on_destructive_change.is_error: 600 raise PlanError( 601 "Plan requires a destructive change to a forward-only model." 602 ) 603 604 if needs_additive_check and has_additive_alteration(alter_operations): 605 self._console.log_additive_change( 606 snapshot_name, 607 alter_operations, 608 model_dialect, 609 error=not snapshot.model.on_additive_change.is_warn, 610 ) 611 if snapshot.model.on_additive_change.is_error: 612 raise PlanError("Plan requires an additive change to a forward-only model.") 613 614 def _categorize_snapshots( 615 self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping 616 ) -> None: 617 """Automatically categorizes snapshots that can be automatically categorized and 618 returns a list of added and directly modified snapshots as well as the mapping of 619 indirectly modified snapshots. 620 """ 621 622 # Iterating in DAG order since a category for a snapshot may depend on the categories 623 # assigned to its upstream dependencies. 624 for s_id in dag: 625 snapshot = self._context_diff.snapshots.get(s_id) 626 627 if not snapshot or not self._is_new_snapshot(snapshot): 628 continue 629 630 forward_only = self._forward_only or self._is_forward_only_change(s_id) 631 if forward_only and s_id.name in self._context_diff.modified_snapshots: 632 new, old = self._context_diff.modified_snapshots[s_id.name] 633 if is_breaking_kind_change(old, new) or snapshot.is_seed: 634 # Breaking kind changes and seed changes can't be forward-only. 635 forward_only = False 636 637 if s_id in self._choices: 638 snapshot.categorize_as(self._choices[s_id], forward_only) 639 continue 640 641 if s_id in self._context_diff.added: 642 snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) 643 elif s_id.name in self._context_diff.modified_snapshots: 644 self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified) 645 646 def _categorize_snapshot( 647 self, 648 snapshot: Snapshot, 649 forward_only: bool, 650 dag: DAG[SnapshotId], 651 indirectly_modified: SnapshotMapping, 652 ) -> None: 653 s_id = snapshot.snapshot_id 654 655 if self._context_diff.directly_modified(s_id.name): 656 if self._auto_categorization_enabled: 657 new, old = self._context_diff.modified_snapshots[s_id.name] 658 if is_breaking_kind_change(old, new): 659 snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False) 660 return 661 662 s_id_with_missing_columns: t.Optional[SnapshotId] = None 663 this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id} 664 for downstream_s_id in this_sid_with_downstream: 665 downstream_snapshot = self._context_diff.snapshots[downstream_s_id] 666 if ( 667 downstream_snapshot.is_model 668 and downstream_snapshot.model.columns_to_types is None 669 ): 670 s_id_with_missing_columns = downstream_s_id 671 break 672 673 if s_id_with_missing_columns is None: 674 change_category = categorize_change(new, old, config=self._categorizer_config) 675 if change_category is not None: 676 snapshot.categorize_as(change_category, forward_only) 677 else: 678 mode = self._categorizer_config.dict().get( 679 new.model.source_type, AutoCategorizationMode.OFF 680 ) 681 if mode == AutoCategorizationMode.FULL: 682 snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) 683 elif self._context_diff.indirectly_modified(snapshot.name): 684 if snapshot.is_materialized_view and not forward_only: 685 # We categorize changes as breaking to allow for instantaneous switches in a virtual layer. 686 # Otherwise, there might be a potentially long downtime during MVs recreation. 687 # In the case of forward-only changes this optimization is not applicable because we want to continue 688 # using the same (existing) table version. 689 snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only) 690 return 691 692 all_upstream_forward_only = set() 693 all_upstream_categories = set() 694 direct_parent_categories = set() 695 696 for p_id in dag.upstream(s_id): 697 parent = self._context_diff.snapshots.get(p_id) 698 699 if parent and self._is_new_snapshot(parent): 700 all_upstream_categories.add(parent.change_category) 701 all_upstream_forward_only.add(parent.is_forward_only) 702 if p_id in snapshot.parents: 703 direct_parent_categories.add(parent.change_category) 704 705 if all_upstream_forward_only == {True} or ( 706 snapshot.is_model and snapshot.model.forward_only 707 ): 708 forward_only = True 709 710 if direct_parent_categories.intersection( 711 {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} 712 ): 713 snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only) 714 elif not direct_parent_categories: 715 snapshot.categorize_as( 716 self._get_orphaned_indirect_change_category(snapshot), forward_only 717 ) 718 elif all_upstream_categories == {SnapshotChangeCategory.METADATA}: 719 snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only) 720 else: 721 snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only) 722 else: 723 # Metadata updated. 724 snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only) 725 726 def _get_orphaned_indirect_change_category( 727 self, indirect_snapshot: Snapshot 728 ) -> SnapshotChangeCategory: 729 """Sometimes an indirectly changed downstream snapshot ends up with no directly changed parents introduced in the same plan. 730 This may happen when 2 or more parent models were changed independently in different plans and then the changes were 731 merged together and applied in a single plan. As a result, a combination of 2 or more previously changed parents produces 732 a new downstream snapshot not previously seen. 733 734 This function is used to infer the correct change category for such downstream snapshots based on change categories of their parents. 735 """ 736 previous_snapshot = self._context_diff.modified_snapshots[indirect_snapshot.name][1] 737 previous_parent_snapshot_ids = {p.name: p for p in previous_snapshot.parents} 738 739 current_parent_snapshots = [ 740 self._context_diff.snapshots[p_id] 741 for p_id in indirect_snapshot.parents 742 if p_id in self._context_diff.snapshots 743 ] 744 745 indirect_category: t.Optional[SnapshotChangeCategory] = None 746 for current_parent_snapshot in current_parent_snapshots: 747 if current_parent_snapshot.name not in previous_parent_snapshot_ids: 748 # This is a new parent so falling back to INDIRECT_BREAKING 749 return SnapshotChangeCategory.INDIRECT_BREAKING 750 pevious_parent_snapshot_id = previous_parent_snapshot_ids[current_parent_snapshot.name] 751 752 if current_parent_snapshot.snapshot_id == pevious_parent_snapshot_id: 753 # There were no new versions of this parent since the previous version of this snapshot, 754 # so we can skip it 755 continue 756 757 # Find the previous snapshot ID of the same parent in the historical chain 758 previous_parent_found = False 759 previous_parent_categories = set() 760 for pv in reversed(current_parent_snapshot.all_versions): 761 pv_snapshot_id = pv.snapshot_id(current_parent_snapshot.name) 762 if pv_snapshot_id == pevious_parent_snapshot_id: 763 previous_parent_found = True 764 break 765 previous_parent_categories.add(pv.change_category) 766 767 if not previous_parent_found: 768 # The previous parent is not in the historical chain so falling back to INDIRECT_BREAKING 769 return SnapshotChangeCategory.INDIRECT_BREAKING 770 771 if previous_parent_categories.intersection( 772 {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} 773 ): 774 # One of the new parents in the chain was breaking so this indirect snapshot is breaking 775 return SnapshotChangeCategory.INDIRECT_BREAKING 776 777 if previous_parent_categories.intersection( 778 { 779 SnapshotChangeCategory.NON_BREAKING, 780 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 781 } 782 ): 783 # All changes in the chain were non-breaking so this indirect snapshot can be non-breaking too 784 indirect_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING 785 elif ( 786 previous_parent_categories == {SnapshotChangeCategory.METADATA} 787 and indirect_category is None 788 ): 789 # All changes in the chain were metadata so this indirect snapshot can be metadata too 790 indirect_category = SnapshotChangeCategory.METADATA 791 792 return indirect_category or SnapshotChangeCategory.INDIRECT_BREAKING 793 794 def _apply_effective_from(self) -> None: 795 if self._effective_from: 796 if not self._forward_only: 797 raise PlanError("Effective date can only be set for a forward-only plan.") 798 if to_datetime(self._effective_from) > now(): 799 raise PlanError("Effective date cannot be in the future.") 800 801 for snapshot in self._context_diff.new_snapshots.values(): 802 if ( 803 snapshot.evaluatable 804 and not snapshot.disable_restatement 805 and (not snapshot.full_history_restatement_only or not snapshot.is_incremental) 806 ): 807 snapshot.effective_from = self._effective_from 808 809 def _is_forward_only_change(self, s_id: SnapshotId) -> bool: 810 if not self._context_diff.directly_modified( 811 s_id.name 812 ) and not self._context_diff.indirectly_modified(s_id.name): 813 return False 814 snapshot = self._context_diff.snapshots[s_id] 815 if snapshot.name in self._context_diff.modified_snapshots: 816 _, old = self._context_diff.modified_snapshots[snapshot.name] 817 # If the model kind has changed in a breaking way, then we can't consider this to be a forward-only change. 818 if snapshot.is_model and is_breaking_kind_change(old, snapshot): 819 return False 820 return ( 821 snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions) 822 ) 823 824 def _is_new_snapshot(self, snapshot: Snapshot) -> bool: 825 """Returns True if the given snapshot is a new snapshot in this plan.""" 826 return snapshot.snapshot_id in self._context_diff.new_snapshots 827 828 def _ensure_valid_date_range(self) -> None: 829 if (self.override_start or self.override_end) and not self.is_start_and_end_allowed: 830 raise PlanError( 831 "The start and end dates can't be set for a production plan without restatements." 832 ) 833 834 if (start := self.start) and (end := self.end): 835 if to_datetime(start) > to_datetime(end): 836 raise PlanError( 837 f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'" 838 ) 839 840 if end := self.end: 841 if to_datetime(end) > to_datetime(self.execution_time): 842 raise PlanError( 843 f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')" 844 ) 845 846 # Validate model-specific start/end dates 847 if (start := self.start or self._default_start) and (end := self.end): 848 start_ts = to_datetime(start) 849 end_ts = to_datetime(end) 850 if start_ts > end_ts: 851 models_to_check: t.Set[str] = ( 852 set(self._backfill_models or []) 853 | set(self._context_diff.modified_snapshots.keys()) 854 | {s.name for s in self._context_diff.added} 855 | set((self._end_override_per_model or {}).keys()) 856 ) 857 for model_name in models_to_check: 858 if snapshot := self._model_fqn_to_snapshot.get(model_name): 859 if snapshot.node.start is None or to_datetime(snapshot.node.start) > end_ts: 860 raise PlanError( 861 f"Model '{model_name}': Start date / time '({time_like_to_str(start_ts)})' can't be greater than end date / time '({time_like_to_str(end_ts)})'.\n" 862 f"Set the `start` attribute in your project config model defaults to avoid this issue." 863 ) 864 865 def _ensure_no_broken_references(self) -> None: 866 for snapshot in self._context_diff.snapshots.values(): 867 broken_references = { 868 x.name for x in self._context_diff.removed_snapshots.values() if not x.is_external 869 } & {x for x in snapshot.node.depends_on} 870 if broken_references: 871 broken_references_msg = ", ".join(f"'{x}'" for x in broken_references) 872 raise PlanError( 873 f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding.""" 874 ) 875 876 def _ensure_new_env_with_changes(self) -> None: 877 if ( 878 self._is_dev 879 and not self._include_unmodified 880 and self._context_diff.is_new_environment 881 and not self._context_diff.has_snapshot_changes 882 and not self._context_diff.has_environment_statements_changes 883 and not self._backfill_models 884 ): 885 raise NoChangesPlanError( 886 f"Creating a new environment requires a change, but project files match the `{self._context_diff.create_from}` environment. Make a change or use the --include-unmodified flag to create a new environment without changes." 887 ) 888 889 @cached_property 890 def _forward_only_preview_needed(self) -> bool: 891 """Determines whether the plan should compute previews for forward-only changes (if there are any).""" 892 return self._is_dev and ( 893 self._forward_only 894 or ( 895 self._enable_preview 896 and any( 897 snapshot.model.forward_only 898 for snapshot in self._modified_and_added_snapshots 899 if snapshot.is_model 900 ) 901 ) 902 ) 903 904 @cached_property 905 def _non_forward_only_preview_needed(self) -> bool: 906 if not self._is_dev: 907 return False 908 for snapshot in self._modified_and_added_snapshots: 909 if not snapshot.is_model: 910 continue 911 if ( 912 not snapshot.virtual_environment_mode.is_full 913 or snapshot.model.auto_restatement_cron is not None 914 ): 915 return True 916 return False 917 918 @cached_property 919 def _modified_and_added_snapshots(self) -> t.List[Snapshot]: 920 return [ 921 snapshot 922 for snapshot in self._context_diff.snapshots.values() 923 if snapshot.name in self._context_diff.modified_snapshots 924 or snapshot.snapshot_id in self._context_diff.added 925 ]
Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes.
Arguments:
- context_diff: The context diff that the plan is based on.
- start: The start time to backfill data.
- end: The end time to backfill data.
- execution_time: The date/time time reference to use for execution time. Defaults to now. If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
- apply: The callback to apply the plan.
- restate_models: A list of models for which the data should be restated for the time range specified in this plan. Note: models defined outside SQLMesh (external) won't be a part of the restatement.
- restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments). If set to None, the default behaviour is to not clear anything unless the target environment is prod.
- backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
- no_gaps: Whether to ensure that new snapshots for nodes that are already a part of the target environment have no data gaps when compared against previous snapshots for same nodes.
- skip_backfill: Whether to skip the backfill step.
- empty_backfill: Like skip_backfill, but also records processed intervals.
- is_dev: Whether this plan is for development purposes.
- forward_only: Whether the purpose of the plan is to make forward only changes.
- allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive.
- allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive.
- environment_ttl: The period of time that a development environment should exist before being deleted.
- categorizer_config: Auto categorization settings.
- auto_categorization_enabled: Whether to apply auto categorization.
- effective_from: The effective date from which to apply forward-only changes on production.
- include_unmodified: Indicates whether to include unmodified nodes in the target development environment.
- environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
- default_start: The default plan start to use if not specified.
- default_end: The default plan end to use if not specified.
- enable_preview: Whether to enable preview for forward-only models in development environments.
- end_bounded: If set to true, the missing intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
- ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
- start_override_per_model: A mapping of model FQNs to target start dates.
- end_override_per_model: A mapping of model FQNs to target end dates.
- ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
- explain: Whether to explain the plan instead of applying it.
PlanBuilder( context_diff: sqlmesh.core.context_diff.ContextDiff, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, apply: Optional[Callable[[sqlmesh.core.plan.definition.Plan], NoneType]] = None, restate_models: Optional[Iterable[str]] = None, restate_all_snapshots: bool = False, backfill_models: Optional[Iterable[str]] = None, no_gaps: bool = False, skip_backfill: bool = False, empty_backfill: bool = False, is_dev: bool = False, forward_only: bool = False, allow_destructive_models: Optional[Iterable[str]] = None, allow_additive_models: Optional[Iterable[str]] = None, environment_ttl: Optional[str] = None, environment_suffix_target: sqlmesh.core.config.common.EnvironmentSuffixTarget = SCHEMA, environment_catalog_mapping: Optional[Dict[re.Pattern, str]] = None, categorizer_config: Optional[sqlmesh.core.config.categorizer.CategorizerConfig] = None, auto_categorization_enabled: bool = True, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, include_unmodified: bool = False, default_start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, default_end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, enable_preview: bool = False, end_bounded: bool = False, ensure_finalized_snapshots: bool = False, explain: bool = False, ignore_cron: bool = False, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, console: Optional[sqlmesh.core.console.PlanBuilderConsole] = None, user_provided_flags: Optional[Dict[str, Union[datetime.date, datetime.datetime, str, int, float, bool, List[str]]]] = None, selected_models: Optional[Set[str]] = None)
102 def __init__( 103 self, 104 context_diff: ContextDiff, 105 start: t.Optional[TimeLike] = None, 106 end: t.Optional[TimeLike] = None, 107 execution_time: t.Optional[TimeLike] = None, 108 apply: t.Optional[t.Callable[[Plan], None]] = None, 109 restate_models: t.Optional[t.Iterable[str]] = None, 110 restate_all_snapshots: bool = False, 111 backfill_models: t.Optional[t.Iterable[str]] = None, 112 no_gaps: bool = False, 113 skip_backfill: bool = False, 114 empty_backfill: bool = False, 115 is_dev: bool = False, 116 forward_only: bool = False, 117 allow_destructive_models: t.Optional[t.Iterable[str]] = None, 118 allow_additive_models: t.Optional[t.Iterable[str]] = None, 119 environment_ttl: t.Optional[str] = None, 120 environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default, 121 environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, 122 categorizer_config: t.Optional[CategorizerConfig] = None, 123 auto_categorization_enabled: bool = True, 124 effective_from: t.Optional[TimeLike] = None, 125 include_unmodified: bool = False, 126 default_start: t.Optional[TimeLike] = None, 127 default_end: t.Optional[TimeLike] = None, 128 enable_preview: bool = False, 129 end_bounded: bool = False, 130 ensure_finalized_snapshots: bool = False, 131 explain: bool = False, 132 ignore_cron: bool = False, 133 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 134 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 135 console: t.Optional[PlanBuilderConsole] = None, 136 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None, 137 selected_models: t.Optional[t.Set[str]] = None, 138 ): 139 self._context_diff = context_diff 140 self._no_gaps = no_gaps 141 self._skip_backfill = skip_backfill 142 self._empty_backfill = empty_backfill 143 self._is_dev = is_dev 144 self._forward_only = forward_only 145 self._allow_destructive_models = set( 146 allow_destructive_models if allow_destructive_models is not None else [] 147 ) 148 self._allow_additive_models = set( 149 allow_additive_models if allow_additive_models is not None else [] 150 ) 151 self._enable_preview = enable_preview 152 self._end_bounded = end_bounded 153 self._ensure_finalized_snapshots = ensure_finalized_snapshots 154 self._ignore_cron = ignore_cron 155 self._start_override_per_model = start_override_per_model 156 self._end_override_per_model = end_override_per_model 157 self._environment_ttl = environment_ttl 158 self._categorizer_config = categorizer_config or CategorizerConfig() 159 self._auto_categorization_enabled = auto_categorization_enabled 160 self._include_unmodified = include_unmodified 161 self._restate_models = set(restate_models) if restate_models is not None else None 162 self._restate_all_snapshots = restate_all_snapshots 163 self._effective_from = effective_from 164 165 # note: this deliberately doesnt default to now() here. 166 # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run 167 # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None 168 # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past 169 # ref: https://github.com/SQLMesh/sqlmesh/pull/4702#discussion_r2140696156 170 self._execution_time = execution_time 171 172 self._backfill_models = backfill_models 173 self._end = end or default_end 174 self._default_start = default_start 175 self._apply = apply 176 self._console = console or get_console() 177 self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {} 178 self._user_provided_flags = user_provided_flags 179 self._selected_models = selected_models 180 self._explain = explain 181 182 self._start = start 183 if not self._start and ( 184 self._forward_only_preview_needed or self._non_forward_only_preview_needed 185 ): 186 self._start = default_start or yesterday_ds() 187 188 self._plan_id: str = random_id() 189 self._model_fqn_to_snapshot = {s.name: s for s in self._context_diff.snapshots.values()} 190 191 self.override_start = start is not None 192 self.override_end = end is not None 193 self.environment_naming_info = EnvironmentNamingInfo.from_environment_catalog_mapping( 194 environment_catalog_mapping or {}, 195 name=self._context_diff.environment, 196 suffix_target=environment_suffix_target, 197 normalize_name=self._context_diff.normalize_environment_name, 198 gateway_managed=self._context_diff.gateway_managed_virtual_layer, 199 ) 200 201 self._latest_plan: t.Optional[Plan] = None
is_start_and_end_allowed: bool
203 @property 204 def is_start_and_end_allowed(self) -> bool: 205 """Indicates whether this plan allows to set the start and end dates.""" 206 return self._is_dev or bool(self._restate_models)
Indicates whether this plan allows to set the start and end dates.
start: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
208 @property 209 def start(self) -> t.Optional[TimeLike]: 210 if self._start and is_relative(self._start): 211 # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' 212 return to_datetime(self._start, relative_base=to_datetime(self.execution_time)) 213 return self._start
end: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
215 @property 216 def end(self) -> t.Optional[TimeLike]: 217 if self._end and is_relative(self._end): 218 # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' 219 return to_datetime(self._end, relative_base=to_datetime(self.execution_time)) 220 return self._end
def
set_start( self, new_start: Union[datetime.date, datetime.datetime, str, int, float]) -> PlanBuilder:
def
set_end( self, new_end: Union[datetime.date, datetime.datetime, str, int, float]) -> PlanBuilder:
def
set_effective_from( self, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType]) -> PlanBuilder:
240 def set_effective_from(self, effective_from: t.Optional[TimeLike]) -> PlanBuilder: 241 """Sets the effective date for all new snapshots in the plan. 242 243 Note: this is only applicable for forward-only plans. 244 245 Args: 246 effective_from: The effective date to set. 247 """ 248 self._effective_from = effective_from 249 if effective_from and self._is_dev and not self.override_start: 250 self._start = effective_from 251 self._latest_plan = None 252 return self
Sets the effective date for all new snapshots in the plan.
Note: this is only applicable for forward-only plans.
Arguments:
- effective_from: The effective date to set.
def
set_choice( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, choice: sqlmesh.core.snapshot.definition.SnapshotChangeCategory) -> PlanBuilder:
254 def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> PlanBuilder: 255 """Sets a snapshot version based on the user choice. 256 257 Args: 258 snapshot: The target snapshot. 259 choice: The user decision on how to version the target snapshot and its children. 260 """ 261 if not self._is_new_snapshot(snapshot): 262 raise PlanError( 263 f"A choice can't be changed for the existing version of {snapshot.name}." 264 ) 265 if ( 266 not self._context_diff.directly_modified(snapshot.name) 267 and snapshot.snapshot_id not in self._context_diff.added 268 ): 269 raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).") 270 271 self._choices[snapshot.snapshot_id] = choice 272 self._latest_plan = None 273 return self
Sets a snapshot version based on the user choice.
Arguments:
- snapshot: The target snapshot.
- choice: The user decision on how to version the target snapshot and its children.
def
apply(self) -> None:
275 def apply(self) -> None: 276 """Builds and applies the plan.""" 277 if not self._apply: 278 raise PlanError("Plan was not initialized with an applier.") 279 self._apply(self.build())
Builds and applies the plan.
281 def build(self) -> Plan: 282 """Builds the plan.""" 283 if self._latest_plan: 284 return self._latest_plan 285 286 self._ensure_new_env_with_changes() 287 self._ensure_valid_date_range() 288 self._ensure_no_broken_references() 289 290 self._apply_effective_from() 291 292 dag = self._build_dag() 293 directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag) 294 295 self._check_destructive_additive_changes(directly_modified) 296 self._categorize_snapshots(dag, indirectly_modified) 297 self._adjust_snapshot_intervals() 298 299 deployability_index = ( 300 DeployabilityIndex.create( 301 self._context_diff.snapshots.values(), 302 start=self._start, 303 start_override_per_model=self._start_override_per_model, 304 ) 305 if self._is_dev 306 else DeployabilityIndex.all_deployable() 307 ) 308 309 restatements = self._build_restatements( 310 dag, 311 earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time), 312 ) 313 models_to_backfill = self._build_models_to_backfill(dag, restatements) 314 315 end_override_per_model = self._end_override_per_model 316 if end_override_per_model and self.override_end: 317 # If the end date was provided explicitly by a user, then interval end for each individual 318 # model should be ignored. 319 end_override_per_model = None 320 321 # this deliberately uses the passed in self._execution_time and not self.execution_time cached property 322 # the reason is because that there can be a delay between the Plan being built and the Plan being actually run, 323 # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to 324 # the current timestamp of when the Plan is eventually run 325 plan_execution_time = self._execution_time 326 327 plan = Plan( 328 context_diff=self._context_diff, 329 plan_id=self._plan_id, 330 provided_start=self.start, 331 provided_end=self.end, 332 is_dev=self._is_dev, 333 skip_backfill=self._skip_backfill, 334 empty_backfill=self._empty_backfill, 335 no_gaps=self._no_gaps, 336 forward_only=self._forward_only, 337 explain=self._explain, 338 allow_destructive_models=t.cast(t.Set, self._allow_destructive_models), 339 allow_additive_models=t.cast(t.Set, self._allow_additive_models), 340 include_unmodified=self._include_unmodified, 341 environment_ttl=self._environment_ttl, 342 environment_naming_info=self.environment_naming_info, 343 directly_modified=directly_modified, 344 indirectly_modified=indirectly_modified, 345 deployability_index=deployability_index, 346 selected_models_to_restate=self._restate_models, 347 restatements=restatements, 348 restate_all_snapshots=self._restate_all_snapshots, 349 start_override_per_model=self._start_override_per_model, 350 end_override_per_model=end_override_per_model, 351 selected_models_to_backfill=self._backfill_models, 352 models_to_backfill=models_to_backfill, 353 effective_from=self._effective_from, 354 execution_time=plan_execution_time, 355 end_bounded=self._end_bounded, 356 ensure_finalized_snapshots=self._ensure_finalized_snapshots, 357 ignore_cron=self._ignore_cron, 358 user_provided_flags=self._user_provided_flags, 359 selected_models=self._selected_models, 360 ) 361 self._latest_plan = plan 362 return plan
Builds the plan.