Edit on GitHub

sqlmesh.core.plan.definition

  1from __future__ import annotations
  2
  3import typing as t
  4from dataclasses import dataclass
  5from datetime import datetime
  6from enum import Enum
  7from functools import cached_property
  8from pydantic import Field
  9
 10from sqlmesh.core.context_diff import ContextDiff
 11from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
 12from sqlmesh.utils.metaprogramming import Executable  # noqa
 13from sqlmesh.core.node import IntervalUnit
 14from sqlmesh.core.snapshot import (
 15    DeployabilityIndex,
 16    Intervals,
 17    Snapshot,
 18    earliest_start_date,
 19    merge_intervals,
 20    missing_intervals,
 21)
 22from sqlmesh.core.snapshot.definition import (
 23    Interval,
 24    SnapshotId,
 25    SnapshotTableInfo,
 26    format_intervals,
 27)
 28from sqlmesh.utils.date import TimeLike, now, to_datetime, to_timestamp
 29from sqlmesh.utils.pydantic import PydanticModel
 30
 31SnapshotMapping = t.Dict[SnapshotId, t.Set[SnapshotId]]
 32UserProvidedFlags = t.Union[TimeLike, str, bool, t.List[str]]
 33
 34
 35class Plan(PydanticModel, frozen=True):
 36    context_diff: ContextDiff
 37    plan_id: str
 38    provided_start: t.Optional[TimeLike] = None
 39    provided_end: t.Optional[TimeLike] = None
 40
 41    is_dev: bool
 42    skip_backfill: bool
 43    empty_backfill: bool
 44    no_gaps: bool
 45    forward_only: bool
 46    allow_destructive_models: t.Set[str]
 47    allow_additive_models: t.Set[str]
 48    include_unmodified: bool
 49    end_bounded: bool
 50    ensure_finalized_snapshots: bool
 51    explain: bool
 52    ignore_cron: bool = False
 53
 54    environment_ttl: t.Optional[str] = None
 55    environment_naming_info: EnvironmentNamingInfo
 56
 57    directly_modified: t.Set[SnapshotId]
 58    indirectly_modified: t.Dict[SnapshotId, t.Set[SnapshotId]]
 59
 60    deployability_index: DeployabilityIndex
 61    selected_models_to_restate: t.Optional[t.Set[str]] = None
 62    """Models that have been explicitly selected for restatement by a user"""
 63    restatements: t.Dict[SnapshotId, Interval]
 64    """
 65    All models being restated, which are typically the explicitly selected ones + their downstream dependencies.
 66
 67    Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
 68    while :restatements is still populated with dev previews
 69    """
 70    restate_all_snapshots: bool
 71    """Whether or not to clear intervals from state for other versions of the models listed in :restatements"""
 72
 73    start_override_per_model: t.Optional[t.Dict[str, datetime]]
 74    end_override_per_model: t.Optional[t.Dict[str, datetime]]
 75
 76    selected_models_to_backfill: t.Optional[t.Set[str]] = None
 77    """Models that have been explicitly selected for backfill by a user."""
 78    models_to_backfill: t.Optional[t.Set[str]] = None
 79    """All models that should be backfilled as part of this plan."""
 80    effective_from: t.Optional[TimeLike] = None
 81    execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time")
 82
 83    user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
 84    selected_models: t.Optional[t.Set[str]] = None
 85    """Models that have been selected for this plan (used for dbt selected_resources)"""
 86
 87    @cached_property
 88    def start(self) -> TimeLike:
 89        if self.provided_start is not None:
 90            return self.provided_start
 91
 92        missing_intervals = self.missing_intervals
 93        if missing_intervals:
 94            return min(si.intervals[0][0] for si in missing_intervals)
 95
 96        return self._earliest_interval_start
 97
 98    @cached_property
 99    def end(self) -> TimeLike:
100        return self.provided_end or self.execution_time
101
102    @cached_property
103    def execution_time(self) -> TimeLike:
104        # note: property is cached so that it returns a consistent timestamp for now()
105        return self.execution_time_ or now()
106
107    @property
108    def previous_plan_id(self) -> t.Optional[str]:
109        return self.context_diff.previous_plan_id
110
111    @property
112    def requires_backfill(self) -> bool:
113        return (
114            not self.skip_backfill
115            and not self.empty_backfill
116            and (bool(self.restatements) or bool(self.missing_intervals))
117        )
118
119    @property
120    def has_changes(self) -> bool:
121        return self.context_diff.has_changes
122
123    @property
124    def has_unmodified_unpromoted(self) -> bool:
125        """Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted."""
126        return (
127            self.is_dev
128            and not self.context_diff.is_new_environment
129            and self.include_unmodified
130            and bool(self.context_diff.unpromoted_models)
131        )
132
133    @property
134    def categorized(self) -> t.List[Snapshot]:
135        """Returns the already categorized snapshots."""
136        return [
137            self.context_diff.snapshots[s_id]
138            for s_id in sorted({*self.directly_modified, *self.metadata_updated})
139            if self.context_diff.snapshots[s_id].version
140        ]
141
142    @property
143    def uncategorized(self) -> t.List[Snapshot]:
144        """Returns the uncategorized snapshots."""
145        return [
146            self.context_diff.snapshots[s_id]
147            for s_id in sorted(self.directly_modified)
148            if not self.context_diff.snapshots[s_id].version
149        ]
150
151    @property
152    def snapshots(self) -> t.Dict[SnapshotId, Snapshot]:
153        return self.context_diff.snapshots
154
155    @cached_property
156    def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]:
157        """Returns the modified (either directly or indirectly) snapshots."""
158        return {
159            **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.directly_modified)},
160            **{
161                s_id: self.context_diff.snapshots[s_id]
162                for downstream_s_ids in self.indirectly_modified.values()
163                for s_id in sorted(downstream_s_ids)
164            },
165            **self.context_diff.removed_snapshots,
166            **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.metadata_updated)},
167        }
168
169    @cached_property
170    def metadata_updated(self) -> t.Set[SnapshotId]:
171        return {
172            snapshot.snapshot_id
173            for snapshot, _ in self.context_diff.modified_snapshots.values()
174            if self.context_diff.metadata_updated(snapshot.name)
175        }
176
177    @property
178    def new_snapshots(self) -> t.List[Snapshot]:
179        """Gets only new snapshots in the plan/environment."""
180        return list(self.context_diff.new_snapshots.values())
181
182    @property
183    def missing_intervals(self) -> t.List[SnapshotIntervals]:
184        """Returns the missing intervals for this plan."""
185        # NOTE: Even though the plan is immutable, snapshots that are part of it are not. Since snapshot intervals
186        # may change over time, we should avoid caching missing intervals within the plan instance.
187        intervals = [
188            SnapshotIntervals(snapshot_id=snapshot.snapshot_id, intervals=missing)
189            for snapshot, missing in missing_intervals(
190                [s for s in self.snapshots.values() if self.is_selected_for_backfill(s.name)],
191                start=self.provided_start or self._earliest_interval_start,
192                end=self.provided_end,
193                execution_time=self.execution_time,
194                restatements=self.restatements,
195                deployability_index=self.deployability_index,
196                start_override_per_model=self.start_override_per_model,
197                end_override_per_model=self.end_override_per_model,
198                end_bounded=self.end_bounded,
199                ignore_cron=self.ignore_cron,
200            ).items()
201            if snapshot.is_model and missing
202        ]
203        return sorted(intervals, key=lambda i: i.snapshot_id)
204
205    @cached_property
206    def environment(self) -> Environment:
207        """The environment of this plan."""
208        expiration_ts = (
209            to_timestamp(self.environment_ttl, relative_base=now())
210            if self.is_dev and self.environment_ttl is not None
211            else None
212        )
213
214        snapshots_by_name = self.context_diff.snapshots_by_name
215        snapshots = [s.table_info for s in self.snapshots.values()]
216        promotable_snapshot_ids = None
217        if self.is_dev:
218            if self.selected_models_to_backfill is not None:
219                # Only promote models that have been explicitly selected for backfill.
220                promotable_snapshot_ids = {
221                    *self.context_diff.previously_promoted_snapshot_ids,
222                    *[
223                        snapshots_by_name[m].snapshot_id
224                        for m in self.selected_models_to_backfill
225                        if m in snapshots_by_name
226                    ],
227                }
228            elif not self.include_unmodified:
229                promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy()
230
231        promoted_snapshot_ids = (
232            [s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids]
233            if promotable_snapshot_ids is not None
234            else None
235        )
236
237        previous_finalized_snapshots = (
238            self.context_diff.environment_snapshots
239            if not self.context_diff.is_unfinalized_environment
240            else self.context_diff.previous_finalized_snapshots
241        )
242
243        return Environment(
244            snapshots=snapshots,
245            start_at=self.provided_start or self._earliest_interval_start,
246            end_at=self.provided_end,
247            plan_id=self.plan_id,
248            previous_plan_id=self.previous_plan_id,
249            expiration_ts=expiration_ts,
250            promoted_snapshot_ids=promoted_snapshot_ids,
251            previous_finalized_snapshots=previous_finalized_snapshots,
252            requirements=self.context_diff.requirements,
253            **self.environment_naming_info.dict(),
254        )
255
256    def is_new_snapshot(self, snapshot: Snapshot) -> bool:
257        """Returns True if the given snapshot is a new snapshot in this plan."""
258        snapshot_id = snapshot.snapshot_id
259        return snapshot_id in self.context_diff.new_snapshots
260
261    def is_selected_for_backfill(self, model_fqn: str) -> bool:
262        """Returns True if a model with the given FQN should be backfilled as part of this plan."""
263        return self.models_to_backfill is None or model_fqn in self.models_to_backfill
264
265    def to_evaluatable(self) -> EvaluatablePlan:
266        return EvaluatablePlan(
267            start=self.start,
268            end=self.end,
269            new_snapshots=self.new_snapshots,
270            environment=self.environment,
271            no_gaps=self.no_gaps,
272            skip_backfill=self.skip_backfill,
273            empty_backfill=self.empty_backfill,
274            restatements={s.name: i for s, i in self.restatements.items()},
275            restate_all_snapshots=self.restate_all_snapshots,
276            is_dev=self.is_dev,
277            allow_destructive_models=self.allow_destructive_models,
278            allow_additive_models=self.allow_additive_models,
279            forward_only=self.forward_only,
280            end_bounded=self.end_bounded,
281            ensure_finalized_snapshots=self.ensure_finalized_snapshots,
282            ignore_cron=self.ignore_cron,
283            directly_modified_snapshots=sorted(self.directly_modified),
284            indirectly_modified_snapshots={
285                s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items()
286            },
287            metadata_updated_snapshots=sorted(self.metadata_updated),
288            removed_snapshots=sorted(self.context_diff.removed_snapshots),
289            requires_backfill=self.requires_backfill,
290            models_to_backfill=self.models_to_backfill,
291            start_override_per_model=self.start_override_per_model,
292            end_override_per_model=self.end_override_per_model,
293            execution_time=self.execution_time,
294            disabled_restatement_models={
295                s.name
296                for s in self.snapshots.values()
297                if s.is_model and s.model.disable_restatement
298            },
299            environment_statements=self.context_diff.environment_statements,
300            user_provided_flags=self.user_provided_flags,
301            selected_models=self.selected_models,
302        )
303
304    @cached_property
305    def _earliest_interval_start(self) -> datetime:
306        return earliest_interval_start(self.snapshots.values(), self.execution_time)
307
308
309class EvaluatablePlan(PydanticModel):
310    """A serializable version of a plan that can be evaluated."""
311
312    start: TimeLike
313    end: TimeLike
314    new_snapshots: t.List[Snapshot]
315    environment: Environment
316    no_gaps: bool
317    skip_backfill: bool
318    empty_backfill: bool
319    restatements: t.Dict[str, Interval]
320    restate_all_snapshots: bool
321    is_dev: bool
322    allow_destructive_models: t.Set[str]
323    allow_additive_models: t.Set[str]
324    forward_only: bool
325    end_bounded: bool
326    ensure_finalized_snapshots: bool
327    ignore_cron: bool = False
328    directly_modified_snapshots: t.List[SnapshotId]
329    indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]]
330    metadata_updated_snapshots: t.List[SnapshotId]
331    removed_snapshots: t.List[SnapshotId]
332    requires_backfill: bool
333    models_to_backfill: t.Optional[t.Set[str]] = None
334    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None
335    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None
336    execution_time: t.Optional[TimeLike] = None
337    disabled_restatement_models: t.Set[str]
338    environment_statements: t.Optional[t.List[EnvironmentStatements]] = None
339    user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
340    selected_models: t.Optional[t.Set[str]] = None
341
342    def is_selected_for_backfill(self, model_fqn: str) -> bool:
343        return self.models_to_backfill is None or model_fqn in self.models_to_backfill
344
345    @property
346    def plan_id(self) -> str:
347        return self.environment.plan_id
348
349    @property
350    def is_prod(self) -> bool:
351        return not self.is_dev
352
353
354class PlanStatus(str, Enum):
355    STARTED = "started"
356    FINISHED = "finished"
357    FAILED = "failed"
358
359    @property
360    def is_started(self) -> bool:
361        return self == PlanStatus.STARTED
362
363    @property
364    def is_failed(self) -> bool:
365        return self == PlanStatus.FAILED
366
367    @property
368    def is_finished(self) -> bool:
369        return self == PlanStatus.FINISHED
370
371
372# millions of these can be created, pydantic has significant overhead
373@dataclass
374class SnapshotIntervals:
375    snapshot_id: SnapshotId
376    intervals: Intervals
377
378    @property
379    def merged_intervals(self) -> Intervals:
380        return merge_intervals(self.intervals)
381
382    def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
383        return format_intervals(self.merged_intervals, unit)
384
385
386def earliest_interval_start(
387    snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None
388) -> datetime:
389    earliest_start = earliest_start_date(snapshots, relative_to=execution_time)
390    earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals]
391    return (
392        min(earliest_start, to_datetime(min(earliest_interval_starts)))
393        if earliest_interval_starts
394        else earliest_start
395    )
UserProvidedFlags = typing.Union[datetime.date, datetime.datetime, str, int, float, bool, typing.List[str]]
class Plan(sqlmesh.utils.pydantic.PydanticModel):
 36class Plan(PydanticModel, frozen=True):
 37    context_diff: ContextDiff
 38    plan_id: str
 39    provided_start: t.Optional[TimeLike] = None
 40    provided_end: t.Optional[TimeLike] = None
 41
 42    is_dev: bool
 43    skip_backfill: bool
 44    empty_backfill: bool
 45    no_gaps: bool
 46    forward_only: bool
 47    allow_destructive_models: t.Set[str]
 48    allow_additive_models: t.Set[str]
 49    include_unmodified: bool
 50    end_bounded: bool
 51    ensure_finalized_snapshots: bool
 52    explain: bool
 53    ignore_cron: bool = False
 54
 55    environment_ttl: t.Optional[str] = None
 56    environment_naming_info: EnvironmentNamingInfo
 57
 58    directly_modified: t.Set[SnapshotId]
 59    indirectly_modified: t.Dict[SnapshotId, t.Set[SnapshotId]]
 60
 61    deployability_index: DeployabilityIndex
 62    selected_models_to_restate: t.Optional[t.Set[str]] = None
 63    """Models that have been explicitly selected for restatement by a user"""
 64    restatements: t.Dict[SnapshotId, Interval]
 65    """
 66    All models being restated, which are typically the explicitly selected ones + their downstream dependencies.
 67
 68    Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
 69    while :restatements is still populated with dev previews
 70    """
 71    restate_all_snapshots: bool
 72    """Whether or not to clear intervals from state for other versions of the models listed in :restatements"""
 73
 74    start_override_per_model: t.Optional[t.Dict[str, datetime]]
 75    end_override_per_model: t.Optional[t.Dict[str, datetime]]
 76
 77    selected_models_to_backfill: t.Optional[t.Set[str]] = None
 78    """Models that have been explicitly selected for backfill by a user."""
 79    models_to_backfill: t.Optional[t.Set[str]] = None
 80    """All models that should be backfilled as part of this plan."""
 81    effective_from: t.Optional[TimeLike] = None
 82    execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time")
 83
 84    user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
 85    selected_models: t.Optional[t.Set[str]] = None
 86    """Models that have been selected for this plan (used for dbt selected_resources)"""
 87
 88    @cached_property
 89    def start(self) -> TimeLike:
 90        if self.provided_start is not None:
 91            return self.provided_start
 92
 93        missing_intervals = self.missing_intervals
 94        if missing_intervals:
 95            return min(si.intervals[0][0] for si in missing_intervals)
 96
 97        return self._earliest_interval_start
 98
 99    @cached_property
