sqlmesh.core.snapshot.definition
1from __future__ import annotations 2 3import sys 4import typing as t 5from collections import defaultdict 6from datetime import datetime, timedelta 7from enum import IntEnum 8import logging 9from functools import cached_property, lru_cache 10from pathlib import Path 11 12from pydantic import Field 13from sqlglot import exp 14from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 15 16from sqlmesh.core.config.common import ( 17 TableNamingConvention, 18 VirtualEnvironmentMode, 19 EnvironmentSuffixTarget, 20) 21from sqlmesh.core import constants as c 22from sqlmesh.core.audit import StandaloneAudit 23from sqlmesh.core.macros import call_macro 24from sqlmesh.core.model import Model, ModelKindMixin, ModelKindName, ViewKind, CustomKind 25from sqlmesh.core.model.definition import _Model 26from sqlmesh.core.node import IntervalUnit, NodeType 27from sqlmesh.utils import sanitize_name, unique 28from sqlmesh.utils.dag import DAG 29from sqlmesh.utils.date import ( 30 TimeLike, 31 is_date, 32 make_inclusive, 33 make_exclusive, 34 make_inclusive_end, 35 now, 36 now_timestamp, 37 time_like_to_str, 38 to_date, 39 to_datetime, 40 to_ds, 41 to_timestamp, 42 to_ts, 43 validate_date_range, 44 yesterday, 45) 46from sqlmesh.utils.errors import SQLMeshError, SignalEvalError 47from sqlmesh.utils.metaprogramming import ( 48 format_evaluated_code_exception, 49 Executable, 50) 51from sqlmesh.utils.hashing import hash_data, md5 52from sqlmesh.utils.pydantic import PydanticModel, field_validator 53 54if t.TYPE_CHECKING: 55 from sqlglot.dialects.dialect import DialectType 56 from sqlmesh.core.environment import EnvironmentNamingInfo 57 from sqlmesh.core.context import ExecutionContext 58 59Interval = t.Tuple[int, int] 60Intervals = t.List[Interval] 61 62Node = t.Annotated[t.Union[Model, StandaloneAudit], Field(discriminator="source_type")] 63 64 65logger = logging.getLogger(__name__) 66 67 68class SnapshotChangeCategory(IntEnum): 69 """ 70 Values are ordered by decreasing severity and that ordering is required. 71 72 BREAKING: The change requires that snapshot modified and downstream dependencies be rebuilt 73 NON_BREAKING: The change requires that only the snapshot modified be rebuilt 74 FORWARD_ONLY: The change requires no rebuilding 75 INDIRECT_BREAKING: The change was caused indirectly and is breaking. 76 INDIRECT_NON_BREAKING: The change was caused indirectly by a non-breaking change. 77 METADATA: The change was caused by a metadata update. 78 """ 79 80 BREAKING = 1 81 NON_BREAKING = 2 82 # FORWARD_ONLY category is deprecated and is kept for backwards compatibility. 83 FORWARD_ONLY = 3 84 INDIRECT_BREAKING = 4 85 INDIRECT_NON_BREAKING = 5 86 METADATA = 6 87 88 @property 89 def is_breaking(self) -> bool: 90 return self == self.BREAKING 91 92 @property 93 def is_non_breaking(self) -> bool: 94 return self == self.NON_BREAKING 95 96 @property 97 def is_forward_only(self) -> bool: 98 return self == self.FORWARD_ONLY 99 100 @property 101 def is_metadata(self) -> bool: 102 return self == self.METADATA 103 104 @property 105 def is_indirect_breaking(self) -> bool: 106 return self == self.INDIRECT_BREAKING 107 108 @property 109 def is_indirect_non_breaking(self) -> bool: 110 return self == self.INDIRECT_NON_BREAKING 111 112 def __repr__(self) -> str: 113 return self.name 114 115 116class SnapshotFingerprint(PydanticModel, frozen=True): 117 data_hash: str 118 metadata_hash: str 119 parent_data_hash: str = "0" 120 parent_metadata_hash: str = "0" 121 122 def to_version(self) -> str: 123 return hash_data([self.data_hash, self.parent_data_hash]) 124 125 def to_identifier(self) -> str: 126 return hash_data( 127 [ 128 self.data_hash, 129 self.metadata_hash, 130 self.parent_data_hash, 131 self.parent_metadata_hash, 132 ] 133 ) 134 135 def __str__(self) -> str: 136 return f"SnapshotFingerprint<{self.to_identifier()}, data: {self.data_hash}, meta: {self.metadata_hash}, pdata: {self.parent_data_hash}, pmeta: {self.parent_metadata_hash}>" 137 138 139class SnapshotId(PydanticModel, frozen=True): 140 name: str 141 identifier: str 142 143 @property 144 def snapshot_id(self) -> SnapshotId: 145 """Helper method to return self.""" 146 return self 147 148 def __eq__(self, other: t.Any) -> bool: 149 return ( 150 isinstance(other, self.__class__) 151 and self.name == other.name 152 and self.identifier == other.identifier 153 ) 154 155 def __hash__(self) -> int: 156 return hash((self.__class__, self.name, self.identifier)) 157 158 def __lt__(self, other: SnapshotId) -> bool: 159 return self.name < other.name 160 161 def __str__(self) -> str: 162 return f"SnapshotId<{self.name}: {self.identifier}>" 163 164 165class SnapshotIdBatch(PydanticModel, frozen=True): 166 snapshot_id: SnapshotId 167 batch_id: int 168 169 170class SnapshotNameVersion(PydanticModel, frozen=True): 171 name: str 172 version: str 173 174 @property 175 def name_version(self) -> SnapshotNameVersion: 176 """Helper method to return self.""" 177 return self 178 179 180class SnapshotIntervals(PydanticModel): 181 name: str 182 identifier: t.Optional[str] 183 version: str 184 dev_version: t.Optional[str] 185 intervals: Intervals = [] 186 dev_intervals: Intervals = [] 187 pending_restatement_intervals: Intervals = [] 188 last_altered_ts: t.Optional[int] = None 189 dev_last_altered_ts: t.Optional[int] = None 190 191 @property 192 def snapshot_id(self) -> t.Optional[SnapshotId]: 193 if not self.identifier: 194 return None 195 return SnapshotId(name=self.name, identifier=self.identifier) 196 197 @property 198 def name_version(self) -> SnapshotNameVersion: 199 return SnapshotNameVersion(name=self.name, version=self.version) 200 201 def add_interval(self, start: int, end: int) -> None: 202 self._add_interval(start, end, "intervals") 203 204 def add_dev_interval(self, start: int, end: int) -> None: 205 self._add_interval(start, end, "dev_intervals") 206 207 def add_pending_restatement_interval(self, start: int, end: int) -> None: 208 self._add_interval(start, end, "pending_restatement_intervals") 209 210 def update_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: 211 self._update_last_altered_ts(last_altered_ts, "last_altered_ts") 212 213 def update_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: 214 self._update_last_altered_ts(last_altered_ts, "dev_last_altered_ts") 215 216 def remove_interval(self, start: int, end: int) -> None: 217 self._remove_interval(start, end, "intervals") 218 219 def remove_dev_interval(self, start: int, end: int) -> None: 220 self._remove_interval(start, end, "dev_intervals") 221 222 def remove_pending_restatement_interval(self, start: int, end: int) -> None: 223 self._remove_interval(start, end, "pending_restatement_intervals") 224 225 def is_empty(self) -> bool: 226 return ( 227 not self.intervals and not self.dev_intervals and not self.pending_restatement_intervals 228 ) 229 230 def _add_interval(self, start: int, end: int, interval_attr: str) -> None: 231 target_intervals = getattr(self, interval_attr) 232 target_intervals = merge_intervals([*target_intervals, (start, end)]) 233 setattr(self, interval_attr, target_intervals) 234 235 def _update_last_altered_ts( 236 self, last_altered_ts: t.Optional[int], last_altered_attr: str 237 ) -> None: 238 if last_altered_ts: 239 existing_last_altered_ts = getattr(self, last_altered_attr) 240 setattr(self, last_altered_attr, max(existing_last_altered_ts or 0, last_altered_ts)) 241 242 def _remove_interval(self, start: int, end: int, interval_attr: str) -> None: 243 target_intervals = getattr(self, interval_attr) 244 target_intervals = remove_interval(target_intervals, start, end) 245 setattr(self, interval_attr, target_intervals) 246 247 248class SnapshotDataVersion(PydanticModel, frozen=True): 249 fingerprint: SnapshotFingerprint 250 version: str 251 dev_version_: t.Optional[str] = Field(default=None, alias="dev_version") 252 change_category: t.Optional[SnapshotChangeCategory] = None 253 physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema") 254 dev_table_suffix: str 255 table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default) 256 virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default) 257 258 def snapshot_id(self, name: str) -> SnapshotId: 259 return SnapshotId(name=name, identifier=self.fingerprint.to_identifier()) 260 261 @property 262 def dev_version(self) -> str: 263 return self.dev_version_ or self.fingerprint.to_version() 264 265 @property 266 def physical_schema(self) -> str: 267 # The physical schema here is optional to maintain backwards compatibility with 268 # records stored by previous versions of SQLMesh. 269 return self.physical_schema_ or c.SQLMESH 270 271 @property 272 def data_version(self) -> SnapshotDataVersion: 273 return self 274 275 @property 276 def is_new_version(self) -> bool: 277 """Returns whether or not this version is new and requires a backfill.""" 278 return self.fingerprint.to_version() == self.version 279 280 281class QualifiedViewName(PydanticModel, frozen=True): 282 catalog: t.Optional[str] = None 283 schema_name: t.Optional[str] = None 284 table: str 285 286 def for_environment( 287 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 288 ) -> str: 289 return exp.table_name(self.table_for_environment(environment_naming_info, dialect=dialect)) 290 291 def table_for_environment( 292 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 293 ) -> exp.Table: 294 return exp.table_( 295 self.table_name_for_environment(environment_naming_info, dialect=dialect), 296 db=self.schema_for_environment(environment_naming_info, dialect=dialect), 297 catalog=self.catalog_for_environment(environment_naming_info, dialect=dialect), 298 ) 299 300 def catalog_for_environment( 301 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 302 ) -> t.Optional[str]: 303 catalog_name: t.Optional[str] = None 304 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog: 305 catalog_name = f"{self.catalog}__{environment_naming_info.name}" 306 elif environment_naming_info.catalog_name_override: 307 catalog_name = environment_naming_info.catalog_name_override 308 309 if catalog_name: 310 return ( 311 normalize_identifiers(catalog_name, dialect=dialect).name 312 if environment_naming_info.normalize_name 313 else catalog_name 314 ) 315 316 return self.catalog 317 318 def schema_for_environment( 319 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 320 ) -> str: 321 normalize = environment_naming_info.normalize_name 322 323 if self.schema_name: 324 schema = self.schema_name 325 else: 326 schema = c.DEFAULT_SCHEMA 327 if normalize: 328 schema = normalize_identifiers(schema, dialect=dialect).name 329 330 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema: 331 env_name = environment_naming_info.name 332 if normalize: 333 env_name = normalize_identifiers(env_name, dialect=dialect).name 334 335 schema = f"{schema}__{env_name}" 336 337 return schema 338 339 def table_name_for_environment( 340 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 341 ) -> str: 342 table = self.table 343 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table: 344 env_name = environment_naming_info.name 345 if environment_naming_info.normalize_name: 346 env_name = normalize_identifiers(env_name, dialect=dialect).name 347 348 table = f"{table}__{env_name}" 349 350 return table 351 352 353class SnapshotInfoMixin(ModelKindMixin): 354 name: str 355 dev_version_: t.Optional[str] 356 change_category: t.Optional[SnapshotChangeCategory] 357 fingerprint: SnapshotFingerprint 358 previous_versions: t.Tuple[SnapshotDataVersion, ...] 359 # Added to support Migration # 34 (default catalog) 360 # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though) 361 base_table_name_override: t.Optional[str] 362 dev_table_suffix: str 363 table_naming_convention: TableNamingConvention 364 forward_only: bool 365 366 @cached_property 367 def identifier(self) -> str: 368 return self.fingerprint.to_identifier() 369 370 @cached_property 371 def snapshot_id(self) -> SnapshotId: 372 return SnapshotId(name=self.name, identifier=self.identifier) 373 374 @property 375 def qualified_view_name(self) -> QualifiedViewName: 376 view_name = exp.to_table(self.fully_qualified_table or self.name) 377 return QualifiedViewName( 378 catalog=view_name.catalog or None, 379 schema_name=view_name.db or None, 380 table=view_name.name, 381 ) 382 383 @property 384 def previous_version(self) -> t.Optional[SnapshotDataVersion]: 385 """Helper method to get the previous data version.""" 386 if self.previous_versions: 387 return self.previous_versions[-1] 388 return None 389 390 @property 391 def dev_version(self) -> str: 392 return self.dev_version_ or self.fingerprint.to_version() 393 394 @property 395 def physical_schema(self) -> str: 396 raise NotImplementedError 397 398 @property 399 def data_version(self) -> SnapshotDataVersion: 400 raise NotImplementedError 401 402 @property 403 def is_new_version(self) -> bool: 404 raise NotImplementedError 405 406 @cached_property 407 def fully_qualified_table(self) -> t.Optional[exp.Table]: 408 raise NotImplementedError 409 410 @property 411 def virtual_environment_mode(self) -> VirtualEnvironmentMode: 412 raise NotImplementedError 413 414 @property 415 def is_forward_only(self) -> bool: 416 return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY 417 418 @property 419 def is_metadata(self) -> bool: 420 return self.change_category == SnapshotChangeCategory.METADATA 421 422 @property 423 def is_indirect_non_breaking(self) -> bool: 424 return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING 425 426 @property 427 def is_no_rebuild(self) -> bool: 428 """Returns true if this snapshot doesn't require a rebuild in production.""" 429 return self.forward_only or self.change_category in ( 430 SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility 431 SnapshotChangeCategory.METADATA, 432 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 433 ) 434 435 @property 436 def is_no_preview(self) -> bool: 437 """Returns true if this snapshot doesn't require a preview in development.""" 438 return self.forward_only and self.change_category in ( 439 SnapshotChangeCategory.METADATA, 440 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 441 ) 442 443 @property 444 def all_versions(self) -> t.Tuple[SnapshotDataVersion, ...]: 445 """Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT.""" 446 return (*self.previous_versions, self.data_version)[-c.DATA_VERSION_LIMIT :] 447 448 def display_name( 449 self, 450 environment_naming_info: EnvironmentNamingInfo, 451 default_catalog: t.Optional[str], 452 dialect: DialectType = None, 453 ) -> str: 454 """ 455 Returns the model name as a qualified view name. 456 This is just used for presenting information back to the user and `qualified_view_name` should be used 457 when wanting a view name in all other cases. 458 """ 459 return display_name(self, environment_naming_info, default_catalog, dialect=dialect) 460 461 def data_hash_matches(self, other: t.Optional[SnapshotInfoMixin | SnapshotDataVersion]) -> bool: 462 return other is not None and self.fingerprint.data_hash == other.fingerprint.data_hash 463 464 def _table_name(self, version: str, is_deployable: bool) -> str: 465 """Full table name pointing to the materialized location of the snapshot. 466 467 Args: 468 version: The snapshot version. 469 is_deployable: Indicates whether to return the table name for deployment to production. 470 """ 471 if self.is_external: 472 return self.name 473 474 if is_deployable and self.virtual_environment_mode.is_dev_only: 475 # Use the model name as is if the target is deployable and the virtual environment mode is set to dev-only 476 return self.name 477 478 is_dev_table = not is_deployable 479 if is_dev_table: 480 version = self.dev_version 481 482 if self.fully_qualified_table is None: 483 raise SQLMeshError( 484 f"Tried to get a table name for a snapshot that does not have a table. {self.name}" 485 ) 486 # We want to exclude the catalog from the name but still include catalog when determining the fqn 487 # for the table. 488 if self.base_table_name_override: 489 base_table_name = self.base_table_name_override 490 else: 491 fqt = self.fully_qualified_table.copy() 492 fqt.set("catalog", None) 493 base_table_name = fqt.sql() 494 495 return table_name( 496 self.physical_schema, 497 base_table_name, 498 version, 499 catalog=self.fully_qualified_table.catalog, 500 suffix=self.dev_table_suffix if is_dev_table else None, 501 naming_convention=self.table_naming_convention, 502 ) 503 504 @property 505 def node_type(self) -> NodeType: 506 raise NotImplementedError 507 508 @property 509 def is_model(self) -> bool: 510 return self.node_type == NodeType.MODEL 511 512 @property 513 def is_audit(self) -> bool: 514 return self.node_type == NodeType.AUDIT 515 516 517class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True): 518 name: str 519 fingerprint: SnapshotFingerprint 520 version: str 521 dev_version_: t.Optional[str] = Field(default=None, alias="dev_version") 522 physical_schema_: str = Field(alias="physical_schema") 523 parents: t.Tuple[SnapshotId, ...] 524 previous_versions: t.Tuple[SnapshotDataVersion, ...] = () 525 change_category: t.Optional[SnapshotChangeCategory] = None 526 kind_name: t.Optional[ModelKindName] = None 527 node_type_: NodeType = Field(default=NodeType.MODEL, alias="node_type") 528 # Added to support Migration # 34 (default catalog) 529 # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though) 530 base_table_name_override: t.Optional[str] = None 531 custom_materialization: t.Optional[str] = None 532 dev_table_suffix: str 533 model_gateway: t.Optional[str] = None 534 forward_only: bool = False 535 table_naming_convention: TableNamingConvention = TableNamingConvention.default 536 virtual_environment_mode_: VirtualEnvironmentMode = Field( 537 default=VirtualEnvironmentMode.default, alias="virtual_environment_mode" 538 ) 539 540 def __lt__(self, other: SnapshotTableInfo) -> bool: 541 return self.name < other.name 542 543 def __eq__(self, other: t.Any) -> bool: 544 return isinstance(other, SnapshotTableInfo) and self.fingerprint == other.fingerprint 545 546 def __hash__(self) -> int: 547 return hash((self.__class__, self.name, self.fingerprint)) 548 549 def table_name(self, is_deployable: bool = True) -> str: 550 """Full table name pointing to the materialized location of the snapshot. 551 552 Args: 553 is_deployable: Indicates whether to return the table name for deployment to production. 554 """ 555 return self._table_name(self.version, is_deployable) 556 557 @property 558 def physical_schema(self) -> str: 559 return self.physical_schema_ 560 561 @cached_property 562 def fully_qualified_table(self) -> exp.Table: 563 return exp.to_table(self.name) 564 565 @property 566 def table_info(self) -> SnapshotTableInfo: 567 """Helper method to return self.""" 568 return self 569 570 @property 571 def virtual_environment_mode(self) -> VirtualEnvironmentMode: 572 return self.virtual_environment_mode_ 573 574 @property 575 def data_version(self) -> SnapshotDataVersion: 576 return SnapshotDataVersion( 577 fingerprint=self.fingerprint, 578 version=self.version, 579 dev_version=self.dev_version, 580 change_category=self.change_category, 581 physical_schema=self.physical_schema, 582 dev_table_suffix=self.dev_table_suffix, 583 table_naming_convention=self.table_naming_convention, 584 virtual_environment_mode=self.virtual_environment_mode, 585 ) 586 587 @property 588 def is_new_version(self) -> bool: 589 """Returns whether or not this version is new and requires a backfill.""" 590 return self.fingerprint.to_version() == self.version 591 592 @property 593 def model_kind_name(self) -> t.Optional[ModelKindName]: 594 return self.kind_name 595 596 @property 597 def node_type(self) -> NodeType: 598 return self.node_type_ 599 600 @property 601 def name_version(self) -> SnapshotNameVersion: 602 """Returns the name and version of the snapshot.""" 603 return SnapshotNameVersion(name=self.name, version=self.version) 604 605 @property 606 def id_and_version(self) -> SnapshotIdAndVersion: 607 return SnapshotIdAndVersion( 608 name=self.name, 609 kind_name=self.kind_name, 610 identifier=self.identifier, 611 version=self.version, 612 dev_version=self.dev_version, 613 fingerprint=self.fingerprint, 614 ) 615 616 617class SnapshotIdAndVersion(PydanticModel, ModelKindMixin): 618 """A stripped down version of a snapshot that is used in situations where we want to fetch the main fields of the snapshots table 619 without the overhead of parsing the full snapshot payload and fetching intervals. 620 """ 621 622 name: str 623 version: str 624 kind_name_: t.Optional[ModelKindName] = Field(default=None, alias="kind_name") 625 dev_version_: t.Optional[str] = Field(alias="dev_version") 626 identifier: str 627 fingerprint_: t.Union[str, SnapshotFingerprint] = Field(alias="fingerprint") 628 629 @property 630 def snapshot_id(self) -> SnapshotId: 631 return SnapshotId(name=self.name, identifier=self.identifier) 632 633 @property 634 def id_and_version(self) -> SnapshotIdAndVersion: 635 return self 636 637 @property 638 def name_version(self) -> SnapshotNameVersion: 639 return SnapshotNameVersion(name=self.name, version=self.version) 640 641 @property 642 def fingerprint(self) -> SnapshotFingerprint: 643 value = self.fingerprint_ 644 if isinstance(value, str): 645 self.fingerprint_ = value = SnapshotFingerprint.parse_raw(value) 646 return value 647 648 @property 649 def dev_version(self) -> str: 650 return self.dev_version_ or self.fingerprint.to_version() 651 652 @property 653 def model_kind_name(self) -> t.Optional[ModelKindName]: 654 return self.kind_name_ 655 656 def display_name( 657 self, 658 environment_naming_info: EnvironmentNamingInfo, 659 default_catalog: t.Optional[str], 660 dialect: DialectType = None, 661 ) -> str: 662 return model_display_name( 663 self.name, environment_naming_info, default_catalog, dialect=dialect 664 ) 665 666 667class Snapshot(PydanticModel, SnapshotInfoMixin): 668 """A snapshot represents a node at a certain point in time. 669 670 Snapshots are used to encapsulate everything needed to evaluate a node. 671 They are standalone objects that hold all state and dynamic content necessary 672 to render a node's query including things like macros. Snapshots also store intervals 673 (timestamp ranges for what data we've processed). 674 675 Nodes can be dynamically rendered due to macros. Rendering a node to its full extent 676 requires storing variables and macro definitions. We store all of the macro definitions and 677 global variable references in `python_env` in raw text to avoid pickling. The helper methods 678 to achieve this are defined in utils.metaprogramming. 679 680 Args: 681 name: The snapshot name which is the same as the node name and should be unique per node. 682 fingerprint: A unique hash of the node definition so that nodes can be reused across environments. 683 node: Node object that the snapshot encapsulates. 684 parents: The list of parent snapshots (upstream dependencies). 685 intervals: List of [start, end) intervals showing which time ranges a snapshot has data for. 686 dev_intervals: List of [start, end) intervals showing development intervals (forward-only). 687 created_ts: Epoch millis timestamp when a snapshot was first created. 688 updated_ts: Epoch millis timestamp when a snapshot was last updated. 689 ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced 690 in any environment. 691 previous: The snapshot data version that this snapshot was based on. If this snapshot is new, then previous will be None. 692 version: User specified version for a snapshot that is used for physical storage. 693 By default, the version is the fingerprint, but not all changes to nodes require a backfill. 694 If a user passes a previous version, that will be used instead and no backfill will be required. 695 change_category: User specified change category indicating which nodes require backfill from node changes made in this snapshot. 696 unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that 697 this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused. 698 effective_from: The timestamp which indicates when this snapshot should be considered effective. 699 Applicable for forward-only snapshots only. 700 migrated: Whether or not this snapshot has been created as a result of migration. 701 unrestorable: Whether or not this snapshot can be used to revert its model to a previous version. 702 next_auto_restatement_ts: The timestamp which indicates when is the next time this snapshot should be restated. 703 table_naming_convention: Convention to follow when generating the physical table name 704 """ 705 706 name: str 707 fingerprint: SnapshotFingerprint 708 physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema") 709 node: Node 710 parents: t.Tuple[SnapshotId, ...] 711 intervals: Intervals = [] 712 dev_intervals: Intervals = [] 713 pending_restatement_intervals: Intervals = [] 714 created_ts: int 715 updated_ts: int 716 ttl: str 717 previous_versions: t.Tuple[SnapshotDataVersion, ...] = () 718 version: t.Optional[str] = None 719 dev_version_: t.Optional[str] = Field(default=None, alias="dev_version") 720 change_category: t.Optional[SnapshotChangeCategory] = None 721 unpaused_ts: t.Optional[int] = None 722 effective_from: t.Optional[TimeLike] = None 723 migrated: bool = False 724 unrestorable: bool = False 725 # Added to support Migration # 34 (default catalog) 726 base_table_name_override: t.Optional[str] = None 727 next_auto_restatement_ts: t.Optional[int] = None 728 dev_table_suffix: str = "dev" 729 table_naming_convention: TableNamingConvention = TableNamingConvention.default 730 forward_only: bool = False 731 # Physical table last modified timestamp, not to be confused with the "updated_ts" field 732 # which is for the snapshot record itself 733 last_altered_ts: t.Optional[int] = None 734 dev_last_altered_ts: t.Optional[int] = None 735 736 @field_validator("ttl") 737 @classmethod 738 def _time_delta_must_be_positive(cls, v: str) -> str: 739 current_time = now() 740 if to_datetime(v, current_time) < current_time: 741 raise ValueError( 742 "Must be positive. Use the 'in' keyword to denote a positive time interval. For example, 'in 7 days'." 743 ) 744 return v 745 746 @staticmethod 747 def hydrate_with_intervals_by_version( 748 snapshots: t.Iterable[Snapshot], 749 intervals: t.Iterable[SnapshotIntervals], 750 ) -> t.List[Snapshot]: 751 """Hydrates target snapshots with given intervals. 752 753 This will match snapshots with intervals by name and version rather than identifiers. 754 755 Args: 756 snapshots: Target snapshots. 757 intervals: Target snapshot intervals. 758 759 Returns: 760 List of target snapshots with hydrated intervals. 761 """ 762 intervals_by_name_version = defaultdict(list) 763 for interval in intervals: 764 intervals_by_name_version[(interval.name, interval.version)].append(interval) 765 766 result = [] 767 for snapshot in snapshots: 768 snapshot_intervals = intervals_by_name_version.get( 769 (snapshot.name, snapshot.version_get_or_generate()), [] 770 ) 771 for interval in snapshot_intervals: 772 snapshot.merge_intervals(interval) 773 774 result.append(snapshot) 775 776 return result 777 778 @classmethod 779 def from_node( 780 cls, 781 node: Node, 782 *, 783 nodes: t.Dict[str, Node], 784 ttl: str = c.DEFAULT_SNAPSHOT_TTL, 785 version: t.Optional[str] = None, 786 cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None, 787 table_naming_convention: TableNamingConvention = TableNamingConvention.default, 788 ) -> Snapshot: 789 """Creates a new snapshot for a node. 790 791 Args: 792 Node: Node to snapshot. 793 nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. 794 If no dictionary is passed in the fingerprint will not be dependent on a node's parents. 795 ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live. 796 version: The version that a snapshot is associated with. Usually set during the planning phase. 797 cache: Cache of node name to fingerprints. 798 table_naming_convention: Convention to follow when generating the physical table name 799 800 Returns: 801 The newly created snapshot. 802 """ 803 created_ts = now_timestamp() 804 805 return cls( 806 name=node.fqn, 807 fingerprint=fingerprint_from_node( 808 node, 809 nodes=nodes, 810 cache=cache, 811 ), 812 node=node, 813 parents=tuple( 814 SnapshotId( 815 name=parent_node.fqn, 816 identifier=fingerprint_from_node( 817 parent_node, 818 nodes=nodes, 819 cache=cache, 820 ).to_identifier(), 821 ) 822 for parent_node in _parents_from_node(node, nodes).values() 823 ), 824 intervals=[], 825 dev_intervals=[], 826 created_ts=created_ts, 827 updated_ts=created_ts, 828 ttl=ttl, 829 version=version, 830 table_naming_convention=table_naming_convention, 831 ) 832 833 def __eq__(self, other: t.Any) -> bool: 834 return isinstance(other, Snapshot) and self.fingerprint == other.fingerprint 835 836 def __hash__(self) -> int: 837 return hash((self.__class__, self.name, self.fingerprint)) 838 839 def __lt__(self, other: Snapshot) -> bool: 840 return self.name < other.name 841 842 def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) -> None: 843 """Add a newly processed time interval to the snapshot. 844 845 The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch 846 timestamp exclusive. This allows merging of ranges to be easier. 847 848 Args: 849 start: The start date/time of the interval (inclusive) 850 end: The end date/time of the interval. If end is a date, then it is considered inclusive. 851 If it is a datetime object, then it is exclusive. 852 is_dev: Indicates whether the given interval is being added while in development mode. 853 """ 854 if to_timestamp(start) > to_timestamp(end): 855 raise ValueError( 856 f"Attempted to add an Invalid interval ({start}, {end}) to snapshot {self.snapshot_id}" 857 ) 858 859 start_ts, end_ts = self.inclusive_exclusive(start, end, strict=False, expand=False) 860 861 if start_ts >= end_ts: 862 # Skipping partial interval. 863 return 864 865 intervals = self.dev_intervals if is_dev else self.intervals 866 intervals.append((start_ts, end_ts)) 867 868 if len(intervals) < 2: 869 return 870 871 merged_intervals = merge_intervals(intervals) 872 if is_dev: 873 self.dev_intervals = merged_intervals 874 else: 875 self.intervals = merged_intervals 876 877 def remove_interval(self, interval: Interval) -> None: 878 """Remove an interval from the snapshot. 879 880 Args: 881 interval: The interval to remove. 882 """ 883 self.intervals = remove_interval(self.intervals, *interval) 884 self.dev_intervals = remove_interval(self.dev_intervals, *interval) 885 886 def get_removal_interval( 887 self, 888 start: TimeLike, 889 end: TimeLike, 890 execution_time: t.Optional[TimeLike] = None, 891 *, 892 strict: bool = True, 893 is_preview: bool = False, 894 ) -> Interval: 895 """Get the interval that should be removed from the snapshot. 896 897 Args: 898 start: The start date/time of the interval to remove. 899 end: The end date/time of the interval to removed. 900 execution_time: The time the interval is being removed. 901 strict: Whether to fail when the inclusive start is the same as the exclusive end. 902 is_preview: Whether the interval needs to be removed for a preview of forward-only changes. 903 When previewing, we are not actually restating a model, but removing an interval to trigger 904 a run. 905 """ 906 end = execution_time or now_timestamp() if self.depends_on_past else end 907 removal_interval = self.inclusive_exclusive(start, end, strict) 908 909 if not is_preview and self.full_history_restatement_only and self.intervals: 910 expanded_removal_interval = self.inclusive_exclusive(self.intervals[0][0], end, strict) 911 requested_start, requested_end = removal_interval 912 expanded_start, expanded_end = expanded_removal_interval 913 914 # only warn if the requested removal interval was a subset of the actual model intervals and was automatically expanded 915 # if the requested interval was the same or wider than the actual model intervals, no need to warn 916 if ( 917 requested_start > expanded_start or requested_end < expanded_end 918 ) and self.is_incremental: 919 from sqlmesh.core.console import get_console 920 921 get_console().log_warning( 922 f"Model '{self.model.name}' is '{self.model_kind_name}' which does not support partial restatement.\n" 923 f"Expanding the requested restatement intervals from [{to_ts(requested_start)} - {to_ts(requested_end)}] " 924 f"to [{to_ts(expanded_start)} - {to_ts(expanded_end)}] in order to fully restate the model." 925 ) 926 927 removal_interval = expanded_removal_interval 928 929 return removal_interval 930 931 @property 932 def allow_partials(self) -> bool: 933 return self.is_model and self.model.allow_partials 934 935 def inclusive_exclusive( 936 self, 937 start: TimeLike, 938 end: TimeLike, 939 strict: bool = True, 940 allow_partial: t.Optional[bool] = None, 941 expand: bool = True, 942 ) -> Interval: 943 """Transform the inclusive start and end into a [start, end) pair. 944 945 Args: 946 start: The start date/time of the interval (inclusive) 947 end: The end date/time of the interval (inclusive) 948 strict: Whether to fail when the inclusive start is the same as the exclusive end. 949 allow_partial: Whether the interval can be partial or not. 950 expand: Whether or not partial intervals are expanded outwards. 951 952 Returns: 953 A [start, end) pair. 954 """ 955 return inclusive_exclusive( 956 start, 957 end, 958 self.node.interval_unit, 959 strict=strict, 960 allow_partial=self.allow_partials if allow_partial is None else allow_partial, 961 expand=expand, 962 ) 963 964 def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: 965 """Inherits intervals from the target snapshot. 966 967 Args: 968 other: The target snapshot to inherit intervals from. 969 """ 970 effective_from_ts = self.normalized_effective_from_ts or 0 971 apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier 972 for start, end in other.intervals: 973 # If the effective_from is set, then intervals that come after it must come from 974 # the current snapshots. 975 if apply_effective_from and start < effective_from_ts: 976 end = min(end, effective_from_ts) 977 if not apply_effective_from or end <= effective_from_ts: 978 self.add_interval(start, end) 979 980 if other.last_altered_ts: 981 self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts) 982 983 if self.dev_version == other.dev_version: 984 # Merge dev intervals if the dev versions match which would mean 985 # that this and the other snapshot are pointing to the same dev table. 986 for start, end in other.dev_intervals: 987 self.add_interval(start, end, is_dev=True) 988 989 if other.dev_last_altered_ts: 990 self.dev_last_altered_ts = max( 991 self.dev_last_altered_ts or 0, other.dev_last_altered_ts 992 ) 993 994 self.pending_restatement_intervals = merge_intervals( 995 [*self.pending_restatement_intervals, *other.pending_restatement_intervals] 996 ) 997 998 @property 999 def evaluatable(self) -> bool: 1000 """Whether or not a snapshot should be evaluated and have intervals.""" 1001 return bool(not self.is_symbolic or self.model.audits) 1002 1003 def missing_intervals( 1004 self, 1005 start: TimeLike, 1006 end: TimeLike, 1007 execution_time: t.Optional[TimeLike] = None, 1008 deployability_index: t.Optional[DeployabilityIndex] = None, 1009 ignore_cron: bool = False, 1010 end_bounded: bool = False, 1011 ) -> Intervals: 1012 """Find all missing intervals between [start, end]. 1013 1014 Although the inputs are inclusive, the returned stored intervals are 1015 [start_ts, end_ts) or start epoch timestamp inclusive and end epoch 1016 timestamp exclusive. 1017 1018 Args: 1019 start: The start date/time of the interval (inclusive) 1020 end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise) 1021 execution_time: The date/time time reference to use for execution time. Defaults to now. 1022 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1023 ignore_cron: Whether to ignore the node's cron schedule. 1024 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1025 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1026 1027 Returns: 1028 A list of all the missing intervals as epoch timestamps. 1029 """ 1030 # If the node says that it has an end, and we are wanting to load past it, then we can return no empty intervals 1031 # Also if a node's start is after the end of the range we are checking then we can return no empty intervals 1032 if (self.node.end and to_datetime(start) > to_datetime(self.node.end)) or ( 1033 self.node.start and to_datetime(self.node.start) > to_datetime(end) 1034 ): 1035 return [] 1036 if self.node.start and to_datetime(start) < to_datetime(self.node.start): 1037 start = self.node.start 1038 # If the amount of time being checked is less than the size of a single interval then we 1039 # know that there can't being missing intervals within that range and return 1040 validate_date_range(start, end) 1041 1042 if ( 1043 not is_date(end) 1044 and not self.allow_partials 1045 and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds 1046 ): 1047 return [] 1048 1049 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 1050 intervals = ( 1051 self.intervals if deployability_index.is_representative(self) else self.dev_intervals 1052 ) 1053 1054 if not self.evaluatable or (self.is_seed and intervals): 1055 return [] 1056 1057 start_ts, end_ts = (to_timestamp(ts) for ts in self.inclusive_exclusive(start, end)) 1058 1059 interval_unit = self.node.interval_unit 1060 execution_time_ts = to_timestamp(execution_time) if execution_time else now_timestamp() 1061 upper_bound_ts = ( 1062 execution_time_ts 1063 if ignore_cron 1064 else to_timestamp(self.node.cron_floor(execution_time_ts)) 1065 ) 1066 if end_bounded: 1067 upper_bound_ts = min(upper_bound_ts, end_ts) 1068 if not self.allow_partials: 1069 upper_bound_ts = to_timestamp(interval_unit.cron_floor(upper_bound_ts)) 1070 1071 end_ts = min(end_ts, upper_bound_ts) 1072 1073 lookback = 0 1074 model_end_ts: t.Optional[int] = None 1075 1076 if self.is_model: 1077 lookback = self.model.lookback 1078 model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None 1079 1080 return compute_missing_intervals( 1081 interval_unit, 1082 tuple(intervals), 1083 start_ts, 1084 end_ts, 1085 lookback, 1086 model_end_ts, 1087 ) 1088 1089 def check_ready_intervals( 1090 self, 1091 intervals: Intervals, 1092 context: ExecutionContext, 1093 ) -> Intervals: 1094 """Returns a list of intervals that are considered ready by the provided signal. 1095 1096 Note that this will handle gaps in the provided intervals. The returned intervals 1097 may introduce new gaps. 1098 """ 1099 signals = self.is_model and self.model.render_signal_calls() 1100 if not signals: 1101 return intervals 1102 1103 for signal_name, kwargs in signals.signals_to_kwargs.items(): 1104 try: 1105 intervals = check_ready_intervals( 1106 signals.prepared_python_env[signal_name], 1107 intervals, 1108 context, 1109 python_env=signals.python_env, 1110 dialect=self.model.dialect, 1111 path=self.model._path, 1112 snapshot=self, 1113 kwargs=kwargs, 1114 ) 1115 except SQLMeshError as e: 1116 raise SignalEvalError( 1117 f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}" 1118 ) 1119 return intervals 1120 1121 def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None: 1122 """Assigns the given category to this snapshot. 1123 1124 Args: 1125 category: The change category to assign to this snapshot. 1126 forward_only: Whether or not this snapshot is applied going forward in production. 1127 """ 1128 assert category != SnapshotChangeCategory.FORWARD_ONLY, ( 1129 "FORWARD_ONLY change category is deprecated" 1130 ) 1131 1132 self.dev_version_ = self.fingerprint.to_version() 1133 is_no_rebuild = forward_only or category in ( 1134 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 1135 SnapshotChangeCategory.METADATA, 1136 ) 1137 if self.is_model and not self.virtual_environment_mode.is_full: 1138 # Hardcode the version if the virtual environment is not fully enabled. 1139 self.version = "novde" 1140 elif self.is_model and self.model.physical_version: 1141 # If the model has a pinned version then use that. 1142 self.version = self.model.physical_version 1143 elif is_no_rebuild and self.previous_version: 1144 self.version = self.previous_version.data_version.version 1145 elif self.is_model and self.model.forward_only and not self.previous_version: 1146 # If this is a new model then use a deterministic version, independent of the fingerprint. 1147 self.version = hash_data([self.name, *self.model.kind.data_hash_values]) 1148 else: 1149 self.version = self.fingerprint.to_version() 1150 1151 if is_no_rebuild and self.previous_version: 1152 previous_version = self.previous_version 1153 self.physical_schema_ = previous_version.physical_schema 1154 self.table_naming_convention = previous_version.table_naming_convention 1155 if self.is_materialized and (category.is_indirect_non_breaking or category.is_metadata): 1156 # Reuse the dev table for indirect non-breaking changes. 1157 self.dev_version_ = ( 1158 previous_version.data_version.dev_version 1159 or previous_version.fingerprint.to_version() 1160 ) 1161 self.dev_table_suffix = previous_version.data_version.dev_table_suffix 1162 1163 self.change_category = category 1164 self.forward_only = forward_only 1165 1166 @property 1167 def categorized(self) -> bool: 1168 """Whether the snapshot has been categorized.""" 1169 return self.change_category is not None and self.version is not None 1170 1171 def table_name(self, is_deployable: bool = True) -> str: 1172 """Full table name pointing to the materialized location of the snapshot. 1173 1174 Args: 1175 is_deployable: Indicates whether to return the table name for deployment to production. 1176 """ 1177 self._ensure_categorized() 1178 assert self.version 1179 return self._table_name(self.version, is_deployable) 1180 1181 def version_get_or_generate(self) -> str: 1182 """Helper method to get the version or generate it from the fingerprint.""" 1183 return self.version or self.fingerprint.to_version() 1184 1185 def is_valid_start( 1186 self, 1187 start: t.Optional[TimeLike], 1188 snapshot_start: TimeLike, 1189 execution_time: t.Optional[TimeLike] = None, 1190 ) -> bool: 1191 """Checks if the given start and end are valid for this snapshot. 1192 Args: 1193 start: The start date/time of the interval (inclusive) 1194 snapshot_start: The start date/time of the snapshot (inclusive) 1195 """ 1196 # The snapshot may not have a start defined. If so we use the provided snapshot start. 1197 if self.depends_on_past and start: 1198 interval_unit = self.node.interval_unit 1199 start_ts = to_timestamp(interval_unit.cron_floor(start)) 1200 1201 if not self.intervals: 1202 # The start date must be aligned by the interval unit. 1203 snapshot_start_ts = to_timestamp(interval_unit.cron_floor(snapshot_start)) 1204 if snapshot_start_ts < to_timestamp(snapshot_start): 1205 snapshot_start_ts = to_timestamp(interval_unit.cron_next(snapshot_start_ts)) 1206 return snapshot_start_ts >= start_ts 1207 # Make sure that if there are missing intervals for this snapshot that they all occur at or after the 1208 # provided start_ts. Otherwise we know that we are doing a non-contiguous load and therefore this is not 1209 # a valid start. 1210 missing_intervals = self.missing_intervals( 1211 snapshot_start, now(), execution_time=execution_time 1212 ) 1213 earliest_interval = missing_intervals[0][0] if missing_intervals else None 1214 if earliest_interval: 1215 return earliest_interval >= start_ts 1216 return True 1217 1218 def get_latest(self, default: t.Optional[TimeLike] = None) -> t.Optional[TimeLike]: 1219 """The latest interval loaded for the snapshot. Default is used if intervals are not defined""" 1220 1221 def to_end_date(end: int, unit: IntervalUnit) -> TimeLike: 1222 if unit.is_day: 1223 return to_date(make_inclusive_end(end)) 1224 return end 1225 1226 return ( 1227 to_end_date(to_timestamp(self.intervals[-1][1]), self.node.interval_unit) 1228 if self.intervals 1229 else default 1230 ) 1231 1232 def needs_destructive_check( 1233 self, 1234 allow_destructive_snapshots: t.Set[str], 1235 ) -> bool: 1236 return ( 1237 self.is_model 1238 and not self.model.on_destructive_change.is_allow 1239 and self.name not in allow_destructive_snapshots 1240 ) 1241 1242 def needs_additive_check( 1243 self, 1244 allow_additive_snapshots: t.Set[str], 1245 ) -> bool: 1246 return ( 1247 self.is_model 1248 and not self.model.on_additive_change.is_allow 1249 and self.name not in allow_additive_snapshots 1250 ) 1251 1252 def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]: 1253 """Returns the next auto restatement interval for the snapshot. 1254 1255 Args: 1256 execution_time: The execution time to use for the restatement. 1257 1258 Returns: 1259 The interval that needs to be restated or None if no restatement is needed. 1260 """ 1261 if ( 1262 not self.is_model 1263 or not self.intervals 1264 or not self.model.auto_restatement_cron 1265 or self.model.disable_restatement 1266 ): 1267 return None 1268 1269 execution_time_ts = to_timestamp(execution_time) 1270 next_auto_restatement_ts = self.next_auto_restatement_ts or to_timestamp( 1271 self.model.auto_restatement_croniter(self.created_ts).get_next(estimate=False) 1272 ) 1273 if execution_time_ts < next_auto_restatement_ts: 1274 return None 1275 1276 num_intervals_to_restate = self.model.auto_restatement_intervals 1277 if num_intervals_to_restate is None: 1278 return (self.intervals[0][0], self.intervals[-1][1]) 1279 1280 auto_restatement_end_ts = to_timestamp( 1281 self.node.interval_unit.cron_floor(execution_time_ts) 1282 ) 1283 auto_restatement_start_ts = ( 1284 auto_restatement_end_ts 1285 - num_intervals_to_restate * self.node.interval_unit.milliseconds 1286 ) 1287 return (auto_restatement_start_ts, auto_restatement_end_ts) 1288 1289 def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optional[int]: 1290 """Updates the next auto restatement timestamp. 1291 1292 Args: 1293 execution_time: The execution time to use for the restatement. 1294 1295 Returns: 1296 The next auto restatement timestamp or None if not applicable. 1297 """ 1298 if ( 1299 not self.is_model 1300 or not self.model.auto_restatement_cron 1301 or self.model.disable_restatement 1302 ): 1303 self.next_auto_restatement_ts = None 1304 else: 1305 self.next_auto_restatement_ts = to_timestamp( 1306 self.model.auto_restatement_croniter(execution_time).get_next(estimate=False) 1307 ) 1308 return self.next_auto_restatement_ts 1309 1310 def apply_pending_restatement_intervals(self) -> None: 1311 """Applies the pending restatement intervals to the snapshot's intervals.""" 1312 if not self.is_model or self.model.disable_restatement: 1313 return 1314 for pending_restatement_interval in self.pending_restatement_intervals: 1315 logger.info( 1316 "Applying the auto restated interval (%s, %s) to snapshot %s", 1317 time_like_to_str(pending_restatement_interval[0]), 1318 time_like_to_str(pending_restatement_interval[1]), 1319 self.snapshot_id, 1320 ) 1321 self.intervals = remove_interval(self.intervals, *pending_restatement_interval) 1322 1323 def is_directly_modified(self, other: Snapshot) -> bool: 1324 """Returns whether or not this snapshot is directly modified in relation to the other snapshot.""" 1325 return self.node.is_data_change(other.node) 1326 1327 def is_indirectly_modified(self, other: Snapshot) -> bool: 1328 """Returns whether or not this snapshot is indirectly modified in relation to the other snapshot.""" 1329 return ( 1330 self.fingerprint.parent_data_hash != other.fingerprint.parent_data_hash 1331 and not self.node.is_data_change(other.node) 1332 ) 1333 1334 def is_metadata_updated(self, other: Snapshot) -> bool: 1335 """Returns whether or not this snapshot contains metadata changes in relation to the other snapshot.""" 1336 return self.fingerprint.metadata_hash != other.fingerprint.metadata_hash 1337 1338 @property 1339 def physical_schema(self) -> str: 1340 if self.physical_schema_ is not None: 1341 return self.physical_schema_ 1342 return self.model.physical_schema if self.is_model else "" 1343 1344 @property 1345 def table_info(self) -> SnapshotTableInfo: 1346 """Helper method to get the SnapshotTableInfo from the Snapshot.""" 1347 self._ensure_categorized() 1348 1349 custom_materialization = ( 1350 self.model.kind.materialization 1351 if self.is_model and isinstance(self.model.kind, CustomKind) 1352 else None 1353 ) 1354 1355 return SnapshotTableInfo( 1356 physical_schema=self.physical_schema, 1357 name=self.name, 1358 fingerprint=self.fingerprint, 1359 version=self.version, 1360 dev_version=self.dev_version, 1361 parents=self.parents, 1362 previous_versions=self.previous_versions, 1363 change_category=self.change_category, 1364 kind_name=self.model_kind_name, 1365 node_type=self.node_type, 1366 custom_materialization=custom_materialization, 1367 dev_table_suffix=self.dev_table_suffix, 1368 model_gateway=self.model_gateway, 1369 table_naming_convention=self.table_naming_convention, # type: ignore 1370 forward_only=self.forward_only, 1371 virtual_environment_mode=self.virtual_environment_mode, 1372 ) 1373 1374 @property 1375 def data_version(self) -> SnapshotDataVersion: 1376 self._ensure_categorized() 1377 return SnapshotDataVersion( 1378 fingerprint=self.fingerprint, 1379 version=self.version, 1380 dev_version=self.dev_version, 1381 change_category=self.change_category, 1382 physical_schema=self.physical_schema, 1383 dev_table_suffix=self.dev_table_suffix, 1384 table_naming_convention=self.table_naming_convention, 1385 virtual_environment_mode=self.virtual_environment_mode, 1386 ) 1387 1388 @property 1389 def snapshot_intervals(self) -> SnapshotIntervals: 1390 self._ensure_categorized() 1391 return SnapshotIntervals( 1392 name=self.name, 1393 identifier=self.identifier, 1394 version=self.version, 1395 dev_version=self.dev_version, 1396 intervals=self.intervals.copy(), 1397 dev_intervals=self.dev_intervals.copy(), 1398 pending_restatement_intervals=self.pending_restatement_intervals.copy(), 1399 ) 1400 1401 @property 1402 def is_materialized_view(self) -> bool: 1403 """Returns whether or not this snapshot's model represents a materialized view.""" 1404 return ( 1405 self.is_model and isinstance(self.model.kind, ViewKind) and self.model.kind.materialized 1406 ) 1407 1408 @property 1409 def is_new_version(self) -> bool: 1410 """Returns whether or not this version is new and requires a backfill.""" 1411 self._ensure_categorized() 1412 return self.fingerprint.to_version() == self.version 1413 1414 @property 1415 def is_paused(self) -> bool: 1416 return self.unpaused_ts is None 1417 1418 @property 1419 def is_paused_forward_only(self) -> bool: 1420 return self.is_paused and self.is_forward_only 1421 1422 @property 1423 def normalized_effective_from_ts(self) -> t.Optional[int]: 1424 return ( 1425 to_timestamp(self.node.interval_unit.cron_floor(self.effective_from)) 1426 if self.effective_from 1427 else None 1428 ) 1429 1430 @property 1431 def model_kind_name(self) -> t.Optional[ModelKindName]: 1432 return self.model.kind.name if self.is_model else None 1433 1434 @property 1435 def node_type(self) -> NodeType: 1436 if self.node.is_model: 1437 return NodeType.MODEL 1438 if self.node.is_audit: 1439 return NodeType.AUDIT 1440 raise SQLMeshError(f"Snapshot {self.snapshot_id} has an unknown node type.") 1441 1442 @property 1443 def model(self) -> Model: 1444 model = self.model_or_none 1445 if model: 1446 return model 1447 raise SQLMeshError(f"Snapshot {self.snapshot_id} is not a model snapshot.") 1448 1449 @property 1450 def model_or_none(self) -> t.Optional[Model]: 1451 if self.is_model: 1452 return t.cast(Model, self.node) 1453 return None 1454 1455 @property 1456 def model_gateway(self) -> t.Optional[str]: 1457 return self.model.gateway if self.is_model else None 1458 1459 @property 1460 def audit(self) -> StandaloneAudit: 1461 if self.is_audit: 1462 return t.cast(StandaloneAudit, self.node) 1463 raise SQLMeshError(f"Snapshot {self.snapshot_id} is not an audit snapshot.") 1464 1465 @property 1466 def depends_on_past(self) -> bool: 1467 """Whether or not this models depends on past intervals to be populated before loading following intervals. 1468 1469 This represents a superset of the following types of models: 1470 1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially. 1471 An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself. 1472 2. Models that can only be restated from the beginning of history *and* their interval batches must be processed sequentially. 1473 """ 1474 return self.depends_on_self or self.full_history_restatement_only 1475 1476 @property 1477 def depends_on_self(self) -> bool: 1478 """Whether or not this models depends on self.""" 1479 return self.is_model and self.model.depends_on_self 1480 1481 @property 1482 def name_version(self) -> SnapshotNameVersion: 1483 """Returns the name and version of the snapshot.""" 1484 return SnapshotNameVersion(name=self.name, version=self.version) 1485 1486 @property 1487 def id_and_version(self) -> SnapshotIdAndVersion: 1488 return self.table_info.id_and_version 1489 1490 @property 1491 def disable_restatement(self) -> bool: 1492 """Is restatement disabled for the node""" 1493 return self.is_model and self.model.disable_restatement 1494 1495 @cached_property 1496 def fully_qualified_table(self) -> t.Optional[exp.Table]: 1497 if not self.is_model: 1498 return None 1499 return t.cast(Model, self.node).fully_qualified_table 1500 1501 @property 1502 def expiration_ts(self) -> int: 1503 return to_timestamp( 1504 self.ttl, 1505 relative_base=to_datetime(self.updated_ts), 1506 check_categorical_relative_expression=False, 1507 ) 1508 1509 @property 1510 def supports_schema_migration_in_prod(self) -> bool: 1511 """Returns whether or not this snapshot supports schema migration when deployed to production.""" 1512 return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed 1513 1514 @property 1515 def requires_schema_migration_in_prod(self) -> bool: 1516 """Returns whether or not this snapshot requires a schema migration when deployed to production.""" 1517 return self.supports_schema_migration_in_prod and ( 1518 (self.previous_version and self.previous_version.version == self.version) 1519 or self.model.forward_only 1520 or bool(self.model.physical_version) 1521 or not self.virtual_environment_mode.is_full 1522 ) 1523 1524 @property 1525 def ttl_ms(self) -> int: 1526 return self.expiration_ts - self.updated_ts 1527 1528 @property 1529 def custom_materialization(self) -> t.Optional[str]: 1530 if self.is_custom: 1531 return t.cast(CustomKind, self.model.kind).materialization 1532 return None 1533 1534 @property 1535 def virtual_environment_mode(self) -> VirtualEnvironmentMode: 1536 return ( 1537 self.model.virtual_environment_mode if self.is_model else VirtualEnvironmentMode.default 1538 ) 1539 1540 def _ensure_categorized(self) -> None: 1541 if not self.change_category: 1542 raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been categorized yet.") 1543 if not self.version: 1544 raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been versioned yet.") 1545 1546 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 1547 state = super().__getstate__() 1548 state["__dict__"] = state["__dict__"].copy() 1549 # Don't store intervals. 1550 state["__dict__"]["intervals"] = [] 1551 state["__dict__"]["dev_intervals"] = [] 1552 return state 1553 1554 1555class SnapshotTableCleanupTask(PydanticModel): 1556 snapshot: SnapshotTableInfo 1557 dev_table_only: bool 1558 1559 1560SnapshotIdLike = t.Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot] 1561SnapshotIdAndVersionLike = t.Union[SnapshotIdAndVersion, SnapshotTableInfo, Snapshot] 1562SnapshotInfoLike = t.Union[SnapshotTableInfo, Snapshot] 1563SnapshotNameVersionLike = t.Union[ 1564 SnapshotNameVersion, SnapshotTableInfo, SnapshotIdAndVersion, Snapshot 1565] 1566 1567 1568class DeployabilityIndex(PydanticModel, frozen=True): 1569 """Contains information about deployability of every snapshot. 1570 1571 Deployability is defined as whether or not the output that a snapshot produces during the 1572 current evaluation can be reused in (deployed to) the production environment. 1573 """ 1574 1575 indexed_ids: t.FrozenSet[str] 1576 is_opposite_index: bool = False 1577 representative_shared_version_ids: t.FrozenSet[str] = frozenset() 1578 1579 @field_validator("indexed_ids", "representative_shared_version_ids", mode="before") 1580 @classmethod 1581 def _snapshot_ids_set_validator(cls, v: t.Any) -> t.Optional[t.FrozenSet[t.Tuple[str, str]]]: 1582 if v is None: 1583 return v 1584 # Transforming into strings because the serialization of sets of objects / lists is broken in Pydantic. 1585 return frozenset( 1586 { 1587 ( 1588 cls._snapshot_id_key(snapshot_id) # type: ignore 1589 if isinstance(snapshot_id, SnapshotId) 1590 else snapshot_id 1591 ) 1592 for snapshot_id in v 1593 } 1594 ) 1595 1596 def is_deployable(self, snapshot: SnapshotIdLike) -> bool: 1597 """Returns true if the output produced by the given snapshot in a development environment can be reused 1598 in (deployed to) production 1599 1600 Args: 1601 snapshot: The snapshot to check. 1602 1603 Returns: 1604 True if the snapshot is deployable, False otherwise. 1605 """ 1606 snapshot_id = self._snapshot_id_key(snapshot.snapshot_id) 1607 return (self.is_opposite_index and snapshot_id not in self.indexed_ids) or ( 1608 not self.is_opposite_index and snapshot_id in self.indexed_ids 1609 ) 1610 1611 def is_representative(self, snapshot: SnapshotIdLike) -> bool: 1612 """Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and 1613 computing missing intervals. 1614 1615 Note, that deployable snapshots are also representative, but the reverse is not always true. 1616 1617 Unlike `is_deployable`, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that 1618 are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider 1619 them as such when constructing a plan, building a physical table mapping or computing missing intervals. 1620 1621 Args: 1622 snapshot: The snapshot to check. 1623 1624 Returns: 1625 True if the snapshot is representative, False otherwise. 1626 """ 1627 snapshot_id = self._snapshot_id_key(snapshot.snapshot_id) 1628 representative = snapshot_id in self.representative_shared_version_ids 1629 return representative or self.is_deployable(snapshot) 1630 1631 def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex: 1632 """Creates a new index with the given snapshot marked as non-deployable.""" 1633 return self._add_snapshot(snapshot, False) 1634 1635 def with_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex: 1636 """Creates a new index with the given snapshot marked as deployable.""" 1637 return self._add_snapshot(snapshot, True) 1638 1639 def _add_snapshot(self, snapshot: SnapshotIdLike, deployable: bool) -> DeployabilityIndex: 1640 snapshot_id = {self._snapshot_id_key(snapshot.snapshot_id)} 1641 indexed_ids = self.indexed_ids 1642 if self.is_opposite_index: 1643 indexed_ids = indexed_ids - snapshot_id if deployable else indexed_ids | snapshot_id 1644 else: 1645 indexed_ids = indexed_ids | snapshot_id if deployable else indexed_ids - snapshot_id 1646 1647 return DeployabilityIndex( 1648 indexed_ids=indexed_ids, 1649 is_opposite_index=self.is_opposite_index, 1650 representative_shared_version_ids=self.representative_shared_version_ids, 1651 ) 1652 1653 @classmethod 1654 def all_deployable(cls) -> DeployabilityIndex: 1655 return cls(indexed_ids=frozenset(), is_opposite_index=True) 1656 1657 @classmethod 1658 def none_deployable(cls) -> DeployabilityIndex: 1659 return cls(indexed_ids=frozenset()) 1660 1661 @classmethod 1662 def create( 1663 cls, 1664 snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot], 1665 start: t.Optional[TimeLike] = None, # plan start 1666 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1667 ) -> DeployabilityIndex: 1668 if not isinstance(snapshots, dict): 1669 snapshots = {s.snapshot_id: s for s in snapshots} 1670 1671 deployability_mapping: t.Dict[SnapshotId, bool] = {} 1672 children_deployability_mapping: t.Dict[SnapshotId, bool] = {} 1673 representative_shared_version_ids: t.Set[SnapshotId] = set() 1674 start_override_per_model = start_override_per_model or {} 1675 1676 start_date_cache: t.Optional[t.Dict[str, datetime]] = {} 1677 1678 dag = snapshots_to_dag(snapshots.values()) 1679 for node in dag: 1680 if node not in snapshots: 1681 continue 1682 snapshot = snapshots[node] 1683 1684 if not snapshot.virtual_environment_mode.is_full: 1685 # If the virtual environment is not fully enabled, then the snapshot can never be deployable 1686 this_deployable = False 1687 else: 1688 # Make sure that the node is deployable according to all its parents 1689 this_deployable = all( 1690 children_deployability_mapping[p_id] 1691 for p_id in snapshots[node].parents 1692 if p_id in children_deployability_mapping 1693 ) 1694 1695 if this_deployable: 1696 is_forward_only_model = ( 1697 snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata 1698 ) 1699 has_auto_restatement = ( 1700 snapshot.is_model and snapshot.model.auto_restatement_cron is not None 1701 ) 1702 1703 snapshot_start = start_override_per_model.get( 1704 node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache) 1705 ) 1706 1707 is_valid_start = ( 1708 snapshot.is_valid_start(start, snapshot_start) if start is not None else True 1709 ) 1710 1711 children_deployable = is_valid_start and not has_auto_restatement 1712 if ( 1713 snapshot.is_forward_only 1714 or snapshot.is_indirect_non_breaking 1715 or is_forward_only_model 1716 or has_auto_restatement 1717 or not is_valid_start 1718 ): 1719 # FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature. 1720 # Similarly, if the model depends on past and the start date is not aligned with the 1721 # model's start, we should consider this snapshot non-deployable. 1722 this_deployable = False 1723 if not snapshot.is_paused or ( 1724 snapshot.is_indirect_non_breaking and snapshot.intervals 1725 ): 1726 # This snapshot represents what's currently deployed in prod. 1727 representative_shared_version_ids.add(node) 1728 else: 1729 # If the parent is not representative then its children can't be deployable. 1730 children_deployable = False 1731 else: 1732 children_deployable = False 1733 if not snapshots[node].is_paused: 1734 representative_shared_version_ids.add(node) 1735 1736 deployability_mapping[node] = this_deployable 1737 children_deployability_mapping[node] = children_deployable 1738 1739 deployable_ids = { 1740 snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable 1741 } 1742 non_deployable_ids = set(snapshots) - deployable_ids 1743 1744 # Pick the smaller set to reduce the size of the serialized object. 1745 if len(deployable_ids) <= len(non_deployable_ids): 1746 return cls( 1747 indexed_ids=deployable_ids, 1748 representative_shared_version_ids=representative_shared_version_ids, 1749 ) 1750 return cls( 1751 indexed_ids=non_deployable_ids, 1752 is_opposite_index=True, 1753 representative_shared_version_ids=representative_shared_version_ids, 1754 ) 1755 1756 @staticmethod 1757 def _snapshot_id_key(snapshot_id: SnapshotId) -> str: 1758 return f"{snapshot_id.name}__{snapshot_id.identifier}" 1759 1760 1761def table_name( 1762 physical_schema: str, 1763 name: str, 1764 version: str, 1765 catalog: t.Optional[str] = None, 1766 suffix: t.Optional[str] = None, 1767 naming_convention: t.Optional[TableNamingConvention] = None, 1768) -> str: 1769 table = exp.to_table(name) 1770 1771 naming_convention = naming_convention or TableNamingConvention.default 1772 1773 if naming_convention == TableNamingConvention.HASH_MD5: 1774 # just take a MD5 hash of what we would have generated anyway using SCHEMA_AND_TABLE 1775 value_to_hash = table_name( 1776 physical_schema=physical_schema, 1777 name=name, 1778 version=version, 1779 catalog=catalog, 1780 suffix=suffix, 1781 naming_convention=TableNamingConvention.SCHEMA_AND_TABLE, 1782 ) 1783 full_name = f"{c.SQLMESH}_md5__{md5(value_to_hash)}" 1784 else: 1785 # note: Snapshot._table_name() already strips the catalog from the model name before calling this function 1786 # Therefore, a model with 3-part naming like "foo.bar.baz" gets passed as (name="bar.baz", catalog="foo") to this function 1787 # This is why there is no TableNamingConvention.CATALOG_AND_SCHEMA_AND_TABLE 1788 table_parts = table.parts 1789 parts_to_consider = 2 if naming_convention == TableNamingConvention.SCHEMA_AND_TABLE else 1 1790 1791 # in case the parsed table name has less parts than what the naming convention says we should be considering 1792 parts_to_consider = min(len(table_parts), parts_to_consider) 1793 1794 # bigquery projects usually have "-" in them which is illegal in the table name, so we aggressively prune 1795 name = "__".join(sanitize_name(part.name) for part in table_parts[-parts_to_consider:]) 1796 1797 full_name = f"{name}__{version}" 1798 1799 suffix = f"__{suffix}" if suffix else "" 1800 1801 table.set("this", exp.to_identifier(f"{full_name}{suffix}")) 1802 table.set("db", exp.to_identifier(physical_schema)) 1803 if not table.catalog and catalog: 1804 table.set("catalog", exp.to_identifier(catalog)) 1805 return exp.table_name(table) 1806 1807 1808def display_name( 1809 snapshot_info_like: t.Union[SnapshotInfoLike, SnapshotInfoMixin], 1810 environment_naming_info: EnvironmentNamingInfo, 1811 default_catalog: t.Optional[str], 1812 dialect: DialectType = None, 1813) -> str: 1814 """ 1815 Returns the model name as a qualified view name. 1816 This is just used for presenting information back to the user and `qualified_view_name` should be used 1817 when wanting a view name in all other cases. 1818 1819 Args: 1820 snapshot_info_like: The snapshot info object to get the display name for 1821 environment_naming_info: Environment naming info to use for display name formatting 1822 default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name. 1823 dialect: Optional dialect type to use for name formatting 1824 1825 Returns: 1826 The formatted display name as a string 1827 """ 1828 if snapshot_info_like.is_audit: 1829 return snapshot_info_like.name 1830 1831 return model_display_name( 1832 snapshot_info_like.name, environment_naming_info, default_catalog, dialect 1833 ) 1834 1835 1836def model_display_name( 1837 node_name: str, 1838 environment_naming_info: EnvironmentNamingInfo, 1839 default_catalog: t.Optional[str], 1840 dialect: DialectType = None, 1841) -> str: 1842 view_name = exp.to_table(node_name) 1843 1844 catalog = ( 1845 None 1846 if ( 1847 environment_naming_info.suffix_target != EnvironmentSuffixTarget.CATALOG 1848 and view_name.catalog == default_catalog 1849 ) 1850 else view_name.catalog 1851 ) 1852 1853 qvn = QualifiedViewName( 1854 catalog=catalog, 1855 schema_name=view_name.db or None, 1856 table=view_name.name, 1857 ) 1858 return qvn.for_environment(environment_naming_info, dialect=dialect) 1859 1860 1861def fingerprint_from_node( 1862 node: Node, 1863 *, 1864 nodes: t.Dict[str, Node], 1865 cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None, 1866) -> SnapshotFingerprint: 1867 """Helper function to generate a fingerprint based on the data and metadata of the node and its parents. 1868 1869 This method tries to remove non-meaningful differences to avoid ever-changing fingerprints. 1870 The fingerprint is made up of two parts split by an underscore -- query_metadata. The query hash is 1871 determined purely by the rendered query and the metadata by everything else. 1872 1873 Args: 1874 node: Node to fingerprint. 1875 nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. 1876 If no dictionary is passed in the fingerprint will not be dependent on a node's parents. 1877 cache: Cache of node name to fingerprints. 1878 1879 Returns: 1880 The fingerprint. 1881 """ 1882 cache = {} if cache is None else cache 1883 1884 if node.fqn not in cache: 1885 parents = [ 1886 fingerprint_from_node(nodes[table], nodes=nodes, cache=cache) 1887 for table in node.depends_on 1888 if table in nodes 1889 ] 1890 1891 parent_data_hash = hash_data(sorted(p.to_version() for p in parents)) 1892 1893 parent_metadata_hash = hash_data( 1894 sorted(h for p in parents for h in (p.metadata_hash, p.parent_metadata_hash)) 1895 ) 1896 1897 cache[node.fqn] = SnapshotFingerprint( 1898 data_hash=node.data_hash, 1899 metadata_hash=node.metadata_hash, 1900 parent_data_hash=parent_data_hash, 1901 parent_metadata_hash=parent_metadata_hash, 1902 ) 1903 1904 return cache[node.fqn] 1905 1906 1907def _parents_from_node( 1908 node: Node, 1909 nodes: t.Dict[str, Node], 1910) -> t.Dict[str, Node]: 1911 parent_nodes = {} 1912 for parent_fqn in node.depends_on: 1913 parent = nodes.get(parent_fqn) 1914 if parent: 1915 parent_nodes[parent.fqn] = parent 1916 if parent.is_model and t.cast(_Model, parent).kind.is_embedded: 1917 parent_nodes.update(_parents_from_node(parent, nodes)) 1918 1919 return parent_nodes 1920 1921 1922def merge_intervals(intervals: t.Collection[Interval]) -> Intervals: 1923 """Merge a list of intervals. 1924 1925 Args: 1926 intervals: A list of intervals to merge together. 1927 1928 Returns: 1929 A new list of sorted and merged intervals. 1930 """ 1931 if not intervals: 1932 return [] 1933 intervals = sorted(intervals) 1934 1935 merged = [intervals[0]] 1936 1937 for interval in intervals[1:]: 1938 current = merged[-1] 1939 1940 if interval[0] <= current[1]: 1941 merged[-1] = (current[0], max(current[1], interval[1])) 1942 else: 1943 merged.append(interval) 1944 1945 return merged 1946 1947 1948def _format_date_time(time_like: TimeLike, unit: t.Optional[IntervalUnit]) -> str: 1949 if unit is None or unit.is_date_granularity: 1950 return to_ds(time_like) 1951 # TODO: Remove `[0:19]` once `to_ts` always returns a timestamp without timezone 1952 return to_ts(time_like)[0:19] 1953 1954 1955def format_intervals(intervals: Intervals, unit: t.Optional[IntervalUnit]) -> str: 1956 inclusive_intervals = [make_inclusive(start, end) for start, end in intervals] 1957 return ", ".join( 1958 " - ".join([_format_date_time(start, unit), _format_date_time(end, unit)]) 1959 for start, end in inclusive_intervals 1960 ) 1961 1962 1963def remove_interval(intervals: Intervals, remove_start: int, remove_end: int) -> Intervals: 1964 """Remove an interval from a list of intervals. Assumes that the correct start and end intervals have been 1965 passed in. Use `get_remove_interval` method of `Snapshot` to get the correct start/end given the snapshot's 1966 information. 1967 1968 Args: 1969 intervals: A list of exclusive intervals. 1970 remove_start: The inclusive start to remove. 1971 remove_end: The exclusive end to remove. 1972 1973 Returns: 1974 A new list of intervals. 1975 """ 1976 modified: Intervals = [] 1977 1978 for start, end in intervals: 1979 if remove_start > start and remove_end < end: 1980 modified.extend( 1981 ( 1982 (start, remove_start), 1983 (remove_end, end), 1984 ) 1985 ) 1986 elif remove_start > start: 1987 modified.append((start, min(remove_start, end))) 1988 elif remove_end < end: 1989 modified.append((max(remove_end, start), end)) 1990 1991 return modified 1992 1993 1994def to_table_mapping( 1995 snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex] 1996) -> t.Dict[str, str]: 1997 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 1998 return { 1999 snapshot.name: snapshot.table_name(deployability_index.is_representative(snapshot)) 2000 for snapshot in snapshots 2001 if snapshot.version and not snapshot.is_embedded and snapshot.is_model 2002 } 2003 2004 2005def to_view_mapping( 2006 snapshots: t.Iterable[Snapshot], 2007 environment_naming_info: EnvironmentNamingInfo, 2008 default_catalog: t.Optional[str] = None, 2009 dialect: t.Optional[str] = None, 2010) -> t.Dict[str, str]: 2011 return { 2012 snapshot.name: snapshot.display_name( 2013 environment_naming_info, default_catalog=default_catalog, dialect=dialect 2014 ) 2015 for snapshot in snapshots 2016 if snapshot.is_model 2017 } 2018 2019 2020def has_paused_forward_only( 2021 targets: t.Iterable[SnapshotIdLike], 2022 snapshots: t.Union[t.List[Snapshot], t.Dict[SnapshotId, Snapshot]], 2023) -> bool: 2024 if not isinstance(snapshots, dict): 2025 snapshots = {s.snapshot_id: s for s in snapshots} 2026 for target in targets: 2027 target_snapshot = snapshots[target.snapshot_id] 2028 if target_snapshot.is_paused_forward_only: 2029 return True 2030 return False 2031 2032 2033def missing_intervals( 2034 snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]], 2035 start: t.Optional[TimeLike] = None, 2036 end: t.Optional[TimeLike] = None, 2037 execution_time: t.Optional[TimeLike] = None, 2038 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 2039 deployability_index: t.Optional[DeployabilityIndex] = None, 2040 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 2041 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 2042 ignore_cron: bool = False, 2043 end_bounded: bool = False, 2044) -> t.Dict[Snapshot, Intervals]: 2045 """Returns all missing intervals given a collection of snapshots.""" 2046 if not isinstance(snapshots, dict): 2047 # Make sure that the mapping is only constructed once 2048 snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} 2049 missing = {} 2050 cache: t.Dict[str, datetime] = {} 2051 end_date = end or now_timestamp() 2052 start_dt = ( 2053 to_datetime(start) 2054 if start 2055 else earliest_start_date(snapshots, cache=cache, relative_to=end_date) 2056 ) 2057 restatements = restatements or {} 2058 start_override_per_model = start_override_per_model or {} 2059 end_override_per_model = end_override_per_model or {} 2060 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 2061 2062 for snapshot in snapshots.values(): 2063 if not snapshot.evaluatable: 2064 continue 2065 2066 snapshot_start_date = start_override_per_model.get(snapshot.name, start_dt) 2067 snapshot_end_date: TimeLike = end_date 2068 2069 restated_interval = restatements.get(snapshot.snapshot_id) 2070 if restated_interval: 2071 snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in restated_interval) 2072 snapshot = snapshot.copy() 2073 snapshot.intervals = snapshot.intervals.copy() 2074 snapshot.remove_interval(restated_interval) 2075 2076 existing_interval_end = end_override_per_model.get(snapshot.name) 2077 if existing_interval_end: 2078 if snapshot_start_date >= existing_interval_end: 2079 # The start exceeds the provided interval end, so we can skip this snapshot 2080 # since it doesn't have missing intervals by definition 2081 continue 2082 snapshot_end_date = existing_interval_end 2083 2084 snapshot_start_date = max( 2085 to_datetime(snapshot_start_date), 2086 to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)), 2087 ) 2088 if snapshot_start_date > to_datetime(snapshot_end_date): 2089 continue 2090 2091 missing_interval_end_date = snapshot_end_date 2092 node_end_date = snapshot.node.end 2093 if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)): 2094 missing_interval_end_date = node_end_date 2095 2096 intervals = snapshot.missing_intervals( 2097 snapshot_start_date, 2098 missing_interval_end_date, 2099 execution_time=execution_time, 2100 deployability_index=deployability_index, 2101 ignore_cron=ignore_cron, 2102 end_bounded=end_bounded, 2103 ) 2104 if intervals: 2105 missing[snapshot] = intervals 2106 2107 return missing 2108 2109 2110@lru_cache(maxsize=16384) 2111def expand_range(start_ts: int, end_ts: int, interval_unit: IntervalUnit) -> t.List[int]: 2112 croniter = interval_unit.croniter(start_ts) 2113 timestamps = [start_ts] 2114 2115 while True: 2116 ts = to_timestamp(croniter.get_next(estimate=True)) 2117 2118 if ts > end_ts: 2119 if timestamps and timestamps[-1] != end_ts: 2120 timestamps.append(end_ts) 2121 break 2122 2123 timestamps.append(ts) 2124 return timestamps 2125 2126 2127@lru_cache(maxsize=16384) 2128def compute_missing_intervals( 2129 interval_unit: IntervalUnit, 2130 intervals: t.Tuple[Interval, ...], 2131 start_ts: int, 2132 end_ts: int, 2133 lookback: int, 2134 model_end_ts: t.Optional[int], 2135) -> Intervals: 2136 """Computes all missing intervals between start and end given intervals. 2137 2138 Args: 2139 interval_unit: The interval unit. 2140 intervals: The intervals to check what's missing. 2141 start_ts: Inclusive timestamp start. 2142 end_ts: Exclusive timestamp end. 2143 lookback: A lookback window. 2144 model_end_ts: The exclusive end timestamp set on the model (if one is set) 2145 2146 Returns: 2147 A list of all timestamps in this range. 2148 """ 2149 if start_ts == end_ts: 2150 return [] 2151 2152 timestamps = expand_range(start_ts, end_ts, interval_unit) 2153 missing = set() 2154 2155 for current_ts, next_ts in zip(timestamps, timestamps[1:]): 2156 for low, high in intervals: 2157 if current_ts < low: 2158 missing.add((current_ts, next_ts)) 2159 break 2160 elif current_ts >= low and next_ts <= high: 2161 break 2162 else: 2163 missing.add((current_ts, next_ts)) 2164 2165 if missing: 2166 if lookback: 2167 if model_end_ts: 2168 croniter = interval_unit.croniter(end_ts) 2169 end_ts = to_timestamp(croniter.get_prev(estimate=True)) 2170 2171 while model_end_ts < end_ts: 2172 end_ts = to_timestamp(croniter.get_prev(estimate=True)) 2173 lookback -= 1 2174 2175 lookback = max(lookback, 0) 2176 2177 for i, (current_ts, next_ts) in enumerate(zip(timestamps, timestamps[1:])): 2178 parent = timestamps[i + lookback : i + lookback + 2] 2179 2180 if len(parent) < 2 or tuple(parent) in missing: 2181 missing.add((current_ts, next_ts)) 2182 2183 if model_end_ts: 2184 missing = {interval for interval in missing if interval[0] < model_end_ts} 2185 2186 return sorted(missing) 2187 2188 2189@lru_cache(maxsize=16384) 2190def inclusive_exclusive( 2191 start: TimeLike, 2192 end: TimeLike, 2193 interval_unit: IntervalUnit, 2194 strict: bool = True, 2195 allow_partial: bool = False, 2196 expand: bool = True, 2197) -> Interval: 2198 """Transform the inclusive start and end into a [start, end) pair. 2199 2200 Args: 2201 start: The start date/time of the interval (inclusive) 2202 end: The end date/time of the interval (inclusive) 2203 interval_unit: The interval unit. 2204 strict: Whether to fail when the inclusive start is the same as the exclusive end. 2205 allow_partial: Whether the interval can be partial or not. 2206 expand: Whether or not partial intervals are expanded outwards. 2207 2208 Returns: 2209 A [start, end) pair. 2210 """ 2211 start_dt = interval_unit.cron_floor(start) 2212 2213 if not expand and not allow_partial and start_dt < to_datetime(start): 2214 start_dt = interval_unit.cron_next(start_dt) 2215 2216 start_ts = to_timestamp(start_dt) 2217 2218 if is_date(end): 2219 end = to_datetime(end) + timedelta(days=1) 2220 2221 if allow_partial: 2222 end_dt = end 2223 else: 2224 end_dt = interval_unit.cron_floor(end) 2225 2226 if expand and end_dt != to_datetime(end): 2227 end_dt = interval_unit.cron_next(end_dt) 2228 2229 end_ts = to_timestamp(end_dt) 2230 2231 if strict and start_ts >= end_ts: 2232 raise ValueError( 2233 f"`end` ({to_datetime(end_ts)}) must be greater than `start` ({to_datetime(start_ts)})" 2234 ) 2235 2236 return (start_ts, end_ts) 2237 2238 2239def earliest_start_date( 2240 snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]], 2241 cache: t.Optional[t.Dict[str, datetime]] = None, 2242 relative_to: t.Optional[TimeLike] = None, 2243) -> datetime: 2244 """Get the earliest start date from a collection of snapshots. 2245 2246 Args: 2247 snapshots: Snapshots to find earliest start date. 2248 cache: optional cache to make computing cache date more efficient 2249 relative_to: the base date to compute start from if inferred from cron 2250 Returns: 2251 The earliest start date or yesterday if none is found. 2252 """ 2253 cache = {} if cache is None else cache 2254 if snapshots: 2255 if not isinstance(snapshots, dict): 2256 # Make sure that the mapping is only constructed once 2257 snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} 2258 return min( 2259 start_date(snapshot, snapshots, cache=cache, relative_to=relative_to) 2260 for snapshot in snapshots.values() 2261 ) 2262 2263 relative_base = None 2264 if relative_to is not None: 2265 relative_base = to_datetime(relative_to) 2266 2267 return yesterday(relative_base=relative_base) 2268 2269 2270def start_date( 2271 snapshot: Snapshot, 2272 snapshots: t.Dict[SnapshotId, Snapshot] | t.Iterable[Snapshot], 2273 cache: t.Optional[t.Dict[str, datetime]] = None, 2274 relative_to: t.Optional[TimeLike] = None, 2275) -> datetime: 2276 """Get the effective/inferred start date for a snapshot. 2277 2278 Not all snapshots define a start date. In those cases, the node's start date 2279 can be inferred from its parent's start date or from its cron. 2280 2281 Args: 2282 snapshot: snapshot to infer start date. 2283 snapshots: a catalog of available snapshots. 2284 cache: optional cache to make computing cache date more efficient 2285 relative_to: the base date to compute start from if inferred from cron 2286 2287 Returns: 2288 Start datetime object. 2289 """ 2290 cache = {} if cache is None else cache 2291 key = f"{snapshot.name}_{to_timestamp(relative_to)}" if relative_to else snapshot.name 2292 if key in cache: 2293 return cache[key] 2294 if snapshot.node.start: 2295 start = to_datetime(snapshot.node.start) 2296 cache[key] = start 2297 return start 2298 2299 if not isinstance(snapshots, dict): 2300 snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} 2301 2302 parent_starts = [ 2303 start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to) 2304 for parent in snapshot.parents 2305 if parent in snapshots 2306 ] 2307 earliest = ( 2308 min(parent_starts) 2309 if parent_starts 2310 else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now())) 2311 ) 2312 2313 cache[key] = earliest 2314 return earliest 2315 2316 2317def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]: 2318 dag: DAG[SnapshotId] = DAG() 2319 for snapshot in snapshots: 2320 dag.add(snapshot.snapshot_id, snapshot.parents) 2321 return dag 2322 2323 2324def apply_auto_restatements( 2325 snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike 2326) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, t.List[SnapshotId]]]: 2327 """Applies auto restatements to the snapshots. 2328 2329 This operation results in the removal of intervals for snapshots that are ready to be restated based 2330 on the provided execution time and configured auto restatement settings. For each affected snapshot, 2331 it also updates the next auto restatement timestamp. 2332 2333 Args: 2334 snapshots: A dictionary of snapshots to apply auto restatements to. 2335 execution_time: The execution time. 2336 2337 Returns: 2338 A list of SnapshotIntervals with **new** intervals that need to be restated. 2339 """ 2340 dag = snapshots_to_dag(snapshots.values()) 2341 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {} 2342 auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {} 2343 for s_id in dag: 2344 if s_id not in snapshots: 2345 continue 2346 snapshot = snapshots[s_id] 2347 if not snapshot.is_model or snapshot.model.disable_restatement: 2348 continue 2349 2350 next_auto_restated_interval = snapshot.get_next_auto_restatement_interval(execution_time) 2351 auto_restated_intervals = [ 2352 auto_restated_intervals_per_snapshot[parent_s_id] 2353 for parent_s_id in snapshot.parents 2354 if parent_s_id in auto_restated_intervals_per_snapshot 2355 ] 2356 upstream_triggers = [] 2357 if next_auto_restated_interval: 2358 logger.info( 2359 "Calculated the next auto restated interval (%s, %s) for snapshot %s", 2360 time_like_to_str(next_auto_restated_interval[0]), 2361 time_like_to_str(next_auto_restated_interval[1]), 2362 snapshot.snapshot_id, 2363 ) 2364 auto_restated_intervals.append(next_auto_restated_interval) 2365 2366 # auto-restated snapshot is its own trigger 2367 upstream_triggers = [s_id] 2368 else: 2369 # inherit each parent's auto-restatement triggers (if any) 2370 for parent_s_id in snapshot.parents: 2371 if parent_s_id in auto_restatement_triggers: 2372 upstream_triggers.extend(auto_restatement_triggers[parent_s_id]) 2373 2374 # remove duplicate triggers, retaining order and keeping first seen of duplicates 2375 if upstream_triggers: 2376 auto_restatement_triggers[s_id] = unique(upstream_triggers) 2377 2378 if auto_restated_intervals: 2379 auto_restated_interval_start = sys.maxsize 2380 auto_restated_interval_end = -sys.maxsize 2381 for interval in auto_restated_intervals: 2382 auto_restated_interval_start = min(auto_restated_interval_start, interval[0]) 2383 auto_restated_interval_end = max(auto_restated_interval_end, interval[1]) 2384 2385 interval_to_remove_start = snapshot.node.interval_unit.cron_floor( 2386 auto_restated_interval_start 2387 ) 2388 interval_to_remove_end = snapshot.node.interval_unit.cron_floor( 2389 auto_restated_interval_end 2390 ) 2391 if auto_restated_interval_end > to_timestamp(interval_to_remove_end): 2392 interval_to_remove_end = snapshot.node.interval_unit.cron_next( 2393 interval_to_remove_end 2394 ) 2395 2396 removal_interval = snapshot.get_removal_interval( 2397 interval_to_remove_start, interval_to_remove_end, execution_time=execution_time 2398 ) 2399 2400 auto_restated_intervals_per_snapshot[s_id] = removal_interval 2401 snapshot.pending_restatement_intervals = merge_intervals( 2402 [*snapshot.pending_restatement_intervals, removal_interval] 2403 ) 2404 2405 snapshot.apply_pending_restatement_intervals() 2406 snapshot.update_next_auto_restatement_ts(execution_time) 2407 return ( 2408 [ 2409 SnapshotIntervals( 2410 name=snapshots[s_id].name, 2411 identifier=None, 2412 version=snapshots[s_id].version, 2413 dev_version=None, 2414 intervals=[], 2415 dev_intervals=[], 2416 pending_restatement_intervals=[interval], 2417 ) 2418 for s_id, interval in auto_restated_intervals_per_snapshot.items() 2419 if s_id in snapshots 2420 ], 2421 auto_restatement_triggers, 2422 ) 2423 2424 2425def parent_snapshots_by_name( 2426 snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot] 2427) -> t.Dict[str, Snapshot]: 2428 parent_snapshots_by_name = { 2429 snapshots[p_sid].name: snapshots[p_sid] for p_sid in snapshot.parents 2430 } 2431 parent_snapshots_by_name[snapshot.name] = snapshot 2432 return parent_snapshots_by_name 2433 2434 2435def _contiguous_intervals(intervals: Intervals) -> t.List[Intervals]: 2436 """Given a list of intervals with gaps, returns a list of sequences of contiguous intervals.""" 2437 contiguous_intervals = [] 2438 current_batch: t.List[Interval] = [] 2439 for interval in intervals: 2440 if len(current_batch) == 0 or interval[0] == current_batch[-1][-1]: 2441 current_batch.append(interval) 2442 else: 2443 contiguous_intervals.append(current_batch) 2444 current_batch = [interval] 2445 2446 if len(current_batch) > 0: 2447 contiguous_intervals.append(current_batch) 2448 2449 return contiguous_intervals 2450 2451 2452def check_ready_intervals( 2453 check: t.Callable, 2454 intervals: Intervals, 2455 context: ExecutionContext, 2456 python_env: t.Dict[str, Executable], 2457 dialect: DialectType = None, 2458 path: t.Optional[Path] = None, 2459 snapshot: t.Optional[Snapshot] = None, 2460 kwargs: t.Optional[t.Dict] = None, 2461) -> Intervals: 2462 checked_intervals: Intervals = [] 2463 2464 for interval_batch in _contiguous_intervals(intervals): 2465 batch = [(to_datetime(start), to_datetime(end)) for start, end in interval_batch] 2466 2467 try: 2468 ready_intervals = call_macro( 2469 check, 2470 dialect, 2471 path, 2472 provided_args=(batch,), 2473 provided_kwargs=(kwargs or {}), 2474 context=context, 2475 snapshot=snapshot, 2476 ) 2477 except Exception as ex: 2478 raise SignalEvalError(format_evaluated_code_exception(ex, python_env)) 2479 2480 if isinstance(ready_intervals, bool): 2481 if not ready_intervals: 2482 batch = [] 2483 elif isinstance(ready_intervals, list): 2484 for i in ready_intervals: 2485 if i not in batch: 2486 raise SignalEvalError(f"Unknown interval {i} for signal") 2487 batch = ready_intervals 2488 else: 2489 raise SignalEvalError(f"Expected bool | list, got {type(ready_intervals)} for signal") 2490 2491 checked_intervals.extend((to_timestamp(start), to_timestamp(end)) for start, end in batch) 2492 2493 return checked_intervals 2494 2495 2496def get_next_model_interval_start(snapshots: t.Iterable[Snapshot]) -> t.Optional[datetime]: 2497 now_dt = now() 2498 2499 starts = [ 2500 snap.node.cron_next(now_dt) 2501 for snap in snapshots 2502 if snap.is_model and not snap.is_symbolic and not snap.is_seed 2503 ] 2504 2505 return min(starts) if starts else None
69class SnapshotChangeCategory(IntEnum): 70 """ 71 Values are ordered by decreasing severity and that ordering is required. 72 73 BREAKING: The change requires that snapshot modified and downstream dependencies be rebuilt 74 NON_BREAKING: The change requires that only the snapshot modified be rebuilt 75 FORWARD_ONLY: The change requires no rebuilding 76 INDIRECT_BREAKING: The change was caused indirectly and is breaking. 77 INDIRECT_NON_BREAKING: The change was caused indirectly by a non-breaking change. 78 METADATA: The change was caused by a metadata update. 79 """ 80 81 BREAKING = 1 82 NON_BREAKING = 2 83 # FORWARD_ONLY category is deprecated and is kept for backwards compatibility. 84 FORWARD_ONLY = 3 85 INDIRECT_BREAKING = 4 86 INDIRECT_NON_BREAKING = 5 87 METADATA = 6 88 89 @property 90 def is_breaking(self) -> bool: 91 return self == self.BREAKING 92 93 @property 94 def is_non_breaking(self) -> bool: 95 return self == self.NON_BREAKING 96 97 @property 98 def is_forward_only(self) -> bool: 99 return self == self.FORWARD_ONLY 100 101 @property 102 def is_metadata(self) -> bool: 103 return self == self.METADATA 104 105 @property 106 def is_indirect_breaking(self) -> bool: 107 return self == self.INDIRECT_BREAKING 108 109 @property 110 def is_indirect_non_breaking(self) -> bool: 111 return self == self.INDIRECT_NON_BREAKING 112 113 def __repr__(self) -> str: 114 return self.name
Values are ordered by decreasing severity and that ordering is required.
BREAKING: The change requires that snapshot modified and downstream dependencies be rebuilt NON_BREAKING: The change requires that only the snapshot modified be rebuilt FORWARD_ONLY: The change requires no rebuilding INDIRECT_BREAKING: The change was caused indirectly and is breaking. INDIRECT_NON_BREAKING: The change was caused indirectly by a non-breaking change. METADATA: The change was caused by a metadata update.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
117class SnapshotFingerprint(PydanticModel, frozen=True): 118 data_hash: str 119 metadata_hash: str 120 parent_data_hash: str = "0" 121 parent_metadata_hash: str = "0" 122 123 def to_version(self) -> str: 124 return hash_data([self.data_hash, self.parent_data_hash]) 125 126 def to_identifier(self) -> str: 127 return hash_data( 128 [ 129 self.data_hash, 130 self.metadata_hash, 131 self.parent_data_hash, 132 self.parent_metadata_hash, 133 ] 134 ) 135 136 def __str__(self) -> str: 137 return f"SnapshotFingerprint<{self.to_identifier()}, data: {self.data_hash}, meta: {self.metadata_hash}, pdata: {self.parent_data_hash}, pmeta: {self.parent_metadata_hash}>"
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
140class SnapshotId(PydanticModel, frozen=True): 141 name: str 142 identifier: str 143 144 @property 145 def snapshot_id(self) -> SnapshotId: 146 """Helper method to return self.""" 147 return self 148 149 def __eq__(self, other: t.Any) -> bool: 150 return ( 151 isinstance(other, self.__class__) 152 and self.name == other.name 153 and self.identifier == other.identifier 154 ) 155 156 def __hash__(self) -> int: 157 return hash((self.__class__, self.name, self.identifier)) 158 159 def __lt__(self, other: SnapshotId) -> bool: 160 return self.name < other.name 161 162 def __str__(self) -> str: 163 return f"SnapshotId<{self.name}: {self.identifier}>"
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
144 @property 145 def snapshot_id(self) -> SnapshotId: 146 """Helper method to return self.""" 147 return self
Helper method to return self.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
171class SnapshotNameVersion(PydanticModel, frozen=True): 172 name: str 173 version: str 174 175 @property 176 def name_version(self) -> SnapshotNameVersion: 177 """Helper method to return self.""" 178 return self
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
175 @property 176 def name_version(self) -> SnapshotNameVersion: 177 """Helper method to return self.""" 178 return self
Helper method to return self.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
181class SnapshotIntervals(PydanticModel): 182 name: str 183 identifier: t.Optional[str] 184 version: str 185 dev_version: t.Optional[str] 186 intervals: Intervals = [] 187 dev_intervals: Intervals = [] 188 pending_restatement_intervals: Intervals = [] 189 last_altered_ts: t.Optional[int] = None 190 dev_last_altered_ts: t.Optional[int] = None 191 192 @property 193 def snapshot_id(self) -> t.Optional[SnapshotId]: 194 if not self.identifier: 195 return None 196 return SnapshotId(name=self.name, identifier=self.identifier) 197 198 @property 199 def name_version(self) -> SnapshotNameVersion: 200 return SnapshotNameVersion(name=self.name, version=self.version) 201 202 def add_interval(self, start: int, end: int) -> None: 203 self._add_interval(start, end, "intervals") 204 205 def add_dev_interval(self, start: int, end: int) -> None: 206 self._add_interval(start, end, "dev_intervals") 207 208 def add_pending_restatement_interval(self, start: int, end: int) -> None: 209 self._add_interval(start, end, "pending_restatement_intervals") 210 211 def update_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: 212 self._update_last_altered_ts(last_altered_ts, "last_altered_ts") 213 214 def update_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: 215 self._update_last_altered_ts(last_altered_ts, "dev_last_altered_ts") 216 217 def remove_interval(self, start: int, end: int) -> None: 218 self._remove_interval(start, end, "intervals") 219 220 def remove_dev_interval(self, start: int, end: int) -> None: 221 self._remove_interval(start, end, "dev_intervals") 222 223 def remove_pending_restatement_interval(self, start: int, end: int) -> None: 224 self._remove_interval(start, end, "pending_restatement_intervals") 225 226 def is_empty(self) -> bool: 227 return ( 228 not self.intervals and not self.dev_intervals and not self.pending_restatement_intervals 229 ) 230 231 def _add_interval(self, start: int, end: int, interval_attr: str) -> None: 232 target_intervals = getattr(self, interval_attr) 233 target_intervals = merge_intervals([*target_intervals, (start, end)]) 234 setattr(self, interval_attr, target_intervals) 235 236 def _update_last_altered_ts( 237 self, last_altered_ts: t.Optional[int], last_altered_attr: str 238 ) -> None: 239 if last_altered_ts: 240 existing_last_altered_ts = getattr(self, last_altered_attr) 241 setattr(self, last_altered_attr, max(existing_last_altered_ts or 0, last_altered_ts)) 242 243 def _remove_interval(self, start: int, end: int, interval_attr: str) -> None: 244 target_intervals = getattr(self, interval_attr) 245 target_intervals = remove_interval(target_intervals, start, end) 246 setattr(self, interval_attr, target_intervals)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
249class SnapshotDataVersion(PydanticModel, frozen=True): 250 fingerprint: SnapshotFingerprint 251 version: str 252 dev_version_: t.Optional[str] = Field(default=None, alias="dev_version") 253 change_category: t.Optional[SnapshotChangeCategory] = None 254 physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema") 255 dev_table_suffix: str 256 table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default) 257 virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default) 258 259 def snapshot_id(self, name: str) -> SnapshotId: 260 return SnapshotId(name=name, identifier=self.fingerprint.to_identifier()) 261 262 @property 263 def dev_version(self) -> str: 264 return self.dev_version_ or self.fingerprint.to_version() 265 266 @property 267 def physical_schema(self) -> str: 268 # The physical schema here is optional to maintain backwards compatibility with 269 # records stored by previous versions of SQLMesh. 270 return self.physical_schema_ or c.SQLMESH 271 272 @property 273 def data_version(self) -> SnapshotDataVersion: 274 return self 275 276 @property 277 def is_new_version(self) -> bool: 278 """Returns whether or not this version is new and requires a backfill.""" 279 return self.fingerprint.to_version() == self.version
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
276 @property 277 def is_new_version(self) -> bool: 278 """Returns whether or not this version is new and requires a backfill.""" 279 return self.fingerprint.to_version() == self.version
Returns whether or not this version is new and requires a backfill.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
282class QualifiedViewName(PydanticModel, frozen=True): 283 catalog: t.Optional[str] = None 284 schema_name: t.Optional[str] = None 285 table: str 286 287 def for_environment( 288 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 289 ) -> str: 290 return exp.table_name(self.table_for_environment(environment_naming_info, dialect=dialect)) 291 292 def table_for_environment( 293 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 294 ) -> exp.Table: 295 return exp.table_( 296 self.table_name_for_environment(environment_naming_info, dialect=dialect), 297 db=self.schema_for_environment(environment_naming_info, dialect=dialect), 298 catalog=self.catalog_for_environment(environment_naming_info, dialect=dialect), 299 ) 300 301 def catalog_for_environment( 302 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 303 ) -> t.Optional[str]: 304 catalog_name: t.Optional[str] = None 305 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog: 306 catalog_name = f"{self.catalog}__{environment_naming_info.name}" 307 elif environment_naming_info.catalog_name_override: 308 catalog_name = environment_naming_info.catalog_name_override 309 310 if catalog_name: 311 return ( 312 normalize_identifiers(catalog_name, dialect=dialect).name 313 if environment_naming_info.normalize_name 314 else catalog_name 315 ) 316 317 return self.catalog 318 319 def schema_for_environment( 320 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 321 ) -> str: 322 normalize = environment_naming_info.normalize_name 323 324 if self.schema_name: 325 schema = self.schema_name 326 else: 327 schema = c.DEFAULT_SCHEMA 328 if normalize: 329 schema = normalize_identifiers(schema, dialect=dialect).name 330 331 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema: 332 env_name = environment_naming_info.name 333 if normalize: 334 env_name = normalize_identifiers(env_name, dialect=dialect).name 335 336 schema = f"{schema}__{env_name}" 337 338 return schema 339 340 def table_name_for_environment( 341 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 342 ) -> str: 343 table = self.table 344 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table: 345 env_name = environment_naming_info.name 346 if environment_naming_info.normalize_name: 347 env_name = normalize_identifiers(env_name, dialect=dialect).name 348 349 table = f"{table}__{env_name}" 350 351 return table
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
292 def table_for_environment( 293 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 294 ) -> exp.Table: 295 return exp.table_( 296 self.table_name_for_environment(environment_naming_info, dialect=dialect), 297 db=self.schema_for_environment(environment_naming_info, dialect=dialect), 298 catalog=self.catalog_for_environment(environment_naming_info, dialect=dialect), 299 )
301 def catalog_for_environment( 302 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 303 ) -> t.Optional[str]: 304 catalog_name: t.Optional[str] = None 305 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog: 306 catalog_name = f"{self.catalog}__{environment_naming_info.name}" 307 elif environment_naming_info.catalog_name_override: 308 catalog_name = environment_naming_info.catalog_name_override 309 310 if catalog_name: 311 return ( 312 normalize_identifiers(catalog_name, dialect=dialect).name 313 if environment_naming_info.normalize_name 314 else catalog_name 315 ) 316 317 return self.catalog
319 def schema_for_environment( 320 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 321 ) -> str: 322 normalize = environment_naming_info.normalize_name 323 324 if self.schema_name: 325 schema = self.schema_name 326 else: 327 schema = c.DEFAULT_SCHEMA 328 if normalize: 329 schema = normalize_identifiers(schema, dialect=dialect).name 330 331 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema: 332 env_name = environment_naming_info.name 333 if normalize: 334 env_name = normalize_identifiers(env_name, dialect=dialect).name 335 336 schema = f"{schema}__{env_name}" 337 338 return schema
340 def table_name_for_environment( 341 self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None 342 ) -> str: 343 table = self.table 344 if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table: 345 env_name = environment_naming_info.name 346 if environment_naming_info.normalize_name: 347 env_name = normalize_identifiers(env_name, dialect=dialect).name 348 349 table = f"{table}__{env_name}" 350 351 return table
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
354class SnapshotInfoMixin(ModelKindMixin): 355 name: str 356 dev_version_: t.Optional[str] 357 change_category: t.Optional[SnapshotChangeCategory] 358 fingerprint: SnapshotFingerprint 359 previous_versions: t.Tuple[SnapshotDataVersion, ...] 360 # Added to support Migration # 34 (default catalog) 361 # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though) 362 base_table_name_override: t.Optional[str] 363 dev_table_suffix: str 364 table_naming_convention: TableNamingConvention 365 forward_only: bool 366 367 @cached_property 368 def identifier(self) -> str: 369 return self.fingerprint.to_identifier() 370 371 @cached_property 372 def snapshot_id(self) -> SnapshotId: 373 return SnapshotId(name=self.name, identifier=self.identifier) 374 375 @property 376 def qualified_view_name(self) -> QualifiedViewName: 377 view_name = exp.to_table(self.fully_qualified_table or self.name) 378 return QualifiedViewName( 379 catalog=view_name.catalog or None, 380 schema_name=view_name.db or None, 381 table=view_name.name, 382 ) 383 384 @property 385 def previous_version(self) -> t.Optional[SnapshotDataVersion]: 386 """Helper method to get the previous data version.""" 387 if self.previous_versions: 388 return self.previous_versions[-1] 389 return None 390 391 @property 392 def dev_version(self) -> str: 393 return self.dev_version_ or self.fingerprint.to_version() 394 395 @property 396 def physical_schema(self) -> str: 397 raise NotImplementedError 398 399 @property 400 def data_version(self) -> SnapshotDataVersion: 401 raise NotImplementedError 402 403 @property 404 def is_new_version(self) -> bool: 405 raise NotImplementedError 406 407 @cached_property 408 def fully_qualified_table(self) -> t.Optional[exp.Table]: 409 raise NotImplementedError 410 411 @property 412 def virtual_environment_mode(self) -> VirtualEnvironmentMode: 413 raise NotImplementedError 414 415 @property 416 def is_forward_only(self) -> bool: 417 return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY 418 419 @property 420 def is_metadata(self) -> bool: 421 return self.change_category == SnapshotChangeCategory.METADATA 422 423 @property 424 def is_indirect_non_breaking(self) -> bool: 425 return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING 426 427 @property 428 def is_no_rebuild(self) -> bool: 429 """Returns true if this snapshot doesn't require a rebuild in production.""" 430 return self.forward_only or self.change_category in ( 431 SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility 432 SnapshotChangeCategory.METADATA, 433 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 434 ) 435 436 @property 437 def is_no_preview(self) -> bool: 438 """Returns true if this snapshot doesn't require a preview in development.""" 439 return self.forward_only and self.change_category in ( 440 SnapshotChangeCategory.METADATA, 441 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 442 ) 443 444 @property 445 def all_versions(self) -> t.Tuple[SnapshotDataVersion, ...]: 446 """Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT.""" 447 return (*self.previous_versions, self.data_version)[-c.DATA_VERSION_LIMIT :] 448 449 def display_name( 450 self, 451 environment_naming_info: EnvironmentNamingInfo, 452 default_catalog: t.Optional[str], 453 dialect: DialectType = None, 454 ) -> str: 455 """ 456 Returns the model name as a qualified view name. 457 This is just used for presenting information back to the user and `qualified_view_name` should be used 458 when wanting a view name in all other cases. 459 """ 460 return display_name(self, environment_naming_info, default_catalog, dialect=dialect) 461 462 def data_hash_matches(self, other: t.Optional[SnapshotInfoMixin | SnapshotDataVersion]) -> bool: 463 return other is not None and self.fingerprint.data_hash == other.fingerprint.data_hash 464 465 def _table_name(self, version: str, is_deployable: bool) -> str: 466 """Full table name pointing to the materialized location of the snapshot. 467 468 Args: 469 version: The snapshot version. 470 is_deployable: Indicates whether to return the table name for deployment to production. 471 """ 472 if self.is_external: 473 return self.name 474 475 if is_deployable and self.virtual_environment_mode.is_dev_only: 476 # Use the model name as is if the target is deployable and the virtual environment mode is set to dev-only 477 return self.name 478 479 is_dev_table = not is_deployable 480 if is_dev_table: 481 version = self.dev_version 482 483 if self.fully_qualified_table is None: 484 raise SQLMeshError( 485 f"Tried to get a table name for a snapshot that does not have a table. {self.name}" 486 ) 487 # We want to exclude the catalog from the name but still include catalog when determining the fqn 488 # for the table. 489 if self.base_table_name_override: 490 base_table_name = self.base_table_name_override 491 else: 492 fqt = self.fully_qualified_table.copy() 493 fqt.set("catalog", None) 494 base_table_name = fqt.sql() 495 496 return table_name( 497 self.physical_schema, 498 base_table_name, 499 version, 500 catalog=self.fully_qualified_table.catalog, 501 suffix=self.dev_table_suffix if is_dev_table else None, 502 naming_convention=self.table_naming_convention, 503 ) 504 505 @property 506 def node_type(self) -> NodeType: 507 raise NotImplementedError 508 509 @property 510 def is_model(self) -> bool: 511 return self.node_type == NodeType.MODEL 512 513 @property 514 def is_audit(self) -> bool: 515 return self.node_type == NodeType.AUDIT
384 @property 385 def previous_version(self) -> t.Optional[SnapshotDataVersion]: 386 """Helper method to get the previous data version.""" 387 if self.previous_versions: 388 return self.previous_versions[-1] 389 return None
Helper method to get the previous data version.
427 @property 428 def is_no_rebuild(self) -> bool: 429 """Returns true if this snapshot doesn't require a rebuild in production.""" 430 return self.forward_only or self.change_category in ( 431 SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility 432 SnapshotChangeCategory.METADATA, 433 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 434 )
Returns true if this snapshot doesn't require a rebuild in production.
436 @property 437 def is_no_preview(self) -> bool: 438 """Returns true if this snapshot doesn't require a preview in development.""" 439 return self.forward_only and self.change_category in ( 440 SnapshotChangeCategory.METADATA, 441 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 442 )
Returns true if this snapshot doesn't require a preview in development.
444 @property 445 def all_versions(self) -> t.Tuple[SnapshotDataVersion, ...]: 446 """Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT.""" 447 return (*self.previous_versions, self.data_version)[-c.DATA_VERSION_LIMIT :]
Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT.
449 def display_name( 450 self, 451 environment_naming_info: EnvironmentNamingInfo, 452 default_catalog: t.Optional[str], 453 dialect: DialectType = None, 454 ) -> str: 455 """ 456 Returns the model name as a qualified view name. 457 This is just used for presenting information back to the user and `qualified_view_name` should be used 458 when wanting a view name in all other cases. 459 """ 460 return display_name(self, environment_naming_info, default_catalog, dialect=dialect)
Returns the model name as a qualified view name.
This is just used for presenting information back to the user and qualified_view_name should be used
when wanting a view name in all other cases.
Inherited Members
- sqlmesh.core.model.kind.ModelKindMixin
- model_kind_name
- is_incremental_by_time_range
- is_incremental_by_unique_key
- is_incremental_by_partition
- is_incremental_unmanaged
- is_incremental
- is_full
- is_view
- is_embedded
- is_seed
- is_external
- is_scd_type_2
- is_scd_type_2_by_time
- is_scd_type_2_by_column
- is_custom
- is_managed
- is_dbt_custom
- is_symbolic
- is_materialized
- only_execution_time
- full_history_restatement_only
- supports_python_models
- supports_grants
518class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True): 519 name: str 520 fingerprint: SnapshotFingerprint 521 version: str 522 dev_version_: t.Optional[str] = Field(default=None, alias="dev_version") 523 physical_schema_: str = Field(alias="physical_schema") 524 parents: t.Tuple[SnapshotId, ...] 525 previous_versions: t.Tuple[SnapshotDataVersion, ...] = () 526 change_category: t.Optional[SnapshotChangeCategory] = None 527 kind_name: t.Optional[ModelKindName] = None 528 node_type_: NodeType = Field(default=NodeType.MODEL, alias="node_type") 529 # Added to support Migration # 34 (default catalog) 530 # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though) 531 base_table_name_override: t.Optional[str] = None 532 custom_materialization: t.Optional[str] = None 533 dev_table_suffix: str 534 model_gateway: t.Optional[str] = None 535 forward_only: bool = False 536 table_naming_convention: TableNamingConvention = TableNamingConvention.default 537 virtual_environment_mode_: VirtualEnvironmentMode = Field( 538 default=VirtualEnvironmentMode.default, alias="virtual_environment_mode" 539 ) 540 541 def __lt__(self, other: SnapshotTableInfo) -> bool: 542 return self.name < other.name 543 544 def __eq__(self, other: t.Any) -> bool: 545 return isinstance(other, SnapshotTableInfo) and self.fingerprint == other.fingerprint 546 547 def __hash__(self) -> int: 548 return hash((self.__class__, self.name, self.fingerprint)) 549 550 def table_name(self, is_deployable: bool = True) -> str: 551 """Full table name pointing to the materialized location of the snapshot. 552 553 Args: 554 is_deployable: Indicates whether to return the table name for deployment to production. 555 """ 556 return self._table_name(self.version, is_deployable) 557 558 @property 559 def physical_schema(self) -> str: 560 return self.physical_schema_ 561 562 @cached_property 563 def fully_qualified_table(self) -> exp.Table: 564 return exp.to_table(self.name) 565 566 @property 567 def table_info(self) -> SnapshotTableInfo: 568 """Helper method to return self.""" 569 return self 570 571 @property 572 def virtual_environment_mode(self) -> VirtualEnvironmentMode: 573 return self.virtual_environment_mode_ 574 575 @property 576 def data_version(self) -> SnapshotDataVersion: 577 return SnapshotDataVersion( 578 fingerprint=self.fingerprint, 579 version=self.version, 580 dev_version=self.dev_version, 581 change_category=self.change_category, 582 physical_schema=self.physical_schema, 583 dev_table_suffix=self.dev_table_suffix, 584 table_naming_convention=self.table_naming_convention, 585 virtual_environment_mode=self.virtual_environment_mode, 586 ) 587 588 @property 589 def is_new_version(self) -> bool: 590 """Returns whether or not this version is new and requires a backfill.""" 591 return self.fingerprint.to_version() == self.version 592 593 @property 594 def model_kind_name(self) -> t.Optional[ModelKindName]: 595 return self.kind_name 596 597 @property 598 def node_type(self) -> NodeType: 599 return self.node_type_ 600 601 @property 602 def name_version(self) -> SnapshotNameVersion: 603 """Returns the name and version of the snapshot.""" 604 return SnapshotNameVersion(name=self.name, version=self.version) 605 606 @property 607 def id_and_version(self) -> SnapshotIdAndVersion: 608 return SnapshotIdAndVersion( 609 name=self.name, 610 kind_name=self.kind_name, 611 identifier=self.identifier, 612 version=self.version, 613 dev_version=self.dev_version, 614 fingerprint=self.fingerprint, 615 )
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
550 def table_name(self, is_deployable: bool = True) -> str: 551 """Full table name pointing to the materialized location of the snapshot. 552 553 Args: 554 is_deployable: Indicates whether to return the table name for deployment to production. 555 """ 556 return self._table_name(self.version, is_deployable)
Full table name pointing to the materialized location of the snapshot.
Arguments:
- is_deployable: Indicates whether to return the table name for deployment to production.
566 @property 567 def table_info(self) -> SnapshotTableInfo: 568 """Helper method to return self.""" 569 return self
Helper method to return self.
575 @property 576 def data_version(self) -> SnapshotDataVersion: 577 return SnapshotDataVersion( 578 fingerprint=self.fingerprint, 579 version=self.version, 580 dev_version=self.dev_version, 581 change_category=self.change_category, 582 physical_schema=self.physical_schema, 583 dev_table_suffix=self.dev_table_suffix, 584 table_naming_convention=self.table_naming_convention, 585 virtual_environment_mode=self.virtual_environment_mode, 586 )
588 @property 589 def is_new_version(self) -> bool: 590 """Returns whether or not this version is new and requires a backfill.""" 591 return self.fingerprint.to_version() == self.version
Returns whether or not this version is new and requires a backfill.
601 @property 602 def name_version(self) -> SnapshotNameVersion: 603 """Returns the name and version of the snapshot.""" 604 return SnapshotNameVersion(name=self.name, version=self.version)
Returns the name and version of the snapshot.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.utils.pydantic.PydanticModel
- dict
- json
- copy
- fields_set
- parse_obj
- parse_raw
- missing_required_fields
- extra_fields
- all_fields
- all_field_infos
- required_fields
- SnapshotInfoMixin
- identifier
- snapshot_id
- qualified_view_name
- previous_version
- dev_version
- is_forward_only
- is_metadata
- is_indirect_non_breaking
- is_no_rebuild
- is_no_preview
- all_versions
- display_name
- data_hash_matches
- is_model
- is_audit
- sqlmesh.core.model.kind.ModelKindMixin
- is_incremental_by_time_range
- is_incremental_by_unique_key
- is_incremental_by_partition
- is_incremental_unmanaged
- is_incremental
- is_full
- is_view
- is_embedded
- is_seed
- is_external
- is_scd_type_2
- is_scd_type_2_by_time
- is_scd_type_2_by_column
- is_custom
- is_managed
- is_dbt_custom
- is_symbolic
- is_materialized
- only_execution_time
- full_history_restatement_only
- supports_python_models
- supports_grants
618class SnapshotIdAndVersion(PydanticModel, ModelKindMixin): 619 """A stripped down version of a snapshot that is used in situations where we want to fetch the main fields of the snapshots table 620 without the overhead of parsing the full snapshot payload and fetching intervals. 621 """ 622 623 name: str 624 version: str 625 kind_name_: t.Optional[ModelKindName] = Field(default=None, alias="kind_name") 626 dev_version_: t.Optional[str] = Field(alias="dev_version") 627 identifier: str 628 fingerprint_: t.Union[str, SnapshotFingerprint] = Field(alias="fingerprint") 629 630 @property 631 def snapshot_id(self) -> SnapshotId: 632 return SnapshotId(name=self.name, identifier=self.identifier) 633 634 @property 635 def id_and_version(self) -> SnapshotIdAndVersion: 636 return self 637 638 @property 639 def name_version(self) -> SnapshotNameVersion: 640 return SnapshotNameVersion(name=self.name, version=self.version) 641 642 @property 643 def fingerprint(self) -> SnapshotFingerprint: 644 value = self.fingerprint_ 645 if isinstance(value, str): 646 self.fingerprint_ = value = SnapshotFingerprint.parse_raw(value) 647 return value 648 649 @property 650 def dev_version(self) -> str: 651 return self.dev_version_ or self.fingerprint.to_version() 652 653 @property 654 def model_kind_name(self) -> t.Optional[ModelKindName]: 655 return self.kind_name_ 656 657 def display_name( 658 self, 659 environment_naming_info: EnvironmentNamingInfo, 660 default_catalog: t.Optional[str], 661 dialect: DialectType = None, 662 ) -> str: 663 return model_display_name( 664 self.name, environment_naming_info, default_catalog, dialect=dialect 665 )
A stripped down version of a snapshot that is used in situations where we want to fetch the main fields of the snapshots table without the overhead of parsing the full snapshot payload and fetching intervals.
653 @property 654 def model_kind_name(self) -> t.Optional[ModelKindName]: 655 return self.kind_name_
Returns the model kind name.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.utils.pydantic.PydanticModel
- dict
- json
- copy
- fields_set
- parse_obj
- parse_raw
- missing_required_fields
- extra_fields
- all_fields
- all_field_infos
- required_fields
- sqlmesh.core.model.kind.ModelKindMixin
- is_incremental_by_time_range
- is_incremental_by_unique_key
- is_incremental_by_partition
- is_incremental_unmanaged
- is_incremental
- is_full
- is_view
- is_embedded
- is_seed
- is_external
- is_scd_type_2
- is_scd_type_2_by_time
- is_scd_type_2_by_column
- is_custom
- is_managed
- is_dbt_custom
- is_symbolic
- is_materialized
- only_execution_time
- full_history_restatement_only
- supports_python_models
- supports_grants
668class Snapshot(PydanticModel, SnapshotInfoMixin): 669 """A snapshot represents a node at a certain point in time. 670 671 Snapshots are used to encapsulate everything needed to evaluate a node. 672 They are standalone objects that hold all state and dynamic content necessary 673 to render a node's query including things like macros. Snapshots also store intervals 674 (timestamp ranges for what data we've processed). 675 676 Nodes can be dynamically rendered due to macros. Rendering a node to its full extent 677 requires storing variables and macro definitions. We store all of the macro definitions and 678 global variable references in `python_env` in raw text to avoid pickling. The helper methods 679 to achieve this are defined in utils.metaprogramming. 680 681 Args: 682 name: The snapshot name which is the same as the node name and should be unique per node. 683 fingerprint: A unique hash of the node definition so that nodes can be reused across environments. 684 node: Node object that the snapshot encapsulates. 685 parents: The list of parent snapshots (upstream dependencies). 686 intervals: List of [start, end) intervals showing which time ranges a snapshot has data for. 687 dev_intervals: List of [start, end) intervals showing development intervals (forward-only). 688 created_ts: Epoch millis timestamp when a snapshot was first created. 689 updated_ts: Epoch millis timestamp when a snapshot was last updated. 690 ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced 691 in any environment. 692 previous: The snapshot data version that this snapshot was based on. If this snapshot is new, then previous will be None. 693 version: User specified version for a snapshot that is used for physical storage. 694 By default, the version is the fingerprint, but not all changes to nodes require a backfill. 695 If a user passes a previous version, that will be used instead and no backfill will be required. 696 change_category: User specified change category indicating which nodes require backfill from node changes made in this snapshot. 697 unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that 698 this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused. 699 effective_from: The timestamp which indicates when this snapshot should be considered effective. 700 Applicable for forward-only snapshots only. 701 migrated: Whether or not this snapshot has been created as a result of migration. 702 unrestorable: Whether or not this snapshot can be used to revert its model to a previous version. 703 next_auto_restatement_ts: The timestamp which indicates when is the next time this snapshot should be restated. 704 table_naming_convention: Convention to follow when generating the physical table name 705 """ 706 707 name: str 708 fingerprint: SnapshotFingerprint 709 physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema") 710 node: Node 711 parents: t.Tuple[SnapshotId, ...] 712 intervals: Intervals = [] 713 dev_intervals: Intervals = [] 714 pending_restatement_intervals: Intervals = [] 715 created_ts: int 716 updated_ts: int 717 ttl: str 718 previous_versions: t.Tuple[SnapshotDataVersion, ...] = () 719 version: t.Optional[str] = None 720 dev_version_: t.Optional[str] = Field(default=None, alias="dev_version") 721 change_category: t.Optional[SnapshotChangeCategory] = None 722 unpaused_ts: t.Optional[int] = None 723 effective_from: t.Optional[TimeLike] = None 724 migrated: bool = False 725 unrestorable: bool = False 726 # Added to support Migration # 34 (default catalog) 727 base_table_name_override: t.Optional[str] = None 728 next_auto_restatement_ts: t.Optional[int] = None 729 dev_table_suffix: str = "dev" 730 table_naming_convention: TableNamingConvention = TableNamingConvention.default 731 forward_only: bool = False 732 # Physical table last modified timestamp, not to be confused with the "updated_ts" field 733 # which is for the snapshot record itself 734 last_altered_ts: t.Optional[int] = None 735 dev_last_altered_ts: t.Optional[int] = None 736 737 @field_validator("ttl") 738 @classmethod 739 def _time_delta_must_be_positive(cls, v: str) -> str: 740 current_time = now() 741 if to_datetime(v, current_time) < current_time: 742 raise ValueError( 743 "Must be positive. Use the 'in' keyword to denote a positive time interval. For example, 'in 7 days'." 744 ) 745 return v 746 747 @staticmethod 748 def hydrate_with_intervals_by_version( 749 snapshots: t.Iterable[Snapshot], 750 intervals: t.Iterable[SnapshotIntervals], 751 ) -> t.List[Snapshot]: 752 """Hydrates target snapshots with given intervals. 753 754 This will match snapshots with intervals by name and version rather than identifiers. 755 756 Args: 757 snapshots: Target snapshots. 758 intervals: Target snapshot intervals. 759 760 Returns: 761 List of target snapshots with hydrated intervals. 762 """ 763 intervals_by_name_version = defaultdict(list) 764 for interval in intervals: 765 intervals_by_name_version[(interval.name, interval.version)].append(interval) 766 767 result = [] 768 for snapshot in snapshots: 769 snapshot_intervals = intervals_by_name_version.get( 770 (snapshot.name, snapshot.version_get_or_generate()), [] 771 ) 772 for interval in snapshot_intervals: 773 snapshot.merge_intervals(interval) 774 775 result.append(snapshot) 776 777 return result 778 779 @classmethod 780 def from_node( 781 cls, 782 node: Node, 783 *, 784 nodes: t.Dict[str, Node], 785 ttl: str = c.DEFAULT_SNAPSHOT_TTL, 786 version: t.Optional[str] = None, 787 cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None, 788 table_naming_convention: TableNamingConvention = TableNamingConvention.default, 789 ) -> Snapshot: 790 """Creates a new snapshot for a node. 791 792 Args: 793 Node: Node to snapshot. 794 nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. 795 If no dictionary is passed in the fingerprint will not be dependent on a node's parents. 796 ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live. 797 version: The version that a snapshot is associated with. Usually set during the planning phase. 798 cache: Cache of node name to fingerprints. 799 table_naming_convention: Convention to follow when generating the physical table name 800 801 Returns: 802 The newly created snapshot. 803 """ 804 created_ts = now_timestamp() 805 806 return cls( 807 name=node.fqn, 808 fingerprint=fingerprint_from_node( 809 node, 810 nodes=nodes, 811 cache=cache, 812 ), 813 node=node, 814 parents=tuple( 815 SnapshotId( 816 name=parent_node.fqn, 817 identifier=fingerprint_from_node( 818 parent_node, 819 nodes=nodes, 820 cache=cache, 821 ).to_identifier(), 822 ) 823 for parent_node in _parents_from_node(node, nodes).values() 824 ), 825 intervals=[], 826 dev_intervals=[], 827 created_ts=created_ts, 828 updated_ts=created_ts, 829 ttl=ttl, 830 version=version, 831 table_naming_convention=table_naming_convention, 832 ) 833 834 def __eq__(self, other: t.Any) -> bool: 835 return isinstance(other, Snapshot) and self.fingerprint == other.fingerprint 836 837 def __hash__(self) -> int: 838 return hash((self.__class__, self.name, self.fingerprint)) 839 840 def __lt__(self, other: Snapshot) -> bool: 841 return self.name < other.name 842 843 def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) -> None: 844 """Add a newly processed time interval to the snapshot. 845 846 The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch 847 timestamp exclusive. This allows merging of ranges to be easier. 848 849 Args: 850 start: The start date/time of the interval (inclusive) 851 end: The end date/time of the interval. If end is a date, then it is considered inclusive. 852 If it is a datetime object, then it is exclusive. 853 is_dev: Indicates whether the given interval is being added while in development mode. 854 """ 855 if to_timestamp(start) > to_timestamp(end): 856 raise ValueError( 857 f"Attempted to add an Invalid interval ({start}, {end}) to snapshot {self.snapshot_id}" 858 ) 859 860 start_ts, end_ts = self.inclusive_exclusive(start, end, strict=False, expand=False) 861 862 if start_ts >= end_ts: 863 # Skipping partial interval. 864 return 865 866 intervals = self.dev_intervals if is_dev else self.intervals 867 intervals.append((start_ts, end_ts)) 868 869 if len(intervals) < 2: 870 return 871 872 merged_intervals = merge_intervals(intervals) 873 if is_dev: 874 self.dev_intervals = merged_intervals 875 else: 876 self.intervals = merged_intervals 877 878 def remove_interval(self, interval: Interval) -> None: 879 """Remove an interval from the snapshot. 880 881 Args: 882 interval: The interval to remove. 883 """ 884 self.intervals = remove_interval(self.intervals, *interval) 885 self.dev_intervals = remove_interval(self.dev_intervals, *interval) 886 887 def get_removal_interval( 888 self, 889 start: TimeLike, 890 end: TimeLike, 891 execution_time: t.Optional[TimeLike] = None, 892 *, 893 strict: bool = True, 894 is_preview: bool = False, 895 ) -> Interval: 896 """Get the interval that should be removed from the snapshot. 897 898 Args: 899 start: The start date/time of the interval to remove. 900 end: The end date/time of the interval to removed. 901 execution_time: The time the interval is being removed. 902 strict: Whether to fail when the inclusive start is the same as the exclusive end. 903 is_preview: Whether the interval needs to be removed for a preview of forward-only changes. 904 When previewing, we are not actually restating a model, but removing an interval to trigger 905 a run. 906 """ 907 end = execution_time or now_timestamp() if self.depends_on_past else end 908 removal_interval = self.inclusive_exclusive(start, end, strict) 909 910 if not is_preview and self.full_history_restatement_only and self.intervals: 911 expanded_removal_interval = self.inclusive_exclusive(self.intervals[0][0], end, strict) 912 requested_start, requested_end = removal_interval 913 expanded_start, expanded_end = expanded_removal_interval 914 915 # only warn if the requested removal interval was a subset of the actual model intervals and was automatically expanded 916 # if the requested interval was the same or wider than the actual model intervals, no need to warn 917 if ( 918 requested_start > expanded_start or requested_end < expanded_end 919 ) and self.is_incremental: 920 from sqlmesh.core.console import get_console 921 922 get_console().log_warning( 923 f"Model '{self.model.name}' is '{self.model_kind_name}' which does not support partial restatement.\n" 924 f"Expanding the requested restatement intervals from [{to_ts(requested_start)} - {to_ts(requested_end)}] " 925 f"to [{to_ts(expanded_start)} - {to_ts(expanded_end)}] in order to fully restate the model." 926 ) 927 928 removal_interval = expanded_removal_interval 929 930 return removal_interval 931 932 @property 933 def allow_partials(self) -> bool: 934 return self.is_model and self.model.allow_partials 935 936 def inclusive_exclusive( 937 self, 938 start: TimeLike, 939 end: TimeLike, 940 strict: bool = True, 941 allow_partial: t.Optional[bool] = None, 942 expand: bool = True, 943 ) -> Interval: 944 """Transform the inclusive start and end into a [start, end) pair. 945 946 Args: 947 start: The start date/time of the interval (inclusive) 948 end: The end date/time of the interval (inclusive) 949 strict: Whether to fail when the inclusive start is the same as the exclusive end. 950 allow_partial: Whether the interval can be partial or not. 951 expand: Whether or not partial intervals are expanded outwards. 952 953 Returns: 954 A [start, end) pair. 955 """ 956 return inclusive_exclusive( 957 start, 958 end, 959 self.node.interval_unit, 960 strict=strict, 961 allow_partial=self.allow_partials if allow_partial is None else allow_partial, 962 expand=expand, 963 ) 964 965 def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: 966 """Inherits intervals from the target snapshot. 967 968 Args: 969 other: The target snapshot to inherit intervals from. 970 """ 971 effective_from_ts = self.normalized_effective_from_ts or 0 972 apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier 973 for start, end in other.intervals: 974 # If the effective_from is set, then intervals that come after it must come from 975 # the current snapshots. 976 if apply_effective_from and start < effective_from_ts: 977 end = min(end, effective_from_ts) 978 if not apply_effective_from or end <= effective_from_ts: 979 self.add_interval(start, end) 980 981 if other.last_altered_ts: 982 self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts) 983 984 if self.dev_version == other.dev_version: 985 # Merge dev intervals if the dev versions match which would mean 986 # that this and the other snapshot are pointing to the same dev table. 987 for start, end in other.dev_intervals: 988 self.add_interval(start, end, is_dev=True) 989 990 if other.dev_last_altered_ts: 991 self.dev_last_altered_ts = max( 992 self.dev_last_altered_ts or 0, other.dev_last_altered_ts 993 ) 994 995 self.pending_restatement_intervals = merge_intervals( 996 [*self.pending_restatement_intervals, *other.pending_restatement_intervals] 997 ) 998 999 @property 1000 def evaluatable(self) -> bool: 1001 """Whether or not a snapshot should be evaluated and have intervals.""" 1002 return bool(not self.is_symbolic or self.model.audits) 1003 1004 def missing_intervals( 1005 self, 1006 start: TimeLike, 1007 end: TimeLike, 1008 execution_time: t.Optional[TimeLike] = None, 1009 deployability_index: t.Optional[DeployabilityIndex] = None, 1010 ignore_cron: bool = False, 1011 end_bounded: bool = False, 1012 ) -> Intervals: 1013 """Find all missing intervals between [start, end]. 1014 1015 Although the inputs are inclusive, the returned stored intervals are 1016 [start_ts, end_ts) or start epoch timestamp inclusive and end epoch 1017 timestamp exclusive. 1018 1019 Args: 1020 start: The start date/time of the interval (inclusive) 1021 end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise) 1022 execution_time: The date/time time reference to use for execution time. Defaults to now. 1023 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1024 ignore_cron: Whether to ignore the node's cron schedule. 1025 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1026 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1027 1028 Returns: 1029 A list of all the missing intervals as epoch timestamps. 1030 """ 1031 # If the node says that it has an end, and we are wanting to load past it, then we can return no empty intervals 1032 # Also if a node's start is after the end of the range we are checking then we can return no empty intervals 1033 if (self.node.end and to_datetime(start) > to_datetime(self.node.end)) or ( 1034 self.node.start and to_datetime(self.node.start) > to_datetime(end) 1035 ): 1036 return [] 1037 if self.node.start and to_datetime(start) < to_datetime(self.node.start): 1038 start = self.node.start 1039 # If the amount of time being checked is less than the size of a single interval then we 1040 # know that there can't being missing intervals within that range and return 1041 validate_date_range(start, end) 1042 1043 if ( 1044 not is_date(end) 1045 and not self.allow_partials 1046 and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds 1047 ): 1048 return [] 1049 1050 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 1051 intervals = ( 1052 self.intervals if deployability_index.is_representative(self) else self.dev_intervals 1053 ) 1054 1055 if not self.evaluatable or (self.is_seed and intervals): 1056 return [] 1057 1058 start_ts, end_ts = (to_timestamp(ts) for ts in self.inclusive_exclusive(start, end)) 1059 1060 interval_unit = self.node.interval_unit 1061 execution_time_ts = to_timestamp(execution_time) if execution_time else now_timestamp() 1062 upper_bound_ts = ( 1063 execution_time_ts 1064 if ignore_cron 1065 else to_timestamp(self.node.cron_floor(execution_time_ts)) 1066 ) 1067 if end_bounded: 1068 upper_bound_ts = min(upper_bound_ts, end_ts) 1069 if not self.allow_partials: 1070 upper_bound_ts = to_timestamp(interval_unit.cron_floor(upper_bound_ts)) 1071 1072 end_ts = min(end_ts, upper_bound_ts) 1073 1074 lookback = 0 1075 model_end_ts: t.Optional[int] = None 1076 1077 if self.is_model: 1078 lookback = self.model.lookback 1079 model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None 1080 1081 return compute_missing_intervals( 1082 interval_unit, 1083 tuple(intervals), 1084 start_ts, 1085 end_ts, 1086 lookback, 1087 model_end_ts, 1088 ) 1089 1090 def check_ready_intervals( 1091 self, 1092 intervals: Intervals, 1093 context: ExecutionContext, 1094 ) -> Intervals: 1095 """Returns a list of intervals that are considered ready by the provided signal. 1096 1097 Note that this will handle gaps in the provided intervals. The returned intervals 1098 may introduce new gaps. 1099 """ 1100 signals = self.is_model and self.model.render_signal_calls() 1101 if not signals: 1102 return intervals 1103 1104 for signal_name, kwargs in signals.signals_to_kwargs.items(): 1105 try: 1106 intervals = check_ready_intervals( 1107 signals.prepared_python_env[signal_name], 1108 intervals, 1109 context, 1110 python_env=signals.python_env, 1111 dialect=self.model.dialect, 1112 path=self.model._path, 1113 snapshot=self, 1114 kwargs=kwargs, 1115 ) 1116 except SQLMeshError as e: 1117 raise SignalEvalError( 1118 f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}" 1119 ) 1120 return intervals 1121 1122 def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None: 1123 """Assigns the given category to this snapshot. 1124 1125 Args: 1126 category: The change category to assign to this snapshot. 1127 forward_only: Whether or not this snapshot is applied going forward in production. 1128 """ 1129 assert category != SnapshotChangeCategory.FORWARD_ONLY, ( 1130 "FORWARD_ONLY change category is deprecated" 1131 ) 1132 1133 self.dev_version_ = self.fingerprint.to_version() 1134 is_no_rebuild = forward_only or category in ( 1135 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 1136 SnapshotChangeCategory.METADATA, 1137 ) 1138 if self.is_model and not self.virtual_environment_mode.is_full: 1139 # Hardcode the version if the virtual environment is not fully enabled. 1140 self.version = "novde" 1141 elif self.is_model and self.model.physical_version: 1142 # If the model has a pinned version then use that. 1143 self.version = self.model.physical_version 1144 elif is_no_rebuild and self.previous_version: 1145 self.version = self.previous_version.data_version.version 1146 elif self.is_model and self.model.forward_only and not self.previous_version: 1147 # If this is a new model then use a deterministic version, independent of the fingerprint. 1148 self.version = hash_data([self.name, *self.model.kind.data_hash_values]) 1149 else: 1150 self.version = self.fingerprint.to_version() 1151 1152 if is_no_rebuild and self.previous_version: 1153 previous_version = self.previous_version 1154 self.physical_schema_ = previous_version.physical_schema 1155 self.table_naming_convention = previous_version.table_naming_convention 1156 if self.is_materialized and (category.is_indirect_non_breaking or category.is_metadata): 1157 # Reuse the dev table for indirect non-breaking changes. 1158 self.dev_version_ = ( 1159 previous_version.data_version.dev_version 1160 or previous_version.fingerprint.to_version() 1161 ) 1162 self.dev_table_suffix = previous_version.data_version.dev_table_suffix 1163 1164 self.change_category = category 1165 self.forward_only = forward_only 1166 1167 @property 1168 def categorized(self) -> bool: 1169 """Whether the snapshot has been categorized.""" 1170 return self.change_category is not None and self.version is not None 1171 1172 def table_name(self, is_deployable: bool = True) -> str: 1173 """Full table name pointing to the materialized location of the snapshot. 1174 1175 Args: 1176 is_deployable: Indicates whether to return the table name for deployment to production. 1177 """ 1178 self._ensure_categorized() 1179 assert self.version 1180 return self._table_name(self.version, is_deployable) 1181 1182 def version_get_or_generate(self) -> str: 1183 """Helper method to get the version or generate it from the fingerprint.""" 1184 return self.version or self.fingerprint.to_version() 1185 1186 def is_valid_start( 1187 self, 1188 start: t.Optional[TimeLike], 1189 snapshot_start: TimeLike, 1190 execution_time: t.Optional[TimeLike] = None, 1191 ) -> bool: 1192 """Checks if the given start and end are valid for this snapshot. 1193 Args: 1194 start: The start date/time of the interval (inclusive) 1195 snapshot_start: The start date/time of the snapshot (inclusive) 1196 """ 1197 # The snapshot may not have a start defined. If so we use the provided snapshot start. 1198 if self.depends_on_past and start: 1199 interval_unit = self.node.interval_unit 1200 start_ts = to_timestamp(interval_unit.cron_floor(start)) 1201 1202 if not self.intervals: 1203 # The start date must be aligned by the interval unit. 1204 snapshot_start_ts = to_timestamp(interval_unit.cron_floor(snapshot_start)) 1205 if snapshot_start_ts < to_timestamp(snapshot_start): 1206 snapshot_start_ts = to_timestamp(interval_unit.cron_next(snapshot_start_ts)) 1207 return snapshot_start_ts >= start_ts 1208 # Make sure that if there are missing intervals for this snapshot that they all occur at or after the 1209 # provided start_ts. Otherwise we know that we are doing a non-contiguous load and therefore this is not 1210 # a valid start. 1211 missing_intervals = self.missing_intervals( 1212 snapshot_start, now(), execution_time=execution_time 1213 ) 1214 earliest_interval = missing_intervals[0][0] if missing_intervals else None 1215 if earliest_interval: 1216 return earliest_interval >= start_ts 1217 return True 1218 1219 def get_latest(self, default: t.Optional[TimeLike] = None) -> t.Optional[TimeLike]: 1220 """The latest interval loaded for the snapshot. Default is used if intervals are not defined""" 1221 1222 def to_end_date(end: int, unit: IntervalUnit) -> TimeLike: 1223 if unit.is_day: 1224 return to_date(make_inclusive_end(end)) 1225 return end 1226 1227 return ( 1228 to_end_date(to_timestamp(self.intervals[-1][1]), self.node.interval_unit) 1229 if self.intervals 1230 else default 1231 ) 1232 1233 def needs_destructive_check( 1234 self, 1235 allow_destructive_snapshots: t.Set[str], 1236 ) -> bool: 1237 return ( 1238 self.is_model 1239 and not self.model.on_destructive_change.is_allow 1240 and self.name not in allow_destructive_snapshots 1241 ) 1242 1243 def needs_additive_check( 1244 self, 1245 allow_additive_snapshots: t.Set[str], 1246 ) -> bool: 1247 return ( 1248 self.is_model 1249 and not self.model.on_additive_change.is_allow 1250 and self.name not in allow_additive_snapshots 1251 ) 1252 1253 def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]: 1254 """Returns the next auto restatement interval for the snapshot. 1255 1256 Args: 1257 execution_time: The execution time to use for the restatement. 1258 1259 Returns: 1260 The interval that needs to be restated or None if no restatement is needed. 1261 """ 1262 if ( 1263 not self.is_model 1264 or not self.intervals 1265 or not self.model.auto_restatement_cron 1266 or self.model.disable_restatement 1267 ): 1268 return None 1269 1270 execution_time_ts = to_timestamp(execution_time) 1271 next_auto_restatement_ts = self.next_auto_restatement_ts or to_timestamp( 1272 self.model.auto_restatement_croniter(self.created_ts).get_next(estimate=False) 1273 ) 1274 if execution_time_ts < next_auto_restatement_ts: 1275 return None 1276 1277 num_intervals_to_restate = self.model.auto_restatement_intervals 1278 if num_intervals_to_restate is None: 1279 return (self.intervals[0][0], self.intervals[-1][1]) 1280 1281 auto_restatement_end_ts = to_timestamp( 1282 self.node.interval_unit.cron_floor(execution_time_ts) 1283 ) 1284 auto_restatement_start_ts = ( 1285 auto_restatement_end_ts 1286 - num_intervals_to_restate * self.node.interval_unit.milliseconds 1287 ) 1288 return (auto_restatement_start_ts, auto_restatement_end_ts) 1289 1290 def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optional[int]: 1291 """Updates the next auto restatement timestamp. 1292 1293 Args: 1294 execution_time: The execution time to use for the restatement. 1295 1296 Returns: 1297 The next auto restatement timestamp or None if not applicable. 1298 """ 1299 if ( 1300 not self.is_model 1301 or not self.model.auto_restatement_cron 1302 or self.model.disable_restatement 1303 ): 1304 self.next_auto_restatement_ts = None 1305 else: 1306 self.next_auto_restatement_ts = to_timestamp( 1307 self.model.auto_restatement_croniter(execution_time).get_next(estimate=False) 1308 ) 1309 return self.next_auto_restatement_ts 1310 1311 def apply_pending_restatement_intervals(self) -> None: 1312 """Applies the pending restatement intervals to the snapshot's intervals.""" 1313 if not self.is_model or self.model.disable_restatement: 1314 return 1315 for pending_restatement_interval in self.pending_restatement_intervals: 1316 logger.info( 1317 "Applying the auto restated interval (%s, %s) to snapshot %s", 1318 time_like_to_str(pending_restatement_interval[0]), 1319 time_like_to_str(pending_restatement_interval[1]), 1320 self.snapshot_id, 1321 ) 1322 self.intervals = remove_interval(self.intervals, *pending_restatement_interval) 1323 1324 def is_directly_modified(self, other: Snapshot) -> bool: 1325 """Returns whether or not this snapshot is directly modified in relation to the other snapshot.""" 1326 return self.node.is_data_change(other.node) 1327 1328 def is_indirectly_modified(self, other: Snapshot) -> bool: 1329 """Returns whether or not this snapshot is indirectly modified in relation to the other snapshot.""" 1330 return ( 1331 self.fingerprint.parent_data_hash != other.fingerprint.parent_data_hash 1332 and not self.node.is_data_change(other.node) 1333 ) 1334 1335 def is_metadata_updated(self, other: Snapshot) -> bool: 1336 """Returns whether or not this snapshot contains metadata changes in relation to the other snapshot.""" 1337 return self.fingerprint.metadata_hash != other.fingerprint.metadata_hash 1338 1339 @property 1340 def physical_schema(self) -> str: 1341 if self.physical_schema_ is not None: 1342 return self.physical_schema_ 1343 return self.model.physical_schema if self.is_model else "" 1344 1345 @property 1346 def table_info(self) -> SnapshotTableInfo: 1347 """Helper method to get the SnapshotTableInfo from the Snapshot.""" 1348 self._ensure_categorized() 1349 1350 custom_materialization = ( 1351 self.model.kind.materialization 1352 if self.is_model and isinstance(self.model.kind, CustomKind) 1353 else None 1354 ) 1355 1356 return SnapshotTableInfo( 1357 physical_schema=self.physical_schema, 1358 name=self.name, 1359 fingerprint=self.fingerprint, 1360 version=self.version, 1361 dev_version=self.dev_version, 1362 parents=self.parents, 1363 previous_versions=self.previous_versions, 1364 change_category=self.change_category, 1365 kind_name=self.model_kind_name, 1366 node_type=self.node_type, 1367 custom_materialization=custom_materialization, 1368 dev_table_suffix=self.dev_table_suffix, 1369 model_gateway=self.model_gateway, 1370 table_naming_convention=self.table_naming_convention, # type: ignore 1371 forward_only=self.forward_only, 1372 virtual_environment_mode=self.virtual_environment_mode, 1373 ) 1374 1375 @property 1376 def data_version(self) -> SnapshotDataVersion: 1377 self._ensure_categorized() 1378 return SnapshotDataVersion( 1379 fingerprint=self.fingerprint, 1380 version=self.version, 1381 dev_version=self.dev_version, 1382 change_category=self.change_category, 1383 physical_schema=self.physical_schema, 1384 dev_table_suffix=self.dev_table_suffix, 1385 table_naming_convention=self.table_naming_convention, 1386 virtual_environment_mode=self.virtual_environment_mode, 1387 ) 1388 1389 @property 1390 def snapshot_intervals(self) -> SnapshotIntervals: 1391 self._ensure_categorized() 1392 return SnapshotIntervals( 1393 name=self.name, 1394 identifier=self.identifier, 1395 version=self.version, 1396 dev_version=self.dev_version, 1397 intervals=self.intervals.copy(), 1398 dev_intervals=self.dev_intervals.copy(), 1399 pending_restatement_intervals=self.pending_restatement_intervals.copy(), 1400 ) 1401 1402 @property 1403 def is_materialized_view(self) -> bool: 1404 """Returns whether or not this snapshot's model represents a materialized view.""" 1405 return ( 1406 self.is_model and isinstance(self.model.kind, ViewKind) and self.model.kind.materialized 1407 ) 1408 1409 @property 1410 def is_new_version(self) -> bool: 1411 """Returns whether or not this version is new and requires a backfill.""" 1412 self._ensure_categorized() 1413 return self.fingerprint.to_version() == self.version 1414 1415 @property 1416 def is_paused(self) -> bool: 1417 return self.unpaused_ts is None 1418 1419 @property 1420 def is_paused_forward_only(self) -> bool: 1421 return self.is_paused and self.is_forward_only 1422 1423 @property 1424 def normalized_effective_from_ts(self) -> t.Optional[int]: 1425 return ( 1426 to_timestamp(self.node.interval_unit.cron_floor(self.effective_from)) 1427 if self.effective_from 1428 else None 1429 ) 1430 1431 @property 1432 def model_kind_name(self) -> t.Optional[ModelKindName]: 1433 return self.model.kind.name if self.is_model else None 1434 1435 @property 1436 def node_type(self) -> NodeType: 1437 if self.node.is_model: 1438 return NodeType.MODEL 1439 if self.node.is_audit: 1440 return NodeType.AUDIT 1441 raise SQLMeshError(f"Snapshot {self.snapshot_id} has an unknown node type.") 1442 1443 @property 1444 def model(self) -> Model: 1445 model = self.model_or_none 1446 if model: 1447 return model 1448 raise SQLMeshError(f"Snapshot {self.snapshot_id} is not a model snapshot.") 1449 1450 @property 1451 def model_or_none(self) -> t.Optional[Model]: 1452 if self.is_model: 1453 return t.cast(Model, self.node) 1454 return None 1455 1456 @property 1457 def model_gateway(self) -> t.Optional[str]: 1458 return self.model.gateway if self.is_model else None 1459 1460 @property 1461 def audit(self) -> StandaloneAudit: 1462 if self.is_audit: 1463 return t.cast(StandaloneAudit, self.node) 1464 raise SQLMeshError(f"Snapshot {self.snapshot_id} is not an audit snapshot.") 1465 1466 @property 1467 def depends_on_past(self) -> bool: 1468 """Whether or not this models depends on past intervals to be populated before loading following intervals. 1469 1470 This represents a superset of the following types of models: 1471 1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially. 1472 An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself. 1473 2. Models that can only be restated from the beginning of history *and* their interval batches must be processed sequentially. 1474 """ 1475 return self.depends_on_self or self.full_history_restatement_only 1476 1477 @property 1478 def depends_on_self(self) -> bool: 1479 """Whether or not this models depends on self.""" 1480 return self.is_model and self.model.depends_on_self 1481 1482 @property 1483 def name_version(self) -> SnapshotNameVersion: 1484 """Returns the name and version of the snapshot.""" 1485 return SnapshotNameVersion(name=self.name, version=self.version) 1486 1487 @property 1488 def id_and_version(self) -> SnapshotIdAndVersion: 1489 return self.table_info.id_and_version 1490 1491 @property 1492 def disable_restatement(self) -> bool: 1493 """Is restatement disabled for the node""" 1494 return self.is_model and self.model.disable_restatement 1495 1496 @cached_property 1497 def fully_qualified_table(self) -> t.Optional[exp.Table]: 1498 if not self.is_model: 1499 return None 1500 return t.cast(Model, self.node).fully_qualified_table 1501 1502 @property 1503 def expiration_ts(self) -> int: 1504 return to_timestamp( 1505 self.ttl, 1506 relative_base=to_datetime(self.updated_ts), 1507 check_categorical_relative_expression=False, 1508 ) 1509 1510 @property 1511 def supports_schema_migration_in_prod(self) -> bool: 1512 """Returns whether or not this snapshot supports schema migration when deployed to production.""" 1513 return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed 1514 1515 @property 1516 def requires_schema_migration_in_prod(self) -> bool: 1517 """Returns whether or not this snapshot requires a schema migration when deployed to production.""" 1518 return self.supports_schema_migration_in_prod and ( 1519 (self.previous_version and self.previous_version.version == self.version) 1520 or self.model.forward_only 1521 or bool(self.model.physical_version) 1522 or not self.virtual_environment_mode.is_full 1523 ) 1524 1525 @property 1526 def ttl_ms(self) -> int: 1527 return self.expiration_ts - self.updated_ts 1528 1529 @property 1530 def custom_materialization(self) -> t.Optional[str]: 1531 if self.is_custom: 1532 return t.cast(CustomKind, self.model.kind).materialization 1533 return None 1534 1535 @property 1536 def virtual_environment_mode(self) -> VirtualEnvironmentMode: 1537 return ( 1538 self.model.virtual_environment_mode if self.is_model else VirtualEnvironmentMode.default 1539 ) 1540 1541 def _ensure_categorized(self) -> None: 1542 if not self.change_category: 1543 raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been categorized yet.") 1544 if not self.version: 1545 raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been versioned yet.") 1546 1547 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 1548 state = super().__getstate__() 1549 state["__dict__"] = state["__dict__"].copy() 1550 # Don't store intervals. 1551 state["__dict__"]["intervals"] = [] 1552 state["__dict__"]["dev_intervals"] = [] 1553 return state
A snapshot represents a node at a certain point in time.
Snapshots are used to encapsulate everything needed to evaluate a node. They are standalone objects that hold all state and dynamic content necessary to render a node's query including things like macros. Snapshots also store intervals (timestamp ranges for what data we've processed).
Nodes can be dynamically rendered due to macros. Rendering a node to its full extent
requires storing variables and macro definitions. We store all of the macro definitions and
global variable references in python_env in raw text to avoid pickling. The helper methods
to achieve this are defined in utils.metaprogramming.
Arguments:
- name: The snapshot name which is the same as the node name and should be unique per node.
- fingerprint: A unique hash of the node definition so that nodes can be reused across environments.
- node: Node object that the snapshot encapsulates.
- parents: The list of parent snapshots (upstream dependencies).
- intervals: List of [start, end) intervals showing which time ranges a snapshot has data for.
- dev_intervals: List of [start, end) intervals showing development intervals (forward-only).
- created_ts: Epoch millis timestamp when a snapshot was first created.
- updated_ts: Epoch millis timestamp when a snapshot was last updated.
- ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced in any environment.
- previous: The snapshot data version that this snapshot was based on. If this snapshot is new, then previous will be None.
- version: User specified version for a snapshot that is used for physical storage. By default, the version is the fingerprint, but not all changes to nodes require a backfill. If a user passes a previous version, that will be used instead and no backfill will be required.
- change_category: User specified change category indicating which nodes require backfill from node changes made in this snapshot.
- unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused.
- effective_from: The timestamp which indicates when this snapshot should be considered effective. Applicable for forward-only snapshots only.
- migrated: Whether or not this snapshot has been created as a result of migration.
- unrestorable: Whether or not this snapshot can be used to revert its model to a previous version.
- next_auto_restatement_ts: The timestamp which indicates when is the next time this snapshot should be restated.
- table_naming_convention: Convention to follow when generating the physical table name
747 @staticmethod 748 def hydrate_with_intervals_by_version( 749 snapshots: t.Iterable[Snapshot], 750 intervals: t.Iterable[SnapshotIntervals], 751 ) -> t.List[Snapshot]: 752 """Hydrates target snapshots with given intervals. 753 754 This will match snapshots with intervals by name and version rather than identifiers. 755 756 Args: 757 snapshots: Target snapshots. 758 intervals: Target snapshot intervals. 759 760 Returns: 761 List of target snapshots with hydrated intervals. 762 """ 763 intervals_by_name_version = defaultdict(list) 764 for interval in intervals: 765 intervals_by_name_version[(interval.name, interval.version)].append(interval) 766 767 result = [] 768 for snapshot in snapshots: 769 snapshot_intervals = intervals_by_name_version.get( 770 (snapshot.name, snapshot.version_get_or_generate()), [] 771 ) 772 for interval in snapshot_intervals: 773 snapshot.merge_intervals(interval) 774 775 result.append(snapshot) 776 777 return result
Hydrates target snapshots with given intervals.
This will match snapshots with intervals by name and version rather than identifiers.
Arguments:
- snapshots: Target snapshots.
- intervals: Target snapshot intervals.
Returns:
List of target snapshots with hydrated intervals.
779 @classmethod 780 def from_node( 781 cls, 782 node: Node, 783 *, 784 nodes: t.Dict[str, Node], 785 ttl: str = c.DEFAULT_SNAPSHOT_TTL, 786 version: t.Optional[str] = None, 787 cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None, 788 table_naming_convention: TableNamingConvention = TableNamingConvention.default, 789 ) -> Snapshot: 790 """Creates a new snapshot for a node. 791 792 Args: 793 Node: Node to snapshot. 794 nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. 795 If no dictionary is passed in the fingerprint will not be dependent on a node's parents. 796 ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live. 797 version: The version that a snapshot is associated with. Usually set during the planning phase. 798 cache: Cache of node name to fingerprints. 799 table_naming_convention: Convention to follow when generating the physical table name 800 801 Returns: 802 The newly created snapshot. 803 """ 804 created_ts = now_timestamp() 805 806 return cls( 807 name=node.fqn, 808 fingerprint=fingerprint_from_node( 809 node, 810 nodes=nodes, 811 cache=cache, 812 ), 813 node=node, 814 parents=tuple( 815 SnapshotId( 816 name=parent_node.fqn, 817 identifier=fingerprint_from_node( 818 parent_node, 819 nodes=nodes, 820 cache=cache, 821 ).to_identifier(), 822 ) 823 for parent_node in _parents_from_node(node, nodes).values() 824 ), 825 intervals=[], 826 dev_intervals=[], 827 created_ts=created_ts, 828 updated_ts=created_ts, 829 ttl=ttl, 830 version=version, 831 table_naming_convention=table_naming_convention, 832 )
Creates a new snapshot for a node.
Arguments:
- Node: Node to snapshot.
- nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
- ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live.
- version: The version that a snapshot is associated with. Usually set during the planning phase.
- cache: Cache of node name to fingerprints.
- table_naming_convention: Convention to follow when generating the physical table name
Returns:
The newly created snapshot.
843 def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) -> None: 844 """Add a newly processed time interval to the snapshot. 845 846 The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch 847 timestamp exclusive. This allows merging of ranges to be easier. 848 849 Args: 850 start: The start date/time of the interval (inclusive) 851 end: The end date/time of the interval. If end is a date, then it is considered inclusive. 852 If it is a datetime object, then it is exclusive. 853 is_dev: Indicates whether the given interval is being added while in development mode. 854 """ 855 if to_timestamp(start) > to_timestamp(end): 856 raise ValueError( 857 f"Attempted to add an Invalid interval ({start}, {end}) to snapshot {self.snapshot_id}" 858 ) 859 860 start_ts, end_ts = self.inclusive_exclusive(start, end, strict=False, expand=False) 861 862 if start_ts >= end_ts: 863 # Skipping partial interval. 864 return 865 866 intervals = self.dev_intervals if is_dev else self.intervals 867 intervals.append((start_ts, end_ts)) 868 869 if len(intervals) < 2: 870 return 871 872 merged_intervals = merge_intervals(intervals) 873 if is_dev: 874 self.dev_intervals = merged_intervals 875 else: 876 self.intervals = merged_intervals
Add a newly processed time interval to the snapshot.
The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch timestamp exclusive. This allows merging of ranges to be easier.
Arguments:
- start: The start date/time of the interval (inclusive)
- end: The end date/time of the interval. If end is a date, then it is considered inclusive. If it is a datetime object, then it is exclusive.
- is_dev: Indicates whether the given interval is being added while in development mode.
878 def remove_interval(self, interval: Interval) -> None: 879 """Remove an interval from the snapshot. 880 881 Args: 882 interval: The interval to remove. 883 """ 884 self.intervals = remove_interval(self.intervals, *interval) 885 self.dev_intervals = remove_interval(self.dev_intervals, *interval)
Remove an interval from the snapshot.
Arguments:
- interval: The interval to remove.
887 def get_removal_interval( 888 self, 889 start: TimeLike, 890 end: TimeLike, 891 execution_time: t.Optional[TimeLike] = None, 892 *, 893 strict: bool = True, 894 is_preview: bool = False, 895 ) -> Interval: 896 """Get the interval that should be removed from the snapshot. 897 898 Args: 899 start: The start date/time of the interval to remove. 900 end: The end date/time of the interval to removed. 901 execution_time: The time the interval is being removed. 902 strict: Whether to fail when the inclusive start is the same as the exclusive end. 903 is_preview: Whether the interval needs to be removed for a preview of forward-only changes. 904 When previewing, we are not actually restating a model, but removing an interval to trigger 905 a run. 906 """ 907 end = execution_time or now_timestamp() if self.depends_on_past else end 908 removal_interval = self.inclusive_exclusive(start, end, strict) 909 910 if not is_preview and self.full_history_restatement_only and self.intervals: 911 expanded_removal_interval = self.inclusive_exclusive(self.intervals[0][0], end, strict) 912 requested_start, requested_end = removal_interval 913 expanded_start, expanded_end = expanded_removal_interval 914 915 # only warn if the requested removal interval was a subset of the actual model intervals and was automatically expanded 916 # if the requested interval was the same or wider than the actual model intervals, no need to warn 917 if ( 918 requested_start > expanded_start or requested_end < expanded_end 919 ) and self.is_incremental: 920 from sqlmesh.core.console import get_console 921 922 get_console().log_warning( 923 f"Model '{self.model.name}' is '{self.model_kind_name}' which does not support partial restatement.\n" 924 f"Expanding the requested restatement intervals from [{to_ts(requested_start)} - {to_ts(requested_end)}] " 925 f"to [{to_ts(expanded_start)} - {to_ts(expanded_end)}] in order to fully restate the model." 926 ) 927 928 removal_interval = expanded_removal_interval 929 930 return removal_interval
Get the interval that should be removed from the snapshot.
Arguments:
- start: The start date/time of the interval to remove.
- end: The end date/time of the interval to removed.
- execution_time: The time the interval is being removed.
- strict: Whether to fail when the inclusive start is the same as the exclusive end.
- is_preview: Whether the interval needs to be removed for a preview of forward-only changes. When previewing, we are not actually restating a model, but removing an interval to trigger a run.
936 def inclusive_exclusive( 937 self, 938 start: TimeLike, 939 end: TimeLike, 940 strict: bool = True, 941 allow_partial: t.Optional[bool] = None, 942 expand: bool = True, 943 ) -> Interval: 944 """Transform the inclusive start and end into a [start, end) pair. 945 946 Args: 947 start: The start date/time of the interval (inclusive) 948 end: The end date/time of the interval (inclusive) 949 strict: Whether to fail when the inclusive start is the same as the exclusive end. 950 allow_partial: Whether the interval can be partial or not. 951 expand: Whether or not partial intervals are expanded outwards. 952 953 Returns: 954 A [start, end) pair. 955 """ 956 return inclusive_exclusive( 957 start, 958 end, 959 self.node.interval_unit, 960 strict=strict, 961 allow_partial=self.allow_partials if allow_partial is None else allow_partial, 962 expand=expand, 963 )
Transform the inclusive start and end into a [start, end) pair.
Arguments:
- start: The start date/time of the interval (inclusive)
- end: The end date/time of the interval (inclusive)
- strict: Whether to fail when the inclusive start is the same as the exclusive end.
- allow_partial: Whether the interval can be partial or not.
- expand: Whether or not partial intervals are expanded outwards.
Returns:
A [start, end) pair.
965 def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: 966 """Inherits intervals from the target snapshot. 967 968 Args: 969 other: The target snapshot to inherit intervals from. 970 """ 971 effective_from_ts = self.normalized_effective_from_ts or 0 972 apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier 973 for start, end in other.intervals: 974 # If the effective_from is set, then intervals that come after it must come from 975 # the current snapshots. 976 if apply_effective_from and start < effective_from_ts: 977 end = min(end, effective_from_ts) 978 if not apply_effective_from or end <= effective_from_ts: 979 self.add_interval(start, end) 980 981 if other.last_altered_ts: 982 self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts) 983 984 if self.dev_version == other.dev_version: 985 # Merge dev intervals if the dev versions match which would mean 986 # that this and the other snapshot are pointing to the same dev table. 987 for start, end in other.dev_intervals: 988 self.add_interval(start, end, is_dev=True) 989 990 if other.dev_last_altered_ts: 991 self.dev_last_altered_ts = max( 992 self.dev_last_altered_ts or 0, other.dev_last_altered_ts 993 ) 994 995 self.pending_restatement_intervals = merge_intervals( 996 [*self.pending_restatement_intervals, *other.pending_restatement_intervals] 997 )
Inherits intervals from the target snapshot.
Arguments:
- other: The target snapshot to inherit intervals from.
999 @property 1000 def evaluatable(self) -> bool: 1001 """Whether or not a snapshot should be evaluated and have intervals.""" 1002 return bool(not self.is_symbolic or self.model.audits)
Whether or not a snapshot should be evaluated and have intervals.
1004 def missing_intervals( 1005 self, 1006 start: TimeLike, 1007 end: TimeLike, 1008 execution_time: t.Optional[TimeLike] = None, 1009 deployability_index: t.Optional[DeployabilityIndex] = None, 1010 ignore_cron: bool = False, 1011 end_bounded: bool = False, 1012 ) -> Intervals: 1013 """Find all missing intervals between [start, end]. 1014 1015 Although the inputs are inclusive, the returned stored intervals are 1016 [start_ts, end_ts) or start epoch timestamp inclusive and end epoch 1017 timestamp exclusive. 1018 1019 Args: 1020 start: The start date/time of the interval (inclusive) 1021 end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise) 1022 execution_time: The date/time time reference to use for execution time. Defaults to now. 1023 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 1024 ignore_cron: Whether to ignore the node's cron schedule. 1025 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, 1026 allow_partials, and other attributes that could cause the intervals to exceed the target end date. 1027 1028 Returns: 1029 A list of all the missing intervals as epoch timestamps. 1030 """ 1031 # If the node says that it has an end, and we are wanting to load past it, then we can return no empty intervals 1032 # Also if a node's start is after the end of the range we are checking then we can return no empty intervals 1033 if (self.node.end and to_datetime(start) > to_datetime(self.node.end)) or ( 1034 self.node.start and to_datetime(self.node.start) > to_datetime(end) 1035 ): 1036 return [] 1037 if self.node.start and to_datetime(start) < to_datetime(self.node.start): 1038 start = self.node.start 1039 # If the amount of time being checked is less than the size of a single interval then we 1040 # know that there can't being missing intervals within that range and return 1041 validate_date_range(start, end) 1042 1043 if ( 1044 not is_date(end) 1045 and not self.allow_partials 1046 and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds 1047 ): 1048 return [] 1049 1050 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 1051 intervals = ( 1052 self.intervals if deployability_index.is_representative(self) else self.dev_intervals 1053 ) 1054 1055 if not self.evaluatable or (self.is_seed and intervals): 1056 return [] 1057 1058 start_ts, end_ts = (to_timestamp(ts) for ts in self.inclusive_exclusive(start, end)) 1059 1060 interval_unit = self.node.interval_unit 1061 execution_time_ts = to_timestamp(execution_time) if execution_time else now_timestamp() 1062 upper_bound_ts = ( 1063 execution_time_ts 1064 if ignore_cron 1065 else to_timestamp(self.node.cron_floor(execution_time_ts)) 1066 ) 1067 if end_bounded: 1068 upper_bound_ts = min(upper_bound_ts, end_ts) 1069 if not self.allow_partials: 1070 upper_bound_ts = to_timestamp(interval_unit.cron_floor(upper_bound_ts)) 1071 1072 end_ts = min(end_ts, upper_bound_ts) 1073 1074 lookback = 0 1075 model_end_ts: t.Optional[int] = None 1076 1077 if self.is_model: 1078 lookback = self.model.lookback 1079 model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None 1080 1081 return compute_missing_intervals( 1082 interval_unit, 1083 tuple(intervals), 1084 start_ts, 1085 end_ts, 1086 lookback, 1087 model_end_ts, 1088 )
Find all missing intervals between [start, end].
Although the inputs are inclusive, the returned stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch timestamp exclusive.
Arguments:
- start: The start date/time of the interval (inclusive)
- end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise)
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- ignore_cron: Whether to ignore the node's cron schedule.
- end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
Returns:
A list of all the missing intervals as epoch timestamps.
1090 def check_ready_intervals( 1091 self, 1092 intervals: Intervals, 1093 context: ExecutionContext, 1094 ) -> Intervals: 1095 """Returns a list of intervals that are considered ready by the provided signal. 1096 1097 Note that this will handle gaps in the provided intervals. The returned intervals 1098 may introduce new gaps. 1099 """ 1100 signals = self.is_model and self.model.render_signal_calls() 1101 if not signals: 1102 return intervals 1103 1104 for signal_name, kwargs in signals.signals_to_kwargs.items(): 1105 try: 1106 intervals = check_ready_intervals( 1107 signals.prepared_python_env[signal_name], 1108 intervals, 1109 context, 1110 python_env=signals.python_env, 1111 dialect=self.model.dialect, 1112 path=self.model._path, 1113 snapshot=self, 1114 kwargs=kwargs, 1115 ) 1116 except SQLMeshError as e: 1117 raise SignalEvalError( 1118 f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}" 1119 ) 1120 return intervals
Returns a list of intervals that are considered ready by the provided signal.
Note that this will handle gaps in the provided intervals. The returned intervals may introduce new gaps.
1122 def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None: 1123 """Assigns the given category to this snapshot. 1124 1125 Args: 1126 category: The change category to assign to this snapshot. 1127 forward_only: Whether or not this snapshot is applied going forward in production. 1128 """ 1129 assert category != SnapshotChangeCategory.FORWARD_ONLY, ( 1130 "FORWARD_ONLY change category is deprecated" 1131 ) 1132 1133 self.dev_version_ = self.fingerprint.to_version() 1134 is_no_rebuild = forward_only or category in ( 1135 SnapshotChangeCategory.INDIRECT_NON_BREAKING, 1136 SnapshotChangeCategory.METADATA, 1137 ) 1138 if self.is_model and not self.virtual_environment_mode.is_full: 1139 # Hardcode the version if the virtual environment is not fully enabled. 1140 self.version = "novde" 1141 elif self.is_model and self.model.physical_version: 1142 # If the model has a pinned version then use that. 1143 self.version = self.model.physical_version 1144 elif is_no_rebuild and self.previous_version: 1145 self.version = self.previous_version.data_version.version 1146 elif self.is_model and self.model.forward_only and not self.previous_version: 1147 # If this is a new model then use a deterministic version, independent of the fingerprint. 1148 self.version = hash_data([self.name, *self.model.kind.data_hash_values]) 1149 else: 1150 self.version = self.fingerprint.to_version() 1151 1152 if is_no_rebuild and self.previous_version: 1153 previous_version = self.previous_version 1154 self.physical_schema_ = previous_version.physical_schema 1155 self.table_naming_convention = previous_version.table_naming_convention 1156 if self.is_materialized and (category.is_indirect_non_breaking or category.is_metadata): 1157 # Reuse the dev table for indirect non-breaking changes. 1158 self.dev_version_ = ( 1159 previous_version.data_version.dev_version 1160 or previous_version.fingerprint.to_version() 1161 ) 1162 self.dev_table_suffix = previous_version.data_version.dev_table_suffix 1163 1164 self.change_category = category 1165 self.forward_only = forward_only
Assigns the given category to this snapshot.
Arguments:
- category: The change category to assign to this snapshot.
- forward_only: Whether or not this snapshot is applied going forward in production.
1167 @property 1168 def categorized(self) -> bool: 1169 """Whether the snapshot has been categorized.""" 1170 return self.change_category is not None and self.version is not None
Whether the snapshot has been categorized.
1172 def table_name(self, is_deployable: bool = True) -> str: 1173 """Full table name pointing to the materialized location of the snapshot. 1174 1175 Args: 1176 is_deployable: Indicates whether to return the table name for deployment to production. 1177 """ 1178 self._ensure_categorized() 1179 assert self.version 1180 return self._table_name(self.version, is_deployable)
Full table name pointing to the materialized location of the snapshot.
Arguments:
- is_deployable: Indicates whether to return the table name for deployment to production.
1182 def version_get_or_generate(self) -> str: 1183 """Helper method to get the version or generate it from the fingerprint.""" 1184 return self.version or self.fingerprint.to_version()
Helper method to get the version or generate it from the fingerprint.
1186 def is_valid_start( 1187 self, 1188 start: t.Optional[TimeLike], 1189 snapshot_start: TimeLike, 1190 execution_time: t.Optional[TimeLike] = None, 1191 ) -> bool: 1192 """Checks if the given start and end are valid for this snapshot. 1193 Args: 1194 start: The start date/time of the interval (inclusive) 1195 snapshot_start: The start date/time of the snapshot (inclusive) 1196 """ 1197 # The snapshot may not have a start defined. If so we use the provided snapshot start. 1198 if self.depends_on_past and start: 1199 interval_unit = self.node.interval_unit 1200 start_ts = to_timestamp(interval_unit.cron_floor(start)) 1201 1202 if not self.intervals: 1203 # The start date must be aligned by the interval unit. 1204 snapshot_start_ts = to_timestamp(interval_unit.cron_floor(snapshot_start)) 1205 if snapshot_start_ts < to_timestamp(snapshot_start): 1206 snapshot_start_ts = to_timestamp(interval_unit.cron_next(snapshot_start_ts)) 1207 return snapshot_start_ts >= start_ts 1208 # Make sure that if there are missing intervals for this snapshot that they all occur at or after the 1209 # provided start_ts. Otherwise we know that we are doing a non-contiguous load and therefore this is not 1210 # a valid start. 1211 missing_intervals = self.missing_intervals( 1212 snapshot_start, now(), execution_time=execution_time 1213 ) 1214 earliest_interval = missing_intervals[0][0] if missing_intervals else None 1215 if earliest_interval: 1216 return earliest_interval >= start_ts 1217 return True
Checks if the given start and end are valid for this snapshot.
Arguments:
- start: The start date/time of the interval (inclusive)
- snapshot_start: The start date/time of the snapshot (inclusive)
1219 def get_latest(self, default: t.Optional[TimeLike] = None) -> t.Optional[TimeLike]: 1220 """The latest interval loaded for the snapshot. Default is used if intervals are not defined""" 1221 1222 def to_end_date(end: int, unit: IntervalUnit) -> TimeLike: 1223 if unit.is_day: 1224 return to_date(make_inclusive_end(end)) 1225 return end 1226 1227 return ( 1228 to_end_date(to_timestamp(self.intervals[-1][1]), self.node.interval_unit) 1229 if self.intervals 1230 else default 1231 )
The latest interval loaded for the snapshot. Default is used if intervals are not defined
1253 def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]: 1254 """Returns the next auto restatement interval for the snapshot. 1255 1256 Args: 1257 execution_time: The execution time to use for the restatement. 1258 1259 Returns: 1260 The interval that needs to be restated or None if no restatement is needed. 1261 """ 1262 if ( 1263 not self.is_model 1264 or not self.intervals 1265 or not self.model.auto_restatement_cron 1266 or self.model.disable_restatement 1267 ): 1268 return None 1269 1270 execution_time_ts = to_timestamp(execution_time) 1271 next_auto_restatement_ts = self.next_auto_restatement_ts or to_timestamp( 1272 self.model.auto_restatement_croniter(self.created_ts).get_next(estimate=False) 1273 ) 1274 if execution_time_ts < next_auto_restatement_ts: 1275 return None 1276 1277 num_intervals_to_restate = self.model.auto_restatement_intervals 1278 if num_intervals_to_restate is None: 1279 return (self.intervals[0][0], self.intervals[-1][1]) 1280 1281 auto_restatement_end_ts = to_timestamp( 1282 self.node.interval_unit.cron_floor(execution_time_ts) 1283 ) 1284 auto_restatement_start_ts = ( 1285 auto_restatement_end_ts 1286 - num_intervals_to_restate * self.node.interval_unit.milliseconds 1287 ) 1288 return (auto_restatement_start_ts, auto_restatement_end_ts)
Returns the next auto restatement interval for the snapshot.
Arguments:
- execution_time: The execution time to use for the restatement.
Returns:
The interval that needs to be restated or None if no restatement is needed.
1290 def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optional[int]: 1291 """Updates the next auto restatement timestamp. 1292 1293 Args: 1294 execution_time: The execution time to use for the restatement. 1295 1296 Returns: 1297 The next auto restatement timestamp or None if not applicable. 1298 """ 1299 if ( 1300 not self.is_model 1301 or not self.model.auto_restatement_cron 1302 or self.model.disable_restatement 1303 ): 1304 self.next_auto_restatement_ts = None 1305 else: 1306 self.next_auto_restatement_ts = to_timestamp( 1307 self.model.auto_restatement_croniter(execution_time).get_next(estimate=False) 1308 ) 1309 return self.next_auto_restatement_ts
Updates the next auto restatement timestamp.
Arguments:
- execution_time: The execution time to use for the restatement.
Returns:
The next auto restatement timestamp or None if not applicable.
1311 def apply_pending_restatement_intervals(self) -> None: 1312 """Applies the pending restatement intervals to the snapshot's intervals.""" 1313 if not self.is_model or self.model.disable_restatement: 1314 return 1315 for pending_restatement_interval in self.pending_restatement_intervals: 1316 logger.info( 1317 "Applying the auto restated interval (%s, %s) to snapshot %s", 1318 time_like_to_str(pending_restatement_interval[0]), 1319 time_like_to_str(pending_restatement_interval[1]), 1320 self.snapshot_id, 1321 ) 1322 self.intervals = remove_interval(self.intervals, *pending_restatement_interval)
Applies the pending restatement intervals to the snapshot's intervals.
1324 def is_directly_modified(self, other: Snapshot) -> bool: 1325 """Returns whether or not this snapshot is directly modified in relation to the other snapshot.""" 1326 return self.node.is_data_change(other.node)
Returns whether or not this snapshot is directly modified in relation to the other snapshot.
1328 def is_indirectly_modified(self, other: Snapshot) -> bool: 1329 """Returns whether or not this snapshot is indirectly modified in relation to the other snapshot.""" 1330 return ( 1331 self.fingerprint.parent_data_hash != other.fingerprint.parent_data_hash 1332 and not self.node.is_data_change(other.node) 1333 )
Returns whether or not this snapshot is indirectly modified in relation to the other snapshot.
1335 def is_metadata_updated(self, other: Snapshot) -> bool: 1336 """Returns whether or not this snapshot contains metadata changes in relation to the other snapshot.""" 1337 return self.fingerprint.metadata_hash != other.fingerprint.metadata_hash
Returns whether or not this snapshot contains metadata changes in relation to the other snapshot.
1345 @property 1346 def table_info(self) -> SnapshotTableInfo: 1347 """Helper method to get the SnapshotTableInfo from the Snapshot.""" 1348 self._ensure_categorized() 1349 1350 custom_materialization = ( 1351 self.model.kind.materialization 1352 if self.is_model and isinstance(self.model.kind, CustomKind) 1353 else None 1354 ) 1355 1356 return SnapshotTableInfo( 1357 physical_schema=self.physical_schema, 1358 name=self.name, 1359 fingerprint=self.fingerprint, 1360 version=self.version, 1361 dev_version=self.dev_version, 1362 parents=self.parents, 1363 previous_versions=self.previous_versions, 1364 change_category=self.change_category, 1365 kind_name=self.model_kind_name, 1366 node_type=self.node_type, 1367 custom_materialization=custom_materialization, 1368 dev_table_suffix=self.dev_table_suffix, 1369 model_gateway=self.model_gateway, 1370 table_naming_convention=self.table_naming_convention, # type: ignore 1371 forward_only=self.forward_only, 1372 virtual_environment_mode=self.virtual_environment_mode, 1373 )
Helper method to get the SnapshotTableInfo from the Snapshot.
1375 @property 1376 def data_version(self) -> SnapshotDataVersion: 1377 self._ensure_categorized() 1378 return SnapshotDataVersion( 1379 fingerprint=self.fingerprint, 1380 version=self.version, 1381 dev_version=self.dev_version, 1382 change_category=self.change_category, 1383 physical_schema=self.physical_schema, 1384 dev_table_suffix=self.dev_table_suffix, 1385 table_naming_convention=self.table_naming_convention, 1386 virtual_environment_mode=self.virtual_environment_mode, 1387 )
1389 @property 1390 def snapshot_intervals(self) -> SnapshotIntervals: 1391 self._ensure_categorized() 1392 return SnapshotIntervals( 1393 name=self.name, 1394 identifier=self.identifier, 1395 version=self.version, 1396 dev_version=self.dev_version, 1397 intervals=self.intervals.copy(), 1398 dev_intervals=self.dev_intervals.copy(), 1399 pending_restatement_intervals=self.pending_restatement_intervals.copy(), 1400 )
1402 @property 1403 def is_materialized_view(self) -> bool: 1404 """Returns whether or not this snapshot's model represents a materialized view.""" 1405 return ( 1406 self.is_model and isinstance(self.model.kind, ViewKind) and self.model.kind.materialized 1407 )
Returns whether or not this snapshot's model represents a materialized view.
1409 @property 1410 def is_new_version(self) -> bool: 1411 """Returns whether or not this version is new and requires a backfill.""" 1412 self._ensure_categorized() 1413 return self.fingerprint.to_version() == self.version
Returns whether or not this version is new and requires a backfill.
1431 @property 1432 def model_kind_name(self) -> t.Optional[ModelKindName]: 1433 return self.model.kind.name if self.is_model else None
Returns the model kind name.
1466 @property 1467 def depends_on_past(self) -> bool: 1468 """Whether or not this models depends on past intervals to be populated before loading following intervals. 1469 1470 This represents a superset of the following types of models: 1471 1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially. 1472 An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself. 1473 2. Models that can only be restated from the beginning of history *and* their interval batches must be processed sequentially. 1474 """ 1475 return self.depends_on_self or self.full_history_restatement_only
Whether or not this models depends on past intervals to be populated before loading following intervals.
This represents a superset of the following types of models:
- Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially. An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself.
- Models that can only be restated from the beginning of history and their interval batches must be processed sequentially.
1477 @property 1478 def depends_on_self(self) -> bool: 1479 """Whether or not this models depends on self.""" 1480 return self.is_model and self.model.depends_on_self
Whether or not this models depends on self.
1482 @property 1483 def name_version(self) -> SnapshotNameVersion: 1484 """Returns the name and version of the snapshot.""" 1485 return SnapshotNameVersion(name=self.name, version=self.version)
Returns the name and version of the snapshot.
1491 @property 1492 def disable_restatement(self) -> bool: 1493 """Is restatement disabled for the node""" 1494 return self.is_model and self.model.disable_restatement
Is restatement disabled for the node
1510 @property 1511 def supports_schema_migration_in_prod(self) -> bool: 1512 """Returns whether or not this snapshot supports schema migration when deployed to production.""" 1513 return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed
Returns whether or not this snapshot supports schema migration when deployed to production.
1515 @property 1516 def requires_schema_migration_in_prod(self) -> bool: 1517 """Returns whether or not this snapshot requires a schema migration when deployed to production.""" 1518 return self.supports_schema_migration_in_prod and ( 1519 (self.previous_version and self.previous_version.version == self.version) 1520 or self.model.forward_only 1521 or bool(self.model.physical_version) 1522 or not self.virtual_environment_mode.is_full 1523 )
Returns whether or not this snapshot requires a schema migration when deployed to production.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.utils.pydantic.PydanticModel
- dict
- json
- copy
- fields_set
- parse_obj
- parse_raw
- missing_required_fields
- extra_fields
- all_fields
- all_field_infos
- required_fields
- SnapshotInfoMixin
- identifier
- snapshot_id
- qualified_view_name
- previous_version
- dev_version
- is_forward_only
- is_metadata
- is_indirect_non_breaking
- is_no_rebuild
- is_no_preview
- all_versions
- display_name
- data_hash_matches
- is_model
- is_audit
- sqlmesh.core.model.kind.ModelKindMixin
- is_incremental_by_time_range
- is_incremental_by_unique_key
- is_incremental_by_partition
- is_incremental_unmanaged
- is_incremental
- is_full
- is_view
- is_embedded
- is_seed
- is_external
- is_scd_type_2
- is_scd_type_2_by_time
- is_scd_type_2_by_column
- is_custom
- is_managed
- is_dbt_custom
- is_symbolic
- is_materialized
- only_execution_time
- full_history_restatement_only
- supports_python_models
- supports_grants
1556class SnapshotTableCleanupTask(PydanticModel): 1557 snapshot: SnapshotTableInfo 1558 dev_table_only: bool
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
1569class DeployabilityIndex(PydanticModel, frozen=True): 1570 """Contains information about deployability of every snapshot. 1571 1572 Deployability is defined as whether or not the output that a snapshot produces during the 1573 current evaluation can be reused in (deployed to) the production environment. 1574 """ 1575 1576 indexed_ids: t.FrozenSet[str] 1577 is_opposite_index: bool = False 1578 representative_shared_version_ids: t.FrozenSet[str] = frozenset() 1579 1580 @field_validator("indexed_ids", "representative_shared_version_ids", mode="before") 1581 @classmethod 1582 def _snapshot_ids_set_validator(cls, v: t.Any) -> t.Optional[t.FrozenSet[t.Tuple[str, str]]]: 1583 if v is None: 1584 return v 1585 # Transforming into strings because the serialization of sets of objects / lists is broken in Pydantic. 1586 return frozenset( 1587 { 1588 ( 1589 cls._snapshot_id_key(snapshot_id) # type: ignore 1590 if isinstance(snapshot_id, SnapshotId) 1591 else snapshot_id 1592 ) 1593 for snapshot_id in v 1594 } 1595 ) 1596 1597 def is_deployable(self, snapshot: SnapshotIdLike) -> bool: 1598 """Returns true if the output produced by the given snapshot in a development environment can be reused 1599 in (deployed to) production 1600 1601 Args: 1602 snapshot: The snapshot to check. 1603 1604 Returns: 1605 True if the snapshot is deployable, False otherwise. 1606 """ 1607 snapshot_id = self._snapshot_id_key(snapshot.snapshot_id) 1608 return (self.is_opposite_index and snapshot_id not in self.indexed_ids) or ( 1609 not self.is_opposite_index and snapshot_id in self.indexed_ids 1610 ) 1611 1612 def is_representative(self, snapshot: SnapshotIdLike) -> bool: 1613 """Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and 1614 computing missing intervals. 1615 1616 Note, that deployable snapshots are also representative, but the reverse is not always true. 1617 1618 Unlike `is_deployable`, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that 1619 are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider 1620 them as such when constructing a plan, building a physical table mapping or computing missing intervals. 1621 1622 Args: 1623 snapshot: The snapshot to check. 1624 1625 Returns: 1626 True if the snapshot is representative, False otherwise. 1627 """ 1628 snapshot_id = self._snapshot_id_key(snapshot.snapshot_id) 1629 representative = snapshot_id in self.representative_shared_version_ids 1630 return representative or self.is_deployable(snapshot) 1631 1632 def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex: 1633 """Creates a new index with the given snapshot marked as non-deployable.""" 1634 return self._add_snapshot(snapshot, False) 1635 1636 def with_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex: 1637 """Creates a new index with the given snapshot marked as deployable.""" 1638 return self._add_snapshot(snapshot, True) 1639 1640 def _add_snapshot(self, snapshot: SnapshotIdLike, deployable: bool) -> DeployabilityIndex: 1641 snapshot_id = {self._snapshot_id_key(snapshot.snapshot_id)} 1642 indexed_ids = self.indexed_ids 1643 if self.is_opposite_index: 1644 indexed_ids = indexed_ids - snapshot_id if deployable else indexed_ids | snapshot_id 1645 else: 1646 indexed_ids = indexed_ids | snapshot_id if deployable else indexed_ids - snapshot_id 1647 1648 return DeployabilityIndex( 1649 indexed_ids=indexed_ids, 1650 is_opposite_index=self.is_opposite_index, 1651 representative_shared_version_ids=self.representative_shared_version_ids, 1652 ) 1653 1654 @classmethod 1655 def all_deployable(cls) -> DeployabilityIndex: 1656 return cls(indexed_ids=frozenset(), is_opposite_index=True) 1657 1658 @classmethod 1659 def none_deployable(cls) -> DeployabilityIndex: 1660 return cls(indexed_ids=frozenset()) 1661 1662 @classmethod 1663 def create( 1664 cls, 1665 snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot], 1666 start: t.Optional[TimeLike] = None, # plan start 1667 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1668 ) -> DeployabilityIndex: 1669 if not isinstance(snapshots, dict): 1670 snapshots = {s.snapshot_id: s for s in snapshots} 1671 1672 deployability_mapping: t.Dict[SnapshotId, bool] = {} 1673 children_deployability_mapping: t.Dict[SnapshotId, bool] = {} 1674 representative_shared_version_ids: t.Set[SnapshotId] = set() 1675 start_override_per_model = start_override_per_model or {} 1676 1677 start_date_cache: t.Optional[t.Dict[str, datetime]] = {} 1678 1679 dag = snapshots_to_dag(snapshots.values()) 1680 for node in dag: 1681 if node not in snapshots: 1682 continue 1683 snapshot = snapshots[node] 1684 1685 if not snapshot.virtual_environment_mode.is_full: 1686 # If the virtual environment is not fully enabled, then the snapshot can never be deployable 1687 this_deployable = False 1688 else: 1689 # Make sure that the node is deployable according to all its parents 1690 this_deployable = all( 1691 children_deployability_mapping[p_id] 1692 for p_id in snapshots[node].parents 1693 if p_id in children_deployability_mapping 1694 ) 1695 1696 if this_deployable: 1697 is_forward_only_model = ( 1698 snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata 1699 ) 1700 has_auto_restatement = ( 1701 snapshot.is_model and snapshot.model.auto_restatement_cron is not None 1702 ) 1703 1704 snapshot_start = start_override_per_model.get( 1705 node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache) 1706 ) 1707 1708 is_valid_start = ( 1709 snapshot.is_valid_start(start, snapshot_start) if start is not None else True 1710 ) 1711 1712 children_deployable = is_valid_start and not has_auto_restatement 1713 if ( 1714 snapshot.is_forward_only 1715 or snapshot.is_indirect_non_breaking 1716 or is_forward_only_model 1717 or has_auto_restatement 1718 or not is_valid_start 1719 ): 1720 # FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature. 1721 # Similarly, if the model depends on past and the start date is not aligned with the 1722 # model's start, we should consider this snapshot non-deployable. 1723 this_deployable = False 1724 if not snapshot.is_paused or ( 1725 snapshot.is_indirect_non_breaking and snapshot.intervals 1726 ): 1727 # This snapshot represents what's currently deployed in prod. 1728 representative_shared_version_ids.add(node) 1729 else: 1730 # If the parent is not representative then its children can't be deployable. 1731 children_deployable = False 1732 else: 1733 children_deployable = False 1734 if not snapshots[node].is_paused: 1735 representative_shared_version_ids.add(node) 1736 1737 deployability_mapping[node] = this_deployable 1738 children_deployability_mapping[node] = children_deployable 1739 1740 deployable_ids = { 1741 snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable 1742 } 1743 non_deployable_ids = set(snapshots) - deployable_ids 1744 1745 # Pick the smaller set to reduce the size of the serialized object. 1746 if len(deployable_ids) <= len(non_deployable_ids): 1747 return cls( 1748 indexed_ids=deployable_ids, 1749 representative_shared_version_ids=representative_shared_version_ids, 1750 ) 1751 return cls( 1752 indexed_ids=non_deployable_ids, 1753 is_opposite_index=True, 1754 representative_shared_version_ids=representative_shared_version_ids, 1755 ) 1756 1757 @staticmethod 1758 def _snapshot_id_key(snapshot_id: SnapshotId) -> str: 1759 return f"{snapshot_id.name}__{snapshot_id.identifier}"
Contains information about deployability of every snapshot.
Deployability is defined as whether or not the output that a snapshot produces during the current evaluation can be reused in (deployed to) the production environment.
1597 def is_deployable(self, snapshot: SnapshotIdLike) -> bool: 1598 """Returns true if the output produced by the given snapshot in a development environment can be reused 1599 in (deployed to) production 1600 1601 Args: 1602 snapshot: The snapshot to check. 1603 1604 Returns: 1605 True if the snapshot is deployable, False otherwise. 1606 """ 1607 snapshot_id = self._snapshot_id_key(snapshot.snapshot_id) 1608 return (self.is_opposite_index and snapshot_id not in self.indexed_ids) or ( 1609 not self.is_opposite_index and snapshot_id in self.indexed_ids 1610 )
Returns true if the output produced by the given snapshot in a development environment can be reused in (deployed to) production
Arguments:
- snapshot: The snapshot to check.
Returns:
True if the snapshot is deployable, False otherwise.
1612 def is_representative(self, snapshot: SnapshotIdLike) -> bool: 1613 """Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and 1614 computing missing intervals. 1615 1616 Note, that deployable snapshots are also representative, but the reverse is not always true. 1617 1618 Unlike `is_deployable`, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that 1619 are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider 1620 them as such when constructing a plan, building a physical table mapping or computing missing intervals. 1621 1622 Args: 1623 snapshot: The snapshot to check. 1624 1625 Returns: 1626 True if the snapshot is representative, False otherwise. 1627 """ 1628 snapshot_id = self._snapshot_id_key(snapshot.snapshot_id) 1629 representative = snapshot_id in self.representative_shared_version_ids 1630 return representative or self.is_deployable(snapshot)
Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and computing missing intervals.
Note, that deployable snapshots are also representative, but the reverse is not always true.
Unlike is_deployable, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that
are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider
them as such when constructing a plan, building a physical table mapping or computing missing intervals.
Arguments:
- snapshot: The snapshot to check.
Returns:
True if the snapshot is representative, False otherwise.
1632 def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex: 1633 """Creates a new index with the given snapshot marked as non-deployable.""" 1634 return self._add_snapshot(snapshot, False)
Creates a new index with the given snapshot marked as non-deployable.
1636 def with_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex: 1637 """Creates a new index with the given snapshot marked as deployable.""" 1638 return self._add_snapshot(snapshot, True)
Creates a new index with the given snapshot marked as deployable.
1662 @classmethod 1663 def create( 1664 cls, 1665 snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot], 1666 start: t.Optional[TimeLike] = None, # plan start 1667 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 1668 ) -> DeployabilityIndex: 1669 if not isinstance(snapshots, dict): 1670 snapshots = {s.snapshot_id: s for s in snapshots} 1671 1672 deployability_mapping: t.Dict[SnapshotId, bool] = {} 1673 children_deployability_mapping: t.Dict[SnapshotId, bool] = {} 1674 representative_shared_version_ids: t.Set[SnapshotId] = set() 1675 start_override_per_model = start_override_per_model or {} 1676 1677 start_date_cache: t.Optional[t.Dict[str, datetime]] = {} 1678 1679 dag = snapshots_to_dag(snapshots.values()) 1680 for node in dag: 1681 if node not in snapshots: 1682 continue 1683 snapshot = snapshots[node] 1684 1685 if not snapshot.virtual_environment_mode.is_full: 1686 # If the virtual environment is not fully enabled, then the snapshot can never be deployable 1687 this_deployable = False 1688 else: 1689 # Make sure that the node is deployable according to all its parents 1690 this_deployable = all( 1691 children_deployability_mapping[p_id] 1692 for p_id in snapshots[node].parents 1693 if p_id in children_deployability_mapping 1694 ) 1695 1696 if this_deployable: 1697 is_forward_only_model = ( 1698 snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata 1699 ) 1700 has_auto_restatement = ( 1701 snapshot.is_model and snapshot.model.auto_restatement_cron is not None 1702 ) 1703 1704 snapshot_start = start_override_per_model.get( 1705 node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache) 1706 ) 1707 1708 is_valid_start = ( 1709 snapshot.is_valid_start(start, snapshot_start) if start is not None else True 1710 ) 1711 1712 children_deployable = is_valid_start and not has_auto_restatement 1713 if ( 1714 snapshot.is_forward_only 1715 or snapshot.is_indirect_non_breaking 1716 or is_forward_only_model 1717 or has_auto_restatement 1718 or not is_valid_start 1719 ): 1720 # FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature. 1721 # Similarly, if the model depends on past and the start date is not aligned with the 1722 # model's start, we should consider this snapshot non-deployable. 1723 this_deployable = False 1724 if not snapshot.is_paused or ( 1725 snapshot.is_indirect_non_breaking and snapshot.intervals 1726 ): 1727 # This snapshot represents what's currently deployed in prod. 1728 representative_shared_version_ids.add(node) 1729 else: 1730 # If the parent is not representative then its children can't be deployable. 1731 children_deployable = False 1732 else: 1733 children_deployable = False 1734 if not snapshots[node].is_paused: 1735 representative_shared_version_ids.add(node) 1736 1737 deployability_mapping[node] = this_deployable 1738 children_deployability_mapping[node] = children_deployable 1739 1740 deployable_ids = { 1741 snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable 1742 } 1743 non_deployable_ids = set(snapshots) - deployable_ids 1744 1745 # Pick the smaller set to reduce the size of the serialized object. 1746 if len(deployable_ids) <= len(non_deployable_ids): 1747 return cls( 1748 indexed_ids=deployable_ids, 1749 representative_shared_version_ids=representative_shared_version_ids, 1750 ) 1751 return cls( 1752 indexed_ids=non_deployable_ids, 1753 is_opposite_index=True, 1754 representative_shared_version_ids=representative_shared_version_ids, 1755 )
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
1762def table_name( 1763 physical_schema: str, 1764 name: str, 1765 version: str, 1766 catalog: t.Optional[str] = None, 1767 suffix: t.Optional[str] = None, 1768 naming_convention: t.Optional[TableNamingConvention] = None, 1769) -> str: 1770 table = exp.to_table(name) 1771 1772 naming_convention = naming_convention or TableNamingConvention.default 1773 1774 if naming_convention == TableNamingConvention.HASH_MD5: 1775 # just take a MD5 hash of what we would have generated anyway using SCHEMA_AND_TABLE 1776 value_to_hash = table_name( 1777 physical_schema=physical_schema, 1778 name=name, 1779 version=version, 1780 catalog=catalog, 1781 suffix=suffix, 1782 naming_convention=TableNamingConvention.SCHEMA_AND_TABLE, 1783 ) 1784 full_name = f"{c.SQLMESH}_md5__{md5(value_to_hash)}" 1785 else: 1786 # note: Snapshot._table_name() already strips the catalog from the model name before calling this function 1787 # Therefore, a model with 3-part naming like "foo.bar.baz" gets passed as (name="bar.baz", catalog="foo") to this function 1788 # This is why there is no TableNamingConvention.CATALOG_AND_SCHEMA_AND_TABLE 1789 table_parts = table.parts 1790 parts_to_consider = 2 if naming_convention == TableNamingConvention.SCHEMA_AND_TABLE else 1 1791 1792 # in case the parsed table name has less parts than what the naming convention says we should be considering 1793 parts_to_consider = min(len(table_parts), parts_to_consider) 1794 1795 # bigquery projects usually have "-" in them which is illegal in the table name, so we aggressively prune 1796 name = "__".join(sanitize_name(part.name) for part in table_parts[-parts_to_consider:]) 1797 1798 full_name = f"{name}__{version}" 1799 1800 suffix = f"__{suffix}" if suffix else "" 1801 1802 table.set("this", exp.to_identifier(f"{full_name}{suffix}")) 1803 table.set("db", exp.to_identifier(physical_schema)) 1804 if not table.catalog and catalog: 1805 table.set("catalog", exp.to_identifier(catalog)) 1806 return exp.table_name(table)
1809def display_name( 1810 snapshot_info_like: t.Union[SnapshotInfoLike, SnapshotInfoMixin], 1811 environment_naming_info: EnvironmentNamingInfo, 1812 default_catalog: t.Optional[str], 1813 dialect: DialectType = None, 1814) -> str: 1815 """ 1816 Returns the model name as a qualified view name. 1817 This is just used for presenting information back to the user and `qualified_view_name` should be used 1818 when wanting a view name in all other cases. 1819 1820 Args: 1821 snapshot_info_like: The snapshot info object to get the display name for 1822 environment_naming_info: Environment naming info to use for display name formatting 1823 default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name. 1824 dialect: Optional dialect type to use for name formatting 1825 1826 Returns: 1827 The formatted display name as a string 1828 """ 1829 if snapshot_info_like.is_audit: 1830 return snapshot_info_like.name 1831 1832 return model_display_name( 1833 snapshot_info_like.name, environment_naming_info, default_catalog, dialect 1834 )
Returns the model name as a qualified view name.
This is just used for presenting information back to the user and qualified_view_name should be used
when wanting a view name in all other cases.
Arguments:
- snapshot_info_like: The snapshot info object to get the display name for
- environment_naming_info: Environment naming info to use for display name formatting
- default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name.
- dialect: Optional dialect type to use for name formatting
Returns:
The formatted display name as a string
1837def model_display_name( 1838 node_name: str, 1839 environment_naming_info: EnvironmentNamingInfo, 1840 default_catalog: t.Optional[str], 1841 dialect: DialectType = None, 1842) -> str: 1843 view_name = exp.to_table(node_name) 1844 1845 catalog = ( 1846 None 1847 if ( 1848 environment_naming_info.suffix_target != EnvironmentSuffixTarget.CATALOG 1849 and view_name.catalog == default_catalog 1850 ) 1851 else view_name.catalog 1852 ) 1853 1854 qvn = QualifiedViewName( 1855 catalog=catalog, 1856 schema_name=view_name.db or None, 1857 table=view_name.name, 1858 ) 1859 return qvn.for_environment(environment_naming_info, dialect=dialect)
1862def fingerprint_from_node( 1863 node: Node, 1864 *, 1865 nodes: t.Dict[str, Node], 1866 cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None, 1867) -> SnapshotFingerprint: 1868 """Helper function to generate a fingerprint based on the data and metadata of the node and its parents. 1869 1870 This method tries to remove non-meaningful differences to avoid ever-changing fingerprints. 1871 The fingerprint is made up of two parts split by an underscore -- query_metadata. The query hash is 1872 determined purely by the rendered query and the metadata by everything else. 1873 1874 Args: 1875 node: Node to fingerprint. 1876 nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. 1877 If no dictionary is passed in the fingerprint will not be dependent on a node's parents. 1878 cache: Cache of node name to fingerprints. 1879 1880 Returns: 1881 The fingerprint. 1882 """ 1883 cache = {} if cache is None else cache 1884 1885 if node.fqn not in cache: 1886 parents = [ 1887 fingerprint_from_node(nodes[table], nodes=nodes, cache=cache) 1888 for table in node.depends_on 1889 if table in nodes 1890 ] 1891 1892 parent_data_hash = hash_data(sorted(p.to_version() for p in parents)) 1893 1894 parent_metadata_hash = hash_data( 1895 sorted(h for p in parents for h in (p.metadata_hash, p.parent_metadata_hash)) 1896 ) 1897 1898 cache[node.fqn] = SnapshotFingerprint( 1899 data_hash=node.data_hash, 1900 metadata_hash=node.metadata_hash, 1901 parent_data_hash=parent_data_hash, 1902 parent_metadata_hash=parent_metadata_hash, 1903 ) 1904 1905 return cache[node.fqn]
Helper function to generate a fingerprint based on the data and metadata of the node and its parents.
This method tries to remove non-meaningful differences to avoid ever-changing fingerprints. The fingerprint is made up of two parts split by an underscore -- query_metadata. The query hash is determined purely by the rendered query and the metadata by everything else.
Arguments:
- node: Node to fingerprint.
- nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
- cache: Cache of node name to fingerprints.
Returns:
The fingerprint.
1923def merge_intervals(intervals: t.Collection[Interval]) -> Intervals: 1924 """Merge a list of intervals. 1925 1926 Args: 1927 intervals: A list of intervals to merge together. 1928 1929 Returns: 1930 A new list of sorted and merged intervals. 1931 """ 1932 if not intervals: 1933 return [] 1934 intervals = sorted(intervals) 1935 1936 merged = [intervals[0]] 1937 1938 for interval in intervals[1:]: 1939 current = merged[-1] 1940 1941 if interval[0] <= current[1]: 1942 merged[-1] = (current[0], max(current[1], interval[1])) 1943 else: 1944 merged.append(interval) 1945 1946 return merged
Merge a list of intervals.
Arguments:
- intervals: A list of intervals to merge together.
Returns:
A new list of sorted and merged intervals.
1956def format_intervals(intervals: Intervals, unit: t.Optional[IntervalUnit]) -> str: 1957 inclusive_intervals = [make_inclusive(start, end) for start, end in intervals] 1958 return ", ".join( 1959 " - ".join([_format_date_time(start, unit), _format_date_time(end, unit)]) 1960 for start, end in inclusive_intervals 1961 )
1964def remove_interval(intervals: Intervals, remove_start: int, remove_end: int) -> Intervals: 1965 """Remove an interval from a list of intervals. Assumes that the correct start and end intervals have been 1966 passed in. Use `get_remove_interval` method of `Snapshot` to get the correct start/end given the snapshot's 1967 information. 1968 1969 Args: 1970 intervals: A list of exclusive intervals. 1971 remove_start: The inclusive start to remove. 1972 remove_end: The exclusive end to remove. 1973 1974 Returns: 1975 A new list of intervals. 1976 """ 1977 modified: Intervals = [] 1978 1979 for start, end in intervals: 1980 if remove_start > start and remove_end < end: 1981 modified.extend( 1982 ( 1983 (start, remove_start), 1984 (remove_end, end), 1985 ) 1986 ) 1987 elif remove_start > start: 1988 modified.append((start, min(remove_start, end))) 1989 elif remove_end < end: 1990 modified.append((max(remove_end, start), end)) 1991 1992 return modified
Remove an interval from a list of intervals. Assumes that the correct start and end intervals have been
passed in. Use get_remove_interval method of Snapshot to get the correct start/end given the snapshot's
information.
Arguments:
- intervals: A list of exclusive intervals.
- remove_start: The inclusive start to remove.
- remove_end: The exclusive end to remove.
Returns:
A new list of intervals.
1995def to_table_mapping( 1996 snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex] 1997) -> t.Dict[str, str]: 1998 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 1999 return { 2000 snapshot.name: snapshot.table_name(deployability_index.is_representative(snapshot)) 2001 for snapshot in snapshots 2002 if snapshot.version and not snapshot.is_embedded and snapshot.is_model 2003 }
2006def to_view_mapping( 2007 snapshots: t.Iterable[Snapshot], 2008 environment_naming_info: EnvironmentNamingInfo, 2009 default_catalog: t.Optional[str] = None, 2010 dialect: t.Optional[str] = None, 2011) -> t.Dict[str, str]: 2012 return { 2013 snapshot.name: snapshot.display_name( 2014 environment_naming_info, default_catalog=default_catalog, dialect=dialect 2015 ) 2016 for snapshot in snapshots 2017 if snapshot.is_model 2018 }
2021def has_paused_forward_only( 2022 targets: t.Iterable[SnapshotIdLike], 2023 snapshots: t.Union[t.List[Snapshot], t.Dict[SnapshotId, Snapshot]], 2024) -> bool: 2025 if not isinstance(snapshots, dict): 2026 snapshots = {s.snapshot_id: s for s in snapshots} 2027 for target in targets: 2028 target_snapshot = snapshots[target.snapshot_id] 2029 if target_snapshot.is_paused_forward_only: 2030 return True 2031 return False
2034def missing_intervals( 2035 snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]], 2036 start: t.Optional[TimeLike] = None, 2037 end: t.Optional[TimeLike] = None, 2038 execution_time: t.Optional[TimeLike] = None, 2039 restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, 2040 deployability_index: t.Optional[DeployabilityIndex] = None, 2041 start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 2042 end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, 2043 ignore_cron: bool = False, 2044 end_bounded: bool = False, 2045) -> t.Dict[Snapshot, Intervals]: 2046 """Returns all missing intervals given a collection of snapshots.""" 2047 if not isinstance(snapshots, dict): 2048 # Make sure that the mapping is only constructed once 2049 snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} 2050 missing = {} 2051 cache: t.Dict[str, datetime] = {} 2052 end_date = end or now_timestamp() 2053 start_dt = ( 2054 to_datetime(start) 2055 if start 2056 else earliest_start_date(snapshots, cache=cache, relative_to=end_date) 2057 ) 2058 restatements = restatements or {} 2059 start_override_per_model = start_override_per_model or {} 2060 end_override_per_model = end_override_per_model or {} 2061 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 2062 2063 for snapshot in snapshots.values(): 2064 if not snapshot.evaluatable: 2065 continue 2066 2067 snapshot_start_date = start_override_per_model.get(snapshot.name, start_dt) 2068 snapshot_end_date: TimeLike = end_date 2069 2070 restated_interval = restatements.get(snapshot.snapshot_id) 2071 if restated_interval: 2072 snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in restated_interval) 2073 snapshot = snapshot.copy() 2074 snapshot.intervals = snapshot.intervals.copy() 2075 snapshot.remove_interval(restated_interval) 2076 2077 existing_interval_end = end_override_per_model.get(snapshot.name) 2078 if existing_interval_end: 2079 if snapshot_start_date >= existing_interval_end: 2080 # The start exceeds the provided interval end, so we can skip this snapshot 2081 # since it doesn't have missing intervals by definition 2082 continue 2083 snapshot_end_date = existing_interval_end 2084 2085 snapshot_start_date = max( 2086 to_datetime(snapshot_start_date), 2087 to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)), 2088 ) 2089 if snapshot_start_date > to_datetime(snapshot_end_date): 2090 continue 2091 2092 missing_interval_end_date = snapshot_end_date 2093 node_end_date = snapshot.node.end 2094 if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)): 2095 missing_interval_end_date = node_end_date 2096 2097 intervals = snapshot.missing_intervals( 2098 snapshot_start_date, 2099 missing_interval_end_date, 2100 execution_time=execution_time, 2101 deployability_index=deployability_index, 2102 ignore_cron=ignore_cron, 2103 end_bounded=end_bounded, 2104 ) 2105 if intervals: 2106 missing[snapshot] = intervals 2107 2108 return missing
Returns all missing intervals given a collection of snapshots.
2111@lru_cache(maxsize=16384) 2112def expand_range(start_ts: int, end_ts: int, interval_unit: IntervalUnit) -> t.List[int]: 2113 croniter = interval_unit.croniter(start_ts) 2114 timestamps = [start_ts] 2115 2116 while True: 2117 ts = to_timestamp(croniter.get_next(estimate=True)) 2118 2119 if ts > end_ts: 2120 if timestamps and timestamps[-1] != end_ts: 2121 timestamps.append(end_ts) 2122 break 2123 2124 timestamps.append(ts) 2125 return timestamps
2128@lru_cache(maxsize=16384) 2129def compute_missing_intervals( 2130 interval_unit: IntervalUnit, 2131 intervals: t.Tuple[Interval, ...], 2132 start_ts: int, 2133 end_ts: int, 2134 lookback: int, 2135 model_end_ts: t.Optional[int], 2136) -> Intervals: 2137 """Computes all missing intervals between start and end given intervals. 2138 2139 Args: 2140 interval_unit: The interval unit. 2141 intervals: The intervals to check what's missing. 2142 start_ts: Inclusive timestamp start. 2143 end_ts: Exclusive timestamp end. 2144 lookback: A lookback window. 2145 model_end_ts: The exclusive end timestamp set on the model (if one is set) 2146 2147 Returns: 2148 A list of all timestamps in this range. 2149 """ 2150 if start_ts == end_ts: 2151 return [] 2152 2153 timestamps = expand_range(start_ts, end_ts, interval_unit) 2154 missing = set() 2155 2156 for current_ts, next_ts in zip(timestamps, timestamps[1:]): 2157 for low, high in intervals: 2158 if current_ts < low: 2159 missing.add((current_ts, next_ts)) 2160 break 2161 elif current_ts >= low and next_ts <= high: 2162 break 2163 else: 2164 missing.add((current_ts, next_ts)) 2165 2166 if missing: 2167 if lookback: 2168 if model_end_ts: 2169 croniter = interval_unit.croniter(end_ts) 2170 end_ts = to_timestamp(croniter.get_prev(estimate=True)) 2171 2172 while model_end_ts < end_ts: 2173 end_ts = to_timestamp(croniter.get_prev(estimate=True)) 2174 lookback -= 1 2175 2176 lookback = max(lookback, 0) 2177 2178 for i, (current_ts, next_ts) in enumerate(zip(timestamps, timestamps[1:])): 2179 parent = timestamps[i + lookback : i + lookback + 2] 2180 2181 if len(parent) < 2 or tuple(parent) in missing: 2182 missing.add((current_ts, next_ts)) 2183 2184 if model_end_ts: 2185 missing = {interval for interval in missing if interval[0] < model_end_ts} 2186 2187 return sorted(missing)
Computes all missing intervals between start and end given intervals.
Arguments:
- interval_unit: The interval unit.
- intervals: The intervals to check what's missing.
- start_ts: Inclusive timestamp start.
- end_ts: Exclusive timestamp end.
- lookback: A lookback window.
- model_end_ts: The exclusive end timestamp set on the model (if one is set)
Returns:
A list of all timestamps in this range.
2190@lru_cache(maxsize=16384) 2191def inclusive_exclusive( 2192 start: TimeLike, 2193 end: TimeLike, 2194 interval_unit: IntervalUnit, 2195 strict: bool = True, 2196 allow_partial: bool = False, 2197 expand: bool = True, 2198) -> Interval: 2199 """Transform the inclusive start and end into a [start, end) pair. 2200 2201 Args: 2202 start: The start date/time of the interval (inclusive) 2203 end: The end date/time of the interval (inclusive) 2204 interval_unit: The interval unit. 2205 strict: Whether to fail when the inclusive start is the same as the exclusive end. 2206 allow_partial: Whether the interval can be partial or not. 2207 expand: Whether or not partial intervals are expanded outwards. 2208 2209 Returns: 2210 A [start, end) pair. 2211 """ 2212 start_dt = interval_unit.cron_floor(start) 2213 2214 if not expand and not allow_partial and start_dt < to_datetime(start): 2215 start_dt = interval_unit.cron_next(start_dt) 2216 2217 start_ts = to_timestamp(start_dt) 2218 2219 if is_date(end): 2220 end = to_datetime(end) + timedelta(days=1) 2221 2222 if allow_partial: 2223 end_dt = end 2224 else: 2225 end_dt = interval_unit.cron_floor(end) 2226 2227 if expand and end_dt != to_datetime(end): 2228 end_dt = interval_unit.cron_next(end_dt) 2229 2230 end_ts = to_timestamp(end_dt) 2231 2232 if strict and start_ts >= end_ts: 2233 raise ValueError( 2234 f"`end` ({to_datetime(end_ts)}) must be greater than `start` ({to_datetime(start_ts)})" 2235 ) 2236 2237 return (start_ts, end_ts)
Transform the inclusive start and end into a [start, end) pair.
Arguments:
- start: The start date/time of the interval (inclusive)
- end: The end date/time of the interval (inclusive)
- interval_unit: The interval unit.
- strict: Whether to fail when the inclusive start is the same as the exclusive end.
- allow_partial: Whether the interval can be partial or not.
- expand: Whether or not partial intervals are expanded outwards.
Returns:
A [start, end) pair.
2240def earliest_start_date( 2241 snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]], 2242 cache: t.Optional[t.Dict[str, datetime]] = None, 2243 relative_to: t.Optional[TimeLike] = None, 2244) -> datetime: 2245 """Get the earliest start date from a collection of snapshots. 2246 2247 Args: 2248 snapshots: Snapshots to find earliest start date. 2249 cache: optional cache to make computing cache date more efficient 2250 relative_to: the base date to compute start from if inferred from cron 2251 Returns: 2252 The earliest start date or yesterday if none is found. 2253 """ 2254 cache = {} if cache is None else cache 2255 if snapshots: 2256 if not isinstance(snapshots, dict): 2257 # Make sure that the mapping is only constructed once 2258 snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} 2259 return min( 2260 start_date(snapshot, snapshots, cache=cache, relative_to=relative_to) 2261 for snapshot in snapshots.values() 2262 ) 2263 2264 relative_base = None 2265 if relative_to is not None: 2266 relative_base = to_datetime(relative_to) 2267 2268 return yesterday(relative_base=relative_base)
Get the earliest start date from a collection of snapshots.
Arguments:
- snapshots: Snapshots to find earliest start date.
- cache: optional cache to make computing cache date more efficient
- relative_to: the base date to compute start from if inferred from cron
Returns:
The earliest start date or yesterday if none is found.
2271def start_date( 2272 snapshot: Snapshot, 2273 snapshots: t.Dict[SnapshotId, Snapshot] | t.Iterable[Snapshot], 2274 cache: t.Optional[t.Dict[str, datetime]] = None, 2275 relative_to: t.Optional[TimeLike] = None, 2276) -> datetime: 2277 """Get the effective/inferred start date for a snapshot. 2278 2279 Not all snapshots define a start date. In those cases, the node's start date 2280 can be inferred from its parent's start date or from its cron. 2281 2282 Args: 2283 snapshot: snapshot to infer start date. 2284 snapshots: a catalog of available snapshots. 2285 cache: optional cache to make computing cache date more efficient 2286 relative_to: the base date to compute start from if inferred from cron 2287 2288 Returns: 2289 Start datetime object. 2290 """ 2291 cache = {} if cache is None else cache 2292 key = f"{snapshot.name}_{to_timestamp(relative_to)}" if relative_to else snapshot.name 2293 if key in cache: 2294 return cache[key] 2295 if snapshot.node.start: 2296 start = to_datetime(snapshot.node.start) 2297 cache[key] = start 2298 return start 2299 2300 if not isinstance(snapshots, dict): 2301 snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} 2302 2303 parent_starts = [ 2304 start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to) 2305 for parent in snapshot.parents 2306 if parent in snapshots 2307 ] 2308 earliest = ( 2309 min(parent_starts) 2310 if parent_starts 2311 else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now())) 2312 ) 2313 2314 cache[key] = earliest 2315 return earliest
Get the effective/inferred start date for a snapshot.
Not all snapshots define a start date. In those cases, the node's start date can be inferred from its parent's start date or from its cron.
Arguments:
- snapshot: snapshot to infer start date.
- snapshots: a catalog of available snapshots.
- cache: optional cache to make computing cache date more efficient
- relative_to: the base date to compute start from if inferred from cron
Returns:
Start datetime object.
2325def apply_auto_restatements( 2326 snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike 2327) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, t.List[SnapshotId]]]: 2328 """Applies auto restatements to the snapshots. 2329 2330 This operation results in the removal of intervals for snapshots that are ready to be restated based 2331 on the provided execution time and configured auto restatement settings. For each affected snapshot, 2332 it also updates the next auto restatement timestamp. 2333 2334 Args: 2335 snapshots: A dictionary of snapshots to apply auto restatements to. 2336 execution_time: The execution time. 2337 2338 Returns: 2339 A list of SnapshotIntervals with **new** intervals that need to be restated. 2340 """ 2341 dag = snapshots_to_dag(snapshots.values()) 2342 auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {} 2343 auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {} 2344 for s_id in dag: 2345 if s_id not in snapshots: 2346 continue 2347 snapshot = snapshots[s_id] 2348 if not snapshot.is_model or snapshot.model.disable_restatement: 2349 continue 2350 2351 next_auto_restated_interval = snapshot.get_next_auto_restatement_interval(execution_time) 2352 auto_restated_intervals = [ 2353 auto_restated_intervals_per_snapshot[parent_s_id] 2354 for parent_s_id in snapshot.parents 2355 if parent_s_id in auto_restated_intervals_per_snapshot 2356 ] 2357 upstream_triggers = [] 2358 if next_auto_restated_interval: 2359 logger.info( 2360 "Calculated the next auto restated interval (%s, %s) for snapshot %s", 2361 time_like_to_str(next_auto_restated_interval[0]), 2362 time_like_to_str(next_auto_restated_interval[1]), 2363 snapshot.snapshot_id, 2364 ) 2365 auto_restated_intervals.append(next_auto_restated_interval) 2366 2367 # auto-restated snapshot is its own trigger 2368 upstream_triggers = [s_id] 2369 else: 2370 # inherit each parent's auto-restatement triggers (if any) 2371 for parent_s_id in snapshot.parents: 2372 if parent_s_id in auto_restatement_triggers: 2373 upstream_triggers.extend(auto_restatement_triggers[parent_s_id]) 2374 2375 # remove duplicate triggers, retaining order and keeping first seen of duplicates 2376 if upstream_triggers: 2377 auto_restatement_triggers[s_id] = unique(upstream_triggers) 2378 2379 if auto_restated_intervals: 2380 auto_restated_interval_start = sys.maxsize 2381 auto_restated_interval_end = -sys.maxsize 2382 for interval in auto_restated_intervals: 2383 auto_restated_interval_start = min(auto_restated_interval_start, interval[0]) 2384 auto_restated_interval_end = max(auto_restated_interval_end, interval[1]) 2385 2386 interval_to_remove_start = snapshot.node.interval_unit.cron_floor( 2387 auto_restated_interval_start 2388 ) 2389 interval_to_remove_end = snapshot.node.interval_unit.cron_floor( 2390 auto_restated_interval_end 2391 ) 2392 if auto_restated_interval_end > to_timestamp(interval_to_remove_end): 2393 interval_to_remove_end = snapshot.node.interval_unit.cron_next( 2394 interval_to_remove_end 2395 ) 2396 2397 removal_interval = snapshot.get_removal_interval( 2398 interval_to_remove_start, interval_to_remove_end, execution_time=execution_time 2399 ) 2400 2401 auto_restated_intervals_per_snapshot[s_id] = removal_interval 2402 snapshot.pending_restatement_intervals = merge_intervals( 2403 [*snapshot.pending_restatement_intervals, removal_interval] 2404 ) 2405 2406 snapshot.apply_pending_restatement_intervals() 2407 snapshot.update_next_auto_restatement_ts(execution_time) 2408 return ( 2409 [ 2410 SnapshotIntervals( 2411 name=snapshots[s_id].name, 2412 identifier=None, 2413 version=snapshots[s_id].version, 2414 dev_version=None, 2415 intervals=[], 2416 dev_intervals=[], 2417 pending_restatement_intervals=[interval], 2418 ) 2419 for s_id, interval in auto_restated_intervals_per_snapshot.items() 2420 if s_id in snapshots 2421 ], 2422 auto_restatement_triggers, 2423 )
Applies auto restatements to the snapshots.
This operation results in the removal of intervals for snapshots that are ready to be restated based on the provided execution time and configured auto restatement settings. For each affected snapshot, it also updates the next auto restatement timestamp.
Arguments:
- snapshots: A dictionary of snapshots to apply auto restatements to.
- execution_time: The execution time.
Returns:
A list of SnapshotIntervals with new intervals that need to be restated.
2426def parent_snapshots_by_name( 2427 snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot] 2428) -> t.Dict[str, Snapshot]: 2429 parent_snapshots_by_name = { 2430 snapshots[p_sid].name: snapshots[p_sid] for p_sid in snapshot.parents 2431 } 2432 parent_snapshots_by_name[snapshot.name] = snapshot 2433 return parent_snapshots_by_name
2453def check_ready_intervals( 2454 check: t.Callable, 2455 intervals: Intervals, 2456 context: ExecutionContext, 2457 python_env: t.Dict[str, Executable], 2458 dialect: DialectType = None, 2459 path: t.Optional[Path] = None, 2460 snapshot: t.Optional[Snapshot] = None, 2461 kwargs: t.Optional[t.Dict] = None, 2462) -> Intervals: 2463 checked_intervals: Intervals = [] 2464 2465 for interval_batch in _contiguous_intervals(intervals): 2466 batch = [(to_datetime(start), to_datetime(end)) for start, end in interval_batch] 2467 2468 try: 2469 ready_intervals = call_macro( 2470 check, 2471 dialect, 2472 path, 2473 provided_args=(batch,), 2474 provided_kwargs=(kwargs or {}), 2475 context=context, 2476 snapshot=snapshot, 2477 ) 2478 except Exception as ex: 2479 raise SignalEvalError(format_evaluated_code_exception(ex, python_env)) 2480 2481 if isinstance(ready_intervals, bool): 2482 if not ready_intervals: 2483 batch = [] 2484 elif isinstance(ready_intervals, list): 2485 for i in ready_intervals: 2486 if i not in batch: 2487 raise SignalEvalError(f"Unknown interval {i} for signal") 2488 batch = ready_intervals 2489 else: 2490 raise SignalEvalError(f"Expected bool | list, got {type(ready_intervals)} for signal") 2491 2492 checked_intervals.extend((to_timestamp(start), to_timestamp(end)) for start, end in batch) 2493 2494 return checked_intervals
2497def get_next_model_interval_start(snapshots: t.Iterable[Snapshot]) -> t.Optional[datetime]: 2498 now_dt = now() 2499 2500 starts = [ 2501 snap.node.cron_next(now_dt) 2502 for snap in snapshots 2503 if snap.is_model and not snap.is_symbolic and not snap.is_seed 2504 ] 2505 2506 return min(starts) if starts else None