sqlmesh.core.plan.definition
1from __future__ import annotations 2 3import typing as t 4from dataclasses import dataclass 5from datetime import datetime 6from enum import Enum 7from functools import cached_property 8from pydantic import Field 9 10from sqlmesh.core.context_diff import ContextDiff 11from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements 12from sqlmesh.utils.metaprogramming import Executable # noqa 13from sqlmesh.core.node import IntervalUnit 14from sqlmesh.core.snapshot import ( 15 DeployabilityIndex, 16 Intervals, 17 Snapshot, 18 earliest_start_date, 19 merge_intervals, 20 missing_intervals, 21) 22from sqlmesh.core.snapshot.definition import ( 23 Interval, 24 SnapshotId, 25 SnapshotTableInfo, 26 format_intervals, 27) 28from sqlmesh.utils.date import TimeLike, now, to_datetime, to_timestamp 29from sqlmesh.utils.pydantic import PydanticModel 30 31SnapshotMapping = t.Dict[SnapshotId, t.Set[SnapshotId]] 32UserProvidedFlags = t.Union[TimeLike, str, bool, t.List[str]] 33 34 35class Plan(PydanticModel, frozen=True): 36 context_diff: ContextDiff 37 plan_id: str 38 provided_start: t.Optional[TimeLike] = None 39 provided_end: t.Optional[TimeLike] = None 40 41 is_dev: bool 42 skip_backfill: bool 43 empty_backfill: bool 44 no_gaps: bool 45 forward_only: bool 46 allow_destructive_models: t.Set[str] 47 allow_additive_models: t.Set[str] 48 include_unmodified: bool 49 end_bounded: bool 50 ensure_finalized_snapshots: bool 51 explain: bool 52 ignore_cron: bool = False 53 54 environment_ttl: t.Optional[str] = None 55 environment_naming_info: EnvironmentNamingInfo 56 57 directly_modified: t.Set[SnapshotId] 58 indirectly_modified: t.Dict[SnapshotId, t.Set[SnapshotId]] 59 60 deployability_index: DeployabilityIndex 61 selected_models_to_restate: t.Optional[t.Set[str]] = None 62 """Models that have been explicitly selected for restatement by a user""" 63 restatements: t.Dict[SnapshotId, Interval] 64 """ 65 All models being restated, which are typically the explicitly selected ones + their downstream dependencies. 66 67 Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty 68 while :restatements is still populated with dev previews 69 """ 70 restate_all_snapshots: bool 71 """Whether or not to clear intervals from state for other versions of the models listed in :restatements""" 72 73 start_override_per_model: t.Optional[t.Dict[str, datetime]] 74 end_override_per_model: t.Optional[t.Dict[str, datetime]] 75 76 selected_models_to_backfill: t.Optional[t.Set[str]] = None 77 """Models that have been explicitly selected for backfill by a user.""" 78 models_to_backfill: t.Optional[t.Set[str]] = None 79 """All models that should be backfilled as part of this plan.""" 80 effective_from: t.Optional[TimeLike] = None 81 execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time") 82 83 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None 84 selected_models: t.Optional[t.Set[str]] = None 85 """Models that have been selected for this plan (used for dbt selected_resources)""" 86 87 @cached_property 88 def start(self) -> TimeLike: 89 if self.provided_start is not None: 90 return self.provided_start 91 92 missing_intervals = self.missing_intervals 93 if missing_intervals: 94 return min(si.intervals[0][0] for si in missing_intervals) 95 96 return self._earliest_interval_start 97 98 @cached_property 99 def end(self) -> TimeLike: 100 return self.provided_end or self.execution_time 101 102 @cached_property 103 def execution_time(self) -> TimeLike: 104 # note: property is cached so that it returns a consistent timestamp for now() 105 return self.execution_time_ or now() 106 107 @property 108 def previous_plan_id(self) -> t.Optional[str]: 109 return self.context_diff.previous_plan_id 110 111 @property 112 def requires_backfill(self) -> bool: 113 return ( 114 not self.skip_backfill 115 and not self.empty_backfill 116 and (bool(self.restatements) or bool(self.missing_intervals)) 117 ) 118 119 @property 120 def has_changes(self) -> bool: 121 return self.context_diff.has_changes 122 123 @property 124 def has_unmodified_unpromoted(self) -> bool: 125 """Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted.""" 126 return ( 127 self.is_dev 128 and not self.context_diff.is_new_environment 129 and self.include_unmodified 130 and bool(self.context_diff.unpromoted_models) 131 ) 132 133 @property 134 def categorized(self) -> t.List[Snapshot]: 135 """Returns the already categorized snapshots.""" 136 return [ 137 self.context_diff.snapshots[s_id] 138 for s_id in sorted({*self.directly_modified, *self.metadata_updated}) 139 if self.context_diff.snapshots[s_id].version 140 ] 141 142 @property 143 def uncategorized(self) -> t.List[Snapshot]: 144 """Returns the uncategorized snapshots.""" 145 return [ 146 self.context_diff.snapshots[s_id] 147 for s_id in sorted(self.directly_modified) 148 if not self.context_diff.snapshots[s_id].version 149 ] 150 151 @property 152 def snapshots(self) -> t.Dict[SnapshotId, Snapshot]: 153 return self.context_diff.snapshots 154 155 @cached_property 156 def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]: 157 """Returns the modified (either directly or indirectly) snapshots.""" 158 return { 159 **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.directly_modified)}, 160 **{ 161 s_id: self.context_diff.snapshots[s_id] 162 for downstream_s_ids in self.indirectly_modified.values() 163 for s_id in sorted(downstream_s_ids) 164 }, 165 **self.context_diff.removed_snapshots, 166 **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.metadata_updated)}, 167 } 168 169 @cached_property 170 def metadata_updated(self) -> t.Set[SnapshotId]: 171 return { 172 snapshot.snapshot_id 173 for snapshot, _ in self.context_diff.modified_snapshots.values() 174 if self.context_diff.metadata_updated(snapshot.name) 175 } 176 177 @property 178 def new_snapshots(self) -> t.List[Snapshot]: 179 """Gets only new snapshots in the plan/environment.""" 180 return list(self.context_diff.new_snapshots.values()) 181 182 @property 183 def missing_intervals(self) -> t.List[SnapshotIntervals]: 184 """Returns the missing intervals for this plan.""" 185 # NOTE: Even though the plan is immutable, snapshots that are part of it are not. Since snapshot intervals 186 # may change over time, we should avoid caching missing intervals within the plan instance. 187 intervals = [ 188 SnapshotIntervals(snapshot_id=snapshot.snapshot_id, intervals=missing) 189 for snapshot, missing in missing_intervals( 190 [s for s in self.snapshots.values() if self.is_selected_for_backfill(s.name)], 191 start=self.provided_start or self._earliest_interval_start, 192 end=self.provided_end, 193 execution_time=self.execution_time, 194 restatements=self.restatements, 195 deployability_index=self.deployability_index, 196 start_override_per_model=self.start_override_per_model, 197 end_override_per_model=self.end_override_per_model, 198 end_bounded=self.end_bounded, 199 ignore_cron=self.ignore_cron, 200 ).items() 201 if snapshot.is_model and missing 202 ] 203 return sorted(intervals, key=lambda i: i.snapshot_id) 204 205 @cached_property 206 def environment(self) -> Environment: 207 """The environment of this plan.""" 208 expiration_ts = ( 209 to_timestamp(self.environment_ttl, relative_base=now()) 210 if self.is_dev and self.environment_ttl is not None 211 else None 212 ) 213 214 snapshots_by_name = self.context_diff.snapshots_by_name 215 snapshots = [s.table_info for s in self.snapshots.values()] 216 promotable_snapshot_ids = None 217 if self.is_dev: 218 if self.selected_models_to_backfill is not None: 219 # Only promote models that have been explicitly selected for backfill. 220 promotable_snapshot_ids = { 221 *self.context_diff.previously_promoted_snapshot_ids, 222 *[ 223 snapshots_by_name[m].snapshot_id 224 for m in self.selected_models_to_backfill 225 if m in snapshots_by_name 226 ], 227 } 228 elif not self.include_unmodified: 229 promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy() 230 231 promoted_snapshot_ids = ( 232 [s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids] 233 if promotable_snapshot_ids is not None 234 else None 235 ) 236 237 previous_finalized_snapshots = ( 238 self.context_diff.environment_snapshots 239 if not self.context_diff.is_unfinalized_environment 240 else self.context_diff.previous_finalized_snapshots 241 ) 242 243 return Environment( 244 snapshots=snapshots, 245 start_at=self.provided_start or self._earliest_interval_start, 246 end_at=self.provided_end, 247 plan_id=self.plan_id, 248 previous_plan_id=self.previous_plan_id, 249 expiration_ts=expiration_ts, 250 promoted_snapshot_ids=promoted_snapshot_ids, 251 previous_finalized_snapshots=previous_finalized_snapshots, 252 requirements=self.context_diff.requirements, 253 **self.environment_naming_info.dict(), 254 ) 255 256 def is_new_snapshot(self, snapshot: Snapshot) -> bool: 257 """Returns True if the given snapshot is a new snapshot in this plan.""" 258 snapshot_id = snapshot.snapshot_id 259 return snapshot_id in self.context_diff.new_snapshots 260 261 def is_selected_for_backfill(self, model_fqn: str) -> bool: 262 """Returns True if a model with the given FQN should be backfilled as part of this plan.""" 263 return self.models_to_backfill is None or model_fqn in self.models_to_backfill 264 265 def to_evaluatable(self) -> EvaluatablePlan: 266 return EvaluatablePlan( 267 start=self.start, 268 end=self.end, 269 new_snapshots=self.new_snapshots, 270 environment=self.environment, 271 no_gaps=self.no_gaps, 272 skip_backfill=self.skip_backfill, 273 empty_backfill=self.empty_backfill, 274 restatements={s.name: i for s, i in self.restatements.items()}, 275 restate_all_snapshots=self.restate_all_snapshots, 276 is_dev=self.is_dev, 277 allow_destructive_models=self.allow_destructive_models, 278 allow_additive_models=self.allow_additive_models, 279 forward_only=self.forward_only, 280 end_bounded=self.end_bounded, 281 ensure_finalized_snapshots=self.ensure_finalized_snapshots, 282 ignore_cron=self.ignore_cron, 283 directly_modified_snapshots=sorted(self.directly_modified), 284 indirectly_modified_snapshots={ 285 s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items() 286 }, 287 metadata_updated_snapshots=sorted(self.metadata_updated), 288 removed_snapshots=sorted(self.context_diff.removed_snapshots), 289 requires_backfill=self.requires_backfill, 290 models_to_backfill=self.models_to_backfill, 291 start_override_per_model=self.start_override_per_model, 292 end_override_per_model=self.end_override_per_model, 293 execution_time=self.execution_time, 294 disabled_restatement_models={ 295 s.name 296 for s in self.snapshots.values() 297 if s.is_model and s.model.disable_restatement 298 }, 299 environment_statements=self.context_diff.environment_statements, 300 user_provided_flags=self.user_provided_flags, 301 selected_models=self.selected_models, 302 ) 303 304 @cached_property 305 def _earliest_interval_start(self) -> datetime: 306 return earliest_interval_start(self.snapshots.values(), self.execution_time) 307 308 309class EvaluatablePlan(PydanticModel): 310 """A serializable version of a plan that can be evaluated.""" 311 312 start: TimeLike 313 end: TimeLike 314 new_snapshots: t.List[Snapshot] 315 environment: Environment 316 no_gaps: bool 317 skip_backfill: bool 318 empty_backfill: bool 319 restatements: t.Dict[str, Interval] 320 restate_all_snapshots: bool 321 is_dev: bool 322 allow_destructive_models: t.Set[str] 323 allow_additive_models: t.Set[str] 324 forward_only: bool 325 end_bounded: bool 326 ensure_finalized_snapshots: bool 327 ignore_cron: bool = False 328 directly_modified_snapshots: t.List[SnapshotId] 329 indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]] 330 metadata_updated_snapshots: t.List[SnapshotId] 331 removed_snapshots: t.List[SnapshotId] 332 requires_backfill: bool 333 models_to_backfill: t.Optional[t.Set[str]] = None 334 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None 335 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None 336 execution_time: t.Optional[TimeLike] = None 337 disabled_restatement_models: t.Set[str] 338 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None 339 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None 340 selected_models: t.Optional[t.Set[str]] = None 341 342 def is_selected_for_backfill(self, model_fqn: str) -> bool: 343 return self.models_to_backfill is None or model_fqn in self.models_to_backfill 344 345 @property 346 def plan_id(self) -> str: 347 return self.environment.plan_id 348 349 @property 350 def is_prod(self) -> bool: 351 return not self.is_dev 352 353 354class PlanStatus(str, Enum): 355 STARTED = "started" 356 FINISHED = "finished" 357 FAILED = "failed" 358 359 @property 360 def is_started(self) -> bool: 361 return self == PlanStatus.STARTED 362 363 @property 364 def is_failed(self) -> bool: 365 return self == PlanStatus.FAILED 366 367 @property 368 def is_finished(self) -> bool: 369 return self == PlanStatus.FINISHED 370 371 372# millions of these can be created, pydantic has significant overhead 373@dataclass 374class SnapshotIntervals: 375 snapshot_id: SnapshotId 376 intervals: Intervals 377 378 @property 379 def merged_intervals(self) -> Intervals: 380 return merge_intervals(self.intervals) 381 382 def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str: 383 return format_intervals(self.merged_intervals, unit) 384 385 386def earliest_interval_start( 387 snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None 388) -> datetime: 389 earliest_start = earliest_start_date(snapshots, relative_to=execution_time) 390 earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals] 391 return ( 392 min(earliest_start, to_datetime(min(earliest_interval_starts))) 393 if earliest_interval_starts 394 else earliest_start 395 )
36class Plan(PydanticModel, frozen=True): 37 context_diff: ContextDiff 38 plan_id: str 39 provided_start: t.Optional[TimeLike] = None 40 provided_end: t.Optional[TimeLike] = None 41 42 is_dev: bool 43 skip_backfill: bool 44 empty_backfill: bool 45 no_gaps: bool 46 forward_only: bool 47 allow_destructive_models: t.Set[str] 48 allow_additive_models: t.Set[str] 49 include_unmodified: bool 50 end_bounded: bool 51 ensure_finalized_snapshots: bool 52 explain: bool 53 ignore_cron: bool = False 54 55 environment_ttl: t.Optional[str] = None 56 environment_naming_info: EnvironmentNamingInfo 57 58 directly_modified: t.Set[SnapshotId] 59 indirectly_modified: t.Dict[SnapshotId, t.Set[SnapshotId]] 60 61 deployability_index: DeployabilityIndex 62 selected_models_to_restate: t.Optional[t.Set[str]] = None 63 """Models that have been explicitly selected for restatement by a user""" 64 restatements: t.Dict[SnapshotId, Interval] 65 """ 66 All models being restated, which are typically the explicitly selected ones + their downstream dependencies. 67 68 Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty 69 while :restatements is still populated with dev previews 70 """ 71 restate_all_snapshots: bool 72 """Whether or not to clear intervals from state for other versions of the models listed in :restatements""" 73 74 start_override_per_model: t.Optional[t.Dict[str, datetime]] 75 end_override_per_model: t.Optional[t.Dict[str, datetime]] 76 77 selected_models_to_backfill: t.Optional[t.Set[str]] = None 78 """Models that have been explicitly selected for backfill by a user.""" 79 models_to_backfill: t.Optional[t.Set[str]] = None 80 """All models that should be backfilled as part of this plan.""" 81 effective_from: t.Optional[TimeLike] = None 82 execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time") 83 84 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None 85 selected_models: t.Optional[t.Set[str]] = None 86 """Models that have been selected for this plan (used for dbt selected_resources)""" 87 88 @cached_property 89 def start(self) -> TimeLike: 90 if self.provided_start is not None: 91 return self.provided_start 92 93 missing_intervals = self.missing_intervals 94 if missing_intervals: 95 return min(si.intervals[0][0] for si in missing_intervals) 96 97 return self._earliest_interval_start 98 99 @cached_property 100 def end(self) -> TimeLike: 101 return self.provided_end or self.execution_time 102 103 @cached_property 104 def execution_time(self) -> TimeLike: 105 # note: property is cached so that it returns a consistent timestamp for now() 106 return self.execution_time_ or now() 107 108 @property 109 def previous_plan_id(self) -> t.Optional[str]: 110 return self.context_diff.previous_plan_id 111 112 @property 113 def requires_backfill(self) -> bool: 114 return ( 115 not self.skip_backfill 116 and not self.empty_backfill 117 and (bool(self.restatements) or bool(self.missing_intervals)) 118 ) 119 120 @property 121 def has_changes(self) -> bool: 122 return self.context_diff.has_changes 123 124 @property 125 def has_unmodified_unpromoted(self) -> bool: 126 """Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted.""" 127 return ( 128 self.is_dev 129 and not self.context_diff.is_new_environment 130 and self.include_unmodified 131 and bool(self.context_diff.unpromoted_models) 132 ) 133 134 @property 135 def categorized(self) -> t.List[Snapshot]: 136 """Returns the already categorized snapshots.""" 137 return [ 138 self.context_diff.snapshots[s_id] 139 for s_id in sorted({*self.directly_modified, *self.metadata_updated}) 140 if self.context_diff.snapshots[s_id].version 141 ] 142 143 @property 144 def uncategorized(self) -> t.List[Snapshot]: 145 """Returns the uncategorized snapshots.""" 146 return [ 147 self.context_diff.snapshots[s_id] 148 for s_id in sorted(self.directly_modified) 149 if not self.context_diff.snapshots[s_id].version 150 ] 151 152 @property 153 def snapshots(self) -> t.Dict[SnapshotId, Snapshot]: 154 return self.context_diff.snapshots 155 156 @cached_property 157 def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]: 158 """Returns the modified (either directly or indirectly) snapshots.""" 159 return { 160 **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.directly_modified)}, 161 **{ 162 s_id: self.context_diff.snapshots[s_id] 163 for downstream_s_ids in self.indirectly_modified.values() 164 for s_id in sorted(downstream_s_ids) 165 }, 166 **self.context_diff.removed_snapshots, 167 **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.metadata_updated)}, 168 } 169 170 @cached_property 171 def metadata_updated(self) -> t.Set[SnapshotId]: 172 return { 173 snapshot.snapshot_id 174 for snapshot, _ in self.context_diff.modified_snapshots.values() 175 if self.context_diff.metadata_updated(snapshot.name) 176 } 177 178 @property 179 def new_snapshots(self) -> t.List[Snapshot]: 180 """Gets only new snapshots in the plan/environment.""" 181 return list(self.context_diff.new_snapshots.values()) 182 183 @property 184 def missing_intervals(self) -> t.List[SnapshotIntervals]: 185 """Returns the missing intervals for this plan.""" 186 # NOTE: Even though the plan is immutable, snapshots that are part of it are not. Since snapshot intervals 187 # may change over time, we should avoid caching missing intervals within the plan instance. 188 intervals = [ 189 SnapshotIntervals(snapshot_id=snapshot.snapshot_id, intervals=missing) 190 for snapshot, missing in missing_intervals( 191 [s for s in self.snapshots.values() if self.is_selected_for_backfill(s.name)], 192 start=self.provided_start or self._earliest_interval_start, 193 end=self.provided_end, 194 execution_time=self.execution_time, 195 restatements=self.restatements, 196 deployability_index=self.deployability_index, 197 start_override_per_model=self.start_override_per_model, 198 end_override_per_model=self.end_override_per_model, 199 end_bounded=self.end_bounded, 200 ignore_cron=self.ignore_cron, 201 ).items() 202 if snapshot.is_model and missing 203 ] 204 return sorted(intervals, key=lambda i: i.snapshot_id) 205 206 @cached_property 207 def environment(self) -> Environment: 208 """The environment of this plan.""" 209 expiration_ts = ( 210 to_timestamp(self.environment_ttl, relative_base=now()) 211 if self.is_dev and self.environment_ttl is not None 212 else None 213 ) 214 215 snapshots_by_name = self.context_diff.snapshots_by_name 216 snapshots = [s.table_info for s in self.snapshots.values()] 217 promotable_snapshot_ids = None 218 if self.is_dev: 219 if self.selected_models_to_backfill is not None: 220 # Only promote models that have been explicitly selected for backfill. 221 promotable_snapshot_ids = { 222 *self.context_diff.previously_promoted_snapshot_ids, 223 *[ 224 snapshots_by_name[m].snapshot_id 225 for m in self.selected_models_to_backfill 226 if m in snapshots_by_name 227 ], 228 } 229 elif not self.include_unmodified: 230 promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy() 231 232 promoted_snapshot_ids = ( 233 [s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids] 234 if promotable_snapshot_ids is not None 235 else None 236 ) 237 238 previous_finalized_snapshots = ( 239 self.context_diff.environment_snapshots 240 if not self.context_diff.is_unfinalized_environment 241 else self.context_diff.previous_finalized_snapshots 242 ) 243 244 return Environment( 245 snapshots=snapshots, 246 start_at=self.provided_start or self._earliest_interval_start, 247 end_at=self.provided_end, 248 plan_id=self.plan_id, 249 previous_plan_id=self.previous_plan_id, 250 expiration_ts=expiration_ts, 251 promoted_snapshot_ids=promoted_snapshot_ids, 252 previous_finalized_snapshots=previous_finalized_snapshots, 253 requirements=self.context_diff.requirements, 254 **self.environment_naming_info.dict(), 255 ) 256 257 def is_new_snapshot(self, snapshot: Snapshot) -> bool: 258 """Returns True if the given snapshot is a new snapshot in this plan.""" 259 snapshot_id = snapshot.snapshot_id 260 return snapshot_id in self.context_diff.new_snapshots 261 262 def is_selected_for_backfill(self, model_fqn: str) -> bool: 263 """Returns True if a model with the given FQN should be backfilled as part of this plan.""" 264 return self.models_to_backfill is None or model_fqn in self.models_to_backfill 265 266 def to_evaluatable(self) -> EvaluatablePlan: 267 return EvaluatablePlan( 268 start=self.start, 269 end=self.end, 270 new_snapshots=self.new_snapshots, 271 environment=self.environment, 272 no_gaps=self.no_gaps, 273 skip_backfill=self.skip_backfill, 274 empty_backfill=self.empty_backfill, 275 restatements={s.name: i for s, i in self.restatements.items()}, 276 restate_all_snapshots=self.restate_all_snapshots, 277 is_dev=self.is_dev, 278 allow_destructive_models=self.allow_destructive_models, 279 allow_additive_models=self.allow_additive_models, 280 forward_only=self.forward_only, 281 end_bounded=self.end_bounded, 282 ensure_finalized_snapshots=self.ensure_finalized_snapshots, 283 ignore_cron=self.ignore_cron, 284 directly_modified_snapshots=sorted(self.directly_modified), 285 indirectly_modified_snapshots={ 286 s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items() 287 }, 288 metadata_updated_snapshots=sorted(self.metadata_updated), 289 removed_snapshots=sorted(self.context_diff.removed_snapshots), 290 requires_backfill=self.requires_backfill, 291 models_to_backfill=self.models_to_backfill, 292 start_override_per_model=self.start_override_per_model, 293 end_override_per_model=self.end_override_per_model, 294 execution_time=self.execution_time, 295 disabled_restatement_models={ 296 s.name 297 for s in self.snapshots.values() 298 if s.is_model and s.model.disable_restatement 299 }, 300 environment_statements=self.context_diff.environment_statements, 301 user_provided_flags=self.user_provided_flags, 302 selected_models=self.selected_models, 303 ) 304 305 @cached_property 306 def _earliest_interval_start(self) -> datetime: 307 return earliest_interval_start(self.snapshots.values(), self.execution_time)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Models that have been explicitly selected for restatement by a user
All models being restated, which are typically the explicitly selected ones + their downstream dependencies.
Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty while :restatements is still populated with dev previews
Whether or not to clear intervals from state for other versions of the models listed in :restatements
Models that have been explicitly selected for backfill by a user.
Models that have been selected for this plan (used for dbt selected_resources)
124 @property 125 def has_unmodified_unpromoted(self) -> bool: 126 """Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted.""" 127 return ( 128 self.is_dev 129 and not self.context_diff.is_new_environment 130 and self.include_unmodified 131 and bool(self.context_diff.unpromoted_models) 132 )
Is the plan for an existing dev environment, has the include unmodified flag, and contains unmodified nodes that have not been promoted.
134 @property 135 def categorized(self) -> t.List[Snapshot]: 136 """Returns the already categorized snapshots.""" 137 return [ 138 self.context_diff.snapshots[s_id] 139 for s_id in sorted({*self.directly_modified, *self.metadata_updated}) 140 if self.context_diff.snapshots[s_id].version 141 ]
Returns the already categorized snapshots.
143 @property 144 def uncategorized(self) -> t.List[Snapshot]: 145 """Returns the uncategorized snapshots.""" 146 return [ 147 self.context_diff.snapshots[s_id] 148 for s_id in sorted(self.directly_modified) 149 if not self.context_diff.snapshots[s_id].version 150 ]
Returns the uncategorized snapshots.
156 @cached_property 157 def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]: 158 """Returns the modified (either directly or indirectly) snapshots.""" 159 return { 160 **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.directly_modified)}, 161 **{ 162 s_id: self.context_diff.snapshots[s_id] 163 for downstream_s_ids in self.indirectly_modified.values() 164 for s_id in sorted(downstream_s_ids) 165 }, 166 **self.context_diff.removed_snapshots, 167 **{s_id: self.context_diff.snapshots[s_id] for s_id in sorted(self.metadata_updated)}, 168 }
Returns the modified (either directly or indirectly) snapshots.
178 @property 179 def new_snapshots(self) -> t.List[Snapshot]: 180 """Gets only new snapshots in the plan/environment.""" 181 return list(self.context_diff.new_snapshots.values())
Gets only new snapshots in the plan/environment.
183 @property 184 def missing_intervals(self) -> t.List[SnapshotIntervals]: 185 """Returns the missing intervals for this plan.""" 186 # NOTE: Even though the plan is immutable, snapshots that are part of it are not. Since snapshot intervals 187 # may change over time, we should avoid caching missing intervals within the plan instance. 188 intervals = [ 189 SnapshotIntervals(snapshot_id=snapshot.snapshot_id, intervals=missing) 190 for snapshot, missing in missing_intervals( 191 [s for s in self.snapshots.values() if self.is_selected_for_backfill(s.name)], 192 start=self.provided_start or self._earliest_interval_start, 193 end=self.provided_end, 194 execution_time=self.execution_time, 195 restatements=self.restatements, 196 deployability_index=self.deployability_index, 197 start_override_per_model=self.start_override_per_model, 198 end_override_per_model=self.end_override_per_model, 199 end_bounded=self.end_bounded, 200 ignore_cron=self.ignore_cron, 201 ).items() 202 if snapshot.is_model and missing 203 ] 204 return sorted(intervals, key=lambda i: i.snapshot_id)
Returns the missing intervals for this plan.
206 @cached_property 207 def environment(self) -> Environment: 208 """The environment of this plan.""" 209 expiration_ts = ( 210 to_timestamp(self.environment_ttl, relative_base=now()) 211 if self.is_dev and self.environment_ttl is not None 212 else None 213 ) 214 215 snapshots_by_name = self.context_diff.snapshots_by_name 216 snapshots = [s.table_info for s in self.snapshots.values()] 217 promotable_snapshot_ids = None 218 if self.is_dev: 219 if self.selected_models_to_backfill is not None: 220 # Only promote models that have been explicitly selected for backfill. 221 promotable_snapshot_ids = { 222 *self.context_diff.previously_promoted_snapshot_ids, 223 *[ 224 snapshots_by_name[m].snapshot_id 225 for m in self.selected_models_to_backfill 226 if m in snapshots_by_name 227 ], 228 } 229 elif not self.include_unmodified: 230 promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy() 231 232 promoted_snapshot_ids = ( 233 [s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids] 234 if promotable_snapshot_ids is not None 235 else None 236 ) 237 238 previous_finalized_snapshots = ( 239 self.context_diff.environment_snapshots 240 if not self.context_diff.is_unfinalized_environment 241 else self.context_diff.previous_finalized_snapshots 242 ) 243 244 return Environment( 245 snapshots=snapshots, 246 start_at=self.provided_start or self._earliest_interval_start, 247 end_at=self.provided_end, 248 plan_id=self.plan_id, 249 previous_plan_id=self.previous_plan_id, 250 expiration_ts=expiration_ts, 251 promoted_snapshot_ids=promoted_snapshot_ids, 252 previous_finalized_snapshots=previous_finalized_snapshots, 253 requirements=self.context_diff.requirements, 254 **self.environment_naming_info.dict(), 255 )
The environment of this plan.
257 def is_new_snapshot(self, snapshot: Snapshot) -> bool: 258 """Returns True if the given snapshot is a new snapshot in this plan.""" 259 snapshot_id = snapshot.snapshot_id 260 return snapshot_id in self.context_diff.new_snapshots
Returns True if the given snapshot is a new snapshot in this plan.
262 def is_selected_for_backfill(self, model_fqn: str) -> bool: 263 """Returns True if a model with the given FQN should be backfilled as part of this plan.""" 264 return self.models_to_backfill is None or model_fqn in self.models_to_backfill
Returns True if a model with the given FQN should be backfilled as part of this plan.
266 def to_evaluatable(self) -> EvaluatablePlan: 267 return EvaluatablePlan( 268 start=self.start, 269 end=self.end, 270 new_snapshots=self.new_snapshots, 271 environment=self.environment, 272 no_gaps=self.no_gaps, 273 skip_backfill=self.skip_backfill, 274 empty_backfill=self.empty_backfill, 275 restatements={s.name: i for s, i in self.restatements.items()}, 276 restate_all_snapshots=self.restate_all_snapshots, 277 is_dev=self.is_dev, 278 allow_destructive_models=self.allow_destructive_models, 279 allow_additive_models=self.allow_additive_models, 280 forward_only=self.forward_only, 281 end_bounded=self.end_bounded, 282 ensure_finalized_snapshots=self.ensure_finalized_snapshots, 283 ignore_cron=self.ignore_cron, 284 directly_modified_snapshots=sorted(self.directly_modified), 285 indirectly_modified_snapshots={ 286 s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items() 287 }, 288 metadata_updated_snapshots=sorted(self.metadata_updated), 289 removed_snapshots=sorted(self.context_diff.removed_snapshots), 290 requires_backfill=self.requires_backfill, 291 models_to_backfill=self.models_to_backfill, 292 start_override_per_model=self.start_override_per_model, 293 end_override_per_model=self.end_override_per_model, 294 execution_time=self.execution_time, 295 disabled_restatement_models={ 296 s.name 297 for s in self.snapshots.values() 298 if s.is_model and s.model.disable_restatement 299 }, 300 environment_statements=self.context_diff.environment_statements, 301 user_provided_flags=self.user_provided_flags, 302 selected_models=self.selected_models, 303 )
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
310class EvaluatablePlan(PydanticModel): 311 """A serializable version of a plan that can be evaluated.""" 312 313 start: TimeLike 314 end: TimeLike 315 new_snapshots: t.List[Snapshot] 316 environment: Environment 317 no_gaps: bool 318 skip_backfill: bool 319 empty_backfill: bool 320 restatements: t.Dict[str, Interval] 321 restate_all_snapshots: bool 322 is_dev: bool 323 allow_destructive_models: t.Set[str] 324 allow_additive_models: t.Set[str] 325 forward_only: bool 326 end_bounded: bool 327 ensure_finalized_snapshots: bool 328 ignore_cron: bool = False 329 directly_modified_snapshots: t.List[SnapshotId] 330 indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]] 331 metadata_updated_snapshots: t.List[SnapshotId] 332 removed_snapshots: t.List[SnapshotId] 333 requires_backfill: bool 334 models_to_backfill: t.Optional[t.Set[str]] = None 335 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None 336 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None 337 execution_time: t.Optional[TimeLike] = None 338 disabled_restatement_models: t.Set[str] 339 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None 340 user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None 341 selected_models: t.Optional[t.Set[str]] = None 342 343 def is_selected_for_backfill(self, model_fqn: str) -> bool: 344 return self.models_to_backfill is None or model_fqn in self.models_to_backfill 345 346 @property 347 def plan_id(self) -> str: 348 return self.environment.plan_id 349 350 @property 351 def is_prod(self) -> bool: 352 return not self.is_dev
A serializable version of a plan that can be evaluated.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
355class PlanStatus(str, Enum): 356 STARTED = "started" 357 FINISHED = "finished" 358 FAILED = "failed" 359 360 @property 361 def is_started(self) -> bool: 362 return self == PlanStatus.STARTED 363 364 @property 365 def is_failed(self) -> bool: 366 return self == PlanStatus.FAILED 367 368 @property 369 def is_finished(self) -> bool: 370 return self == PlanStatus.FINISHED
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
374@dataclass 375class SnapshotIntervals: 376 snapshot_id: SnapshotId 377 intervals: Intervals 378 379 @property 380 def merged_intervals(self) -> Intervals: 381 return merge_intervals(self.intervals) 382 383 def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str: 384 return format_intervals(self.merged_intervals, unit)
387def earliest_interval_start( 388 snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None 389) -> datetime: 390 earliest_start = earliest_start_date(snapshots, relative_to=execution_time) 391 earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals] 392 return ( 393 min(earliest_start, to_datetime(min(earliest_interval_starts))) 394 if earliest_interval_starts 395 else earliest_start 396 )