Edit on GitHub

sqlmesh.core.plan.builder

  1from __future__ import annotations
  2
  3import logging
  4import re
  5import typing as t
  6from collections import defaultdict
  7from functools import cached_property
  8from datetime import datetime
  9
 10
 11from sqlmesh.core.console import PlanBuilderConsole, get_console
 12from sqlmesh.core.config import (
 13    AutoCategorizationMode,
 14    CategorizerConfig,
 15    EnvironmentSuffixTarget,
 16)
 17from sqlmesh.core.context_diff import ContextDiff
 18from sqlmesh.core.environment import EnvironmentNamingInfo
 19from sqlmesh.core.plan.common import should_force_rebuild, is_breaking_kind_change
 20from sqlmesh.core.plan.definition import (
 21    Plan,
 22    SnapshotMapping,
 23    UserProvidedFlags,
 24    earliest_interval_start,
 25)
 26from sqlmesh.core.schema_diff import (
 27    get_schema_differ,
 28    has_drop_alteration,
 29    has_additive_alteration,
 30    TableAlterOperation,
 31)
 32from sqlmesh.core.snapshot import (
 33    DeployabilityIndex,
 34    Snapshot,
 35    SnapshotChangeCategory,
 36)
 37from sqlmesh.core.snapshot.categorizer import categorize_change
 38from sqlmesh.core.snapshot.definition import Interval, SnapshotId
 39from sqlmesh.utils import columns_to_types_all_known, random_id
 40from sqlmesh.utils.dag import DAG
 41from sqlmesh.utils.date import (
 42    TimeLike,
 43    now,
 44    to_datetime,
 45    yesterday_ds,
 46    to_timestamp,
 47    time_like_to_str,
 48    is_relative,
 49)
 50from sqlmesh.utils.errors import NoChangesPlanError, PlanError
 51
 52logger = logging.getLogger(__name__)
 53
 54
 55class PlanBuilder:
 56    """Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes.
 57
 58    Args:
 59        context_diff: The context diff that the plan is based on.
 60        start: The start time to backfill data.
 61        end: The end time to backfill data.
 62        execution_time: The date/time time reference to use for execution time. Defaults to now.
 63            If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
 64        apply: The callback to apply the plan.
 65        restate_models: A list of models for which the data should be restated for the time range
 66            specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
 67            of the restatement.
 68        restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals
 69            being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments).
 70            If set to None, the default behaviour is to not clear anything unless the target environment is prod.
 71        backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
 72        no_gaps:  Whether to ensure that new snapshots for nodes that are already a
 73            part of the target environment have no data gaps when compared against previous
 74            snapshots for same nodes.
 75        skip_backfill: Whether to skip the backfill step.
 76        empty_backfill: Like skip_backfill, but also records processed intervals.
 77        is_dev: Whether this plan is for development purposes.
 78        forward_only: Whether the purpose of the plan is to make forward only changes.
 79        allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive.
 80        allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive.
 81        environment_ttl: The period of time that a development environment should exist before being deleted.
 82        categorizer_config: Auto categorization settings.
 83        auto_categorization_enabled: Whether to apply auto categorization.
 84        effective_from: The effective date from which to apply forward-only changes on production.
 85        include_unmodified: Indicates whether to include unmodified nodes in the target development environment.
 86        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
 87        default_start: The default plan start to use if not specified.
 88        default_end: The default plan end to use if not specified.
 89        enable_preview: Whether to enable preview for forward-only models in development environments.
 90        end_bounded: If set to true, the missing intervals will be bounded by the target end date, disregarding lookback,
 91            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
 92        ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
 93            environment state, or to use whatever snapshots are in the current environment state even if
 94            the environment is not finalized.
 95        start_override_per_model: A mapping of model FQNs to target start dates.
 96        end_override_per_model: A mapping of model FQNs to target end dates.
 97        ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
 98        explain: Whether to explain the plan instead of applying it.
 99    """