100    def end(self) -> TimeLike:
101        return self.provided_end or self.execution_time
102
103    @cached_property
104    def execution_time(self) -> TimeLike:
105        # note: property is cached so that it returns a consistent timestamp for now()
106        return self.execution_time_ or now()
107
108    @property
109    def previous_plan_id(self) -> t.Optional[str]:
110        return self.context_diff.previous_plan_id
111
112    @property
113    def requires_backfill(self) -> bool:
114        return (
115            not self.skip_backfill
116            and not self.empty_backfill
117            and (bool(self.restatements) or bool(self.missing_intervals))
118        )
119
120    @property
121    def has_changes(self) -> bool:
122        return self.context_diff.has_changes
123
124    @property
125    def has_unmodified_unpromoted(self) -> bool:
126        """Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted."""
127        return (
128            self.is_dev
129            and not self.context_diff.is_new_environment
130            and self.include_unmodified
131            and bool(self.context_diff.unpromoted_models)
132        )
133
134    @property
135    def categorized(self) -> t.List[Snapshot]:
136        """Returns the already categorized snapshots."""
137        return [
138            self.context_diff.snapshots[s_id]
139            for s_id in sorted({*self.directly_modified, *self.metadata_updated})
140            if self.context_diff.snapshots[s_id].version
141        ]
142
143    @property
144    def uncategorized(self) -> t.List[Snapshot]:
145        """Returns the uncategorized snapshots."""
146        return [
147            self.context_diff.snapshots[s_id]
148            for s_id in sorted(self.directly_modified)
149            if not self.context_diff.snapshots[s_id].version
150        ]
151
152    @property
153    def snapshots(self) -> t.Dict[SnapshotId, Snapshot]:
154        return self.context_diff.snapshots
155
156    @cached_property
157    def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]:
158        """Returns the modified (either directly or indirectly) snapshots."""
159        return {
160            **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.directly_modified)},
161            **{
162                s_id: self.context_diff.snapshots[s_id]
163                for downstream_s_ids in self.indirectly_modified.values()
164                for s_id in sorted(downstream_s_ids)
165            },
166            **self.context_diff.removed_snapshots,
167            **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.metadata_updated)},
168        }
169
170    @cached_property
171    def metadata_updated(self) -> t.Set[SnapshotId]:
172        return {
173            snapshot.snapshot_id
174            for snapshot, _ in self.context_diff.modified_snapshots.values()
175            if self.context_diff.metadata_updated(snapshot.name)
176        }
177
178    @property
179    def new_snapshots(self) -> t.List[Snapshot]:
180        """Gets only new snapshots in the plan/environment."""
181        return list(self.context_diff.new_snapshots.values())
182
183    @property
184    def missing_intervals(self) -> t.List[SnapshotIntervals]:
185        """Returns the missing intervals for this plan."""
186        # NOTE: Even though the plan is immutable, snapshots that are part of it are not. Since snapshot intervals
187        # may change over time, we should avoid caching missing intervals within the plan instance.
188        intervals = [
189            SnapshotIntervals(snapshot_id=snapshot.snapshot_id, intervals=missing)
190            for snapshot, missing in missing_intervals(
191                [s for s in self.snapshots.values() if self.is_selected_for_backfill(s.name)],
192                start=self.provided_start or self._earliest_interval_start,
193                end=self.provided_end,
194                execution_time=self.execution_time,
195                restatements=self.restatements,
196                deployability_index=self.deployability_index,
197                start_override_per_model=self.start_override_per_model,
198                end_override_per_model=self.end_override_per_model,
199                end_bounded=self.end_bounded,
200                ignore_cron=self.ignore_cron,
201            ).items()
202            if snapshot.is_model and missing
203        ]
204        return sorted(intervals, key=lambda i: i.snapshot_id)
205
206    @cached_property
207    def environment(self) -> Environment:
208        """The environment of this plan."""
209        expiration_ts = (
210            to_timestamp(self.environment_ttl, relative_base=now())
211            if self.is_dev and self.environment_ttl is not None
212            else None
213        )
214
215        snapshots_by_name = self.context_diff.snapshots_by_name
216        snapshots = [s.table_info for s in self.snapshots.values()]
217        promotable_snapshot_ids = None
218        if self.is_dev:
219            if self.selected_models_to_backfill is not None:
220                # Only promote models that have been explicitly selected for backfill.
221                promotable_snapshot_ids = {
222                    *self.context_diff.previously_promoted_snapshot_ids,
223                    *[
224                        snapshots_by_name[m].snapshot_id
225                        for m in self.selected_models_to_backfill
226                        if m in snapshots_by_name
227                    ],
228                }
229            elif not self.include_unmodified:
230                promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy()
231
232        promoted_snapshot_ids = (
233            [s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids]
234            if promotable_snapshot_ids is not None
235            else None
236        )
237
238        previous_finalized_snapshots = (
239            self.context_diff.environment_snapshots
240            if not self.context_diff.is_unfinalized_environment
241            else self.context_diff.previous_finalized_snapshots
242        )
243
244        return Environment(
245            snapshots=snapshots,
246            start_at=self.provided_start or self._earliest_interval_start,
247            end_at=self.provided_end,
248            plan_id=self.plan_id,
249            previous_plan_id=self.previous_plan_id,
250            expiration_ts=expiration_ts,
251            promoted_snapshot_ids=promoted_snapshot_ids,
252            previous_finalized_snapshots=previous_finalized_snapshots,
253            requirements=self.context_diff.requirements,
254            **self.environment_naming_info.dict(),
255        )
256
257    def is_new_snapshot(self, snapshot: Snapshot) -> bool:
258        """Returns True if the given snapshot is a new snapshot in this plan."""
259        snapshot_id = snapshot.snapshot_id
260        return snapshot_id in self.context_diff.new_snapshots
261
262    def is_selected_for_backfill(self, model_fqn: str) -> bool:
263        """Returns True if a model with the given FQN should be backfilled as part of this plan."""
264        return self.models_to_backfill is None or model_fqn in self.models_to_backfill
265
266    def to_evaluatable(self) -> EvaluatablePlan:
267        return EvaluatablePlan(
268            start=self.start,
269            end=self.end,
270            new_snapshots=self.new_snapshots,
271            environment=self.environment,
272            no_gaps=self.no_gaps,
273            skip_backfill=self.skip_backfill,
274            empty_backfill=self.empty_backfill,
275            restatements={s.name: i for s, i in self.restatements.items()},
276            restate_all_snapshots=self.restate_all_snapshots,
277            is_dev=self.is_dev,
278            allow_destructive_models=self.allow_destructive_models,
279            allow_additive_models=self.allow_additive_models,
280            forward_only=self.forward_only,
281            end_bounded=self.end_bounded,
282            ensure_finalized_snapshots=self.ensure_finalized_snapshots,
283            ignore_cron=self.ignore_cron,
284            directly_modified_snapshots=sorted(self.directly_modified),
285            indirectly_modified_snapshots={
286                s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items()
287            },
288            metadata_updated_snapshots=sorted(self.metadata_updated),
289            removed_snapshots=sorted(self.context_diff.removed_snapshots),
290            requires_backfill=self.requires_backfill,
291            models_to_backfill=self.models_to_backfill,
292            start_override_per_model=self.start_override_per_model,
293            end_override_per_model=self.end_override_per_model,
294            execution_time=self.execution_time,
295            disabled_restatement_models={
296                s.name
297                for s in self.snapshots.values()
298                if s.is_model and s.model.disable_restatement
299            },
300            environment_statements=self.context_diff.environment_statements,
301            user_provided_flags=self.user_provided_flags,
302            selected_models=self.selected_models,
303        )
304
305    @cached_property
306    def _earliest_interval_start(self) -> datetime:
307        return earliest_interval_start(self.snapshots.values(), self.execution_time)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
plan_id: str
provided_start: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
provided_end: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
is_dev: bool
skip_backfill: bool
empty_backfill: bool
no_gaps: bool
forward_only: bool
allow_destructive_models: Set[str]
allow_additive_models: Set[str]
include_unmodified: bool
end_bounded: bool
ensure_finalized_snapshots: bool
explain: bool
ignore_cron: bool
environment_ttl: Optional[str]
selected_models_to_restate: Optional[Set[str]]