100
101    def __init__(
102        self,
103        context_diff: ContextDiff,
104        start: t.Optional[TimeLike] = None,
105        end: t.Optional[TimeLike] = None,
106        execution_time: t.Optional[TimeLike] = None,
107        apply: t.Optional[t.Callable[[Plan], None]] = None,
108        restate_models: t.Optional[t.Iterable[str]] = None,
109        restate_all_snapshots: bool = False,
110        backfill_models: t.Optional[t.Iterable[str]] = None,
111        no_gaps: bool = False,
112        skip_backfill: bool = False,
113        empty_backfill: bool = False,
114        is_dev: bool = False,
115        forward_only: bool = False,
116        allow_destructive_models: t.Optional[t.Iterable[str]] = None,
117        allow_additive_models: t.Optional[t.Iterable[str]] = None,
118        environment_ttl: t.Optional[str] = None,
119        environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default,
120        environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
121        categorizer_config: t.Optional[CategorizerConfig] = None,
122        auto_categorization_enabled: bool = True,
123        effective_from: t.Optional[TimeLike] = None,
124        include_unmodified: bool = False,
125        default_start: t.Optional[TimeLike] = None,
126        default_end: t.Optional[TimeLike] = None,
127        enable_preview: bool = False,
128        end_bounded: bool = False,
129        ensure_finalized_snapshots: bool = False,
130        explain: bool = False,
131        ignore_cron: bool = False,
132        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
133        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
134        console: t.Optional[PlanBuilderConsole] = None,
135        user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
136        selected_models: t.Optional[t.Set[str]] = None,
137    ):
138        self._context_diff = context_diff
139        self._no_gaps = no_gaps
140        self._skip_backfill = skip_backfill
141        self._empty_backfill = empty_backfill
142        self._is_dev = is_dev
143        self._forward_only = forward_only
144        self._allow_destructive_models = set(
145            allow_destructive_models if allow_destructive_models is not None else []
146        )
147        self._allow_additive_models = set(
148            allow_additive_models if allow_additive_models is not None else []
149        )
150        self._enable_preview = enable_preview
151        self._end_bounded = end_bounded
152        self._ensure_finalized_snapshots = ensure_finalized_snapshots
153        self._ignore_cron = ignore_cron
154        self._start_override_per_model = start_override_per_model
155        self._end_override_per_model = end_override_per_model
156        self._environment_ttl = environment_ttl
157        self._categorizer_config = categorizer_config or CategorizerConfig()
158        self._auto_categorization_enabled = auto_categorization_enabled
159        self._include_unmodified = include_unmodified
160        self._restate_models = set(restate_models) if restate_models is not None else None
161        self._restate_all_snapshots = restate_all_snapshots
162        self._effective_from = effective_from
163
164        # note: this deliberately doesnt default to now() here.
165        # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run
166        # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None
167        # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past
168        # ref: https://github.com/SQLMesh/sqlmesh/pull/4702#discussion_r2140696156
169        self._execution_time = execution_time
170
171        self._backfill_models = backfill_models
172        self._end = end or default_end
173        self._default_start = default_start
174        self._apply = apply
175        self._console = console or get_console()
176        self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {}
177        self._user_provided_flags = user_provided_flags
178        self._selected_models = selected_models
179        self._explain = explain
180
181        self._start = start
182        if not self._start and (
183            self._forward_only_preview_needed or self._non_forward_only_preview_needed
184        ):
185            self._start = default_start or yesterday_ds()
186
187        self._plan_id: str = random_id()
188        self._model_fqn_to_snapshot = {s.name: s for s in self._context_diff.snapshots.values()}
189
190        self.override_start = start is not None
191        self.override_end = end is not None
192        self.environment_naming_info = EnvironmentNamingInfo.from_environment_catalog_mapping(
193            environment_catalog_mapping or {},
194            name=self._context_diff.environment,
195            suffix_target=environment_suffix_target,
196            normalize_name=self._context_diff.normalize_environment_name,
197            gateway_managed=self._context_diff.gateway_managed_virtual_layer,
198        )
199
200        self._latest_plan: t.Optional[Plan] = None
201
202    @property
203    def is_start_and_end_allowed(self) -> bool:
204        """Indicates whether this plan allows to set the start and end dates."""
205        return self._is_dev or bool(self._restate_models)
206
207    @property
208    def start(self) -> t.Optional[TimeLike]:
209        if self._start and is_relative(self._start):
210            # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
211            return to_datetime(self._start, relative_base=to_datetime(self.execution_time))
212        return self._start
213
214    @property
215    def end(self) -> t.Optional[TimeLike]:
216        if self._end and is_relative(self._end):
217            # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
218            return to_datetime(self._end, relative_base=to_datetime(self.execution_time))
219        return self._end
220
221    @cached_property
222    def execution_time(self) -> TimeLike:
223        # this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings
224        # during the plan building process
225        return self._execution_time or now()
226
227    def set_start(self, new_start: TimeLike) -> PlanBuilder:
228        self._start = new_start
229        self.override_start = True
230        self._latest_plan = None
231        return self
232
233    def set_end(self, new_end: TimeLike) -> PlanBuilder:
234        self._end = new_end
235        self.override_end = True
236        self._latest_plan = None
237        return self
238
239    def set_effective_from(self, effective_from: t.Optional[TimeLike]) -> PlanBuilder:
240        """Sets the effective date for all new snapshots in the plan.
241
242        Note: this is only applicable for forward-only plans.
243
244        Args:
245            effective_from: The effective date to set.
246        """
247        self._effective_from = effective_from
248        if effective_from and self._is_dev and not self.override_start:
249            self._start = effective_from
250        self._latest_plan = None
251        return self
252
253    def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> PlanBuilder:
254        """Sets a snapshot version based on the user choice.
255
256        Args:
257            snapshot: The target snapshot.
258            choice: The user decision on how to version the target snapshot and its children.
259        """
260        if not self._is_new_snapshot(snapshot):
261            raise PlanError(
262                f"A choice can't be changed for the existing version of {snapshot.name}."
263            )
264        if (
265            not self._context_diff.directly_modified(snapshot.name)
266            and snapshot.snapshot_id not in self._context_diff.added
267        ):
268            raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).")
269
270        self._choices[snapshot.snapshot_id] = choice
271        self._latest_plan = None
272        return self
273
274    def apply(self) -> None:
275        """Builds and applies the plan."""
276        if not self._apply:
277            raise PlanError("Plan was not initialized with an applier.")
278        self._apply(self.build())
279
280    def build(self) -> Plan:
281        """Builds the plan."""
282        if self._latest_plan:
283            return self._latest_plan
284
285        self._ensure_new_env_with_changes()
286        self._ensure_valid_date_range()
287        self._ensure_no_broken_references()
288
289        self._apply_effective_from()
290
291        dag = self._build_dag()
292        directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag)
293
294        self._check_destructive_additive_changes(directly_modified)
295        self._categorize_snapshots(dag, indirectly_modified)
296        self._adjust_snapshot_intervals()
297
298        deployability_index = (
299            DeployabilityIndex.create(
300                self._context_diff.snapshots.values(),
301                start=self._start,
302                start_override_per_model=self._start_override_per_model,
303            )
304            if self._is_dev
305            else DeployabilityIndex.all_deployable()
306        )
307
308        restatements = self._build_restatements(
309            dag,
310            earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
311        )
312        models_to_backfill = self._build_models_to_backfill(dag, restatements)
313
314        end_override_per_model = self._end_override_per_model
315        if end_override_per_model and self.override_end:
316            # If the end date was provided explicitly by a user, then interval end for each individual
317            # model should be ignored.
318            end_override_per_model = None
319
320        # this deliberately uses the passed in self._execution_time and not self.execution_time cached property
321        # the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
322        # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to
323        # the current timestamp of when the Plan is eventually run
324        plan_execution_time = self._execution_time
325
326        plan = Plan(
327            context_diff=self._context_diff,
328            plan_id=self._plan_id,
329            provided_start=self.start,
330            provided_end=self.end,
331            is_dev=self._is_dev,
332            skip_backfill=self._skip_backfill,
333            empty_backfill=self._empty_backfill,
334            no_gaps=self._no_gaps,
335            forward_only=self._forward_only,
336            explain=self._explain,
337            allow_destructive_models=t.cast(t.Set, self._allow_destructive_models),
338            allow_additive_models=t.cast(t.Set, self._allow_additive_models),
339            include_unmodified=self._include_unmodified,
340            environment_ttl=self._environment_ttl,
341            environment_naming_info=self.environment_naming_info,
342            directly_modified=directly_modified,
343            indirectly_modified=indirectly_modified,
344            deployability_index=deployability_index,
345            selected_models_to_restate=self._restate_models,
346            restatements=restatements,
347            restate_all_snapshots=self._restate_all_snapshots,
348            start_override_per_model=self._start_override_per_model,
349            end_override_per_model=end_override_per_model,
350            selected_models_to_backfill=self._backfill_models,
351            models_to_backfill=models_to_backfill,
352            effective_from=self._effective_from,
353            execution_time=plan_execution_time,
354            end_bounded=self._end_bounded,
355            ensure_finalized_snapshots=self._ensure_finalized_snapshots,
356            ignore_cron=self._ignore_cron,
357            user_provided_flags=self._user_provided_flags,
358            selected_models=self._selected_models,
359        )
360        self._latest_plan = plan
361        return plan
362
363    def _build_dag(self) -> DAG[SnapshotId]:
364        dag: DAG[SnapshotId] = DAG()
365        for s_id, context_snapshot in self._context_diff.snapshots.items():
366            dag.add(s_id, context_snapshot.parents)
367        return dag
368
369    def _build_restatements(
370        self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
371    ) -> t.Dict[SnapshotId, Interval]:
372        restate_models = self._restate_models
373        if restate_models == set():
374            # This is a warning but we print this as error since the Console is lacking API for warnings.
375            self._console.log_error(
376                "Provided restated models do not match any models. No models will be included in plan."
377            )
378            return {}
379
380        restatements: t.Dict[SnapshotId, Interval] = {}
381        forward_only_preview_needed = self._forward_only_preview_needed
382        is_preview = False
383        if not restate_models and forward_only_preview_needed:
384            # Add model names for new forward-only snapshots to the restatement list
385            # in order to compute previews.
386            restate_models = {
387                s.name
388                for s in self._context_diff.new_snapshots.values()
389                if s.is_model
390                and not s.is_symbolic
391                and (s.is_forward_only or s.model.forward_only)
392                and not s.is_no_preview
393                and (
394                    # Metadata changes should not be previewed.
395                    self._context_diff.directly_modified(s.name)
396                    or self._context_diff.indirectly_modified(s.name)
397                )
398            }
399            is_preview = True
400
401        if not restate_models:
402            return {}
403
404        start = self._start or earliest_interval_start
405        end = self._end or now()
406
407        # Add restate snapshots and their downstream snapshots
408        for model_fqn in restate_models:
409            if model_fqn not in self._model_fqn_to_snapshot:
410                raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.")
411
412        # Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's
413        # restatement range that it's downstream dependencies all expand their restatement ranges as well.
414        for s_id in dag:
415            snapshot = self._context_diff.snapshots[s_id]
416
417            if is_preview and snapshot.is_no_preview:
418                continue
419
420            # Since we are traversing the graph in topological order and the largest interval range is pushed down
421            # the graph we just have to check our immediate parents in the graph and not the whole upstream graph.
422            restating_parents = [
423                self._context_diff.snapshots[s] for s in snapshot.parents if s in restatements
424            ]
425
426            if not restating_parents and snapshot.name not in restate_models:
427                continue
428
429            if not forward_only_preview_needed:
430                if self._is_dev and not snapshot.is_paused:
431                    self._console.log_warning(
432                        f"Cannot restate model '{snapshot.name}' because the current version is used in production. "
433                        "Run the restatement against the production environment instead to restate this model."
434                    )
435                    continue
436                elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement:
437                    self._console.log_warning(
438                        f"Cannot restate model '{snapshot.name}'. "
439                        "Restatement is disabled for this model to prevent possible data loss. "
440                        "If you want to restate this model, change the model's `disable_restatement` setting to `false`."
441                    )
442                    continue
443                elif snapshot.is_seed:
444                    logger.info("Skipping restatement for model '%s'", snapshot.name)
445                    continue
446
447            possible_intervals = {
448                restatements[p.snapshot_id] for p in restating_parents if p.is_incremental
449            }
450            possible_intervals.add(
451                snapshot.get_removal_interval(
452                    start,
453                    end,
454                    self._execution_time,
455                    strict=False,
456                    is_preview=is_preview,
457                )
458            )
459            snapshot_start = min(i[0] for i in possible_intervals)
460            snapshot_end = max(i[1] for i in possible_intervals)
461
462            # We may be tasked with restating a time range smaller than the target snapshot interval unit
463            # For example, restating an hour of Hourly Model A, which has a downstream dependency of Daily Model B
464            # we need to ensure the whole affected day in Model B is restated
465            floored_snapshot_start = snapshot.node.interval_unit.cron_floor(snapshot_start)
466            floored_snapshot_end = snapshot.node.interval_unit.cron_floor(snapshot_end)
467            if to_timestamp(floored_snapshot_end) < snapshot_end:
468                snapshot_start = to_timestamp(floored_snapshot_start)
469                snapshot_end = to_timestamp(
470                    snapshot.node.interval_unit.cron_next(floored_snapshot_end)
471                )
472
473            restatements[s_id] = (snapshot_start, snapshot_end)
474
475        return restatements
476
477    def _build_directly_and_indirectly_modified(
478        self, dag: DAG[SnapshotId]
479    ) -> t.Tuple[t.Set[SnapshotId], SnapshotMapping]:
480        """Builds collections of directly and indirectly modified snapshots.
481
482        Returns:
483            The tuple in which the first element contains a list of added and directly modified
484            snapshots while the second element contains a mapping of indirectly modified snapshots.
485        """
486        directly_modified = set()
487        all_indirectly_modified = set()
488
489        for s_id in dag:
490            if s_id.name in self._context_diff.modified_snapshots:
491                if self._context_diff.directly_modified(s_id.name):
492                    directly_modified.add(s_id)
493                else:
494                    all_indirectly_modified.add(s_id)
495            elif s_id in self._context_diff.added:
496                directly_modified.add(s_id)
497
498        indirectly_modified: SnapshotMapping = defaultdict(set)
499        for snapshot in directly_modified:
500            for downstream_s_id in dag.downstream(snapshot.snapshot_id):
501                if downstream_s_id in all_indirectly_modified:
502                    indirectly_modified[snapshot.snapshot_id].add(downstream_s_id)
503
504        return (
505            directly_modified,
506            indirectly_modified,
507        )
508
509    def _build_models_to_backfill(
510        self, dag: DAG[SnapshotId], restatements: t.Collection[SnapshotId]
511    ) -> t.Optional[t.Set[str]]:
512        backfill_models = (
513            self._backfill_models
514            if self._backfill_models is not None
515            else [r.name for r in restatements]
516            # Only backfill models explicitly marked for restatement.
517            if self._restate_models
518            else None
519        )
520        if backfill_models is None:
521            return None
522        return {
523            self._context_diff.snapshots[s_id].name
524            for s_id in dag.subdag(
525                *[
526                    self._model_fqn_to_snapshot[m].snapshot_id
527                    for m in backfill_models
528                    if m in self._model_fqn_to_snapshot
529                ]
530            ).sorted
531        }
532
533    def _adjust_snapshot_intervals(self) -> None:
534        for new, old in self._context_diff.modified_snapshots.values():
535            if not new.is_model or not old.is_model:
536                continue
537            is_same_version = old.version_get_or_generate() == new.version_get_or_generate()
538            if is_same_version and should_force_rebuild(old, new):
539                # If the difference between 2 snapshots requires a full rebuild,
540                # then clear the intervals for the new snapshot.
541                self._context_diff.snapshots[new.snapshot_id].intervals = []
542            elif new.snapshot_id in self._context_diff.new_snapshots:
543                new.intervals = []
544                new.dev_intervals = []
545                if is_same_version:
546                    new.merge_intervals(old)
547                    if new.is_forward_only:
548                        new.dev_intervals = new.intervals.copy()
549
550    def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
551        for s_id in sorted(directly_modified):
552            if s_id.name not in self._context_diff.modified_snapshots:
553                continue
554
555            snapshot = self._context_diff.snapshots[s_id]
556            needs_destructive_check = snapshot.needs_destructive_check(
557                self._allow_destructive_models
558            )
559            needs_additive_check = snapshot.needs_additive_check(self._allow_additive_models)
560            # should we raise/warn if this snapshot has/inherits a destructive change?
561            should_raise_or_warn = (self._is_forward_only_change(s_id) or self._forward_only) and (
562                needs_destructive_check or needs_additive_check
563            )
564
565            if not should_raise_or_warn or not snapshot.is_model:
566                continue
567
568            new, old = self._context_diff.modified_snapshots[snapshot.name]
569
570            # we must know all columns_to_types to determine whether a change is destructive
571            old_columns_to_types = old.model.columns_to_types or {}
572            new_columns_to_types = new.model.columns_to_types or {}
573
574            if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known(
575                new_columns_to_types
576            ):
577                alter_operations = t.cast(
578                    t.List[TableAlterOperation],
579                    get_schema_differ(snapshot.model.dialect).compare_columns(
580                        new.name,
581                        old_columns_to_types,
582                        new_columns_to_types,
583                        ignore_destructive=new.model.on_destructive_change.is_ignore,
584                        ignore_additive=new.model.on_additive_change.is_ignore,
585                    ),
586                )
587
588                snapshot_name = snapshot.name
589                model_dialect = snapshot.model.dialect
590
591                if needs_destructive_check and has_drop_alteration(alter_operations):
592                    self._console.log_destructive_change(
593                        snapshot_name,
594                        alter_operations,
595                        model_dialect,
596                        error=not snapshot.model.on_destructive_change.is_warn,
597                    )
598                    if snapshot.model.on_destructive_change.is_error:
599                        raise PlanError(
600                            "Plan requires a destructive change to a forward-only model."
601                        )
602
603                if needs_additive_check and has_additive_alteration(alter_operations):
604                    self._console.log_additive_change(
605                        snapshot_name,
606                        alter_operations,
607                        model_dialect,
608                        error=not snapshot.model.on_additive_change.is_warn,
609                    )
610                    if snapshot.model.on_additive_change.is_error:
611                        raise PlanError("Plan requires an additive change to a forward-only model.")
612
613    def _categorize_snapshots(
614        self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping
615    ) -> None:
616        """Automatically categorizes snapshots that can be automatically categorized and
617        returns a list of added and directly modified snapshots as well as the mapping of
618        indirectly modified snapshots.
619        """
620
621        # Iterating in DAG order since a category for a snapshot may depend on the categories
622        # assigned to its upstream dependencies.
623        for s_id in dag:
624            snapshot = self._context_diff.snapshots.get(s_id)
625
626            if not snapshot or not self._is_new_snapshot(snapshot):
627                continue
628
629            forward_only = self._forward_only or self._is_forward_only_change(s_id)
630            if forward_only and s_id.name in self._context_diff.modified_snapshots:
631                new, old = self._context_diff.modified_snapshots[s_id.name]
632                if is_breaking_kind_change(old, new) or snapshot.is_seed:
633                    # Breaking kind changes and seed changes can't be forward-only.
634                    forward_only = False
635
636            if s_id in self._choices:
637                snapshot.categorize_as(self._choices[s_id], forward_only)
638                continue
639
640            if s_id in self._context_diff.added:
641                snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
642            elif s_id.name in self._context_diff.modified_snapshots:
643                self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified)
644
645    def _categorize_snapshot(
646        self,
647        snapshot: Snapshot,
648        forward_only: bool,
649        dag: DAG[SnapshotId],
650        indirectly_modified: SnapshotMapping,
651    ) -> None:
652        s_id = snapshot.snapshot_id
653
654        if self._context_diff.directly_modified(s_id.name):
655            if self._auto_categorization_enabled:
656                new, old = self._context_diff.modified_snapshots[s_id.name]
657                if is_breaking_kind_change(old, new):
658                    snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False)
659                    return
660
661                s_id_with_missing_columns: t.Optional[SnapshotId] = None
662                this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id}
663                for downstream_s_id in this_sid_with_downstream:
664                    downstream_snapshot = self._context_diff.snapshots[downstream_s_id]
665                    if (
666                        downstream_snapshot.is_model
667                        and downstream_snapshot.model.columns_to_types is None
668                    ):
669                        s_id_with_missing_columns = downstream_s_id
670                        break
671
672                if s_id_with_missing_columns is None:
673                    change_category = categorize_change(new, old, config=self._categorizer_config)
674                    if change_category is not None:
675                        snapshot.categorize_as(change_category, forward_only)
676                else:
677                    mode = self._categorizer_config.dict().get(
678                        new.model.source_type, AutoCategorizationMode.OFF
679                    )
680                    if mode == AutoCategorizationMode.FULL:
681                        snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
682        elif self._context_diff.indirectly_modified(snapshot.name):
683            if snapshot.is_materialized_view and not forward_only:
684                # We categorize changes as breaking to allow for instantaneous switches in a virtual layer.
685                # Otherwise, there might be a potentially long downtime during MVs recreation.
686                # In the case of forward-only changes this optimization is not applicable because we want to continue
687                # using the same (existing) table version.
688                snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
689                return
690
691            all_upstream_forward_only = set()
692            all_upstream_categories = set()
693            direct_parent_categories = set()
694
695            for p_id in dag.upstream(s_id):
696                parent = self._context_diff.snapshots.get(p_id)
697
698                if parent and self._is_new_snapshot(parent):
699                    all_upstream_categories.add(parent.change_category)
700                    all_upstream_forward_only.add(parent.is_forward_only)
701                    if p_id in snapshot.parents:
702                        direct_parent_categories.add(parent.change_category)
703
704            if all_upstream_forward_only == {True} or (
705                snapshot.is_model and snapshot.model.forward_only
706            ):
707                forward_only = True
708
709            if direct_parent_categories.intersection(
710                {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
711            ):
712                snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
713            elif not direct_parent_categories:
714                snapshot.categorize_as(
715                    self._get_orphaned_indirect_change_category(snapshot), forward_only
716                )
717            elif all_upstream_categories == {SnapshotChangeCategory.METADATA}:
718                snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
719            else:
720                snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only)
721        else:
722            # Metadata updated.
723            snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
724
725    def _get_orphaned_indirect_change_category(
726        self, indirect_snapshot: Snapshot
727    ) -> SnapshotChangeCategory:
728        """Sometimes an indirectly changed downstream snapshot ends up with no directly changed parents introduced in the same plan.
729        This may happen when 2 or more parent models were changed independently in different plans and then the changes were
730        merged together and applied in a single plan. As a result, a combination of 2 or more previously changed parents produces
731        a new downstream snapshot not previously seen.
732
733        This function is used to infer the correct change category for such downstream snapshots based on change categories of their parents.
734        """
735        previous_snapshot = self._context_diff.modified_snapshots[indirect_snapshot.name][1]
736        previous_parent_snapshot_ids = {p.name: p for p in previous_snapshot.parents}
737
738        current_parent_snapshots = [
739            self._context_diff.snapshots[p_id]
740            for p_id in indirect_snapshot.parents
741            if p_id in self._context_diff.snapshots
742        ]
743
744        indirect_category: t.Optional[SnapshotChangeCategory] = None
745        for current_parent_snapshot in current_parent_snapshots:
746            if current_parent_snapshot.name not in previous_parent_snapshot_ids:
747                # This is a new parent so falling back to INDIRECT_BREAKING
748                return SnapshotChangeCategory.INDIRECT_BREAKING
749            pevious_parent_snapshot_id = previous_parent_snapshot_ids[current_parent_snapshot.name]
750
751            if current_parent_snapshot.snapshot_id == pevious_parent_snapshot_id:
752                # There were no new versions of this parent since the previous version of this snapshot,
753                # so we can skip it
754                continue
755
756            # Find the previous snapshot ID of the same parent in the historical chain
757            previous_parent_found = False
758            previous_parent_categories = set()
759            for pv in reversed(current_parent_snapshot.all_versions):
760                pv_snapshot_id = pv.snapshot_id(current_parent_snapshot.name)
761                if pv_snapshot_id == pevious_parent_snapshot_id:
762                    previous_parent_found = True
763                    break
764                previous_parent_categories.add(pv.change_category)
765
766            if not previous_parent_found:
767                # The previous parent is not in the historical chain so falling back to INDIRECT_BREAKING
768                return SnapshotChangeCategory.INDIRECT_BREAKING
769
770            if previous_parent_categories.intersection(
771                {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
772            ):
773                # One of the new parents in the chain was breaking so this indirect snapshot is breaking
774                return SnapshotChangeCategory.INDIRECT_BREAKING
775
776            if previous_parent_categories.intersection(
777                {
778                    SnapshotChangeCategory.NON_BREAKING,
779                    SnapshotChangeCategory.INDIRECT_NON_BREAKING,
780                }
781            ):
782                # All changes in the chain were non-breaking so this indirect snapshot can be non-breaking too
783                indirect_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING
784            elif (
785                previous_parent_categories == {SnapshotChangeCategory.METADATA}
786                and indirect_category is None
787            ):
788                # All changes in the chain were metadata so this indirect snapshot can be metadata too
789                indirect_category = SnapshotChangeCategory.METADATA
790
791        return indirect_category or SnapshotChangeCategory.INDIRECT_BREAKING
792
793    def _apply_effective_from(self) -> None:
794        if self._effective_from:
795            if not self._forward_only:
796                raise PlanError("Effective date can only be set for a forward-only plan.")
797            if to_datetime(self._effective_from) > now():
798                raise PlanError("Effective date cannot be in the future.")
799
800        for snapshot in self._context_diff.new_snapshots.values():
801            if (
802                snapshot.evaluatable
803                and not snapshot.disable_restatement
804                and (not snapshot.full_history_restatement_only or not snapshot.is_incremental)
805            ):
806                snapshot.effective_from = self._effective_from
807
808    def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
809        if not self._context_diff.directly_modified(
810            s_id.name
811        ) and not self._context_diff.indirectly_modified(s_id.name):
812            return False
813        snapshot = self._context_diff.snapshots[s_id]
814        if snapshot.name in self._context_diff.modified_snapshots:
815            _, old = self._context_diff.modified_snapshots[snapshot.name]
816            # If the model kind has changed in a breaking way, then we can't consider this to be a forward-only change.
817            if snapshot.is_model and is_breaking_kind_change(old, snapshot):
818                return False
819        return (
820            snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
821        )
822
823    def _is_new_snapshot(self, snapshot: Snapshot) -> bool:
824        """Returns True if the given snapshot is a new snapshot in this plan."""
825        return snapshot.snapshot_id in self._context_diff.new_snapshots
826
827    def _ensure_valid_date_range(self) -> None:
828        if (self.override_start or self.override_end) and not self.is_start_and_end_allowed:
829            raise PlanError(
830                "The start and end dates can't be set for a production plan without restatements."
831            )
832
833        if (start := self.start) and (end := self.end):
834            if to_datetime(start) > to_datetime(end):
835                raise PlanError(
836                    f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'"
837                )
838
839        if end := self.end:
840            if to_datetime(end) > to_datetime(self.execution_time):
841                raise PlanError(
842                    f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')"
843                )
844
845        # Validate model-specific start/end dates
846        if (start := self.start or self._default_start) and (end := self.end):
847            start_ts = to_datetime(start)
848            end_ts = to_datetime(end)
849            if start_ts > end_ts:
850                models_to_check: t.Set[str] = (
851                    set(self._backfill_models or [])
852                    | set(self._context_diff.modified_snapshots.keys())
853                    | {s.name for s in self._context_diff.added}
854                    | set((self._end_override_per_model or {}).keys())
855                )
856                for model_name in models_to_check:
857                    if snapshot := self._model_fqn_to_snapshot.get(model_name):
858                        if snapshot.node.start is None or to_datetime(snapshot.node.start) > end_ts:
859                            raise PlanError(
860                                f"Model '{model_name}': Start date / time '({time_like_to_str(start_ts)})' can't be greater than end date / time '({time_like_to_str(end_ts)})'.\n"
861                                f"Set the `start` attribute in your project config model defaults to avoid this issue."
862                            )
863
864    def _ensure_no_broken_references(self) -> None:
865        for snapshot in self._context_diff.snapshots.values():
866            broken_references = {
867                x.name for x in self._context_diff.removed_snapshots.values() if not x.is_external
868            } & {x for x in snapshot.node.depends_on}
869            if broken_references:
870                broken_references_msg = ", ".join(f"'{x}'" for x in broken_references)
871                raise PlanError(
872                    f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding."""
873                )
874
875    def _ensure_new_env_with_changes(self) -> None:
876        if (
877            self._is_dev
878            and not self._include_unmodified
879            and self._context_diff.is_new_environment
880            and not self._context_diff.has_snapshot_changes
881            and not self._context_diff.has_environment_statements_changes
882            and not self._backfill_models
883        ):
884            raise NoChangesPlanError(
885                f"Creating a new environment requires a change, but project files match the `{self._context_diff.create_from}` environment. Make a change or use the --include-unmodified flag to create a new environment without changes."
886            )
887
888    @cached_property
889    def _forward_only_preview_needed(self) -> bool:
890        """Determines whether the plan should compute previews for forward-only changes (if there are any)."""
891        return self._is_dev and (
892            self._forward_only
893            or (
894                self._enable_preview
895                and any(
896                    snapshot.model.forward_only
897                    for snapshot in self._modified_and_added_snapshots
898                    if snapshot.is_model
899                )
900            )
901        )
902
903    @cached_property
904    def _non_forward_only_preview_needed(self) -> bool:
905        if not self._is_dev:
906            return False
907        for snapshot in self._modified_and_added_snapshots:
908            if not snapshot.is_model:
909                continue
910            if (
911                not snapshot.virtual_environment_mode.is_full
912                or snapshot.model.auto_restatement_cron is not None
913            ):
914                return True
915        return False
916
917    @cached_property
918    def _modified_and_added_snapshots(self) -> t.List[Snapshot]:
919        return [
920            snapshot
921            for snapshot in self._context_diff.snapshots.values()
922            if snapshot.name in self._context_diff.modified_snapshots
923            or snapshot.snapshot_id in self._context_diff.added
924        ]
logger = <Logger sqlmesh.core.plan.builder (WARNING)>
class PlanBuilder:
 56class PlanBuilder:
 57    """Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes.
 58
 59    Args:
 60        context_diff: The context diff that the plan is based on.
 61        start: The start time to backfill data.
 62        end: The end time to backfill data.
 63        execution_time: The date/time time reference to use for execution time. Defaults to now.
 64            If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
 65        apply: The callback to apply the plan.
 66        restate_models: A list of models for which the data should be restated for the time range
 67            specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
 68            of the restatement.
 69        restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals
 70            being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments).
 71            If set to None, the default behaviour is to not clear anything unless the target environment is prod.
 72        backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
 73        no_gaps:  Whether to ensure that new snapshots for nodes that are already a
 74            part of the target environment have no data gaps when compared against previous
 75            snapshots for same nodes.
 76        skip_backfill: Whether to skip the backfill step.
 77        empty_backfill: Like skip_backfill, but also records processed intervals.
 78        is_dev: Whether this plan is for development purposes.
 79        forward_only: Whether the purpose of the plan is to make forward only changes.
 80        allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive.
 81        allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive.
 82        environment_ttl: The period of time that a development environment should exist before being deleted.
 83        categorizer_config: Auto categorization settings.
 84        auto_categorization_enabled: Whether to apply auto categorization.
 85        effective_from: The effective date from which to apply forward-only changes on production.
 86        include_unmodified: Indicates whether to include unmodified nodes in the target development environment.
 87        environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
 88        default_start: The default plan start to use if not specified.
 89        default_end: The default plan end to use if not specified.
 90        enable_preview: Whether to enable preview for forward-only models in development environments.
 91        end_bounded: If set to true, the missing intervals will be bounded by the target end date, disregarding lookback,
 92            allow_partials, and other attributes that could cause the intervals to exceed the target end date.
 93        ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
 94            environment state, or to use whatever snapshots are in the current environment state even if
 95            the environment is not finalized.
 96        start_override_per_model: A mapping of model FQNs to target start dates.
 97        end_override_per_model: A mapping of model FQNs to target end dates.
 98        ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
 99        explain: Whether to explain the plan instead of applying it.
100    """
101
102    def __init__(
103        self,
104        context_diff: ContextDiff,
105        start: t.Optional[TimeLike] = None,
106        end: t.Optional[TimeLike] = None,
107        execution_time: t.Optional[TimeLike] = None,
108        apply: t.Optional[t.Callable[[Plan], None]] = None,
109        restate_models: t.Optional[t.Iterable[str]] = None,
110        restate_all_snapshots: bool = False,
111        backfill_models: t.Optional[t.Iterable[str]] = None,
112        no_gaps: bool = False,
113        skip_backfill: bool = False,
114        empty_backfill: bool = False,
115        is_dev: bool = False,
116        forward_only: bool = False,
117        allow_destructive_models: t.Optional[t.Iterable[str]] = None,
118        allow_additive_models: t.Optional[t.Iterable[str]] = None,
119        environment_ttl: t.Optional[str] = None,
120        environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default,
121        environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
122        categorizer_config: t.Optional[CategorizerConfig] = None,
123        auto_categorization_enabled: bool = True,
124        effective_from: t.Optional[TimeLike] = None,
125        include_unmodified: bool = False,
126        default_start: t.Optional[TimeLike] = None,
127        default_end: t.Optional[TimeLike] = None,
128        enable_preview: bool = False,
129        end_bounded: bool = False,
130        ensure_finalized_snapshots: bool = False,
131        explain: bool = False,
132        ignore_cron: bool = False,
133        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
134        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
135        console: t.Optional[PlanBuilderConsole] = None,
136        user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
137        selected_models: t.Optional[t.Set[str]] = None,
138    ):
139        self._context_diff = context_diff
140        self._no_gaps = no_gaps
141        self._skip_backfill = skip_backfill
142        self._empty_backfill = empty_backfill
143        self._is_dev = is_dev
144        self._forward_only = forward_only
145        self._allow_destructive_models = set(
146            allow_destructive_models if allow_destructive_models is not None else []
147        )
148        self._allow_additive_models = set(
149            allow_additive_models if allow_additive_models is not None else []
150        )
151        self._enable_preview = enable_preview
152        self._end_bounded = end_bounded
153        self._ensure_finalized_snapshots = ensure_finalized_snapshots
154        self._ignore_cron = ignore_cron
155        self._start_override_per_model = start_override_per_model
156        self._end_override_per_model = end_override_per_model
157        self._environment_ttl = environment_ttl
158        self._categorizer_config = categorizer_config or CategorizerConfig()
159        self._auto_categorization_enabled = auto_categorization_enabled
160        self._include_unmodified = include_unmodified
161        self._restate_models = set(restate_models) if restate_models is not None else None
162        self._restate_all_snapshots = restate_all_snapshots
163        self._effective_from = effective_from
164
165        # note: this deliberately doesnt default to now() here.
166        # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run
167        # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None
168        # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past
169        # ref: https://github.com/SQLMesh/sqlmesh/pull/4702#discussion_r2140696156
170        self._execution_time = execution_time
171
172        self._backfill_models = backfill_models
173        self._end = end or default_end
174        self._default_start = default_start
175        self._apply = apply
176        self._console = console or get_console()
177        self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {}
178        self._user_provided_flags = user_provided_flags
179        self._selected_models = selected_models
180        self._explain = explain
181
182        self._start = start
183        if not self._start and (
184            self._forward_only_preview_needed or self._non_forward_only_preview_needed
185        ):
186            self._start = default_start or yesterday_ds()
187
188        self._plan_id: str = random_id()
189        self._model_fqn_to_snapshot = {s.name: s for s in self._context_diff.snapshots.values()}
190
191        self.override_start = start is not None
192        self.override_end = end is not None
193        self.environment_naming_info = EnvironmentNamingInfo.from_environment_catalog_mapping(
194            environment_catalog_mapping or {},
195            name=self._context_diff.environment,
196            suffix_target=environment_suffix_target,
197            normalize_name=self._context_diff.normalize_environment_name,
198            gateway_managed=self._context_diff.gateway_managed_virtual_layer,
199        )
200
201        self._latest_plan: t.Optional[Plan] = None
202
203    @property
204    def is_start_and_end_allowed(self) -> bool:
205        """Indicates whether this plan allows to set the start and end dates."""
206        return self._is_dev or bool(self._restate_models)
207
208    @property
209    def start(self) -> t.Optional[TimeLike]:
210        if self._start and is_relative(self._start):
211            # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
212            return to_datetime(self._start, relative_base=to_datetime(self.execution_time))
213        return self._start
214
215    @property
216    def end(self) -> t.Optional[TimeLike]:
217        if self._end and is_relative(self._end):
218            # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
219            return to_datetime(self._end, relative_base=to_datetime(self.execution_time))
220        return self._end
221
222    @cached_property
223    def execution_time(self) -> TimeLike:
224        # this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings
225        # during the plan building process
226        return self._execution_time or now()
227
228    def set_start(self, new_start: TimeLike) -> PlanBuilder:
229        self._start = new_start
230        self.override_start = True
231        self._latest_plan = None
232        return self
233
234    def set_end(self, new_end: TimeLike) -> PlanBuilder:
235        self._end = new_end
236        self.override_end = True
237        self._latest_plan = None
238        return self
239
240    def set_effective_from(self, effective_from: t.Optional[TimeLike]) -> PlanBuilder:
241        """Sets the effective date for all new snapshots in the plan.
242
243        Note: this is only applicable for forward-only plans.
244
245        Args:
246            effective_from: The effective date to set.
247        """
248        self._effective_from = effective_from
249        if effective_from and self._is_dev and not self.override_start:
250            self._start = effective_from
251        self._latest_plan = None
252        return self
253
254    def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> PlanBuilder:
255        """Sets a snapshot version based on the user choice.
256
257        Args:
258            snapshot: The target snapshot.
259            choice: The user decision on how to version the target snapshot and its children.
260        """
261        if not self._is_new_snapshot(snapshot):
262            raise PlanError(
263                f"A choice can't be changed for the existing version of {snapshot.name}."
264            )
265        if (
266            not self._context_diff.directly_modified(snapshot.name)
267            and snapshot.snapshot_id not in self._context_diff.added
268        ):
269            raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).")
270
271        self._choices[snapshot.snapshot_id] = choice
272        self._latest_plan = None
273        return self
274
275    def apply(self) -> None:
276        """Builds and applies the plan."""
277        if not self._apply:
278            raise PlanError("Plan was not initialized with an applier.")
279        self._apply(self.build())
280
281    def build(self) -> Plan:
282        """Builds the plan."""
283        if self._latest_plan:
284            return self._latest_plan
285
286        self._ensure_new_env_with_changes()
287        self._ensure_valid_date_range()
288        self._ensure_no_broken_references()
289
290        self._apply_effective_from()
291
292        dag = self._build_dag()
293        directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag)
294
295        self._check_destructive_additive_changes(directly_modified)
296        self._categorize_snapshots(dag, indirectly_modified)
297        self._adjust_snapshot_intervals()
298
299        deployability_index = (
300            DeployabilityIndex.create(
301                self._context_diff.snapshots.values(),
302                start=self._start,
303                start_override_per_model=self._start_override_per_model,
304            )
305            if self._is_dev
306            else DeployabilityIndex.all_deployable()
307        )
308
309        restatements = self._build_restatements(
310            dag,
311            earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
312        )
313        models_to_backfill = self._build_models_to_backfill(dag, restatements)
314
315        end_override_per_model = self._end_override_per_model
316        if end_override_per_model and self.override_end:
317            # If the end date was provided explicitly by a user, then interval end for each individual
318            # model should be ignored.
319            end_override_per_model = None
320
321        # this deliberately uses the passed in self._execution_time and not self.execution_time cached property
322        # the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
323        # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to
324        # the current timestamp of when the Plan is eventually run
325        plan_execution_time = self._execution_time
326
327        plan = Plan(
328            context_diff=self._context_diff,
329            plan_id=self._plan_id,
330            provided_start=self.start,
331            provided_end=self.end,
332            is_dev=self._is_dev,
333            skip_backfill=self._skip_backfill,
334            empty_backfill=self._empty_backfill,
335            no_gaps=self._no_gaps,
336            forward_only=self._forward_only,
337            explain=self._explain,
338            allow_destructive_models=t.cast(t.Set, self._allow_destructive_models),
339            allow_additive_models=t.cast(t.Set, self._allow_additive_models),
340            include_unmodified=self._include_unmodified,
341            environment_ttl=self._environment_ttl,
342            environment_naming_info=self.environment_naming_info,
343            directly_modified=directly_modified,
344            indirectly_modified=indirectly_modified,
345            deployability_index=deployability_index,
346            selected_models_to_restate=self._restate_models,
347            restatements=restatements,
348            restate_all_snapshots=self._restate_all_snapshots,
349            start_override_per_model=self._start_override_per_model,
350            end_override_per_model=end_override_per_model,
351            selected_models_to_backfill=self._backfill_models,
352            models_to_backfill=models_to_backfill,
353            effective_from=self._effective_from,
354            execution_time=plan_execution_time,
355            end_bounded=self._end_bounded,
356            ensure_finalized_snapshots=self._ensure_finalized_snapshots,
357            ignore_cron=self._ignore_cron,
358            user_provided_flags=self._user_provided_flags,
359            selected_models=self._selected_models,
360        )
361        self._latest_plan = plan
362        return plan
363
364    def _build_dag(self) -> DAG[SnapshotId]:
365        dag: DAG[SnapshotId] = DAG()
366        for s_id, context_snapshot in self._context_diff.snapshots.items():
367            dag.add(s_id, context_snapshot.parents)
368        return dag
369
370    def _build_restatements(
371        self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
372    ) -> t.Dict[SnapshotId, Interval]:
373        restate_models = self._restate_models
374        if restate_models == set():
375            # This is a warning but we print this as error since the Console is lacking API for warnings.
376            self._console.log_error(
377                "Provided restated models do not match any models. No models will be included in plan."
378            )
379            return {}
380
381        restatements: t.Dict[SnapshotId, Interval] = {}
382        forward_only_preview_needed = self._forward_only_preview_needed
383        is_preview = False
384        if not restate_models and forward_only_preview_needed:
385            # Add model names for new forward-only snapshots to the restatement list
386            # in order to compute previews.
387            restate_models = {
388                s.name
389                for s in self._context_diff.new_snapshots.values()
390                if s.is_model
391                and not s.is_symbolic
392                and (s.is_forward_only or s.model.forward_only)
393                and not s.is_no_preview
394                and (
395                    # Metadata changes should not be previewed.
396                    self._context_diff.directly_modified(s.name)
397                    or self._context_diff.indirectly_modified(s.name)
398                )
399            }
400            is_preview = True
401
402        if not restate_models:
403            return {}
404
405        start = self._start or earliest_interval_start
406        end = self._end or now()
407
408        # Add restate snapshots and their downstream snapshots
409        for model_fqn in restate_models:
410            if model_fqn not in self._model_fqn_to_snapshot:
411                raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.")
412
413        # Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's
414        # restatement range that it's downstream dependencies all expand their restatement ranges as well.
415        for s_id in dag:
416            snapshot = self._context_diff.snapshots[s_id]
417
418            if is_preview and snapshot.is_no_preview:
419                continue
420
421            # Since we are traversing the graph in topological order and the largest interval range is pushed down
422            # the graph we just have to check our immediate parents in the graph and not the whole upstream graph.
423            restating_parents = [
424                self._context_diff.snapshots[s] for s in snapshot.parents if s in restatements
425            ]
426
427            if not restating_parents and snapshot.name not in restate_models:
428                continue
429
430            if not forward_only_preview_needed:
431                if self._is_dev and not snapshot.is_paused:
432                    self._console.log_warning(
433                        f"Cannot restate model '{snapshot.name}' because the current version is used in production. "
434                        "Run the restatement against the production environment instead to restate this model."
435                    )
436                    continue
437                elif (not self._is_dev or not snapshot.is_paused) and snapshot.disable_restatement:
438                    self._console.log_warning(
439                        f"Cannot restate model '{snapshot.name}'. "
440                        "Restatement is disabled for this model to prevent possible data loss. "
441                        "If you want to restate this model, change the model's `disable_restatement` setting to `false`."
442                    )
443                    continue
444                elif snapshot.is_seed:
445                    logger.info("Skipping restatement for model '%s'", snapshot.name)
446                    continue
447
448            possible_intervals = {
449                restatements[p.snapshot_id] for p in restating_parents if p.is_incremental
450            }
451            possible_intervals.add(
452                snapshot.get_removal_interval(
453                    start,
454                    end,
455                    self._execution_time,
456                    strict=False,
457                    is_preview=is_preview,
458                )
459            )
460            snapshot_start = min(i[0] for i in possible_intervals)
461            snapshot_end = max(i[1] for i in possible_intervals)
462
463            # We may be tasked with restating a time range smaller than the target snapshot interval unit
464            # For example, restating an hour of Hourly Model A, which has a downstream dependency of Daily Model B
465            # we need to ensure the whole affected day in Model B is restated
466            floored_snapshot_start = snapshot.node.interval_unit.cron_floor(snapshot_start)
467            floored_snapshot_end = snapshot.node.interval_unit.cron_floor(snapshot_end)
468            if to_timestamp(floored_snapshot_end) < snapshot_end:
469                snapshot_start = to_timestamp(floored_snapshot_start)
470                snapshot_end = to_timestamp(
471                    snapshot.node.interval_unit.cron_next(floored_snapshot_end)
472                )
473
474            restatements[s_id] = (snapshot_start, snapshot_end)
475
476        return restatements
477
478    def _build_directly_and_indirectly_modified(
479        self, dag: DAG[SnapshotId]
480    ) -> t.Tuple[t.Set[SnapshotId], SnapshotMapping]:
481        """Builds collections of directly and indirectly modified snapshots.
482
483        Returns:
484            The tuple in which the first element contains a list of added and directly modified
485            snapshots while the second element contains a mapping of indirectly modified snapshots.
486        """
487        directly_modified = set()
488        all_indirectly_modified = set()
489
490        for s_id in dag:
491            if s_id.name in self._context_diff.modified_snapshots:
492                if self._context_diff.directly_modified(s_id.name):
493                    directly_modified.add(s_id)
494                else:
495                    all_indirectly_modified.add(s_id)
496            elif s_id in self._context_diff.added:
497                directly_modified.add(s_id)
498
499        indirectly_modified: SnapshotMapping = defaultdict(set)
500        for snapshot in directly_modified:
501            for downstream_s_id in dag.downstream(snapshot.snapshot_id):
502                if downstream_s_id in all_indirectly_modified:
503                    indirectly_modified[snapshot.snapshot_id].add(downstream_s_id)
504
505        return (
506            directly_modified,
507            indirectly_modified,
508        )
509
510    def _build_models_to_backfill(
511        self, dag: DAG[SnapshotId], restatements: t.Collection[SnapshotId]
512    ) -> t.Optional[t.Set[str]]:
513        backfill_models = (
514            self._backfill_models
515            if self._backfill_models is not None
516            else [r.name for r in restatements]
517            # Only backfill models explicitly marked for restatement.
518            if self._restate_models
519            else None
520        )
521        if backfill_models is None:
522            return None
523        return {
524            self._context_diff.snapshots[s_id].name
525            for s_id in dag.subdag(
526                *[
527                    self._model_fqn_to_snapshot[m].snapshot_id
528                    for m in backfill_models
529                    if m in self._model_fqn_to_snapshot
530                ]
531            ).sorted
532        }
533
534    def _adjust_snapshot_intervals(self) -> None:
535        for new, old in self._context_diff.modified_snapshots.values():
536            if not new.is_model or not old.is_model:
537                continue
538            is_same_version = old.version_get_or_generate() == new.version_get_or_generate()
539            if is_same_version and should_force_rebuild(old, new):
540                # If the difference between 2 snapshots requires a full rebuild,
541                # then clear the intervals for the new snapshot.
542                self._context_diff.snapshots[new.snapshot_id].intervals = []
543            elif new.snapshot_id in self._context_diff.new_snapshots:
544                new.intervals = []
545                new.dev_intervals = []
546                if is_same_version:
547                    new.merge_intervals(old)
548                    if new.is_forward_only:
549                        new.dev_intervals = new.intervals.copy()
550
551    def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
552        for s_id in sorted(directly_modified):
553            if s_id.name not in self._context_diff.modified_snapshots:
554                continue
555
556            snapshot = self._context_diff.snapshots[s_id]
557            needs_destructive_check = snapshot.needs_destructive_check(
558                self._allow_destructive_models
559            )
560            needs_additive_check = snapshot.needs_additive_check(self._allow_additive_models)
561            # should we raise/warn if this snapshot has/inherits a destructive change?
562            should_raise_or_warn = (self._is_forward_only_change(s_id) or self._forward_only) and (
563                needs_destructive_check or needs_additive_check
564            )
565
566            if not should_raise_or_warn or not snapshot.is_model:
567                continue
568
569            new, old = self._context_diff.modified_snapshots[snapshot.name]
570
571            # we must know all columns_to_types to determine whether a change is destructive
572            old_columns_to_types = old.model.columns_to_types or {}
573            new_columns_to_types = new.model.columns_to_types or {}
574
575            if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known(
576                new_columns_to_types
577            ):
578                alter_operations = t.cast(
579                    t.List[TableAlterOperation],
580                    get_schema_differ(snapshot.model.dialect).compare_columns(
581                        new.name,
582                        old_columns_to_types,
583                        new_columns_to_types,
584                        ignore_destructive=new.model.on_destructive_change.is_ignore,
585                        ignore_additive=new.model.on_additive_change.is_ignore,
586                    ),
587                )
588
589                snapshot_name = snapshot.name
590                model_dialect = snapshot.model.dialect
591
592                if needs_destructive_check and has_drop_alteration(alter_operations):
593                    self._console.log_destructive_change(
594                        snapshot_name,
595                        alter_operations,
596                        model_dialect,
597                        error=not snapshot.model.on_destructive_change.is_warn,
598                    )
599                    if snapshot.model.on_destructive_change.is_error:
600                        raise PlanError(
601                            "Plan requires a destructive change to a forward-only model."
602                        )
603
604                if needs_additive_check and has_additive_alteration(alter_operations):
605                    self._console.log_additive_change(
606                        snapshot_name,
607                        alter_operations,
608                        model_dialect,
609                        error=not snapshot.model.on_additive_change.is_warn,
610                    )
611                    if snapshot.model.on_additive_change.is_error:
612                        raise PlanError("Plan requires an additive change to a forward-only model.")
613
614    def _categorize_snapshots(
615        self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping
616    ) -> None:
617        """Automatically categorizes snapshots that can be automatically categorized and
618        returns a list of added and directly modified snapshots as well as the mapping of
619        indirectly modified snapshots.
620        """
621
622        # Iterating in DAG order since a category for a snapshot may depend on the categories
623        # assigned to its upstream dependencies.
624        for s_id in dag:
625            snapshot = self._context_diff.snapshots.get(s_id)
626
627            if not snapshot or not self._is_new_snapshot(snapshot):
628                continue
629
630            forward_only = self._forward_only or self._is_forward_only_change(s_id)
631            if forward_only and s_id.name in self._context_diff.modified_snapshots:
632                new, old = self._context_diff.modified_snapshots[s_id.name]
633                if is_breaking_kind_change(old, new) or snapshot.is_seed:
634                    # Breaking kind changes and seed changes can't be forward-only.
635                    forward_only = False
636
637            if s_id in self._choices:
638                snapshot.categorize_as(self._choices[s_id], forward_only)
639                continue
640
641            if s_id in self._context_diff.added:
642                snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
643            elif s_id.name in self._context_diff.modified_snapshots:
644                self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified)
645
646    def _categorize_snapshot(
647        self,
648        snapshot: Snapshot,
649        forward_only: bool,
650        dag: DAG[SnapshotId],
651        indirectly_modified: SnapshotMapping,
652    ) -> None:
653        s_id = snapshot.snapshot_id
654
655        if self._context_diff.directly_modified(s_id.name):
656            if self._auto_categorization_enabled:
657                new, old = self._context_diff.modified_snapshots[s_id.name]
658                if is_breaking_kind_change(old, new):
659                    snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False)
660                    return
661
662                s_id_with_missing_columns: t.Optional[SnapshotId] = None
663                this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id}
664                for downstream_s_id in this_sid_with_downstream:
665                    downstream_snapshot = self._context_diff.snapshots[downstream_s_id]
666                    if (
667                        downstream_snapshot.is_model
668                        and downstream_snapshot.model.columns_to_types is None
669                    ):
670                        s_id_with_missing_columns = downstream_s_id
671                        break
672
673                if s_id_with_missing_columns is None:
674                    change_category = categorize_change(new, old, config=self._categorizer_config)
675                    if change_category is not None:
676                        snapshot.categorize_as(change_category, forward_only)
677                else:
678                    mode = self._categorizer_config.dict().get(
679                        new.model.source_type, AutoCategorizationMode.OFF
680                    )
681                    if mode == AutoCategorizationMode.FULL:
682                        snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
683        elif self._context_diff.indirectly_modified(snapshot.name):
684            if snapshot.is_materialized_view and not forward_only:
685                # We categorize changes as breaking to allow for instantaneous switches in a virtual layer.
686                # Otherwise, there might be a potentially long downtime during MVs recreation.
687                # In the case of forward-only changes this optimization is not applicable because we want to continue
688                # using the same (existing) table version.
689                snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
690                return
691
692            all_upstream_forward_only = set()
693            all_upstream_categories = set()
694            direct_parent_categories = set()
695
696            for p_id in dag.upstream(s_id):
697                parent = self._context_diff.snapshots.get(p_id)
698
699                if parent and self._is_new_snapshot(parent):
700                    all_upstream_categories.add(parent.change_category)
701                    all_upstream_forward_only.add(parent.is_forward_only)
702                    if p_id in snapshot.parents:
703                        direct_parent_categories.add(parent.change_category)
704
705            if all_upstream_forward_only == {True} or (
706                snapshot.is_model and snapshot.model.forward_only
707            ):
708                forward_only = True
709
710            if direct_parent_categories.intersection(
711                {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
712            ):
713                snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
714            elif not direct_parent_categories:
715                snapshot.categorize_as(
716                    self._get_orphaned_indirect_change_category(snapshot), forward_only
717                )
718            elif all_upstream_categories == {SnapshotChangeCategory.METADATA}:
719                snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
720            else:
721                snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only)
722        else:
723            # Metadata updated.
724            snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
725
726    def _get_orphaned_indirect_change_category(
727        self, indirect_snapshot: Snapshot
728    ) -> SnapshotChangeCategory:
729        """Sometimes an indirectly changed downstream snapshot ends up with no directly changed parents introduced in the same plan.
730        This may happen when 2 or more parent models were changed independently in different plans and then the changes were
731        merged together and applied in a single plan. As a result, a combination of 2 or more previously changed parents produces
732        a new downstream snapshot not previously seen.
733
734        This function is used to infer the correct change category for such downstream snapshots based on change categories of their parents.
735        """
736        previous_snapshot = self._context_diff.modified_snapshots[indirect_snapshot.name][1]
737        previous_parent_snapshot_ids = {p.name: p for p in previous_snapshot.parents}
738
739        current_parent_snapshots = [
740            self._context_diff.snapshots[p_id]
741            for p_id in indirect_snapshot.parents
742            if p_id in self._context_diff.snapshots
743        ]
744
745        indirect_category: t.Optional[SnapshotChangeCategory] = None
746        for current_parent_snapshot in current_parent_snapshots:
747            if current_parent_snapshot.name not in previous_parent_snapshot_ids:
748                # This is a new parent so falling back to INDIRECT_BREAKING
749                return SnapshotChangeCategory.INDIRECT_BREAKING
750            pevious_parent_snapshot_id = previous_parent_snapshot_ids[current_parent_snapshot.name]
751
752            if current_parent_snapshot.snapshot_id == pevious_parent_snapshot_id:
753                # There were no new versions of this parent since the previous version of this snapshot,
754                # so we can skip it
755                continue
756
757            # Find the previous snapshot ID of the same parent in the historical chain
758            previous_parent_found = False
759            previous_parent_categories = set()
760            for pv in reversed(current_parent_snapshot.all_versions):
761                pv_snapshot_id = pv.snapshot_id(current_parent_snapshot.name)
762                if pv_snapshot_id == pevious_parent_snapshot_id:
763                    previous_parent_found = True
764                    break
765                previous_parent_categories.add(pv.change_category)
766
767            if not previous_parent_found:
768                # The previous parent is not in the historical chain so falling back to INDIRECT_BREAKING
769                return SnapshotChangeCategory.INDIRECT_BREAKING
770
771            if previous_parent_categories.intersection(
772                {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
773            ):
774                # One of the new parents in the chain was breaking so this indirect snapshot is breaking
775                return SnapshotChangeCategory.INDIRECT_BREAKING
776
777            if previous_parent_categories.intersection(
778                {
779                    SnapshotChangeCategory.NON_BREAKING,
780                    SnapshotChangeCategory.INDIRECT_NON_BREAKING,
781                }
782            ):
783                # All changes in the chain were non-breaking so this indirect snapshot can be non-breaking too
784                indirect_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING
785            elif (
786                previous_parent_categories == {SnapshotChangeCategory.METADATA}
787                and indirect_category is None
788            ):
789                # All changes in the chain were metadata so this indirect snapshot can be metadata too
790                indirect_category = SnapshotChangeCategory.METADATA
791
792        return indirect_category or SnapshotChangeCategory.INDIRECT_BREAKING
793
794    def _apply_effective_from(self) -> None:
795        if self._effective_from:
796            if not self._forward_only:
797                raise PlanError("Effective date can only be set for a forward-only plan.")
798            if to_datetime(self._effective_from) > now():
799                raise PlanError("Effective date cannot be in the future.")
800
801        for snapshot in self._context_diff.new_snapshots.values():
802            if (
803                snapshot.evaluatable
804                and not snapshot.disable_restatement
805                and (not snapshot.full_history_restatement_only or not snapshot.is_incremental)
806            ):
807                snapshot.effective_from = self._effective_from
808
809    def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
810        if not self._context_diff.directly_modified(
811            s_id.name
812        ) and not self._context_diff.indirectly_modified(s_id.name):
813            return False
814        snapshot = self._context_diff.snapshots[s_id]
815        if snapshot.name in self._context_diff.modified_snapshots:
816            _, old = self._context_diff.modified_snapshots[snapshot.name]
817            # If the model kind has changed in a breaking way, then we can't consider this to be a forward-only change.
818            if snapshot.is_model and is_breaking_kind_change(old, snapshot):
819                return False
820        return (
821            snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
822        )
823
824    def _is_new_snapshot(self, snapshot: Snapshot) -> bool:
825        """Returns True if the given snapshot is a new snapshot in this plan."""
826        return snapshot.snapshot_id in self._context_diff.new_snapshots
827
828    def _ensure_valid_date_range(self) -> None:
829        if (self.override_start or self.override_end) and not self.is_start_and_end_allowed:
830            raise PlanError(
831                "The start and end dates can't be set for a production plan without restatements."
832            )
833
834        if (start := self.start) and (end := self.end):
835            if to_datetime(start) > to_datetime(end):
836                raise PlanError(
837                    f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'"
838                )
839
840        if end := self.end:
841            if to_datetime(end) > to_datetime(self.execution_time):
842                raise PlanError(
843                    f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')"
844                )
845
846        # Validate model-specific start/end dates
847        if (start := self.start or self._default_start) and (end := self.end):
848            start_ts = to_datetime(start)
849            end_ts = to_datetime(end)
850            if start_ts > end_ts:
851                models_to_check: t.Set[str] = (
852                    set(self._backfill_models or [])
853                    | set(self._context_diff.modified_snapshots.keys())
854                    | {s.name for s in self._context_diff.added}
855                    | set((self._end_override_per_model or {}).keys())
856                )
857                for model_name in models_to_check:
858                    if snapshot := self._model_fqn_to_snapshot.get(model_name):
859                        if snapshot.node.start is None or to_datetime(snapshot.node.start) > end_ts:
860                            raise PlanError(
861                                f"Model '{model_name}': Start date / time '({time_like_to_str(start_ts)})' can't be greater than end date / time '({time_like_to_str(end_ts)})'.\n"
862                                f"Set the `start` attribute in your project config model defaults to avoid this issue."
863                            )
864
865    def _ensure_no_broken_references(self) -> None:
866        for snapshot in self._context_diff.snapshots.values():
867            broken_references = {
868                x.name for x in self._context_diff.removed_snapshots.values() if not x.is_external
869            } & {x for x in snapshot.node.depends_on}
870            if broken_references:
871                broken_references_msg = ", ".join(f"'{x}'" for x in broken_references)
872                raise PlanError(
873                    f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding."""
874                )
875
876    def _ensure_new_env_with_changes(self) -> None:
877        if (
878            self._is_dev
879            and not self._include_unmodified
880            and self._context_diff.is_new_environment
881            and not self._context_diff.has_snapshot_changes
882            and not self._context_diff.has_environment_statements_changes
883            and not self._backfill_models
884        ):
885            raise NoChangesPlanError(
886                f"Creating a new environment requires a change, but project files match the `{self._context_diff.create_from}` environment. Make a change or use the --include-unmodified flag to create a new environment without changes."
887            )
888
889    @cached_property
890    def _forward_only_preview_needed(self) -> bool:
891        """Determines whether the plan should compute previews for forward-only changes (if there are any)."""
892        return self._is_dev and (
893            self._forward_only
894            or (
895                self._enable_preview
896                and any(
897                    snapshot.model.forward_only
898                    for snapshot in self._modified_and_added_snapshots
899                    if snapshot.is_model
900                )
901            )
902        )
903
904    @cached_property
905    def _non_forward_only_preview_needed(self) -> bool:
906        if not self._is_dev:
907            return False
908        for snapshot in self._modified_and_added_snapshots:
909            if not snapshot.is_model:
910                continue
911            if (
912                not snapshot.virtual_environment_mode.is_full
913                or snapshot.model.auto_restatement_cron is not None
914            ):
915                return True
916        return False
917
918    @cached_property
919    def _modified_and_added_snapshots(self) -> t.List[Snapshot]:
920        return [
921            snapshot
922            for snapshot in self._context_diff.snapshots.values()
923            if snapshot.name in self._context_diff.modified_snapshots
924            or snapshot.snapshot_id in self._context_diff.added
925        ]

Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes.

Arguments:
  • context_diff: The context diff that the plan is based on.
  • start: The start time to backfill data.
  • end: The end time to backfill data.
  • execution_time: The date/time time reference to use for execution time. Defaults to now. If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time
  • apply: The callback to apply the plan.
  • restate_models: A list of models for which the data should be restated for the time range specified in this plan. Note: models defined outside SQLMesh (external) won't be a part of the restatement.
  • restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments). If set to None, the default behaviour is to not clear anything unless the target environment is prod.
  • backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
  • no_gaps: Whether to ensure that new snapshots for nodes that are already a part of the target environment have no data gaps when compared against previous snapshots for same nodes.
  • skip_backfill: Whether to skip the backfill step.
  • empty_backfill: Like skip_backfill, but also records processed intervals.
  • is_dev: Whether this plan is for development purposes.
  • forward_only: Whether the purpose of the plan is to make forward only changes.
  • allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive.
  • allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive.
  • environment_ttl: The period of time that a development environment should exist before being deleted.
  • categorizer_config: Auto categorization settings.
  • auto_categorization_enabled: Whether to apply auto categorization.
  • effective_from: The effective date from which to apply forward-only changes on production.
  • include_unmodified: Indicates whether to include unmodified nodes in the target development environment.
  • environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
  • default_start: The default plan start to use if not specified.
  • default_end: The default plan end to use if not specified.
  • enable_preview: Whether to enable preview for forward-only models in development environments.
  • end_bounded: If set to true, the missing intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
  • ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
  • start_override_per_model: A mapping of model FQNs to target start dates.
  • end_override_per_model: A mapping of model FQNs to target end dates.
  • ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
  • explain: Whether to explain the plan instead of applying it.