Models that have been explicitly selected for restatement by a user

restatements: Dict[sqlmesh.core.snapshot.definition.SnapshotId, Tuple[int, int]]

All models being restated, which are typically the explicitly selected ones + their downstream dependencies.

Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty while :restatements is still populated with dev previews

restate_all_snapshots: bool

Whether or not to clear intervals from state for other versions of the models listed in :restatements

start_override_per_model: Optional[Dict[str, datetime.datetime]]
end_override_per_model: Optional[Dict[str, datetime.datetime]]
selected_models_to_backfill: Optional[Set[str]]

Models that have been explicitly selected for backfill by a user.

models_to_backfill: Optional[Set[str]]

All models that should be backfilled as part of this plan.

effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
execution_time_: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
user_provided_flags: Optional[Dict[str, Union[datetime.date, datetime.datetime, str, int, float, bool, List[str]]]]
selected_models: Optional[Set[str]]

Models that have been selected for this plan (used for dbt selected_resources)

start: Union[datetime.date, datetime.datetime, str, int, float]
88    @cached_property
89    def start(self) -> TimeLike:
90        if self.provided_start is not None:
91            return self.provided_start
92
93        missing_intervals = self.missing_intervals
94        if missing_intervals:
95            return min(si.intervals[0][0] for si in missing_intervals)
96
97        return self._earliest_interval_start
end: Union[datetime.date, datetime.datetime, str, int, float]
 99    @cached_property