PlanBuilder( context_diff: sqlmesh.core.context_diff.ContextDiff, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, apply: Optional[Callable[[sqlmesh.core.plan.definition.Plan], NoneType]] = None, restate_models: Optional[Iterable[str]] = None, restate_all_snapshots: bool = False, backfill_models: Optional[Iterable[str]] = None, no_gaps: bool = False, skip_backfill: bool = False, empty_backfill: bool = False, is_dev: bool = False, forward_only: bool = False, allow_destructive_models: Optional[Iterable[str]] = None, allow_additive_models: Optional[Iterable[str]] = None, environment_ttl: Optional[str] = None, environment_suffix_target: sqlmesh.core.config.common.EnvironmentSuffixTarget = SCHEMA, environment_catalog_mapping: Optional[Dict[re.Pattern, str]] = None, categorizer_config: Optional[sqlmesh.core.config.categorizer.CategorizerConfig] = None, auto_categorization_enabled: bool = True, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, include_unmodified: bool = False, default_start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, default_end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, enable_preview: bool = False, end_bounded: bool = False, ensure_finalized_snapshots: bool = False, explain: bool = False, ignore_cron: bool = False, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, console: Optional[sqlmesh.core.console.PlanBuilderConsole] = None, user_provided_flags: Optional[Dict[str, Union[datetime.date, datetime.datetime, str, int, float, bool, List[str]]]] = None, selected_models: Optional[Set[str]] = None)
102    def __init__(
103        self,
104        context_diff: ContextDiff,
105        start: t.Optional[TimeLike] = None,
106        end: t.Optional[TimeLike] = None,
107        execution_time: t.Optional[TimeLike] = None,
108        apply: t.Optional[t.Callable[[Plan], None]] = None,
109        restate_models: t.Optional[t.Iterable[str]] = None,
110        restate_all_snapshots: bool = False,
111        backfill_models: t.Optional[t.Iterable[str]] = None,
112        no_gaps: bool = False,
113        skip_backfill: bool = False,
114        empty_backfill: bool = False,
115        is_dev: bool = False,
116        forward_only: bool = False,
117        allow_destructive_models: t.Optional[t.Iterable[str]] = None,
118        allow_additive_models: t.Optional[t.Iterable[str]] = None,
119        environment_ttl: t.Optional[str] = None,
120        environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default,
121        environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
122        categorizer_config: t.Optional[CategorizerConfig] = None,
123        auto_categorization_enabled: bool = True,
124        effective_from: t.Optional[TimeLike] = None,
125        include_unmodified: bool = False,
126        default_start: t.Optional[TimeLike] = None,
127        default_end: t.Optional[TimeLike] = None,
128        enable_preview: bool = False,
129        end_bounded: bool = False,
130        ensure_finalized_snapshots: bool = False,
131        explain: bool = False,
132        ignore_cron: bool = False,
133        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
134        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
135        console: t.Optional[PlanBuilderConsole] = None,
136        user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
137        selected_models: t.Optional[t.Set[str]] = None,
138    ):
139        self._context_diff = context_diff
140        self._no_gaps = no_gaps
141        self._skip_backfill = skip_backfill
142        self._empty_backfill = empty_backfill
143        self._is_dev = is_dev
144        self._forward_only = forward_only
145        self._allow_destructive_models = set(
146            allow_destructive_models if allow_destructive_models is not None else []
147        )
148        self._allow_additive_models = set(
149            allow_additive_models if allow_additive_models is not None else []
150        )
151        self._enable_preview = enable_preview
152        self._end_bounded = end_bounded
153        self._ensure_finalized_snapshots = ensure_finalized_snapshots
154        self._ignore_cron = ignore_cron
155        self._start_override_per_model = start_override_per_model
156        self._end_override_per_model = end_override_per_model
157        self._environment_ttl = environment_ttl
158        self._categorizer_config = categorizer_config or CategorizerConfig()
159        self._auto_categorization_enabled = auto_categorization_enabled
160        self._include_unmodified = include_unmodified
161        self._restate_models = set(restate_models) if restate_models is not None else None
162        self._restate_all_snapshots = restate_all_snapshots
163        self._effective_from = effective_from
164
165        # note: this deliberately doesnt default to now() here.
166        # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run
167        # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None
168        # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past
169        # ref: https://github.com/SQLMesh/sqlmesh/pull/4702#discussion_r2140696156
170        self._execution_time = execution_time
171
172        self._backfill_models = backfill_models
173        self._end = end or default_end
174        self._default_start = default_start
175        self._apply = apply
176        self._console = console or get_console()
177        self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {}
178        self._user_provided_flags = user_provided_flags
179        self._selected_models = selected_models
180        self._explain = explain
181
182        self._start = start
183        if not self._start and (
184            self._forward_only_preview_needed or self._non_forward_only_preview_needed
185        ):
186            self._start = default_start or yesterday_ds()
187
188        self._plan_id: str = random_id()
189        self._model_fqn_to_snapshot = {s.name: s for s in self._context_diff.snapshots.values()}
190
191        self.override_start = start is not None
192        self.override_end = end is not None
193        self.environment_naming_info = EnvironmentNamingInfo.from_environment_catalog_mapping(
194            environment_catalog_mapping or {},
195            name=self._context_diff.environment,
196            suffix_target=environment_suffix_target,
197            normalize_name=self._context_diff.normalize_environment_name,
198            gateway_managed=self._context_diff.gateway_managed_virtual_layer,
199        )
200
201        self._latest_plan: t.Optional[Plan] = None
override_start
override_end
environment_naming_info
is_start_and_end_allowed: bool
203    @property
204    def is_start_and_end_allowed(self) -> bool:
205        """Indicates whether this plan allows to set the start and end dates."""
206        return self._is_dev or bool(self._restate_models)

Indicates whether this plan allows to set the start and end dates.

start: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
208    @property
209    def start(self) -> t.Optional[TimeLike]:
210        if self._start and is_relative(self._start):
211            # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
212            return to_datetime(self._start, relative_base=to_datetime(self.execution_time))
213        return self._start
end: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
215    @property
216    def end(self) -> t.Optional[TimeLike]:
217        if self._end and is_relative(self._end):
218            # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00'
219            return to_datetime(self._end, relative_base=to_datetime(self.execution_time))
220        return self._end
execution_time: Union[datetime.date, datetime.datetime, str, int, float]
222    @cached_property
223    def execution_time(self) -> TimeLike:
224        # this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings
225        # during the plan building process
226        return self._execution_time or now()
def set_start( self, new_start: Union[datetime.date, datetime.datetime, str, int, float]) -> PlanBuilder:
228    def set_start(self, new_start: TimeLike) -> PlanBuilder:
229        self._start = new_start
230        self.override_start = True
231        self._latest_plan = None
232        return self
def set_end( self, new_end: Union[datetime.date, datetime.datetime, str, int, float]) -> PlanBuilder:
234    def set_end(self, new_end: TimeLike) -> PlanBuilder:
235        self._end = new_end
236        self.override_end = True
237        self._latest_plan = None
238        return self
def set_effective_from( self, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType]) -> PlanBuilder:
240    def set_effective_from(self, effective_from: t.Optional[TimeLike]) -> PlanBuilder:
241        """Sets the effective date for all new snapshots in the plan.
242
243        Note: this is only applicable for forward-only plans.
244
245        Args:
246            effective_from: The effective date to set.
247        """
248        self._effective_from = effective_from
249        if effective_from and self._is_dev and not self.override_start:
250            self._start = effective_from
251        self._latest_plan = None
252        return self