100    def end(self) -> TimeLike:
101        return self.provided_end or self.execution_time
execution_time: Union[datetime.date, datetime.datetime, str, int, float]
103    @cached_property
104    def execution_time(self) -> TimeLike:
105        # note: property is cached so that it returns a consistent timestamp for now()
106        return self.execution_time_ or now()
previous_plan_id: Optional[str]
108    @property
109    def previous_plan_id(self) -> t.Optional[str]:
110        return self.context_diff.previous_plan_id
requires_backfill: bool
112    @property
113    def requires_backfill(self) -> bool:
114        return (
115            not self.skip_backfill
116            and not self.empty_backfill
117            and (bool(self.restatements) or bool(self.missing_intervals))
118        )
has_changes: bool
120    @property
121    def has_changes(self) -> bool:
122        return self.context_diff.has_changes
has_unmodified_unpromoted: bool
124    @property
125    def has_unmodified_unpromoted(self) -> bool:
126        """Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted."""
127        return (
128            self.is_dev
129            and not self.context_diff.is_new_environment
130            and self.include_unmodified
131            and bool(self.context_diff.unpromoted_models)
132        )

Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted.

categorized: List[sqlmesh.core.snapshot.definition.Snapshot]
134    @property
135    def categorized(self) -> t.List[Snapshot]:
136        """Returns the already categorized snapshots."""
137        return [
138            self.context_diff.snapshots[s_id]
139            for s_id in sorted({*self.directly_modified, *self.metadata_updated})
140            if self.context_diff.snapshots[s_id].version
141        ]

Returns the already categorized snapshots.

uncategorized: List[sqlmesh.core.snapshot.definition.Snapshot]
143    @property
144    def uncategorized(self) -> t.List[Snapshot]:
145        """Returns the uncategorized snapshots."""
146        return [
147            self.context_diff.snapshots[s_id]
148            for s_id in sorted(self.directly_modified)
149            if not self.context_diff.snapshots[s_id].version
150        ]

Returns the uncategorized snapshots.

152    @property
153    def snapshots(self) -> t.Dict[SnapshotId, Snapshot]:
154        return self.context_diff.snapshots
156    @cached_property
157    def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]:
158        """Returns the modified (either directly or indirectly) snapshots."""
159        return {
160            **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.directly_modified)},
161            **{
162                s_id: self.context_diff.snapshots[s_id]
163                for downstream_s_ids in self.indirectly_modified.values()
164                for s_id in sorted(downstream_s_ids)
165            },
166            **self.context_diff.removed_snapshots,
167            **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.metadata_updated)},
168        }

Returns the modified (either directly or indirectly) snapshots.

metadata_updated: Set[sqlmesh.core.snapshot.definition.SnapshotId]
170    @cached_property
171    def metadata_updated(self) -> t.Set[SnapshotId]:
172        return {
173            snapshot.snapshot_id
174            for snapshot, _ in self.context_diff.modified_snapshots.values()
175            if self.context_diff.metadata_updated(snapshot.name)
176        }
new_snapshots: List[sqlmesh.core.snapshot.definition.Snapshot]
178    @property
179    def new_snapshots(self) -> t.List[Snapshot]:
180        """Gets only new snapshots in the plan/environment."""
181        return list(self.context_diff.new_snapshots.values())

Gets only new snapshots in the plan/environment.

missing_intervals: List[SnapshotIntervals]
183    @property
184    def missing_intervals(self) -> t.List[SnapshotIntervals]:
185        """Returns the missing intervals for this plan."""
186        # NOTE: Even though the plan is immutable, snapshots that are part of it are not. Since snapshot intervals
187        # may change over time, we should avoid caching missing intervals within the plan instance.
188        intervals = [
189            SnapshotIntervals(snapshot_id=snapshot.snapshot_id, intervals=missing)
190            for snapshot, missing in missing_intervals(
191                [s for s in self.snapshots.values() if self.is_selected_for_backfill(s.name)],
192                start=self.provided_start or self._earliest_interval_start,
193                end=self.provided_end,
194                execution_time=self.execution_time,
195                restatements=self.restatements,
196                deployability_index=self.deployability_index,
197                start_override_per_model=self.start_override_per_model,
198                end_override_per_model=self.end_override_per_model,
199                end_bounded=self.end_bounded,
200                ignore_cron=self.ignore_cron,
201            ).items()
202            if snapshot.is_model and missing
203        ]
204        return sorted(intervals, key=lambda i: i.snapshot_id)