Sets the effective date for all new snapshots in the plan.

Note: this is only applicable for forward-only plans.

Arguments:
  • effective_from: The effective date to set.
254    def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> PlanBuilder:
255        """Sets a snapshot version based on the user choice.
256
257        Args:
258            snapshot: The target snapshot.
259            choice: The user decision on how to version the target snapshot and its children.
260        """
261        if not self._is_new_snapshot(snapshot):
262            raise PlanError(
263                f"A choice can't be changed for the existing version of {snapshot.name}."
264            )
265        if (
266            not self._context_diff.directly_modified(snapshot.name)
267            and snapshot.snapshot_id not in self._context_diff.added
268        ):
269            raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).")
270
271        self._choices[snapshot.snapshot_id] = choice
272        self._latest_plan = None
273        return self

Sets a snapshot version based on the user choice.

Arguments:
  • snapshot: The target snapshot.
  • choice: The user decision on how to version the target snapshot and its children.
def apply(self) -> None:
275    def apply(self) -> None:
276        """Builds and applies the plan."""
277        if not self._apply:
278            raise PlanError("Plan was not initialized with an applier.")
279        self._apply(self.build())

Builds and applies the plan.

def build(self) -> sqlmesh.core.plan.definition.Plan:
281    def build(self) -> Plan:
282        """Builds the plan."""
283        if self._latest_plan:
284            return self._latest_plan
285
286        self._ensure_new_env_with_changes()
287        self._ensure_valid_date_range()
288        self._ensure_no_broken_references()
289
290        self._apply_effective_from()
291
292        dag = self._build_dag()
293        directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag)
294
295        self._check_destructive_additive_changes(directly_modified)
296        self._categorize_snapshots(dag, indirectly_modified)
297        self._adjust_snapshot_intervals()
298
299        deployability_index = (
300            DeployabilityIndex.create(
301                self._context_diff.snapshots.values(),
302                start=self._start,
303                start_override_per_model=self._start_override_per_model,
304            )
305            if self._is_dev
306            else DeployabilityIndex.all_deployable()
307        )
308
309        restatements = self._build_restatements(
310            dag,
311            earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
312        )
313        models_to_backfill = self._build_models_to_backfill(dag, restatements)
314
315        end_override_per_model = self._end_override_per_model
316        if end_override_per_model and self.override_end:
317            # If the end date was provided explicitly by a user, then interval end for each individual
318            # model should be ignored.
319            end_override_per_model = None
320
321        # this deliberately uses the passed in self._execution_time and not self.execution_time cached property
322        # the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
323        # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to
324        # the current timestamp of when the Plan is eventually run
325        plan_execution_time = self._execution_time
326
327        plan = Plan(
328            context_diff=self._context_diff,
329            plan_id=self._plan_id,
330            provided_start=self.start,
331            provided_end=self.end,
332            is_dev=self._is_dev,
333            skip_backfill=self._skip_backfill,
334            empty_backfill=self._empty_backfill,
335            no_gaps=self._no_gaps,
336            forward_only=self._forward_only,
337            explain=self._explain,
338            allow_destructive_models=t.cast(t.Set, self._allow_destructive_models),
339            allow_additive_models=t.cast(t.Set, self._allow_additive_models),
340            include_unmodified=self._include_unmodified,
341            environment_ttl=self._environment_ttl,
342            environment_naming_info=self.environment_naming_info,
343            directly_modified=directly_modified,
344            indirectly_modified=indirectly_modified,
345            deployability_index=deployability_index,
346            selected_models_to_restate=self._restate_models,
347            restatements=restatements,
348            restate_all_snapshots=self._restate_all_snapshots,
349            start_override_per_model=self._start_override_per_model,
350            end_override_per_model=end_override_per_model,
351            selected_models_to_backfill=self._backfill_models,
352            models_to_backfill=models_to_backfill,
353            effective_from=self._effective_from,
354            execution_time=plan_execution_time,
355            end_bounded=self._end_bounded,
356            ensure_finalized_snapshots=self._ensure_finalized_snapshots,
357            ignore_cron=self._ignore_cron,
358            user_provided_flags=self._user_provided_flags,
359            selected_models=self._selected_models,
360        )
361        self._latest_plan = plan
362        return plan

Builds the plan.