Returns the missing intervals for this plan.

environment: sqlmesh.core.environment.Environment
206    @cached_property
207    def environment(self) -> Environment:
208        """The environment of this plan."""
209        expiration_ts = (
210            to_timestamp(self.environment_ttl, relative_base=now())
211            if self.is_dev and self.environment_ttl is not None
212            else None
213        )
214
215        snapshots_by_name = self.context_diff.snapshots_by_name
216        snapshots = [s.table_info for s in self.snapshots.values()]
217        promotable_snapshot_ids = None
218        if self.is_dev:
219            if self.selected_models_to_backfill is not None:
220                # Only promote models that have been explicitly selected for backfill.
221                promotable_snapshot_ids = {
222                    *self.context_diff.previously_promoted_snapshot_ids,
223                    *[
224                        snapshots_by_name[m].snapshot_id
225                        for m in self.selected_models_to_backfill
226                        if m in snapshots_by_name
227                    ],
228                }
229            elif not self.include_unmodified:
230                promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy()
231
232        promoted_snapshot_ids = (
233            [s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids]
234            if promotable_snapshot_ids is not None
235            else None
236        )
237
238        previous_finalized_snapshots = (
239            self.context_diff.environment_snapshots
240            if not self.context_diff.is_unfinalized_environment
241            else self.context_diff.previous_finalized_snapshots
242        )
243
244        return Environment(
245            snapshots=snapshots,
246            start_at=self.provided_start or self._earliest_interval_start,
247            end_at=self.provided_end,
248            plan_id=self.plan_id,
249            previous_plan_id=self.previous_plan_id,
250            expiration_ts=expiration_ts,
251            promoted_snapshot_ids=promoted_snapshot_ids,
252            previous_finalized_snapshots=previous_finalized_snapshots,
253            requirements=self.context_diff.requirements,
254            **self.environment_naming_info.dict(),
255        )

The environment of this plan.

def is_new_snapshot(self, snapshot: sqlmesh.core.snapshot.definition.Snapshot) -> bool:
257    def is_new_snapshot(self, snapshot: Snapshot) -> bool:
258        """Returns True if the given snapshot is a new snapshot in this plan."""
259        snapshot_id = snapshot.snapshot_id
260        return snapshot_id in self.context_diff.new_snapshots

Returns True if the given snapshot is a new snapshot in this plan.

def is_selected_for_backfill(self, model_fqn: str) -> bool:
262    def is_selected_for_backfill(self, model_fqn: str) -> bool:
263        """Returns True if a model with the given FQN should be backfilled as part of this plan."""
264        return self.models_to_backfill is None or model_fqn in self.models_to_backfill

Returns True if a model with the given FQN should be backfilled as part of this plan.

def to_evaluatable(self) -> EvaluatablePlan:
266    def to_evaluatable(self) -> EvaluatablePlan:
267        return EvaluatablePlan(
268            start=self.start,
269            end=self.end,
270            new_snapshots=self.new_snapshots,
271            environment=self.environment,
272            no_gaps=self.no_gaps,
273            skip_backfill=self.skip_backfill,
274            empty_backfill=self.empty_backfill,
275            restatements={s.name: i for s, i in self.restatements.items()},
276            restate_all_snapshots=self.restate_all_snapshots,
277            is_dev=self.is_dev,
278            allow_destructive_models=self.allow_destructive_models,
279            allow_additive_models=self.allow_additive_models,
280            forward_only=self.forward_only,
281            end_bounded=self.end_bounded,
282            ensure_finalized_snapshots=self.ensure_finalized_snapshots,
283            ignore_cron=self.ignore_cron,
284            directly_modified_snapshots=sorted(self.directly_modified),
285            indirectly_modified_snapshots={
286                s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items()
287            },
288            metadata_updated_snapshots=sorted(self.metadata_updated),
289            removed_snapshots=sorted(self.context_diff.removed_snapshots),
290            requires_backfill=self.requires_backfill,
291            models_to_backfill=self.models_to_backfill,
292            start_override_per_model=self.start_override_per_model,
293            end_override_per_model=self.end_override_per_model,
294            execution_time=self.execution_time,
295            disabled_restatement_models={
296                s.name
297                for s in self.snapshots.values()
298                if s.is_model and s.model.disable_restatement
299            },
300            environment_statements=self.context_diff.environment_statements,
301            user_provided_flags=self.user_provided_flags,
302            selected_models=self.selected_models,
303        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class EvaluatablePlan(sqlmesh.utils.pydantic.PydanticModel):
310class EvaluatablePlan(PydanticModel):
311    """A serializable version of a plan that can be evaluated."""
312
313    start: TimeLike
314    end: TimeLike
315    new_snapshots: t.List[Snapshot]
316    environment: Environment
317    no_gaps: bool
318    skip_backfill: bool
319    empty_backfill: bool
320    restatements: t.Dict[str, Interval]
321    restate_all_snapshots: bool
322    is_dev: bool
323    allow_destructive_models: t.Set[str]
324    allow_additive_models: t.Set[str]
325    forward_only: bool
326    end_bounded: bool
327    ensure_finalized_snapshots: bool
328    ignore_cron: bool = False
329    directly_modified_snapshots: t.List[SnapshotId]
330    indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]]
331    metadata_updated_snapshots: t.List[SnapshotId]
332    removed_snapshots: t.List[SnapshotId]
333    requires_backfill: bool
334    models_to_backfill: t.Optional[t.Set[str]] = None
335    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None
336    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None
337    execution_time: t.Optional[TimeLike] = None
338    disabled_restatement_models: t.Set[str]
339    environment_statements: t.Optional[t.List[EnvironmentStatements]] = None
340    user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
341    selected_models: t.Optional[t.Set[str]] = None
342
343    def is_selected_for_backfill(self, model_fqn: str) -> bool:
344        return self.models_to_backfill is None or model_fqn in self.models_to_backfill
345
346    @property
347    def plan_id(self) -> str:
348        return self.environment.plan_id
349
350    @property
351    def is_prod(self) -> bool:
352        return not self.is_dev

A serializable version of a plan that can be evaluated.

start: Union[datetime.date, datetime.datetime, str, int, float]
end: Union[datetime.date, datetime.datetime, str, int, float]
no_gaps: bool
skip_backfill: bool
empty_backfill: bool
restatements: Dict[str, Tuple[int, int]]
restate_all_snapshots: bool
is_dev: bool
allow_destructive_models: Set[str]
allow_additive_models: Set[str]
forward_only: bool
end_bounded: bool
ensure_finalized_snapshots: bool
ignore_cron: bool
directly_modified_snapshots: List[sqlmesh.core.snapshot.definition.SnapshotId]
indirectly_modified_snapshots: Dict[str, List[sqlmesh.core.snapshot.definition.SnapshotId]]
metadata_updated_snapshots: List[sqlmesh.core.snapshot.definition.SnapshotId]
requires_backfill: bool
models_to_backfill: Optional[Set[str]]
start_override_per_model: Optional[Dict[str, datetime.datetime]]
end_override_per_model: Optional[Dict[str, datetime.datetime]]
execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
disabled_restatement_models: Set[str]
environment_statements: Optional[List[sqlmesh.core.environment.EnvironmentStatements]]
user_provided_flags: Optional[Dict[str, Union[datetime.date, datetime.datetime, str, int, float, bool, List[str]]]]
selected_models: Optional[Set[str]]
def is_selected_for_backfill(self, model_fqn: str) -> bool:
343    def is_selected_for_backfill(self, model_fqn: str) -> bool:
344        return self.models_to_backfill is None or model_fqn in self.models_to_backfill
plan_id: str
346    @property
347    def plan_id(self) -> str:
348        return self.environment.plan_id
is_prod: bool
350    @property
351    def is_prod(self) -> bool:
352        return not self.is_dev
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class PlanStatus(builtins.str, enum.Enum):
355class PlanStatus(str, Enum):
356    STARTED = "started"
357    FINISHED = "finished"
358    FAILED = "failed"
359
360    @property
361    def is_started(self) -> bool:
362        return self == PlanStatus.STARTED
363
364    @property
365    def is_failed(self) -> bool:
366        return self == PlanStatus.FAILED
367
368    @property
369    def is_finished(self) -> bool:
370        return self == PlanStatus.FINISHED

An enumeration.

STARTED = <PlanStatus.STARTED: 'started'>
FINISHED = <PlanStatus.FINISHED: 'finished'>
FAILED = <PlanStatus.FAILED: 'failed'>
is_started: bool
360    @property
361    def is_started(self) -> bool:
362        return self == PlanStatus.STARTED
is_failed: bool
364    @property
365    def is_failed(self) -> bool:
366        return self == PlanStatus.FAILED
is_finished: bool
368    @property
369    def is_finished(self) -> bool:
370        return self == PlanStatus.FINISHED
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
@dataclass
class SnapshotIntervals:
374@dataclass
375class SnapshotIntervals:
376    snapshot_id: SnapshotId
377    intervals: Intervals
378
379    @property
380    def merged_intervals(self) -> Intervals:
381        return merge_intervals(self.intervals)
382
383    def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
384        return format_intervals(self.merged_intervals, unit)
SnapshotIntervals( snapshot_id: sqlmesh.core.snapshot.definition.SnapshotId, intervals: List[Tuple[int, int]])
intervals: List[Tuple[int, int]]
merged_intervals: List[Tuple[int, int]]
379    @property
380    def merged_intervals(self) -> Intervals:
381        return merge_intervals(self.intervals)
def format_intervals(self, unit: Optional[sqlmesh.core.node.IntervalUnit] = None) -> str:
383    def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
384        return format_intervals(self.merged_intervals, unit)
def earliest_interval_start( snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> datetime.datetime:
387def earliest_interval_start(
388    snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None
389) -> datetime:
390    earliest_start = earliest_start_date(snapshots, relative_to=execution_time)
391    earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals]
392    return (
393        min(earliest_start, to_datetime(min(earliest_interval_starts)))
394        if earliest_interval_starts
395        else earliest_start
396    )