Edit on GitHub

sqlmesh.core.snapshot.definition

   1from __future__ import annotations
   2
   3import sys
   4import typing as t
   5from collections import defaultdict
   6from datetime import datetime, timedelta
   7from enum import IntEnum
   8import logging
   9from functools import cached_property, lru_cache
  10from pathlib import Path
  11
  12from pydantic import Field
  13from sqlglot import exp
  14from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
  15
  16from sqlmesh.core.config.common import (
  17    TableNamingConvention,
  18    VirtualEnvironmentMode,
  19    EnvironmentSuffixTarget,
  20)
  21from sqlmesh.core import constants as c
  22from sqlmesh.core.audit import StandaloneAudit
  23from sqlmesh.core.macros import call_macro
  24from sqlmesh.core.model import Model, ModelKindMixin, ModelKindName, ViewKind, CustomKind
  25from sqlmesh.core.model.definition import _Model
  26from sqlmesh.core.node import IntervalUnit, NodeType
  27from sqlmesh.utils import sanitize_name, unique
  28from sqlmesh.utils.dag import DAG
  29from sqlmesh.utils.date import (
  30    TimeLike,
  31    is_date,
  32    make_inclusive,
  33    make_exclusive,
  34    make_inclusive_end,
  35    now,
  36    now_timestamp,
  37    time_like_to_str,
  38    to_date,
  39    to_datetime,
  40    to_ds,
  41    to_timestamp,
  42    to_ts,
  43    validate_date_range,
  44    yesterday,
  45)
  46from sqlmesh.utils.errors import SQLMeshError, SignalEvalError
  47from sqlmesh.utils.metaprogramming import (
  48    format_evaluated_code_exception,
  49    Executable,
  50)
  51from sqlmesh.utils.hashing import hash_data, md5
  52from sqlmesh.utils.pydantic import PydanticModel, field_validator
  53
  54if t.TYPE_CHECKING:
  55    from sqlglot.dialects.dialect import DialectType
  56    from sqlmesh.core.environment import EnvironmentNamingInfo
  57    from sqlmesh.core.context import ExecutionContext
  58
  59Interval = t.Tuple[int, int]
  60Intervals = t.List[Interval]
  61
  62Node = t.Annotated[t.Union[Model, StandaloneAudit], Field(discriminator="source_type")]
  63
  64
  65logger = logging.getLogger(__name__)
  66
  67
  68class SnapshotChangeCategory(IntEnum):
  69    """
  70    Values are ordered by decreasing severity and that ordering is required.
  71
  72    BREAKING: The change requires that snapshot modified and downstream dependencies be rebuilt
  73    NON_BREAKING: The change requires that only the snapshot modified be rebuilt
  74    FORWARD_ONLY: The change requires no rebuilding
  75    INDIRECT_BREAKING: The change was caused indirectly and is breaking.
  76    INDIRECT_NON_BREAKING: The change was caused indirectly by a non-breaking change.
  77    METADATA: The change was caused by a metadata update.
  78    """
  79
  80    BREAKING = 1
  81    NON_BREAKING = 2
  82    # FORWARD_ONLY category is deprecated and is kept for backwards compatibility.
  83    FORWARD_ONLY = 3
  84    INDIRECT_BREAKING = 4
  85    INDIRECT_NON_BREAKING = 5
  86    METADATA = 6
  87
  88    @property
  89    def is_breaking(self) -> bool:
  90        return self == self.BREAKING
  91
  92    @property
  93    def is_non_breaking(self) -> bool:
  94        return self == self.NON_BREAKING
  95
  96    @property
  97    def is_forward_only(self) -> bool:
  98        return self == self.FORWARD_ONLY
  99
 100    @property
 101    def is_metadata(self) -> bool:
 102        return self == self.METADATA
 103
 104    @property
 105    def is_indirect_breaking(self) -> bool:
 106        return self == self.INDIRECT_BREAKING
 107
 108    @property
 109    def is_indirect_non_breaking(self) -> bool:
 110        return self == self.INDIRECT_NON_BREAKING
 111
 112    def __repr__(self) -> str:
 113        return self.name
 114
 115
 116class SnapshotFingerprint(PydanticModel, frozen=True):
 117    data_hash: str
 118    metadata_hash: str
 119    parent_data_hash: str = "0"
 120    parent_metadata_hash: str = "0"
 121
 122    def to_version(self) -> str:
 123        return hash_data([self.data_hash, self.parent_data_hash])
 124
 125    def to_identifier(self) -> str:
 126        return hash_data(
 127            [
 128                self.data_hash,
 129                self.metadata_hash,
 130                self.parent_data_hash,
 131                self.parent_metadata_hash,
 132            ]
 133        )
 134
 135    def __str__(self) -> str:
 136        return f"SnapshotFingerprint<{self.to_identifier()}, data: {self.data_hash}, meta: {self.metadata_hash}, pdata: {self.parent_data_hash}, pmeta: {self.parent_metadata_hash}>"
 137
 138
 139class SnapshotId(PydanticModel, frozen=True):
 140    name: str
 141    identifier: str
 142
 143    @property
 144    def snapshot_id(self) -> SnapshotId:
 145        """Helper method to return self."""
 146        return self
 147
 148    def __eq__(self, other: t.Any) -> bool:
 149        return (
 150            isinstance(other, self.__class__)
 151            and self.name == other.name
 152            and self.identifier == other.identifier
 153        )
 154
 155    def __hash__(self) -> int:
 156        return hash((self.__class__, self.name, self.identifier))
 157
 158    def __lt__(self, other: SnapshotId) -> bool:
 159        return self.name < other.name
 160
 161    def __str__(self) -> str:
 162        return f"SnapshotId<{self.name}: {self.identifier}>"
 163
 164
 165class SnapshotIdBatch(PydanticModel, frozen=True):
 166    snapshot_id: SnapshotId
 167    batch_id: int
 168
 169
 170class SnapshotNameVersion(PydanticModel, frozen=True):
 171    name: str
 172    version: str
 173
 174    @property
 175    def name_version(self) -> SnapshotNameVersion:
 176        """Helper method to return self."""
 177        return self
 178
 179
 180class SnapshotIntervals(PydanticModel):
 181    name: str
 182    identifier: t.Optional[str]
 183    version: str
 184    dev_version: t.Optional[str]
 185    intervals: Intervals = []
 186    dev_intervals: Intervals = []
 187    pending_restatement_intervals: Intervals = []
 188    last_altered_ts: t.Optional[int] = None
 189    dev_last_altered_ts: t.Optional[int] = None
 190
 191    @property
 192    def snapshot_id(self) -> t.Optional[SnapshotId]:
 193        if not self.identifier:
 194            return None
 195        return SnapshotId(name=self.name, identifier=self.identifier)
 196
 197    @property
 198    def name_version(self) -> SnapshotNameVersion:
 199        return SnapshotNameVersion(name=self.name, version=self.version)
 200
 201    def add_interval(self, start: int, end: int) -> None:
 202        self._add_interval(start, end, "intervals")
 203
 204    def add_dev_interval(self, start: int, end: int) -> None:
 205        self._add_interval(start, end, "dev_intervals")
 206
 207    def add_pending_restatement_interval(self, start: int, end: int) -> None:
 208        self._add_interval(start, end, "pending_restatement_intervals")
 209
 210    def update_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
 211        self._update_last_altered_ts(last_altered_ts, "last_altered_ts")
 212
 213    def update_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
 214        self._update_last_altered_ts(last_altered_ts, "dev_last_altered_ts")
 215
 216    def remove_interval(self, start: int, end: int) -> None:
 217        self._remove_interval(start, end, "intervals")
 218
 219    def remove_dev_interval(self, start: int, end: int) -> None:
 220        self._remove_interval(start, end, "dev_intervals")
 221
 222    def remove_pending_restatement_interval(self, start: int, end: int) -> None:
 223        self._remove_interval(start, end, "pending_restatement_intervals")
 224
 225    def is_empty(self) -> bool:
 226        return (
 227            not self.intervals and not self.dev_intervals and not self.pending_restatement_intervals
 228        )
 229
 230    def _add_interval(self, start: int, end: int, interval_attr: str) -> None:
 231        target_intervals = getattr(self, interval_attr)
 232        target_intervals = merge_intervals([*target_intervals, (start, end)])
 233        setattr(self, interval_attr, target_intervals)
 234
 235    def _update_last_altered_ts(
 236        self, last_altered_ts: t.Optional[int], last_altered_attr: str
 237    ) -> None:
 238        if last_altered_ts:
 239            existing_last_altered_ts = getattr(self, last_altered_attr)
 240            setattr(self, last_altered_attr, max(existing_last_altered_ts or 0, last_altered_ts))
 241
 242    def _remove_interval(self, start: int, end: int, interval_attr: str) -> None:
 243        target_intervals = getattr(self, interval_attr)
 244        target_intervals = remove_interval(target_intervals, start, end)
 245        setattr(self, interval_attr, target_intervals)
 246
 247
 248class SnapshotDataVersion(PydanticModel, frozen=True):
 249    fingerprint: SnapshotFingerprint
 250    version: str
 251    dev_version_: t.Optional[str] = Field(default=None, alias="dev_version")
 252    change_category: t.Optional[SnapshotChangeCategory] = None
 253    physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
 254    dev_table_suffix: str
 255    table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
 256    virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
 257
 258    def snapshot_id(self, name: str) -> SnapshotId:
 259        return SnapshotId(name=name, identifier=self.fingerprint.to_identifier())
 260
 261    @property
 262    def dev_version(self) -> str:
 263        return self.dev_version_ or self.fingerprint.to_version()
 264
 265    @property
 266    def physical_schema(self) -> str:
 267        # The physical schema here is optional to maintain backwards compatibility with
 268        # records stored by previous versions of SQLMesh.
 269        return self.physical_schema_ or c.SQLMESH
 270
 271    @property
 272    def data_version(self) -> SnapshotDataVersion:
 273        return self
 274
 275    @property
 276    def is_new_version(self) -> bool:
 277        """Returns whether or not this version is new and requires a backfill."""
 278        return self.fingerprint.to_version() == self.version
 279
 280
 281class QualifiedViewName(PydanticModel, frozen=True):
 282    catalog: t.Optional[str] = None
 283    schema_name: t.Optional[str] = None
 284    table: str
 285
 286    def for_environment(
 287        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
 288    ) -> str:
 289        return exp.table_name(self.table_for_environment(environment_naming_info, dialect=dialect))
 290
 291    def table_for_environment(
 292        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
 293    ) -> exp.Table:
 294        return exp.table_(
 295            self.table_name_for_environment(environment_naming_info, dialect=dialect),
 296            db=self.schema_for_environment(environment_naming_info, dialect=dialect),
 297            catalog=self.catalog_for_environment(environment_naming_info, dialect=dialect),
 298        )
 299
 300    def catalog_for_environment(
 301        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
 302    ) -> t.Optional[str]:
 303        catalog_name: t.Optional[str] = None
 304        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog:
 305            catalog_name = f"{self.catalog}__{environment_naming_info.name}"
 306        elif environment_naming_info.catalog_name_override:
 307            catalog_name = environment_naming_info.catalog_name_override
 308
 309        if catalog_name:
 310            return (
 311                normalize_identifiers(catalog_name, dialect=dialect).name
 312                if environment_naming_info.normalize_name
 313                else catalog_name
 314            )
 315
 316        return self.catalog
 317
 318    def schema_for_environment(
 319        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
 320    ) -> str:
 321        normalize = environment_naming_info.normalize_name
 322
 323        if self.schema_name:
 324            schema = self.schema_name
 325        else:
 326            schema = c.DEFAULT_SCHEMA
 327            if normalize:
 328                schema = normalize_identifiers(schema, dialect=dialect).name
 329
 330        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema:
 331            env_name = environment_naming_info.name
 332            if normalize:
 333                env_name = normalize_identifiers(env_name, dialect=dialect).name
 334
 335            schema = f"{schema}__{env_name}"
 336
 337        return schema
 338
 339    def table_name_for_environment(
 340        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
 341    ) -> str:
 342        table = self.table
 343        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table:
 344            env_name = environment_naming_info.name
 345            if environment_naming_info.normalize_name:
 346                env_name = normalize_identifiers(env_name, dialect=dialect).name
 347
 348            table = f"{table}__{env_name}"
 349
 350        return table
 351
 352
 353class SnapshotInfoMixin(ModelKindMixin):
 354    name: str
 355    dev_version_: t.Optional[str]
 356    change_category: t.Optional[SnapshotChangeCategory]
 357    fingerprint: SnapshotFingerprint
 358    previous_versions: t.Tuple[SnapshotDataVersion, ...]
 359    # Added to support Migration # 34 (default catalog)
 360    # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though)
 361    base_table_name_override: t.Optional[str]
 362    dev_table_suffix: str
 363    table_naming_convention: TableNamingConvention
 364    forward_only: bool
 365
 366    @cached_property
 367    def identifier(self) -> str:
 368        return self.fingerprint.to_identifier()
 369
 370    @cached_property
 371    def snapshot_id(self) -> SnapshotId:
 372        return SnapshotId(name=self.name, identifier=self.identifier)
 373
 374    @property
 375    def qualified_view_name(self) -> QualifiedViewName:
 376        view_name = exp.to_table(self.fully_qualified_table or self.name)
 377        return QualifiedViewName(
 378            catalog=view_name.catalog or None,
 379            schema_name=view_name.db or None,
 380            table=view_name.name,
 381        )
 382
 383    @property
 384    def previous_version(self) -> t.Optional[SnapshotDataVersion]:
 385        """Helper method to get the previous data version."""
 386        if self.previous_versions:
 387            return self.previous_versions[-1]
 388        return None
 389
 390    @property
 391    def dev_version(self) -> str:
 392        return self.dev_version_ or self.fingerprint.to_version()
 393
 394    @property
 395    def physical_schema(self) -> str:
 396        raise NotImplementedError
 397
 398    @property
 399    def data_version(self) -> SnapshotDataVersion:
 400        raise NotImplementedError
 401
 402    @property
 403    def is_new_version(self) -> bool:
 404        raise NotImplementedError
 405
 406    @cached_property
 407    def fully_qualified_table(self) -> t.Optional[exp.Table]:
 408        raise NotImplementedError
 409
 410    @property
 411    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
 412        raise NotImplementedError
 413
 414    @property
 415    def is_forward_only(self) -> bool:
 416        return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
 417
 418    @property
 419    def is_metadata(self) -> bool:
 420        return self.change_category == SnapshotChangeCategory.METADATA
 421
 422    @property
 423    def is_indirect_non_breaking(self) -> bool:
 424        return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
 425
 426    @property
 427    def is_no_rebuild(self) -> bool:
 428        """Returns true if this snapshot doesn't require a rebuild in production."""
 429        return self.forward_only or self.change_category in (
 430            SnapshotChangeCategory.FORWARD_ONLY,  # Backwards compatibility
 431            SnapshotChangeCategory.METADATA,
 432            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
 433        )
 434
 435    @property
 436    def is_no_preview(self) -> bool:
 437        """Returns true if this snapshot doesn't require a preview in development."""
 438        return self.forward_only and self.change_category in (
 439            SnapshotChangeCategory.METADATA,
 440            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
 441        )
 442
 443    @property
 444    def all_versions(self) -> t.Tuple[SnapshotDataVersion, ...]:
 445        """Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT."""
 446        return (*self.previous_versions, self.data_version)[-c.DATA_VERSION_LIMIT :]
 447
 448    def display_name(
 449        self,
 450        environment_naming_info: EnvironmentNamingInfo,
 451        default_catalog: t.Optional[str],
 452        dialect: DialectType = None,
 453    ) -> str:
 454        """
 455        Returns the model name as a qualified view name.
 456        This is just used for presenting information back to the user and `qualified_view_name` should be used
 457        when wanting a view name in all other cases.
 458        """
 459        return display_name(self, environment_naming_info, default_catalog, dialect=dialect)
 460
 461    def data_hash_matches(self, other: t.Optional[SnapshotInfoMixin | SnapshotDataVersion]) -> bool:
 462        return other is not None and self.fingerprint.data_hash == other.fingerprint.data_hash
 463
 464    def _table_name(self, version: str, is_deployable: bool) -> str:
 465        """Full table name pointing to the materialized location of the snapshot.
 466
 467        Args:
 468            version: The snapshot version.
 469            is_deployable: Indicates whether to return the table name for deployment to production.
 470        """
 471        if self.is_external:
 472            return self.name
 473
 474        if is_deployable and self.virtual_environment_mode.is_dev_only:
 475            # Use the model name as is if the target is deployable and the virtual environment mode is set to dev-only
 476            return self.name
 477
 478        is_dev_table = not is_deployable
 479        if is_dev_table:
 480            version = self.dev_version
 481
 482        if self.fully_qualified_table is None:
 483            raise SQLMeshError(
 484                f"Tried to get a table name for a snapshot that does not have a table. {self.name}"
 485            )
 486        # We want to exclude the catalog from the name but still include catalog when determining the fqn
 487        # for the table.
 488        if self.base_table_name_override:
 489            base_table_name = self.base_table_name_override
 490        else:
 491            fqt = self.fully_qualified_table.copy()
 492            fqt.set("catalog", None)
 493            base_table_name = fqt.sql()
 494
 495        return table_name(
 496            self.physical_schema,
 497            base_table_name,
 498            version,
 499            catalog=self.fully_qualified_table.catalog,
 500            suffix=self.dev_table_suffix if is_dev_table else None,
 501            naming_convention=self.table_naming_convention,
 502        )
 503
 504    @property
 505    def node_type(self) -> NodeType:
 506        raise NotImplementedError
 507
 508    @property
 509    def is_model(self) -> bool:
 510        return self.node_type == NodeType.MODEL
 511
 512    @property
 513    def is_audit(self) -> bool:
 514        return self.node_type == NodeType.AUDIT
 515
 516
 517class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
 518    name: str
 519    fingerprint: SnapshotFingerprint
 520    version: str
 521    dev_version_: t.Optional[str] = Field(default=None, alias="dev_version")
 522    physical_schema_: str = Field(alias="physical_schema")
 523    parents: t.Tuple[SnapshotId, ...]
 524    previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
 525    change_category: t.Optional[SnapshotChangeCategory] = None
 526    kind_name: t.Optional[ModelKindName] = None
 527    node_type_: NodeType = Field(default=NodeType.MODEL, alias="node_type")
 528    # Added to support Migration # 34 (default catalog)
 529    # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though)
 530    base_table_name_override: t.Optional[str] = None
 531    custom_materialization: t.Optional[str] = None
 532    dev_table_suffix: str
 533    model_gateway: t.Optional[str] = None
 534    forward_only: bool = False
 535    table_naming_convention: TableNamingConvention = TableNamingConvention.default
 536    virtual_environment_mode_: VirtualEnvironmentMode = Field(
 537        default=VirtualEnvironmentMode.default, alias="virtual_environment_mode"
 538    )
 539
 540    def __lt__(self, other: SnapshotTableInfo) -> bool:
 541        return self.name < other.name
 542
 543    def __eq__(self, other: t.Any) -> bool:
 544        return isinstance(other, SnapshotTableInfo) and self.fingerprint == other.fingerprint
 545
 546    def __hash__(self) -> int:
 547        return hash((self.__class__, self.name, self.fingerprint))
 548
 549    def table_name(self, is_deployable: bool = True) -> str:
 550        """Full table name pointing to the materialized location of the snapshot.
 551
 552        Args:
 553            is_deployable: Indicates whether to return the table name for deployment to production.
 554        """
 555        return self._table_name(self.version, is_deployable)
 556
 557    @property
 558    def physical_schema(self) -> str:
 559        return self.physical_schema_
 560
 561    @cached_property
 562    def fully_qualified_table(self) -> exp.Table:
 563        return exp.to_table(self.name)
 564
 565    @property
 566    def table_info(self) -> SnapshotTableInfo:
 567        """Helper method to return self."""
 568        return self
 569
 570    @property
 571    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
 572        return self.virtual_environment_mode_
 573
 574    @property
 575    def data_version(self) -> SnapshotDataVersion:
 576        return SnapshotDataVersion(
 577            fingerprint=self.fingerprint,
 578            version=self.version,
 579            dev_version=self.dev_version,
 580            change_category=self.change_category,
 581            physical_schema=self.physical_schema,
 582            dev_table_suffix=self.dev_table_suffix,
 583            table_naming_convention=self.table_naming_convention,
 584            virtual_environment_mode=self.virtual_environment_mode,
 585        )
 586
 587    @property
 588    def is_new_version(self) -> bool:
 589        """Returns whether or not this version is new and requires a backfill."""
 590        return self.fingerprint.to_version() == self.version
 591
 592    @property
 593    def model_kind_name(self) -> t.Optional[ModelKindName]:
 594        return self.kind_name
 595
 596    @property
 597    def node_type(self) -> NodeType:
 598        return self.node_type_
 599
 600    @property
 601    def name_version(self) -> SnapshotNameVersion:
 602        """Returns the name and version of the snapshot."""
 603        return SnapshotNameVersion(name=self.name, version=self.version)
 604
 605    @property
 606    def id_and_version(self) -> SnapshotIdAndVersion:
 607        return SnapshotIdAndVersion(
 608            name=self.name,
 609            kind_name=self.kind_name,
 610            identifier=self.identifier,
 611            version=self.version,
 612            dev_version=self.dev_version,
 613            fingerprint=self.fingerprint,
 614        )
 615
 616
 617class SnapshotIdAndVersion(PydanticModel, ModelKindMixin):
 618    """A stripped down version of a snapshot that is used in situations where we want to fetch the main fields of the snapshots table
 619    without the overhead of parsing the full snapshot payload and fetching intervals.
 620    """
 621
 622    name: str
 623    version: str
 624    kind_name_: t.Optional[ModelKindName] = Field(default=None, alias="kind_name")
 625    dev_version_: t.Optional[str] = Field(alias="dev_version")
 626    identifier: str
 627    fingerprint_: t.Union[str, SnapshotFingerprint] = Field(alias="fingerprint")
 628
 629    @property
 630    def snapshot_id(self) -> SnapshotId:
 631        return SnapshotId(name=self.name, identifier=self.identifier)
 632
 633    @property
 634    def id_and_version(self) -> SnapshotIdAndVersion:
 635        return self
 636
 637    @property
 638    def name_version(self) -> SnapshotNameVersion:
 639        return SnapshotNameVersion(name=self.name, version=self.version)
 640
 641    @property
 642    def fingerprint(self) -> SnapshotFingerprint:
 643        value = self.fingerprint_
 644        if isinstance(value, str):
 645            self.fingerprint_ = value = SnapshotFingerprint.parse_raw(value)
 646        return value
 647
 648    @property
 649    def dev_version(self) -> str:
 650        return self.dev_version_ or self.fingerprint.to_version()
 651
 652    @property
 653    def model_kind_name(self) -> t.Optional[ModelKindName]:
 654        return self.kind_name_
 655
 656    def display_name(
 657        self,
 658        environment_naming_info: EnvironmentNamingInfo,
 659        default_catalog: t.Optional[str],
 660        dialect: DialectType = None,
 661    ) -> str:
 662        return model_display_name(
 663            self.name, environment_naming_info, default_catalog, dialect=dialect
 664        )
 665
 666
 667class Snapshot(PydanticModel, SnapshotInfoMixin):
 668    """A snapshot represents a node at a certain point in time.
 669
 670    Snapshots are used to encapsulate everything needed to evaluate a node.
 671    They are standalone objects that hold all state and dynamic content necessary
 672    to render a node's query including things like macros. Snapshots also store intervals
 673    (timestamp ranges for what data we've processed).
 674
 675    Nodes can be dynamically rendered due to macros. Rendering a node to its full extent
 676    requires storing variables and macro definitions. We store all of the macro definitions and
 677    global variable references in `python_env` in raw text to avoid pickling. The helper methods
 678    to achieve this are defined in utils.metaprogramming.
 679
 680    Args:
 681        name: The snapshot name which is the same as the node name and should be unique per node.
 682        fingerprint: A unique hash of the node definition so that nodes can be reused across environments.
 683        node: Node object that the snapshot encapsulates.
 684        parents: The list of parent snapshots (upstream dependencies).
 685        intervals: List of [start, end) intervals showing which time ranges a snapshot has data for.
 686        dev_intervals: List of [start, end) intervals showing development intervals (forward-only).
 687        created_ts: Epoch millis timestamp when a snapshot was first created.
 688        updated_ts: Epoch millis timestamp when a snapshot was last updated.
 689        ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced
 690            in any environment.
 691        previous: The snapshot data version that this snapshot was based on. If this snapshot is new, then previous will be None.
 692        version: User specified version for a snapshot that is used for physical storage.
 693            By default, the version is the fingerprint, but not all changes to nodes require a backfill.
 694            If a user passes a previous version, that will be used instead and no backfill will be required.
 695        change_category: User specified change category indicating which nodes require backfill from node changes made in this snapshot.
 696        unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that
 697            this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused.
 698        effective_from: The timestamp which indicates when this snapshot should be considered effective.
 699            Applicable for forward-only snapshots only.
 700        migrated: Whether or not this snapshot has been created as a result of migration.
 701        unrestorable: Whether or not this snapshot can be used to revert its model to a previous version.
 702        next_auto_restatement_ts: The timestamp which indicates when is the next time this snapshot should be restated.
 703        table_naming_convention: Convention to follow when generating the physical table name
 704    """
 705
 706    name: str
 707    fingerprint: SnapshotFingerprint
 708    physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
 709    node: Node
 710    parents: t.Tuple[SnapshotId, ...]
 711    intervals: Intervals = []
 712    dev_intervals: Intervals = []
 713    pending_restatement_intervals: Intervals = []
 714    created_ts: int
 715    updated_ts: int
 716    ttl: str
 717    previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
 718    version: t.Optional[str] = None
 719    dev_version_: t.Optional[str] = Field(default=None, alias="dev_version")
 720    change_category: t.Optional[SnapshotChangeCategory] = None
 721    unpaused_ts: t.Optional[int] = None
 722    effective_from: t.Optional[TimeLike] = None
 723    migrated: bool = False
 724    unrestorable: bool = False
 725    # Added to support Migration # 34 (default catalog)
 726    base_table_name_override: t.Optional[str] = None
 727    next_auto_restatement_ts: t.Optional[int] = None
 728    dev_table_suffix: str = "dev"
 729    table_naming_convention: TableNamingConvention = TableNamingConvention.default
 730    forward_only: bool = False
 731    # Physical table last modified timestamp, not to be confused with the "updated_ts" field
 732    # which is for the snapshot record itself
 733    last_altered_ts: t.Optional[int] = None
 734    dev_last_altered_ts: t.Optional[int] = None
 735
 736    @field_validator("ttl")
 737    @classmethod
 738    def _time_delta_must_be_positive(cls, v: str) -> str:
 739        current_time = now()
 740        if to_datetime(v, current_time) < current_time:
 741            raise ValueError(
 742                "Must be positive. Use the 'in' keyword to denote a positive time interval. For example, 'in 7 days'."
 743            )
 744        return v
 745
 746    @staticmethod
 747    def hydrate_with_intervals_by_version(
 748        snapshots: t.Iterable[Snapshot],
 749        intervals: t.Iterable[SnapshotIntervals],
 750    ) -> t.List[Snapshot]:
 751        """Hydrates target snapshots with given intervals.
 752
 753        This will match snapshots with intervals by name and version rather than identifiers.
 754
 755        Args:
 756            snapshots: Target snapshots.
 757            intervals: Target snapshot intervals.
 758
 759        Returns:
 760            List of target snapshots with hydrated intervals.
 761        """
 762        intervals_by_name_version = defaultdict(list)
 763        for interval in intervals:
 764            intervals_by_name_version[(interval.name, interval.version)].append(interval)
 765
 766        result = []
 767        for snapshot in snapshots:
 768            snapshot_intervals = intervals_by_name_version.get(
 769                (snapshot.name, snapshot.version_get_or_generate()), []
 770            )
 771            for interval in snapshot_intervals:
 772                snapshot.merge_intervals(interval)
 773
 774            result.append(snapshot)
 775
 776        return result
 777
 778    @classmethod
 779    def from_node(
 780        cls,
 781        node: Node,
 782        *,
 783        nodes: t.Dict[str, Node],
 784        ttl: str = c.DEFAULT_SNAPSHOT_TTL,
 785        version: t.Optional[str] = None,
 786        cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
 787        table_naming_convention: TableNamingConvention = TableNamingConvention.default,
 788    ) -> Snapshot:
 789        """Creates a new snapshot for a node.
 790
 791        Args:
 792            Node: Node to snapshot.
 793            nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes.
 794                If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
 795            ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live.
 796            version: The version that a snapshot is associated with. Usually set during the planning phase.
 797            cache: Cache of node name to fingerprints.
 798            table_naming_convention: Convention to follow when generating the physical table name
 799
 800        Returns:
 801            The newly created snapshot.
 802        """
 803        created_ts = now_timestamp()
 804
 805        return cls(
 806            name=node.fqn,
 807            fingerprint=fingerprint_from_node(
 808                node,
 809                nodes=nodes,
 810                cache=cache,
 811            ),
 812            node=node,
 813            parents=tuple(
 814                SnapshotId(
 815                    name=parent_node.fqn,
 816                    identifier=fingerprint_from_node(
 817                        parent_node,
 818                        nodes=nodes,
 819                        cache=cache,
 820                    ).to_identifier(),
 821                )
 822                for parent_node in _parents_from_node(node, nodes).values()
 823            ),
 824            intervals=[],
 825            dev_intervals=[],
 826            created_ts=created_ts,
 827            updated_ts=created_ts,
 828            ttl=ttl,
 829            version=version,
 830            table_naming_convention=table_naming_convention,
 831        )
 832
 833    def __eq__(self, other: t.Any) -> bool:
 834        return isinstance(other, Snapshot) and self.fingerprint == other.fingerprint
 835
 836    def __hash__(self) -> int:
 837        return hash((self.__class__, self.name, self.fingerprint))
 838
 839    def __lt__(self, other: Snapshot) -> bool:
 840        return self.name < other.name
 841
 842    def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) -> None:
 843        """Add a newly processed time interval to the snapshot.
 844
 845        The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch
 846        timestamp exclusive. This allows merging of ranges to be easier.
 847
 848        Args:
 849            start: The start date/time of the interval (inclusive)
 850            end: The end date/time of the interval. If end is a date, then it is considered inclusive.
 851                If it is a datetime object, then it is exclusive.
 852            is_dev: Indicates whether the given interval is being added while in development mode.
 853        """
 854        if to_timestamp(start) > to_timestamp(end):
 855            raise ValueError(
 856                f"Attempted to add an Invalid interval ({start}, {end}) to snapshot {self.snapshot_id}"
 857            )
 858
 859        start_ts, end_ts = self.inclusive_exclusive(start, end, strict=False, expand=False)
 860
 861        if start_ts >= end_ts:
 862            # Skipping partial interval.
 863            return
 864
 865        intervals = self.dev_intervals if is_dev else self.intervals
 866        intervals.append((start_ts, end_ts))
 867
 868        if len(intervals) < 2:
 869            return
 870
 871        merged_intervals = merge_intervals(intervals)
 872        if is_dev:
 873            self.dev_intervals = merged_intervals
 874        else:
 875            self.intervals = merged_intervals
 876
 877    def remove_interval(self, interval: Interval) -> None:
 878        """Remove an interval from the snapshot.
 879
 880        Args:
 881            interval: The interval to remove.
 882        """
 883        self.intervals = remove_interval(self.intervals, *interval)
 884        self.dev_intervals = remove_interval(self.dev_intervals, *interval)
 885
 886    def get_removal_interval(
 887        self,
 888        start: TimeLike,
 889        end: TimeLike,
 890        execution_time: t.Optional[TimeLike] = None,
 891        *,
 892        strict: bool = True,
 893        is_preview: bool = False,
 894    ) -> Interval:
 895        """Get the interval that should be removed from the snapshot.
 896
 897        Args:
 898            start: The start date/time of the interval to remove.
 899            end: The end date/time of the interval to removed.
 900            execution_time: The time the interval is being removed.
 901            strict: Whether to fail when the inclusive start is the same as the exclusive end.
 902            is_preview: Whether the interval needs to be removed for a preview of forward-only changes.
 903                When previewing, we are not actually restating a model, but removing an interval to trigger
 904                a run.
 905        """
 906        end = execution_time or now_timestamp() if self.depends_on_past else end
 907        removal_interval = self.inclusive_exclusive(start, end, strict)
 908
 909        if not is_preview and self.full_history_restatement_only and self.intervals:
 910            expanded_removal_interval = self.inclusive_exclusive(self.intervals[0][0], end, strict)
 911            requested_start, requested_end = removal_interval
 912            expanded_start, expanded_end = expanded_removal_interval
 913
 914            # only warn if the requested removal interval was a subset of the actual model intervals and was automatically expanded
 915            # if the requested interval was the same or wider than the actual model intervals, no need to warn
 916            if (
 917                requested_start > expanded_start or requested_end < expanded_end
 918            ) and self.is_incremental:
 919                from sqlmesh.core.console import get_console
 920
 921                get_console().log_warning(
 922                    f"Model '{self.model.name}' is '{self.model_kind_name}' which does not support partial restatement.\n"
 923                    f"Expanding the requested restatement intervals from [{to_ts(requested_start)} - {to_ts(requested_end)}] "
 924                    f"to [{to_ts(expanded_start)} - {to_ts(expanded_end)}] in order to fully restate the model."
 925                )
 926
 927            removal_interval = expanded_removal_interval
 928
 929        return removal_interval
 930
 931    @property
 932    def allow_partials(self) -> bool:
 933        return self.is_model and self.model.allow_partials
 934
 935    def inclusive_exclusive(
 936        self,
 937        start: TimeLike,
 938        end: TimeLike,
 939        strict: bool = True,
 940        allow_partial: t.Optional[bool] = None,
 941        expand: bool = True,
 942    ) -> Interval:
 943        """Transform the inclusive start and end into a [start, end) pair.
 944
 945        Args:
 946            start: The start date/time of the interval (inclusive)
 947            end: The end date/time of the interval (inclusive)
 948            strict: Whether to fail when the inclusive start is the same as the exclusive end.
 949            allow_partial: Whether the interval can be partial or not.
 950            expand: Whether or not partial intervals are expanded outwards.
 951
 952        Returns:
 953            A [start, end) pair.
 954        """
 955        return inclusive_exclusive(
 956            start,
 957            end,
 958            self.node.interval_unit,
 959            strict=strict,
 960            allow_partial=self.allow_partials if allow_partial is None else allow_partial,
 961            expand=expand,
 962        )
 963
 964    def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
 965        """Inherits intervals from the target snapshot.
 966
 967        Args:
 968            other: The target snapshot to inherit intervals from.
 969        """
 970        effective_from_ts = self.normalized_effective_from_ts or 0
 971        apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
 972        for start, end in other.intervals:
 973            # If the effective_from is set, then intervals that come after it must come from
 974            # the current snapshots.
 975            if apply_effective_from and start < effective_from_ts:
 976                end = min(end, effective_from_ts)
 977            if not apply_effective_from or end <= effective_from_ts:
 978                self.add_interval(start, end)
 979
 980        if other.last_altered_ts:
 981            self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts)
 982
 983        if self.dev_version == other.dev_version:
 984            # Merge dev intervals if the dev versions match which would mean
 985            # that this and the other snapshot are pointing to the same dev table.
 986            for start, end in other.dev_intervals:
 987                self.add_interval(start, end, is_dev=True)
 988
 989            if other.dev_last_altered_ts:
 990                self.dev_last_altered_ts = max(
 991                    self.dev_last_altered_ts or 0, other.dev_last_altered_ts
 992                )
 993
 994        self.pending_restatement_intervals = merge_intervals(
 995            [*self.pending_restatement_intervals, *other.pending_restatement_intervals]
 996        )
 997
 998    @property
 999    def evaluatable(self) -> bool:
1000        """Whether or not a snapshot should be evaluated and have intervals."""
1001        return bool(not self.is_symbolic or self.model.audits)
1002
1003    def missing_intervals(
1004        self,
1005        start: TimeLike,
1006        end: TimeLike,
1007        execution_time: t.Optional[TimeLike] = None,
1008        deployability_index: t.Optional[DeployabilityIndex] = None,
1009        ignore_cron: bool = False,
1010        end_bounded: bool = False,
1011    ) -> Intervals:
1012        """Find all missing intervals between [start, end].
1013
1014        Although the inputs are inclusive, the returned stored intervals are
1015        [start_ts, end_ts) or start epoch timestamp inclusive and end epoch
1016        timestamp exclusive.
1017
1018        Args:
1019            start: The start date/time of the interval (inclusive)
1020            end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise)
1021            execution_time: The date/time time reference to use for execution time. Defaults to now.
1022            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1023            ignore_cron: Whether to ignore the node's cron schedule.
1024            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1025                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1026
1027        Returns:
1028            A list of all the missing intervals as epoch timestamps.
1029        """
1030        # If the node says that it has an end, and we are wanting to load past it, then we can return no empty intervals
1031        # Also if a node's start is after the end of the range we are checking then we can return no empty intervals
1032        if (self.node.end and to_datetime(start) > to_datetime(self.node.end)) or (
1033            self.node.start and to_datetime(self.node.start) > to_datetime(end)
1034        ):
1035            return []
1036        if self.node.start and to_datetime(start) < to_datetime(self.node.start):
1037            start = self.node.start
1038        # If the amount of time being checked is less than the size of a single interval then we
1039        # know that there can't being missing intervals within that range and return
1040        validate_date_range(start, end)
1041
1042        if (
1043            not is_date(end)
1044            and not self.allow_partials
1045            and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds
1046        ):
1047            return []
1048
1049        deployability_index = deployability_index or DeployabilityIndex.all_deployable()
1050        intervals = (
1051            self.intervals if deployability_index.is_representative(self) else self.dev_intervals
1052        )
1053
1054        if not self.evaluatable or (self.is_seed and intervals):
1055            return []
1056
1057        start_ts, end_ts = (to_timestamp(ts) for ts in self.inclusive_exclusive(start, end))
1058
1059        interval_unit = self.node.interval_unit
1060        execution_time_ts = to_timestamp(execution_time) if execution_time else now_timestamp()
1061        upper_bound_ts = (
1062            execution_time_ts
1063            if ignore_cron
1064            else to_timestamp(self.node.cron_floor(execution_time_ts))
1065        )
1066        if end_bounded:
1067            upper_bound_ts = min(upper_bound_ts, end_ts)
1068        if not self.allow_partials:
1069            upper_bound_ts = to_timestamp(interval_unit.cron_floor(upper_bound_ts))
1070
1071        end_ts = min(end_ts, upper_bound_ts)
1072
1073        lookback = 0
1074        model_end_ts: t.Optional[int] = None
1075
1076        if self.is_model:
1077            lookback = self.model.lookback
1078            model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None
1079
1080        return compute_missing_intervals(
1081            interval_unit,
1082            tuple(intervals),
1083            start_ts,
1084            end_ts,
1085            lookback,
1086            model_end_ts,
1087        )
1088
1089    def check_ready_intervals(
1090        self,
1091        intervals: Intervals,
1092        context: ExecutionContext,
1093    ) -> Intervals:
1094        """Returns a list of intervals that are considered ready by the provided signal.
1095
1096        Note that this will handle gaps in the provided intervals. The returned intervals
1097        may introduce new gaps.
1098        """
1099        signals = self.is_model and self.model.render_signal_calls()
1100        if not signals:
1101            return intervals
1102
1103        for signal_name, kwargs in signals.signals_to_kwargs.items():
1104            try:
1105                intervals = check_ready_intervals(
1106                    signals.prepared_python_env[signal_name],
1107                    intervals,
1108                    context,
1109                    python_env=signals.python_env,
1110                    dialect=self.model.dialect,
1111                    path=self.model._path,
1112                    snapshot=self,
1113                    kwargs=kwargs,
1114                )
1115            except SQLMeshError as e:
1116                raise SignalEvalError(
1117                    f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}"
1118                )
1119        return intervals
1120
1121    def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
1122        """Assigns the given category to this snapshot.
1123
1124        Args:
1125            category: The change category to assign to this snapshot.
1126            forward_only: Whether or not this snapshot is applied going forward in production.
1127        """
1128        assert category != SnapshotChangeCategory.FORWARD_ONLY, (
1129            "FORWARD_ONLY change category is deprecated"
1130        )
1131
1132        self.dev_version_ = self.fingerprint.to_version()
1133        is_no_rebuild = forward_only or category in (
1134            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
1135            SnapshotChangeCategory.METADATA,
1136        )
1137        if self.is_model and not self.virtual_environment_mode.is_full:
1138            # Hardcode the version if the virtual environment is not fully enabled.
1139            self.version = "novde"
1140        elif self.is_model and self.model.physical_version:
1141            # If the model has a pinned version then use that.
1142            self.version = self.model.physical_version
1143        elif is_no_rebuild and self.previous_version:
1144            self.version = self.previous_version.data_version.version
1145        elif self.is_model and self.model.forward_only and not self.previous_version:
1146            # If this is a new model then use a deterministic version, independent of the fingerprint.
1147            self.version = hash_data([self.name, *self.model.kind.data_hash_values])
1148        else:
1149            self.version = self.fingerprint.to_version()
1150
1151        if is_no_rebuild and self.previous_version:
1152            previous_version = self.previous_version
1153            self.physical_schema_ = previous_version.physical_schema
1154            self.table_naming_convention = previous_version.table_naming_convention
1155            if self.is_materialized and (category.is_indirect_non_breaking or category.is_metadata):
1156                # Reuse the dev table for indirect non-breaking changes.
1157                self.dev_version_ = (
1158                    previous_version.data_version.dev_version
1159                    or previous_version.fingerprint.to_version()
1160                )
1161                self.dev_table_suffix = previous_version.data_version.dev_table_suffix
1162
1163        self.change_category = category
1164        self.forward_only = forward_only
1165
1166    @property
1167    def categorized(self) -> bool:
1168        """Whether the snapshot has been categorized."""
1169        return self.change_category is not None and self.version is not None
1170
1171    def table_name(self, is_deployable: bool = True) -> str:
1172        """Full table name pointing to the materialized location of the snapshot.
1173
1174        Args:
1175            is_deployable: Indicates whether to return the table name for deployment to production.
1176        """
1177        self._ensure_categorized()
1178        assert self.version
1179        return self._table_name(self.version, is_deployable)
1180
1181    def version_get_or_generate(self) -> str:
1182        """Helper method to get the version or generate it from the fingerprint."""
1183        return self.version or self.fingerprint.to_version()
1184
1185    def is_valid_start(
1186        self,
1187        start: t.Optional[TimeLike],
1188        snapshot_start: TimeLike,
1189        execution_time: t.Optional[TimeLike] = None,
1190    ) -> bool:
1191        """Checks if the given start and end are valid for this snapshot.
1192        Args:
1193            start: The start date/time of the interval (inclusive)
1194            snapshot_start: The start date/time of the snapshot (inclusive)
1195        """
1196        # The snapshot may not have a start defined. If so we use the provided snapshot start.
1197        if self.depends_on_past and start:
1198            interval_unit = self.node.interval_unit
1199            start_ts = to_timestamp(interval_unit.cron_floor(start))
1200
1201            if not self.intervals:
1202                # The start date must be aligned by the interval unit.
1203                snapshot_start_ts = to_timestamp(interval_unit.cron_floor(snapshot_start))
1204                if snapshot_start_ts < to_timestamp(snapshot_start):
1205                    snapshot_start_ts = to_timestamp(interval_unit.cron_next(snapshot_start_ts))
1206                return snapshot_start_ts >= start_ts
1207            # Make sure that if there are missing intervals for this snapshot that they all occur at or after the
1208            # provided start_ts. Otherwise we know that we are doing a non-contiguous load and therefore this is not
1209            # a valid start.
1210            missing_intervals = self.missing_intervals(
1211                snapshot_start, now(), execution_time=execution_time
1212            )
1213            earliest_interval = missing_intervals[0][0] if missing_intervals else None
1214            if earliest_interval:
1215                return earliest_interval >= start_ts
1216        return True
1217
1218    def get_latest(self, default: t.Optional[TimeLike] = None) -> t.Optional[TimeLike]:
1219        """The latest interval loaded for the snapshot. Default is used if intervals are not defined"""
1220
1221        def to_end_date(end: int, unit: IntervalUnit) -> TimeLike:
1222            if unit.is_day:
1223                return to_date(make_inclusive_end(end))
1224            return end
1225
1226        return (
1227            to_end_date(to_timestamp(self.intervals[-1][1]), self.node.interval_unit)
1228            if self.intervals
1229            else default
1230        )
1231
1232    def needs_destructive_check(
1233        self,
1234        allow_destructive_snapshots: t.Set[str],
1235    ) -> bool:
1236        return (
1237            self.is_model
1238            and not self.model.on_destructive_change.is_allow
1239            and self.name not in allow_destructive_snapshots
1240        )
1241
1242    def needs_additive_check(
1243        self,
1244        allow_additive_snapshots: t.Set[str],
1245    ) -> bool:
1246        return (
1247            self.is_model
1248            and not self.model.on_additive_change.is_allow
1249            and self.name not in allow_additive_snapshots
1250        )
1251
1252    def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]:
1253        """Returns the next auto restatement interval for the snapshot.
1254
1255        Args:
1256            execution_time: The execution time to use for the restatement.
1257
1258        Returns:
1259            The interval that needs to be restated or None if no restatement is needed.
1260        """
1261        if (
1262            not self.is_model
1263            or not self.intervals
1264            or not self.model.auto_restatement_cron
1265            or self.model.disable_restatement
1266        ):
1267            return None
1268
1269        execution_time_ts = to_timestamp(execution_time)
1270        next_auto_restatement_ts = self.next_auto_restatement_ts or to_timestamp(
1271            self.model.auto_restatement_croniter(self.created_ts).get_next(estimate=False)
1272        )
1273        if execution_time_ts < next_auto_restatement_ts:
1274            return None
1275
1276        num_intervals_to_restate = self.model.auto_restatement_intervals
1277        if num_intervals_to_restate is None:
1278            return (self.intervals[0][0], self.intervals[-1][1])
1279
1280        auto_restatement_end_ts = to_timestamp(
1281            self.node.interval_unit.cron_floor(execution_time_ts)
1282        )
1283        auto_restatement_start_ts = (
1284            auto_restatement_end_ts
1285            - num_intervals_to_restate * self.node.interval_unit.milliseconds
1286        )
1287        return (auto_restatement_start_ts, auto_restatement_end_ts)
1288
1289    def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optional[int]:
1290        """Updates the next auto restatement timestamp.
1291
1292        Args:
1293            execution_time: The execution time to use for the restatement.
1294
1295        Returns:
1296            The next auto restatement timestamp or None if not applicable.
1297        """
1298        if (
1299            not self.is_model
1300            or not self.model.auto_restatement_cron
1301            or self.model.disable_restatement
1302        ):
1303            self.next_auto_restatement_ts = None
1304        else:
1305            self.next_auto_restatement_ts = to_timestamp(
1306                self.model.auto_restatement_croniter(execution_time).get_next(estimate=False)
1307            )
1308        return self.next_auto_restatement_ts
1309
1310    def apply_pending_restatement_intervals(self) -> None:
1311        """Applies the pending restatement intervals to the snapshot's intervals."""
1312        if not self.is_model or self.model.disable_restatement:
1313            return
1314        for pending_restatement_interval in self.pending_restatement_intervals:
1315            logger.info(
1316                "Applying the auto restated interval (%s, %s) to snapshot %s",
1317                time_like_to_str(pending_restatement_interval[0]),
1318                time_like_to_str(pending_restatement_interval[1]),
1319                self.snapshot_id,
1320            )
1321            self.intervals = remove_interval(self.intervals, *pending_restatement_interval)
1322
1323    def is_directly_modified(self, other: Snapshot) -> bool:
1324        """Returns whether or not this snapshot is directly modified in relation to the other snapshot."""
1325        return self.node.is_data_change(other.node)
1326
1327    def is_indirectly_modified(self, other: Snapshot) -> bool:
1328        """Returns whether or not this snapshot is indirectly modified in relation to the other snapshot."""
1329        return (
1330            self.fingerprint.parent_data_hash != other.fingerprint.parent_data_hash
1331            and not self.node.is_data_change(other.node)
1332        )
1333
1334    def is_metadata_updated(self, other: Snapshot) -> bool:
1335        """Returns whether or not this snapshot contains metadata changes in relation to the other snapshot."""
1336        return self.fingerprint.metadata_hash != other.fingerprint.metadata_hash
1337
1338    @property
1339    def physical_schema(self) -> str:
1340        if self.physical_schema_ is not None:
1341            return self.physical_schema_
1342        return self.model.physical_schema if self.is_model else ""
1343
1344    @property
1345    def table_info(self) -> SnapshotTableInfo:
1346        """Helper method to get the SnapshotTableInfo from the Snapshot."""
1347        self._ensure_categorized()
1348
1349        custom_materialization = (
1350            self.model.kind.materialization
1351            if self.is_model and isinstance(self.model.kind, CustomKind)
1352            else None
1353        )
1354
1355        return SnapshotTableInfo(
1356            physical_schema=self.physical_schema,
1357            name=self.name,
1358            fingerprint=self.fingerprint,
1359            version=self.version,
1360            dev_version=self.dev_version,
1361            parents=self.parents,
1362            previous_versions=self.previous_versions,
1363            change_category=self.change_category,
1364            kind_name=self.model_kind_name,
1365            node_type=self.node_type,
1366            custom_materialization=custom_materialization,
1367            dev_table_suffix=self.dev_table_suffix,
1368            model_gateway=self.model_gateway,
1369            table_naming_convention=self.table_naming_convention,  # type: ignore
1370            forward_only=self.forward_only,
1371            virtual_environment_mode=self.virtual_environment_mode,
1372        )
1373
1374    @property
1375    def data_version(self) -> SnapshotDataVersion:
1376        self._ensure_categorized()
1377        return SnapshotDataVersion(
1378            fingerprint=self.fingerprint,
1379            version=self.version,
1380            dev_version=self.dev_version,
1381            change_category=self.change_category,
1382            physical_schema=self.physical_schema,
1383            dev_table_suffix=self.dev_table_suffix,
1384            table_naming_convention=self.table_naming_convention,
1385            virtual_environment_mode=self.virtual_environment_mode,
1386        )
1387
1388    @property
1389    def snapshot_intervals(self) -> SnapshotIntervals:
1390        self._ensure_categorized()
1391        return SnapshotIntervals(
1392            name=self.name,
1393            identifier=self.identifier,
1394            version=self.version,
1395            dev_version=self.dev_version,
1396            intervals=self.intervals.copy(),
1397            dev_intervals=self.dev_intervals.copy(),
1398            pending_restatement_intervals=self.pending_restatement_intervals.copy(),
1399        )
1400
1401    @property
1402    def is_materialized_view(self) -> bool:
1403        """Returns whether or not this snapshot's model represents a materialized view."""
1404        return (
1405            self.is_model and isinstance(self.model.kind, ViewKind) and self.model.kind.materialized
1406        )
1407
1408    @property
1409    def is_new_version(self) -> bool:
1410        """Returns whether or not this version is new and requires a backfill."""
1411        self._ensure_categorized()
1412        return self.fingerprint.to_version() == self.version
1413
1414    @property
1415    def is_paused(self) -> bool:
1416        return self.unpaused_ts is None
1417
1418    @property
1419    def is_paused_forward_only(self) -> bool:
1420        return self.is_paused and self.is_forward_only
1421
1422    @property
1423    def normalized_effective_from_ts(self) -> t.Optional[int]:
1424        return (
1425            to_timestamp(self.node.interval_unit.cron_floor(self.effective_from))
1426            if self.effective_from
1427            else None
1428        )
1429
1430    @property
1431    def model_kind_name(self) -> t.Optional[ModelKindName]:
1432        return self.model.kind.name if self.is_model else None
1433
1434    @property
1435    def node_type(self) -> NodeType:
1436        if self.node.is_model:
1437            return NodeType.MODEL
1438        if self.node.is_audit:
1439            return NodeType.AUDIT
1440        raise SQLMeshError(f"Snapshot {self.snapshot_id} has an unknown node type.")
1441
1442    @property
1443    def model(self) -> Model:
1444        model = self.model_or_none
1445        if model:
1446            return model
1447        raise SQLMeshError(f"Snapshot {self.snapshot_id} is not a model snapshot.")
1448
1449    @property
1450    def model_or_none(self) -> t.Optional[Model]:
1451        if self.is_model:
1452            return t.cast(Model, self.node)
1453        return None
1454
1455    @property
1456    def model_gateway(self) -> t.Optional[str]:
1457        return self.model.gateway if self.is_model else None
1458
1459    @property
1460    def audit(self) -> StandaloneAudit:
1461        if self.is_audit:
1462            return t.cast(StandaloneAudit, self.node)
1463        raise SQLMeshError(f"Snapshot {self.snapshot_id} is not an audit snapshot.")
1464
1465    @property
1466    def depends_on_past(self) -> bool:
1467        """Whether or not this models depends on past intervals to be populated before loading following intervals.
1468
1469        This represents a superset of the following types of models:
1470        1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially.
1471           An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself.
1472        2. Models that can only be restated from the beginning of history *and* their interval batches must be processed sequentially.
1473        """
1474        return self.depends_on_self or self.full_history_restatement_only
1475
1476    @property
1477    def depends_on_self(self) -> bool:
1478        """Whether or not this models depends on self."""
1479        return self.is_model and self.model.depends_on_self
1480
1481    @property
1482    def name_version(self) -> SnapshotNameVersion:
1483        """Returns the name and version of the snapshot."""
1484        return SnapshotNameVersion(name=self.name, version=self.version)
1485
1486    @property
1487    def id_and_version(self) -> SnapshotIdAndVersion:
1488        return self.table_info.id_and_version
1489
1490    @property
1491    def disable_restatement(self) -> bool:
1492        """Is restatement disabled for the node"""
1493        return self.is_model and self.model.disable_restatement
1494
1495    @cached_property
1496    def fully_qualified_table(self) -> t.Optional[exp.Table]:
1497        if not self.is_model:
1498            return None
1499        return t.cast(Model, self.node).fully_qualified_table
1500
1501    @property
1502    def expiration_ts(self) -> int:
1503        return to_timestamp(
1504            self.ttl,
1505            relative_base=to_datetime(self.updated_ts),
1506            check_categorical_relative_expression=False,
1507        )
1508
1509    @property
1510    def supports_schema_migration_in_prod(self) -> bool:
1511        """Returns whether or not this snapshot supports schema migration when deployed to production."""
1512        return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed
1513
1514    @property
1515    def requires_schema_migration_in_prod(self) -> bool:
1516        """Returns whether or not this snapshot requires a schema migration when deployed to production."""
1517        return self.supports_schema_migration_in_prod and (
1518            (self.previous_version and self.previous_version.version == self.version)
1519            or self.model.forward_only
1520            or bool(self.model.physical_version)
1521            or not self.virtual_environment_mode.is_full
1522        )
1523
1524    @property
1525    def ttl_ms(self) -> int:
1526        return self.expiration_ts - self.updated_ts
1527
1528    @property
1529    def custom_materialization(self) -> t.Optional[str]:
1530        if self.is_custom:
1531            return t.cast(CustomKind, self.model.kind).materialization
1532        return None
1533
1534    @property
1535    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
1536        return (
1537            self.model.virtual_environment_mode if self.is_model else VirtualEnvironmentMode.default
1538        )
1539
1540    def _ensure_categorized(self) -> None:
1541        if not self.change_category:
1542            raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been categorized yet.")
1543        if not self.version:
1544            raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been versioned yet.")
1545
1546    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
1547        state = super().__getstate__()
1548        state["__dict__"] = state["__dict__"].copy()
1549        # Don't store intervals.
1550        state["__dict__"]["intervals"] = []
1551        state["__dict__"]["dev_intervals"] = []
1552        return state
1553
1554
1555class SnapshotTableCleanupTask(PydanticModel):
1556    snapshot: SnapshotTableInfo
1557    dev_table_only: bool
1558
1559
1560SnapshotIdLike = t.Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]
1561SnapshotIdAndVersionLike = t.Union[SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]
1562SnapshotInfoLike = t.Union[SnapshotTableInfo, Snapshot]
1563SnapshotNameVersionLike = t.Union[
1564    SnapshotNameVersion, SnapshotTableInfo, SnapshotIdAndVersion, Snapshot
1565]
1566
1567
1568class DeployabilityIndex(PydanticModel, frozen=True):
1569    """Contains information about deployability of every snapshot.
1570
1571    Deployability is defined as whether or not the output that a snapshot produces during the
1572    current evaluation can be reused in (deployed to) the production environment.
1573    """
1574
1575    indexed_ids: t.FrozenSet[str]
1576    is_opposite_index: bool = False
1577    representative_shared_version_ids: t.FrozenSet[str] = frozenset()
1578
1579    @field_validator("indexed_ids", "representative_shared_version_ids", mode="before")
1580    @classmethod
1581    def _snapshot_ids_set_validator(cls, v: t.Any) -> t.Optional[t.FrozenSet[t.Tuple[str, str]]]:
1582        if v is None:
1583            return v
1584        # Transforming into strings because the serialization of sets of objects / lists is broken in Pydantic.
1585        return frozenset(
1586            {
1587                (
1588                    cls._snapshot_id_key(snapshot_id)  # type: ignore
1589                    if isinstance(snapshot_id, SnapshotId)
1590                    else snapshot_id
1591                )
1592                for snapshot_id in v
1593            }
1594        )
1595
1596    def is_deployable(self, snapshot: SnapshotIdLike) -> bool:
1597        """Returns true if the output produced by the given snapshot in a development environment can be reused
1598        in (deployed to) production
1599
1600        Args:
1601            snapshot: The snapshot to check.
1602
1603        Returns:
1604            True if the snapshot is deployable, False otherwise.
1605        """
1606        snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1607        return (self.is_opposite_index and snapshot_id not in self.indexed_ids) or (
1608            not self.is_opposite_index and snapshot_id in self.indexed_ids
1609        )
1610
1611    def is_representative(self, snapshot: SnapshotIdLike) -> bool:
1612        """Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and
1613        computing missing intervals.
1614
1615        Note, that deployable snapshots are also representative, but the reverse is not always true.
1616
1617        Unlike `is_deployable`, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that
1618        are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider
1619        them as such when constructing a plan, building a physical table mapping or computing missing intervals.
1620
1621        Args:
1622            snapshot: The snapshot to check.
1623
1624        Returns:
1625            True if the snapshot is representative, False otherwise.
1626        """
1627        snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1628        representative = snapshot_id in self.representative_shared_version_ids
1629        return representative or self.is_deployable(snapshot)
1630
1631    def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:
1632        """Creates a new index with the given snapshot marked as non-deployable."""
1633        return self._add_snapshot(snapshot, False)
1634
1635    def with_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:
1636        """Creates a new index with the given snapshot marked as deployable."""
1637        return self._add_snapshot(snapshot, True)
1638
1639    def _add_snapshot(self, snapshot: SnapshotIdLike, deployable: bool) -> DeployabilityIndex:
1640        snapshot_id = {self._snapshot_id_key(snapshot.snapshot_id)}
1641        indexed_ids = self.indexed_ids
1642        if self.is_opposite_index:
1643            indexed_ids = indexed_ids - snapshot_id if deployable else indexed_ids | snapshot_id
1644        else:
1645            indexed_ids = indexed_ids | snapshot_id if deployable else indexed_ids - snapshot_id
1646
1647        return DeployabilityIndex(
1648            indexed_ids=indexed_ids,
1649            is_opposite_index=self.is_opposite_index,
1650            representative_shared_version_ids=self.representative_shared_version_ids,
1651        )
1652
1653    @classmethod
1654    def all_deployable(cls) -> DeployabilityIndex:
1655        return cls(indexed_ids=frozenset(), is_opposite_index=True)
1656
1657    @classmethod
1658    def none_deployable(cls) -> DeployabilityIndex:
1659        return cls(indexed_ids=frozenset())
1660
1661    @classmethod
1662    def create(
1663        cls,
1664        snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot],
1665        start: t.Optional[TimeLike] = None,  # plan start
1666        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1667    ) -> DeployabilityIndex:
1668        if not isinstance(snapshots, dict):
1669            snapshots = {s.snapshot_id: s for s in snapshots}
1670
1671        deployability_mapping: t.Dict[SnapshotId, bool] = {}
1672        children_deployability_mapping: t.Dict[SnapshotId, bool] = {}
1673        representative_shared_version_ids: t.Set[SnapshotId] = set()
1674        start_override_per_model = start_override_per_model or {}
1675
1676        start_date_cache: t.Optional[t.Dict[str, datetime]] = {}
1677
1678        dag = snapshots_to_dag(snapshots.values())
1679        for node in dag:
1680            if node not in snapshots:
1681                continue
1682            snapshot = snapshots[node]
1683
1684            if not snapshot.virtual_environment_mode.is_full:
1685                # If the virtual environment is not fully enabled, then the snapshot can never be deployable
1686                this_deployable = False
1687            else:
1688                # Make sure that the node is deployable according to all its parents
1689                this_deployable = all(
1690                    children_deployability_mapping[p_id]
1691                    for p_id in snapshots[node].parents
1692                    if p_id in children_deployability_mapping
1693                )
1694
1695            if this_deployable:
1696                is_forward_only_model = (
1697                    snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata
1698                )
1699                has_auto_restatement = (
1700                    snapshot.is_model and snapshot.model.auto_restatement_cron is not None
1701                )
1702
1703                snapshot_start = start_override_per_model.get(
1704                    node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache)
1705                )
1706
1707                is_valid_start = (
1708                    snapshot.is_valid_start(start, snapshot_start) if start is not None else True
1709                )
1710
1711                children_deployable = is_valid_start and not has_auto_restatement
1712                if (
1713                    snapshot.is_forward_only
1714                    or snapshot.is_indirect_non_breaking
1715                    or is_forward_only_model
1716                    or has_auto_restatement
1717                    or not is_valid_start
1718                ):
1719                    # FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature.
1720                    # Similarly, if the model depends on past and the start date is not aligned with the
1721                    # model's start, we should consider this snapshot non-deployable.
1722                    this_deployable = False
1723                    if not snapshot.is_paused or (
1724                        snapshot.is_indirect_non_breaking and snapshot.intervals
1725                    ):
1726                        # This snapshot represents what's currently deployed in prod.
1727                        representative_shared_version_ids.add(node)
1728                    else:
1729                        # If the parent is not representative then its children can't be deployable.
1730                        children_deployable = False
1731            else:
1732                children_deployable = False
1733                if not snapshots[node].is_paused:
1734                    representative_shared_version_ids.add(node)
1735
1736            deployability_mapping[node] = this_deployable
1737            children_deployability_mapping[node] = children_deployable
1738
1739        deployable_ids = {
1740            snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable
1741        }
1742        non_deployable_ids = set(snapshots) - deployable_ids
1743
1744        # Pick the smaller set to reduce the size of the serialized object.
1745        if len(deployable_ids) <= len(non_deployable_ids):
1746            return cls(
1747                indexed_ids=deployable_ids,
1748                representative_shared_version_ids=representative_shared_version_ids,
1749            )
1750        return cls(
1751            indexed_ids=non_deployable_ids,
1752            is_opposite_index=True,
1753            representative_shared_version_ids=representative_shared_version_ids,
1754        )
1755
1756    @staticmethod
1757    def _snapshot_id_key(snapshot_id: SnapshotId) -> str:
1758        return f"{snapshot_id.name}__{snapshot_id.identifier}"
1759
1760
1761def table_name(
1762    physical_schema: str,
1763    name: str,
1764    version: str,
1765    catalog: t.Optional[str] = None,
1766    suffix: t.Optional[str] = None,
1767    naming_convention: t.Optional[TableNamingConvention] = None,
1768) -> str:
1769    table = exp.to_table(name)
1770
1771    naming_convention = naming_convention or TableNamingConvention.default
1772
1773    if naming_convention == TableNamingConvention.HASH_MD5:
1774        # just take a MD5 hash of what we would have generated anyway using SCHEMA_AND_TABLE
1775        value_to_hash = table_name(
1776            physical_schema=physical_schema,
1777            name=name,
1778            version=version,
1779            catalog=catalog,
1780            suffix=suffix,
1781            naming_convention=TableNamingConvention.SCHEMA_AND_TABLE,
1782        )
1783        full_name = f"{c.SQLMESH}_md5__{md5(value_to_hash)}"
1784    else:
1785        # note: Snapshot._table_name() already strips the catalog from the model name before calling this function
1786        # Therefore, a model with 3-part naming like "foo.bar.baz" gets passed as (name="bar.baz", catalog="foo") to this function
1787        # This is why there is no TableNamingConvention.CATALOG_AND_SCHEMA_AND_TABLE
1788        table_parts = table.parts
1789        parts_to_consider = 2 if naming_convention == TableNamingConvention.SCHEMA_AND_TABLE else 1
1790
1791        # in case the parsed table name has less parts than what the naming convention says we should be considering
1792        parts_to_consider = min(len(table_parts), parts_to_consider)
1793
1794        # bigquery projects usually have "-" in them which is illegal in the table name, so we aggressively prune
1795        name = "__".join(sanitize_name(part.name) for part in table_parts[-parts_to_consider:])
1796
1797        full_name = f"{name}__{version}"
1798
1799    suffix = f"__{suffix}" if suffix else ""
1800
1801    table.set("this", exp.to_identifier(f"{full_name}{suffix}"))
1802    table.set("db", exp.to_identifier(physical_schema))
1803    if not table.catalog and catalog:
1804        table.set("catalog", exp.to_identifier(catalog))
1805    return exp.table_name(table)
1806
1807
1808def display_name(
1809    snapshot_info_like: t.Union[SnapshotInfoLike, SnapshotInfoMixin],
1810    environment_naming_info: EnvironmentNamingInfo,
1811    default_catalog: t.Optional[str],
1812    dialect: DialectType = None,
1813) -> str:
1814    """
1815    Returns the model name as a qualified view name.
1816    This is just used for presenting information back to the user and `qualified_view_name` should be used
1817    when wanting a view name in all other cases.
1818
1819    Args:
1820        snapshot_info_like: The snapshot info object to get the display name for
1821        environment_naming_info: Environment naming info to use for display name formatting
1822        default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name.
1823        dialect: Optional dialect type to use for name formatting
1824
1825    Returns:
1826        The formatted display name as a string
1827    """
1828    if snapshot_info_like.is_audit:
1829        return snapshot_info_like.name
1830
1831    return model_display_name(
1832        snapshot_info_like.name, environment_naming_info, default_catalog, dialect
1833    )
1834
1835
1836def model_display_name(
1837    node_name: str,
1838    environment_naming_info: EnvironmentNamingInfo,
1839    default_catalog: t.Optional[str],
1840    dialect: DialectType = None,
1841) -> str:
1842    view_name = exp.to_table(node_name)
1843
1844    catalog = (
1845        None
1846        if (
1847            environment_naming_info.suffix_target != EnvironmentSuffixTarget.CATALOG
1848            and view_name.catalog == default_catalog
1849        )
1850        else view_name.catalog
1851    )
1852
1853    qvn = QualifiedViewName(
1854        catalog=catalog,
1855        schema_name=view_name.db or None,
1856        table=view_name.name,
1857    )
1858    return qvn.for_environment(environment_naming_info, dialect=dialect)
1859
1860
1861def fingerprint_from_node(
1862    node: Node,
1863    *,
1864    nodes: t.Dict[str, Node],
1865    cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
1866) -> SnapshotFingerprint:
1867    """Helper function to generate a fingerprint based on the data and metadata of the node and its parents.
1868
1869    This method tries to remove non-meaningful differences to avoid ever-changing fingerprints.
1870    The fingerprint is made up of two parts split by an underscore -- query_metadata. The query hash is
1871    determined purely by the rendered query and the metadata by everything else.
1872
1873    Args:
1874        node: Node to fingerprint.
1875        nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes.
1876            If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
1877        cache: Cache of node name to fingerprints.
1878
1879    Returns:
1880        The fingerprint.
1881    """
1882    cache = {} if cache is None else cache
1883
1884    if node.fqn not in cache:
1885        parents = [
1886            fingerprint_from_node(nodes[table], nodes=nodes, cache=cache)
1887            for table in node.depends_on
1888            if table in nodes
1889        ]
1890
1891        parent_data_hash = hash_data(sorted(p.to_version() for p in parents))
1892
1893        parent_metadata_hash = hash_data(
1894            sorted(h for p in parents for h in (p.metadata_hash, p.parent_metadata_hash))
1895        )
1896
1897        cache[node.fqn] = SnapshotFingerprint(
1898            data_hash=node.data_hash,
1899            metadata_hash=node.metadata_hash,
1900            parent_data_hash=parent_data_hash,
1901            parent_metadata_hash=parent_metadata_hash,
1902        )
1903
1904    return cache[node.fqn]
1905
1906
1907def _parents_from_node(
1908    node: Node,
1909    nodes: t.Dict[str, Node],
1910) -> t.Dict[str, Node]:
1911    parent_nodes = {}
1912    for parent_fqn in node.depends_on:
1913        parent = nodes.get(parent_fqn)
1914        if parent:
1915            parent_nodes[parent.fqn] = parent
1916            if parent.is_model and t.cast(_Model, parent).kind.is_embedded:
1917                parent_nodes.update(_parents_from_node(parent, nodes))
1918
1919    return parent_nodes
1920
1921
1922def merge_intervals(intervals: t.Collection[Interval]) -> Intervals:
1923    """Merge a list of intervals.
1924
1925    Args:
1926        intervals: A list of intervals to merge together.
1927
1928    Returns:
1929        A new list of sorted and merged intervals.
1930    """
1931    if not intervals:
1932        return []
1933    intervals = sorted(intervals)
1934
1935    merged = [intervals[0]]
1936
1937    for interval in intervals[1:]:
1938        current = merged[-1]
1939
1940        if interval[0] <= current[1]:
1941            merged[-1] = (current[0], max(current[1], interval[1]))
1942        else:
1943            merged.append(interval)
1944
1945    return merged
1946
1947
1948def _format_date_time(time_like: TimeLike, unit: t.Optional[IntervalUnit]) -> str:
1949    if unit is None or unit.is_date_granularity:
1950        return to_ds(time_like)
1951    # TODO: Remove `[0:19]` once `to_ts` always returns a timestamp without timezone
1952    return to_ts(time_like)[0:19]
1953
1954
1955def format_intervals(intervals: Intervals, unit: t.Optional[IntervalUnit]) -> str:
1956    inclusive_intervals = [make_inclusive(start, end) for start, end in intervals]
1957    return ", ".join(
1958        " - ".join([_format_date_time(start, unit), _format_date_time(end, unit)])
1959        for start, end in inclusive_intervals
1960    )
1961
1962
1963def remove_interval(intervals: Intervals, remove_start: int, remove_end: int) -> Intervals:
1964    """Remove an interval from a list of intervals. Assumes that the correct start and end intervals have been
1965    passed in. Use `get_remove_interval` method of `Snapshot` to get the correct start/end given the snapshot's
1966    information.
1967
1968    Args:
1969        intervals: A list of exclusive intervals.
1970        remove_start: The inclusive start to remove.
1971        remove_end: The exclusive end to remove.
1972
1973    Returns:
1974        A new list of intervals.
1975    """
1976    modified: Intervals = []
1977
1978    for start, end in intervals:
1979        if remove_start > start and remove_end < end:
1980            modified.extend(
1981                (
1982                    (start, remove_start),
1983                    (remove_end, end),
1984                )
1985            )
1986        elif remove_start > start:
1987            modified.append((start, min(remove_start, end)))
1988        elif remove_end < end:
1989            modified.append((max(remove_end, start), end))
1990
1991    return modified
1992
1993
1994def to_table_mapping(
1995    snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex]
1996) -> t.Dict[str, str]:
1997    deployability_index = deployability_index or DeployabilityIndex.all_deployable()
1998    return {
1999        snapshot.name: snapshot.table_name(deployability_index.is_representative(snapshot))
2000        for snapshot in snapshots
2001        if snapshot.version and not snapshot.is_embedded and snapshot.is_model
2002    }
2003
2004
2005def to_view_mapping(
2006    snapshots: t.Iterable[Snapshot],
2007    environment_naming_info: EnvironmentNamingInfo,
2008    default_catalog: t.Optional[str] = None,
2009    dialect: t.Optional[str] = None,
2010) -> t.Dict[str, str]:
2011    return {
2012        snapshot.name: snapshot.display_name(
2013            environment_naming_info, default_catalog=default_catalog, dialect=dialect
2014        )
2015        for snapshot in snapshots
2016        if snapshot.is_model
2017    }
2018
2019
2020def has_paused_forward_only(
2021    targets: t.Iterable[SnapshotIdLike],
2022    snapshots: t.Union[t.List[Snapshot], t.Dict[SnapshotId, Snapshot]],
2023) -> bool:
2024    if not isinstance(snapshots, dict):
2025        snapshots = {s.snapshot_id: s for s in snapshots}
2026    for target in targets:
2027        target_snapshot = snapshots[target.snapshot_id]
2028        if target_snapshot.is_paused_forward_only:
2029            return True
2030    return False
2031
2032
2033def missing_intervals(
2034    snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]],
2035    start: t.Optional[TimeLike] = None,
2036    end: t.Optional[TimeLike] = None,
2037    execution_time: t.Optional[TimeLike] = None,
2038    restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
2039    deployability_index: t.Optional[DeployabilityIndex] = None,
2040    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
2041    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
2042    ignore_cron: bool = False,
2043    end_bounded: bool = False,
2044) -> t.Dict[Snapshot, Intervals]:
2045    """Returns all missing intervals given a collection of snapshots."""
2046    if not isinstance(snapshots, dict):
2047        # Make sure that the mapping is only constructed once
2048        snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
2049    missing = {}
2050    cache: t.Dict[str, datetime] = {}
2051    end_date = end or now_timestamp()
2052    start_dt = (
2053        to_datetime(start)
2054        if start
2055        else earliest_start_date(snapshots, cache=cache, relative_to=end_date)
2056    )
2057    restatements = restatements or {}
2058    start_override_per_model = start_override_per_model or {}
2059    end_override_per_model = end_override_per_model or {}
2060    deployability_index = deployability_index or DeployabilityIndex.all_deployable()
2061
2062    for snapshot in snapshots.values():
2063        if not snapshot.evaluatable:
2064            continue
2065
2066        snapshot_start_date = start_override_per_model.get(snapshot.name, start_dt)
2067        snapshot_end_date: TimeLike = end_date
2068
2069        restated_interval = restatements.get(snapshot.snapshot_id)
2070        if restated_interval:
2071            snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in restated_interval)
2072            snapshot = snapshot.copy()
2073            snapshot.intervals = snapshot.intervals.copy()
2074            snapshot.remove_interval(restated_interval)
2075
2076        existing_interval_end = end_override_per_model.get(snapshot.name)
2077        if existing_interval_end:
2078            if snapshot_start_date >= existing_interval_end:
2079                # The start exceeds the provided interval end, so we can skip this snapshot
2080                # since it doesn't have missing intervals by definition
2081                continue
2082            snapshot_end_date = existing_interval_end
2083
2084        snapshot_start_date = max(
2085            to_datetime(snapshot_start_date),
2086            to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
2087        )
2088        if snapshot_start_date > to_datetime(snapshot_end_date):
2089            continue
2090
2091        missing_interval_end_date = snapshot_end_date
2092        node_end_date = snapshot.node.end
2093        if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
2094            missing_interval_end_date = node_end_date
2095
2096        intervals = snapshot.missing_intervals(
2097            snapshot_start_date,
2098            missing_interval_end_date,
2099            execution_time=execution_time,
2100            deployability_index=deployability_index,
2101            ignore_cron=ignore_cron,
2102            end_bounded=end_bounded,
2103        )
2104        if intervals:
2105            missing[snapshot] = intervals
2106
2107    return missing
2108
2109
2110@lru_cache(maxsize=16384)
2111def expand_range(start_ts: int, end_ts: int, interval_unit: IntervalUnit) -> t.List[int]:
2112    croniter = interval_unit.croniter(start_ts)
2113    timestamps = [start_ts]
2114
2115    while True:
2116        ts = to_timestamp(croniter.get_next(estimate=True))
2117
2118        if ts > end_ts:
2119            if timestamps and timestamps[-1] != end_ts:
2120                timestamps.append(end_ts)
2121            break
2122
2123        timestamps.append(ts)
2124    return timestamps
2125
2126
2127@lru_cache(maxsize=16384)
2128def compute_missing_intervals(
2129    interval_unit: IntervalUnit,
2130    intervals: t.Tuple[Interval, ...],
2131    start_ts: int,
2132    end_ts: int,
2133    lookback: int,
2134    model_end_ts: t.Optional[int],
2135) -> Intervals:
2136    """Computes all missing intervals between start and end given intervals.
2137
2138    Args:
2139        interval_unit: The interval unit.
2140        intervals: The intervals to check what's missing.
2141        start_ts: Inclusive timestamp start.
2142        end_ts: Exclusive timestamp end.
2143        lookback: A lookback window.
2144        model_end_ts: The exclusive end timestamp set on the model (if one is set)
2145
2146    Returns:
2147        A list of all timestamps in this range.
2148    """
2149    if start_ts == end_ts:
2150        return []
2151
2152    timestamps = expand_range(start_ts, end_ts, interval_unit)
2153    missing = set()
2154
2155    for current_ts, next_ts in zip(timestamps, timestamps[1:]):
2156        for low, high in intervals:
2157            if current_ts < low:
2158                missing.add((current_ts, next_ts))
2159                break
2160            elif current_ts >= low and next_ts <= high:
2161                break
2162        else:
2163            missing.add((current_ts, next_ts))
2164
2165    if missing:
2166        if lookback:
2167            if model_end_ts:
2168                croniter = interval_unit.croniter(end_ts)
2169                end_ts = to_timestamp(croniter.get_prev(estimate=True))
2170
2171                while model_end_ts < end_ts:
2172                    end_ts = to_timestamp(croniter.get_prev(estimate=True))
2173                    lookback -= 1
2174
2175                lookback = max(lookback, 0)
2176
2177            for i, (current_ts, next_ts) in enumerate(zip(timestamps, timestamps[1:])):
2178                parent = timestamps[i + lookback : i + lookback + 2]
2179
2180                if len(parent) < 2 or tuple(parent) in missing:
2181                    missing.add((current_ts, next_ts))
2182
2183        if model_end_ts:
2184            missing = {interval for interval in missing if interval[0] < model_end_ts}
2185
2186    return sorted(missing)
2187
2188
2189@lru_cache(maxsize=16384)
2190def inclusive_exclusive(
2191    start: TimeLike,
2192    end: TimeLike,
2193    interval_unit: IntervalUnit,
2194    strict: bool = True,
2195    allow_partial: bool = False,
2196    expand: bool = True,
2197) -> Interval:
2198    """Transform the inclusive start and end into a [start, end) pair.
2199
2200    Args:
2201        start: The start date/time of the interval (inclusive)
2202        end: The end date/time of the interval (inclusive)
2203        interval_unit: The interval unit.
2204        strict: Whether to fail when the inclusive start is the same as the exclusive end.
2205        allow_partial: Whether the interval can be partial or not.
2206        expand: Whether or not partial intervals are expanded outwards.
2207
2208    Returns:
2209        A [start, end) pair.
2210    """
2211    start_dt = interval_unit.cron_floor(start)
2212
2213    if not expand and not allow_partial and start_dt < to_datetime(start):
2214        start_dt = interval_unit.cron_next(start_dt)
2215
2216    start_ts = to_timestamp(start_dt)
2217
2218    if is_date(end):
2219        end = to_datetime(end) + timedelta(days=1)
2220
2221    if allow_partial:
2222        end_dt = end
2223    else:
2224        end_dt = interval_unit.cron_floor(end)
2225
2226        if expand and end_dt != to_datetime(end):
2227            end_dt = interval_unit.cron_next(end_dt)
2228
2229    end_ts = to_timestamp(end_dt)
2230
2231    if strict and start_ts >= end_ts:
2232        raise ValueError(
2233            f"`end` ({to_datetime(end_ts)}) must be greater than `start` ({to_datetime(start_ts)})"
2234        )
2235
2236    return (start_ts, end_ts)
2237
2238
2239def earliest_start_date(
2240    snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]],
2241    cache: t.Optional[t.Dict[str, datetime]] = None,
2242    relative_to: t.Optional[TimeLike] = None,
2243) -> datetime:
2244    """Get the earliest start date from a collection of snapshots.
2245
2246    Args:
2247        snapshots: Snapshots to find earliest start date.
2248        cache: optional cache to make computing cache date more efficient
2249        relative_to: the base date to compute start from if inferred from cron
2250    Returns:
2251        The earliest start date or yesterday if none is found.
2252    """
2253    cache = {} if cache is None else cache
2254    if snapshots:
2255        if not isinstance(snapshots, dict):
2256            # Make sure that the mapping is only constructed once
2257            snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
2258        return min(
2259            start_date(snapshot, snapshots, cache=cache, relative_to=relative_to)
2260            for snapshot in snapshots.values()
2261        )
2262
2263    relative_base = None
2264    if relative_to is not None:
2265        relative_base = to_datetime(relative_to)
2266
2267    return yesterday(relative_base=relative_base)
2268
2269
2270def start_date(
2271    snapshot: Snapshot,
2272    snapshots: t.Dict[SnapshotId, Snapshot] | t.Iterable[Snapshot],
2273    cache: t.Optional[t.Dict[str, datetime]] = None,
2274    relative_to: t.Optional[TimeLike] = None,
2275) -> datetime:
2276    """Get the effective/inferred start date for a snapshot.
2277
2278    Not all snapshots define a start date. In those cases, the node's start date
2279    can be inferred from its parent's start date or from its cron.
2280
2281    Args:
2282        snapshot: snapshot to infer start date.
2283        snapshots: a catalog of available snapshots.
2284        cache: optional cache to make computing cache date more efficient
2285        relative_to: the base date to compute start from if inferred from cron
2286
2287    Returns:
2288        Start datetime object.
2289    """
2290    cache = {} if cache is None else cache
2291    key = f"{snapshot.name}_{to_timestamp(relative_to)}" if relative_to else snapshot.name
2292    if key in cache:
2293        return cache[key]
2294    if snapshot.node.start:
2295        start = to_datetime(snapshot.node.start)
2296        cache[key] = start
2297        return start
2298
2299    if not isinstance(snapshots, dict):
2300        snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
2301
2302    parent_starts = [
2303        start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to)
2304        for parent in snapshot.parents
2305        if parent in snapshots
2306    ]
2307    earliest = (
2308        min(parent_starts)
2309        if parent_starts
2310        else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now()))
2311    )
2312
2313    cache[key] = earliest
2314    return earliest
2315
2316
2317def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
2318    dag: DAG[SnapshotId] = DAG()
2319    for snapshot in snapshots:
2320        dag.add(snapshot.snapshot_id, snapshot.parents)
2321    return dag
2322
2323
2324def apply_auto_restatements(
2325    snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike
2326) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, t.List[SnapshotId]]]:
2327    """Applies auto restatements to the snapshots.
2328
2329    This operation results in the removal of intervals for snapshots that are ready to be restated based
2330    on the provided execution time and configured auto restatement settings. For each affected snapshot,
2331    it also updates the next auto restatement timestamp.
2332
2333    Args:
2334        snapshots: A dictionary of snapshots to apply auto restatements to.
2335        execution_time: The execution time.
2336
2337    Returns:
2338        A list of SnapshotIntervals with **new** intervals that need to be restated.
2339    """
2340    dag = snapshots_to_dag(snapshots.values())
2341    auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
2342    auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {}
2343    for s_id in dag:
2344        if s_id not in snapshots:
2345            continue
2346        snapshot = snapshots[s_id]
2347        if not snapshot.is_model or snapshot.model.disable_restatement:
2348            continue
2349
2350        next_auto_restated_interval = snapshot.get_next_auto_restatement_interval(execution_time)
2351        auto_restated_intervals = [
2352            auto_restated_intervals_per_snapshot[parent_s_id]
2353            for parent_s_id in snapshot.parents
2354            if parent_s_id in auto_restated_intervals_per_snapshot
2355        ]
2356        upstream_triggers = []
2357        if next_auto_restated_interval:
2358            logger.info(
2359                "Calculated the next auto restated interval (%s, %s) for snapshot %s",
2360                time_like_to_str(next_auto_restated_interval[0]),
2361                time_like_to_str(next_auto_restated_interval[1]),
2362                snapshot.snapshot_id,
2363            )
2364            auto_restated_intervals.append(next_auto_restated_interval)
2365
2366            # auto-restated snapshot is its own trigger
2367            upstream_triggers = [s_id]
2368        else:
2369            # inherit each parent's auto-restatement triggers (if any)
2370            for parent_s_id in snapshot.parents:
2371                if parent_s_id in auto_restatement_triggers:
2372                    upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
2373
2374        # remove duplicate triggers, retaining order and keeping first seen of duplicates
2375        if upstream_triggers:
2376            auto_restatement_triggers[s_id] = unique(upstream_triggers)
2377
2378        if auto_restated_intervals:
2379            auto_restated_interval_start = sys.maxsize
2380            auto_restated_interval_end = -sys.maxsize
2381            for interval in auto_restated_intervals:
2382                auto_restated_interval_start = min(auto_restated_interval_start, interval[0])
2383                auto_restated_interval_end = max(auto_restated_interval_end, interval[1])
2384
2385            interval_to_remove_start = snapshot.node.interval_unit.cron_floor(
2386                auto_restated_interval_start
2387            )
2388            interval_to_remove_end = snapshot.node.interval_unit.cron_floor(
2389                auto_restated_interval_end
2390            )
2391            if auto_restated_interval_end > to_timestamp(interval_to_remove_end):
2392                interval_to_remove_end = snapshot.node.interval_unit.cron_next(
2393                    interval_to_remove_end
2394                )
2395
2396            removal_interval = snapshot.get_removal_interval(
2397                interval_to_remove_start, interval_to_remove_end, execution_time=execution_time
2398            )
2399
2400            auto_restated_intervals_per_snapshot[s_id] = removal_interval
2401            snapshot.pending_restatement_intervals = merge_intervals(
2402                [*snapshot.pending_restatement_intervals, removal_interval]
2403            )
2404
2405        snapshot.apply_pending_restatement_intervals()
2406        snapshot.update_next_auto_restatement_ts(execution_time)
2407    return (
2408        [
2409            SnapshotIntervals(
2410                name=snapshots[s_id].name,
2411                identifier=None,
2412                version=snapshots[s_id].version,
2413                dev_version=None,
2414                intervals=[],
2415                dev_intervals=[],
2416                pending_restatement_intervals=[interval],
2417            )
2418            for s_id, interval in auto_restated_intervals_per_snapshot.items()
2419            if s_id in snapshots
2420        ],
2421        auto_restatement_triggers,
2422    )
2423
2424
2425def parent_snapshots_by_name(
2426    snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot]
2427) -> t.Dict[str, Snapshot]:
2428    parent_snapshots_by_name = {
2429        snapshots[p_sid].name: snapshots[p_sid] for p_sid in snapshot.parents
2430    }
2431    parent_snapshots_by_name[snapshot.name] = snapshot
2432    return parent_snapshots_by_name
2433
2434
2435def _contiguous_intervals(intervals: Intervals) -> t.List[Intervals]:
2436    """Given a list of intervals with gaps, returns a list of sequences of contiguous intervals."""
2437    contiguous_intervals = []
2438    current_batch: t.List[Interval] = []
2439    for interval in intervals:
2440        if len(current_batch) == 0 or interval[0] == current_batch[-1][-1]:
2441            current_batch.append(interval)
2442        else:
2443            contiguous_intervals.append(current_batch)
2444            current_batch = [interval]
2445
2446    if len(current_batch) > 0:
2447        contiguous_intervals.append(current_batch)
2448
2449    return contiguous_intervals
2450
2451
2452def check_ready_intervals(
2453    check: t.Callable,
2454    intervals: Intervals,
2455    context: ExecutionContext,
2456    python_env: t.Dict[str, Executable],
2457    dialect: DialectType = None,
2458    path: t.Optional[Path] = None,
2459    snapshot: t.Optional[Snapshot] = None,
2460    kwargs: t.Optional[t.Dict] = None,
2461) -> Intervals:
2462    checked_intervals: Intervals = []
2463
2464    for interval_batch in _contiguous_intervals(intervals):
2465        batch = [(to_datetime(start), to_datetime(end)) for start, end in interval_batch]
2466
2467        try:
2468            ready_intervals = call_macro(
2469                check,
2470                dialect,
2471                path,
2472                provided_args=(batch,),
2473                provided_kwargs=(kwargs or {}),
2474                context=context,
2475                snapshot=snapshot,
2476            )
2477        except Exception as ex:
2478            raise SignalEvalError(format_evaluated_code_exception(ex, python_env))
2479
2480        if isinstance(ready_intervals, bool):
2481            if not ready_intervals:
2482                batch = []
2483        elif isinstance(ready_intervals, list):
2484            for i in ready_intervals:
2485                if i not in batch:
2486                    raise SignalEvalError(f"Unknown interval {i} for signal")
2487            batch = ready_intervals
2488        else:
2489            raise SignalEvalError(f"Expected bool | list, got {type(ready_intervals)} for signal")
2490
2491        checked_intervals.extend((to_timestamp(start), to_timestamp(end)) for start, end in batch)
2492
2493    return checked_intervals
2494
2495
2496def get_next_model_interval_start(snapshots: t.Iterable[Snapshot]) -> t.Optional[datetime]:
2497    now_dt = now()
2498
2499    starts = [
2500        snap.node.cron_next(now_dt)
2501        for snap in snapshots
2502        if snap.is_model and not snap.is_symbolic and not snap.is_seed
2503    ]
2504
2505    return min(starts) if starts else None
Interval = typing.Tuple[int, int]
Intervals = typing.List[typing.Tuple[int, int]]
logger = <Logger sqlmesh.core.snapshot.definition (WARNING)>
class SnapshotChangeCategory(enum.IntEnum):
 69class SnapshotChangeCategory(IntEnum):
 70    """
 71    Values are ordered by decreasing severity and that ordering is required.
 72
 73    BREAKING: The change requires that snapshot modified and downstream dependencies be rebuilt
 74    NON_BREAKING: The change requires that only the snapshot modified be rebuilt
 75    FORWARD_ONLY: The change requires no rebuilding
 76    INDIRECT_BREAKING: The change was caused indirectly and is breaking.
 77    INDIRECT_NON_BREAKING: The change was caused indirectly by a non-breaking change.
 78    METADATA: The change was caused by a metadata update.
 79    """
 80
 81    BREAKING = 1
 82    NON_BREAKING = 2
 83    # FORWARD_ONLY category is deprecated and is kept for backwards compatibility.
 84    FORWARD_ONLY = 3
 85    INDIRECT_BREAKING = 4
 86    INDIRECT_NON_BREAKING = 5
 87    METADATA = 6
 88
 89    @property
 90    def is_breaking(self) -> bool:
 91        return self == self.BREAKING
 92
 93    @property
 94    def is_non_breaking(self) -> bool:
 95        return self == self.NON_BREAKING
 96
 97    @property
 98    def is_forward_only(self) -> bool:
 99        return self == self.FORWARD_ONLY
100
101    @property
102    def is_metadata(self) -> bool:
103        return self == self.METADATA
104
105    @property
106    def is_indirect_breaking(self) -> bool:
107        return self == self.INDIRECT_BREAKING
108
109    @property
110    def is_indirect_non_breaking(self) -> bool:
111        return self == self.INDIRECT_NON_BREAKING
112
113    def __repr__(self) -> str:
114        return self.name

Values are ordered by decreasing severity and that ordering is required.

BREAKING: The change requires that snapshot modified and downstream dependencies be rebuilt NON_BREAKING: The change requires that only the snapshot modified be rebuilt FORWARD_ONLY: The change requires no rebuilding INDIRECT_BREAKING: The change was caused indirectly and is breaking. INDIRECT_NON_BREAKING: The change was caused indirectly by a non-breaking change. METADATA: The change was caused by a metadata update.

BREAKING = BREAKING
NON_BREAKING = NON_BREAKING
FORWARD_ONLY = FORWARD_ONLY
INDIRECT_BREAKING = INDIRECT_BREAKING
INDIRECT_NON_BREAKING = INDIRECT_NON_BREAKING
METADATA = METADATA
is_breaking: bool
89    @property
90    def is_breaking(self) -> bool:
91        return self == self.BREAKING
is_non_breaking: bool
93    @property
94    def is_non_breaking(self) -> bool:
95        return self == self.NON_BREAKING
is_forward_only: bool
97    @property
98    def is_forward_only(self) -> bool:
99        return self == self.FORWARD_ONLY
is_metadata: bool
101    @property
102    def is_metadata(self) -> bool:
103        return self == self.METADATA
is_indirect_breaking: bool
105    @property
106    def is_indirect_breaking(self) -> bool:
107        return self == self.INDIRECT_BREAKING
is_indirect_non_breaking: bool
109    @property
110    def is_indirect_non_breaking(self) -> bool:
111        return self == self.INDIRECT_NON_BREAKING
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class SnapshotFingerprint(sqlmesh.utils.pydantic.PydanticModel):
117class SnapshotFingerprint(PydanticModel, frozen=True):
118    data_hash: str
119    metadata_hash: str
120    parent_data_hash: str = "0"
121    parent_metadata_hash: str = "0"
122
123    def to_version(self) -> str:
124        return hash_data([self.data_hash, self.parent_data_hash])
125
126    def to_identifier(self) -> str:
127        return hash_data(
128            [
129                self.data_hash,
130                self.metadata_hash,
131                self.parent_data_hash,
132                self.parent_metadata_hash,
133            ]
134        )
135
136    def __str__(self) -> str:
137        return f"SnapshotFingerprint<{self.to_identifier()}, data: {self.data_hash}, meta: {self.metadata_hash}, pdata: {self.parent_data_hash}, pmeta: {self.parent_metadata_hash}>"

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
data_hash: str
metadata_hash: str
parent_data_hash: str
parent_metadata_hash: str
def to_version(self) -> str:
123    def to_version(self) -> str:
124        return hash_data([self.data_hash, self.parent_data_hash])
def to_identifier(self) -> str:
126    def to_identifier(self) -> str:
127        return hash_data(
128            [
129                self.data_hash,
130                self.metadata_hash,
131                self.parent_data_hash,
132                self.parent_metadata_hash,
133            ]
134        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnapshotId(sqlmesh.utils.pydantic.PydanticModel):
140class SnapshotId(PydanticModel, frozen=True):
141    name: str
142    identifier: str
143
144    @property
145    def snapshot_id(self) -> SnapshotId:
146        """Helper method to return self."""
147        return self
148
149    def __eq__(self, other: t.Any) -> bool:
150        return (
151            isinstance(other, self.__class__)
152            and self.name == other.name
153            and self.identifier == other.identifier
154        )
155
156    def __hash__(self) -> int:
157        return hash((self.__class__, self.name, self.identifier))
158
159    def __lt__(self, other: SnapshotId) -> bool:
160        return self.name < other.name
161
162    def __str__(self) -> str:
163        return f"SnapshotId<{self.name}: {self.identifier}>"

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
name: str
identifier: str
snapshot_id: SnapshotId
144    @property
145    def snapshot_id(self) -> SnapshotId:
146        """Helper method to return self."""
147        return self

Helper method to return self.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnapshotIdBatch(sqlmesh.utils.pydantic.PydanticModel):
166class SnapshotIdBatch(PydanticModel, frozen=True):
167    snapshot_id: SnapshotId
168    batch_id: int

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
snapshot_id: SnapshotId
batch_id: int
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnapshotNameVersion(sqlmesh.utils.pydantic.PydanticModel):
171class SnapshotNameVersion(PydanticModel, frozen=True):
172    name: str
173    version: str
174
175    @property
176    def name_version(self) -> SnapshotNameVersion:
177        """Helper method to return self."""
178        return self

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
name: str
version: str
name_version: SnapshotNameVersion
175    @property
176    def name_version(self) -> SnapshotNameVersion:
177        """Helper method to return self."""
178        return self

Helper method to return self.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnapshotIntervals(sqlmesh.utils.pydantic.PydanticModel):
181class SnapshotIntervals(PydanticModel):
182    name: str
183    identifier: t.Optional[str]
184    version: str
185    dev_version: t.Optional[str]
186    intervals: Intervals = []
187    dev_intervals: Intervals = []
188    pending_restatement_intervals: Intervals = []
189    last_altered_ts: t.Optional[int] = None
190    dev_last_altered_ts: t.Optional[int] = None
191
192    @property
193    def snapshot_id(self) -> t.Optional[SnapshotId]:
194        if not self.identifier:
195            return None
196        return SnapshotId(name=self.name, identifier=self.identifier)
197
198    @property
199    def name_version(self) -> SnapshotNameVersion:
200        return SnapshotNameVersion(name=self.name, version=self.version)
201
202    def add_interval(self, start: int, end: int) -> None:
203        self._add_interval(start, end, "intervals")
204
205    def add_dev_interval(self, start: int, end: int) -> None:
206        self._add_interval(start, end, "dev_intervals")
207
208    def add_pending_restatement_interval(self, start: int, end: int) -> None:
209        self._add_interval(start, end, "pending_restatement_intervals")
210
211    def update_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
212        self._update_last_altered_ts(last_altered_ts, "last_altered_ts")
213
214    def update_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
215        self._update_last_altered_ts(last_altered_ts, "dev_last_altered_ts")
216
217    def remove_interval(self, start: int, end: int) -> None:
218        self._remove_interval(start, end, "intervals")
219
220    def remove_dev_interval(self, start: int, end: int) -> None:
221        self._remove_interval(start, end, "dev_intervals")
222
223    def remove_pending_restatement_interval(self, start: int, end: int) -> None:
224        self._remove_interval(start, end, "pending_restatement_intervals")
225
226    def is_empty(self) -> bool:
227        return (
228            not self.intervals and not self.dev_intervals and not self.pending_restatement_intervals
229        )
230
231    def _add_interval(self, start: int, end: int, interval_attr: str) -> None:
232        target_intervals = getattr(self, interval_attr)
233        target_intervals = merge_intervals([*target_intervals, (start, end)])
234        setattr(self, interval_attr, target_intervals)
235
236    def _update_last_altered_ts(
237        self, last_altered_ts: t.Optional[int], last_altered_attr: str
238    ) -> None:
239        if last_altered_ts:
240            existing_last_altered_ts = getattr(self, last_altered_attr)
241            setattr(self, last_altered_attr, max(existing_last_altered_ts or 0, last_altered_ts))
242
243    def _remove_interval(self, start: int, end: int, interval_attr: str) -> None:
244        target_intervals = getattr(self, interval_attr)
245        target_intervals = remove_interval(target_intervals, start, end)
246        setattr(self, interval_attr, target_intervals)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
name: str
identifier: Optional[str]
version: str
dev_version: Optional[str]
intervals: List[Tuple[int, int]]
dev_intervals: List[Tuple[int, int]]
pending_restatement_intervals: List[Tuple[int, int]]
last_altered_ts: Optional[int]
dev_last_altered_ts: Optional[int]
snapshot_id: Optional[SnapshotId]
192    @property
193    def snapshot_id(self) -> t.Optional[SnapshotId]:
194        if not self.identifier:
195            return None
196        return SnapshotId(name=self.name, identifier=self.identifier)
name_version: SnapshotNameVersion
198    @property
199    def name_version(self) -> SnapshotNameVersion:
200        return SnapshotNameVersion(name=self.name, version=self.version)
def add_interval(self, start: int, end: int) -> None:
202    def add_interval(self, start: int, end: int) -> None:
203        self._add_interval(start, end, "intervals")
def add_dev_interval(self, start: int, end: int) -> None:
205    def add_dev_interval(self, start: int, end: int) -> None:
206        self._add_interval(start, end, "dev_intervals")
def add_pending_restatement_interval(self, start: int, end: int) -> None:
208    def add_pending_restatement_interval(self, start: int, end: int) -> None:
209        self._add_interval(start, end, "pending_restatement_intervals")
def update_last_altered_ts(self, last_altered_ts: Optional[int]) -> None:
211    def update_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
212        self._update_last_altered_ts(last_altered_ts, "last_altered_ts")
def update_dev_last_altered_ts(self, last_altered_ts: Optional[int]) -> None:
214    def update_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
215        self._update_last_altered_ts(last_altered_ts, "dev_last_altered_ts")
def remove_interval(self, start: int, end: int) -> None:
217    def remove_interval(self, start: int, end: int) -> None:
218        self._remove_interval(start, end, "intervals")
def remove_dev_interval(self, start: int, end: int) -> None:
220    def remove_dev_interval(self, start: int, end: int) -> None:
221        self._remove_interval(start, end, "dev_intervals")
def remove_pending_restatement_interval(self, start: int, end: int) -> None:
223    def remove_pending_restatement_interval(self, start: int, end: int) -> None:
224        self._remove_interval(start, end, "pending_restatement_intervals")
def is_empty(self) -> bool:
226    def is_empty(self) -> bool:
227        return (
228            not self.intervals and not self.dev_intervals and not self.pending_restatement_intervals
229        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnapshotDataVersion(sqlmesh.utils.pydantic.PydanticModel):
249class SnapshotDataVersion(PydanticModel, frozen=True):
250    fingerprint: SnapshotFingerprint
251    version: str
252    dev_version_: t.Optional[str] = Field(default=None, alias="dev_version")
253    change_category: t.Optional[SnapshotChangeCategory] = None
254    physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
255    dev_table_suffix: str
256    table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
257    virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
258
259    def snapshot_id(self, name: str) -> SnapshotId:
260        return SnapshotId(name=name, identifier=self.fingerprint.to_identifier())
261
262    @property
263    def dev_version(self) -> str:
264        return self.dev_version_ or self.fingerprint.to_version()
265
266    @property
267    def physical_schema(self) -> str:
268        # The physical schema here is optional to maintain backwards compatibility with
269        # records stored by previous versions of SQLMesh.
270        return self.physical_schema_ or c.SQLMESH
271
272    @property
273    def data_version(self) -> SnapshotDataVersion:
274        return self
275
276    @property
277    def is_new_version(self) -> bool:
278        """Returns whether or not this version is new and requires a backfill."""
279        return self.fingerprint.to_version() == self.version

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
fingerprint: SnapshotFingerprint
version: str
dev_version_: Optional[str]
change_category: Optional[SnapshotChangeCategory]
physical_schema_: Optional[str]
dev_table_suffix: str
def snapshot_id(self, name: str) -> SnapshotId:
259    def snapshot_id(self, name: str) -> SnapshotId:
260        return SnapshotId(name=name, identifier=self.fingerprint.to_identifier())
dev_version: str
262    @property
263    def dev_version(self) -> str:
264        return self.dev_version_ or self.fingerprint.to_version()
physical_schema: str
266    @property
267    def physical_schema(self) -> str:
268        # The physical schema here is optional to maintain backwards compatibility with
269        # records stored by previous versions of SQLMesh.
270        return self.physical_schema_ or c.SQLMESH
data_version: SnapshotDataVersion
272    @property
273    def data_version(self) -> SnapshotDataVersion:
274        return self
is_new_version: bool
276    @property
277    def is_new_version(self) -> bool:
278        """Returns whether or not this version is new and requires a backfill."""
279        return self.fingerprint.to_version() == self.version

Returns whether or not this version is new and requires a backfill.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class QualifiedViewName(sqlmesh.utils.pydantic.PydanticModel):
282class QualifiedViewName(PydanticModel, frozen=True):
283    catalog: t.Optional[str] = None
284    schema_name: t.Optional[str] = None
285    table: str
286
287    def for_environment(
288        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
289    ) -> str:
290        return exp.table_name(self.table_for_environment(environment_naming_info, dialect=dialect))
291
292    def table_for_environment(
293        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
294    ) -> exp.Table:
295        return exp.table_(
296            self.table_name_for_environment(environment_naming_info, dialect=dialect),
297            db=self.schema_for_environment(environment_naming_info, dialect=dialect),
298            catalog=self.catalog_for_environment(environment_naming_info, dialect=dialect),
299        )
300
301    def catalog_for_environment(
302        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
303    ) -> t.Optional[str]:
304        catalog_name: t.Optional[str] = None
305        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog:
306            catalog_name = f"{self.catalog}__{environment_naming_info.name}"
307        elif environment_naming_info.catalog_name_override:
308            catalog_name = environment_naming_info.catalog_name_override
309
310        if catalog_name:
311            return (
312                normalize_identifiers(catalog_name, dialect=dialect).name
313                if environment_naming_info.normalize_name
314                else catalog_name
315            )
316
317        return self.catalog
318
319    def schema_for_environment(
320        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
321    ) -> str:
322        normalize = environment_naming_info.normalize_name
323
324        if self.schema_name:
325            schema = self.schema_name
326        else:
327            schema = c.DEFAULT_SCHEMA
328            if normalize:
329                schema = normalize_identifiers(schema, dialect=dialect).name
330
331        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema:
332            env_name = environment_naming_info.name
333            if normalize:
334                env_name = normalize_identifiers(env_name, dialect=dialect).name
335
336            schema = f"{schema}__{env_name}"
337
338        return schema
339
340    def table_name_for_environment(
341        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
342    ) -> str:
343        table = self.table
344        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table:
345            env_name = environment_naming_info.name
346            if environment_naming_info.normalize_name:
347                env_name = normalize_identifiers(env_name, dialect=dialect).name
348
349            table = f"{table}__{env_name}"
350
351        return table

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
catalog: Optional[str]
schema_name: Optional[str]
table: str
def for_environment( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
287    def for_environment(
288        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
289    ) -> str:
290        return exp.table_name(self.table_for_environment(environment_naming_info, dialect=dialect))
def table_for_environment( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.query.Table:
292    def table_for_environment(
293        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
294    ) -> exp.Table:
295        return exp.table_(
296            self.table_name_for_environment(environment_naming_info, dialect=dialect),
297            db=self.schema_for_environment(environment_naming_info, dialect=dialect),
298            catalog=self.catalog_for_environment(environment_naming_info, dialect=dialect),
299        )
def catalog_for_environment( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> Optional[str]:
301    def catalog_for_environment(
302        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
303    ) -> t.Optional[str]:
304        catalog_name: t.Optional[str] = None
305        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog:
306            catalog_name = f"{self.catalog}__{environment_naming_info.name}"
307        elif environment_naming_info.catalog_name_override:
308            catalog_name = environment_naming_info.catalog_name_override
309
310        if catalog_name:
311            return (
312                normalize_identifiers(catalog_name, dialect=dialect).name
313                if environment_naming_info.normalize_name
314                else catalog_name
315            )
316
317        return self.catalog
def schema_for_environment( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
319    def schema_for_environment(
320        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
321    ) -> str:
322        normalize = environment_naming_info.normalize_name
323
324        if self.schema_name:
325            schema = self.schema_name
326        else:
327            schema = c.DEFAULT_SCHEMA
328            if normalize:
329                schema = normalize_identifiers(schema, dialect=dialect).name
330
331        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema:
332            env_name = environment_naming_info.name
333            if normalize:
334                env_name = normalize_identifiers(env_name, dialect=dialect).name
335
336            schema = f"{schema}__{env_name}"
337
338        return schema
def table_name_for_environment( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
340    def table_name_for_environment(
341        self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
342    ) -> str:
343        table = self.table
344        if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table:
345            env_name = environment_naming_info.name
346            if environment_naming_info.normalize_name:
347                env_name = normalize_identifiers(env_name, dialect=dialect).name
348
349            table = f"{table}__{env_name}"
350
351        return table
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SnapshotInfoMixin(sqlmesh.core.model.kind.ModelKindMixin):
354class SnapshotInfoMixin(ModelKindMixin):
355    name: str
356    dev_version_: t.Optional[str]
357    change_category: t.Optional[SnapshotChangeCategory]
358    fingerprint: SnapshotFingerprint
359    previous_versions: t.Tuple[SnapshotDataVersion, ...]
360    # Added to support Migration # 34 (default catalog)
361    # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though)
362    base_table_name_override: t.Optional[str]
363    dev_table_suffix: str
364    table_naming_convention: TableNamingConvention
365    forward_only: bool
366
367    @cached_property
368    def identifier(self) -> str:
369        return self.fingerprint.to_identifier()
370
371    @cached_property
372    def snapshot_id(self) -> SnapshotId:
373        return SnapshotId(name=self.name, identifier=self.identifier)
374
375    @property
376    def qualified_view_name(self) -> QualifiedViewName:
377        view_name = exp.to_table(self.fully_qualified_table or self.name)
378        return QualifiedViewName(
379            catalog=view_name.catalog or None,
380            schema_name=view_name.db or None,
381            table=view_name.name,
382        )
383
384    @property
385    def previous_version(self) -> t.Optional[SnapshotDataVersion]:
386        """Helper method to get the previous data version."""
387        if self.previous_versions:
388            return self.previous_versions[-1]
389        return None
390
391    @property
392    def dev_version(self) -> str:
393        return self.dev_version_ or self.fingerprint.to_version()
394
395    @property
396    def physical_schema(self) -> str:
397        raise NotImplementedError
398
399    @property
400    def data_version(self) -> SnapshotDataVersion:
401        raise NotImplementedError
402
403    @property
404    def is_new_version(self) -> bool:
405        raise NotImplementedError
406
407    @cached_property
408    def fully_qualified_table(self) -> t.Optional[exp.Table]:
409        raise NotImplementedError
410
411    @property
412    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
413        raise NotImplementedError
414
415    @property
416    def is_forward_only(self) -> bool:
417        return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
418
419    @property
420    def is_metadata(self) -> bool:
421        return self.change_category == SnapshotChangeCategory.METADATA
422
423    @property
424    def is_indirect_non_breaking(self) -> bool:
425        return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
426
427    @property
428    def is_no_rebuild(self) -> bool:
429        """Returns true if this snapshot doesn't require a rebuild in production."""
430        return self.forward_only or self.change_category in (
431            SnapshotChangeCategory.FORWARD_ONLY,  # Backwards compatibility
432            SnapshotChangeCategory.METADATA,
433            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
434        )
435
436    @property
437    def is_no_preview(self) -> bool:
438        """Returns true if this snapshot doesn't require a preview in development."""
439        return self.forward_only and self.change_category in (
440            SnapshotChangeCategory.METADATA,
441            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
442        )
443
444    @property
445    def all_versions(self) -> t.Tuple[SnapshotDataVersion, ...]:
446        """Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT."""
447        return (*self.previous_versions, self.data_version)[-c.DATA_VERSION_LIMIT :]
448
449    def display_name(
450        self,
451        environment_naming_info: EnvironmentNamingInfo,
452        default_catalog: t.Optional[str],
453        dialect: DialectType = None,
454    ) -> str:
455        """
456        Returns the model name as a qualified view name.
457        This is just used for presenting information back to the user and `qualified_view_name` should be used
458        when wanting a view name in all other cases.
459        """
460        return display_name(self, environment_naming_info, default_catalog, dialect=dialect)
461
462    def data_hash_matches(self, other: t.Optional[SnapshotInfoMixin | SnapshotDataVersion]) -> bool:
463        return other is not None and self.fingerprint.data_hash == other.fingerprint.data_hash
464
465    def _table_name(self, version: str, is_deployable: bool) -> str:
466        """Full table name pointing to the materialized location of the snapshot.
467
468        Args:
469            version: The snapshot version.
470            is_deployable: Indicates whether to return the table name for deployment to production.
471        """
472        if self.is_external:
473            return self.name
474
475        if is_deployable and self.virtual_environment_mode.is_dev_only:
476            # Use the model name as is if the target is deployable and the virtual environment mode is set to dev-only
477            return self.name
478
479        is_dev_table = not is_deployable
480        if is_dev_table:
481            version = self.dev_version
482
483        if self.fully_qualified_table is None:
484            raise SQLMeshError(
485                f"Tried to get a table name for a snapshot that does not have a table. {self.name}"
486            )
487        # We want to exclude the catalog from the name but still include catalog when determining the fqn
488        # for the table.
489        if self.base_table_name_override:
490            base_table_name = self.base_table_name_override
491        else:
492            fqt = self.fully_qualified_table.copy()
493            fqt.set("catalog", None)
494            base_table_name = fqt.sql()
495
496        return table_name(
497            self.physical_schema,
498            base_table_name,
499            version,
500            catalog=self.fully_qualified_table.catalog,
501            suffix=self.dev_table_suffix if is_dev_table else None,
502            naming_convention=self.table_naming_convention,
503        )
504
505    @property
506    def node_type(self) -> NodeType:
507        raise NotImplementedError
508
509    @property
510    def is_model(self) -> bool:
511        return self.node_type == NodeType.MODEL
512
513    @property
514    def is_audit(self) -> bool:
515        return self.node_type == NodeType.AUDIT
name: str
dev_version_: Optional[str]
change_category: Optional[SnapshotChangeCategory]
fingerprint: SnapshotFingerprint
previous_versions: Tuple[SnapshotDataVersion, ...]
base_table_name_override: Optional[str]
dev_table_suffix: str
forward_only: bool
identifier: str
367    @cached_property
368    def identifier(self) -> str:
369        return self.fingerprint.to_identifier()
snapshot_id: SnapshotId
371    @cached_property
372    def snapshot_id(self) -> SnapshotId:
373        return SnapshotId(name=self.name, identifier=self.identifier)
qualified_view_name: QualifiedViewName
375    @property
376    def qualified_view_name(self) -> QualifiedViewName:
377        view_name = exp.to_table(self.fully_qualified_table or self.name)
378        return QualifiedViewName(
379            catalog=view_name.catalog or None,
380            schema_name=view_name.db or None,
381            table=view_name.name,
382        )
previous_version: Optional[SnapshotDataVersion]
384    @property
385    def previous_version(self) -> t.Optional[SnapshotDataVersion]:
386        """Helper method to get the previous data version."""
387        if self.previous_versions:
388            return self.previous_versions[-1]
389        return None

Helper method to get the previous data version.

dev_version: str
391    @property
392    def dev_version(self) -> str:
393        return self.dev_version_ or self.fingerprint.to_version()
physical_schema: str
395    @property
396    def physical_schema(self) -> str:
397        raise NotImplementedError
data_version: SnapshotDataVersion
399    @property
400    def data_version(self) -> SnapshotDataVersion:
401        raise NotImplementedError
is_new_version: bool
403    @property
404    def is_new_version(self) -> bool:
405        raise NotImplementedError
fully_qualified_table: Optional[sqlglot.expressions.query.Table]
407    @cached_property
408    def fully_qualified_table(self) -> t.Optional[exp.Table]:
409        raise NotImplementedError
virtual_environment_mode: sqlmesh.core.config.common.VirtualEnvironmentMode
411    @property
412    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
413        raise NotImplementedError
is_forward_only: bool
415    @property
416    def is_forward_only(self) -> bool:
417        return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
is_metadata: bool
419    @property
420    def is_metadata(self) -> bool:
421        return self.change_category == SnapshotChangeCategory.METADATA
is_indirect_non_breaking: bool
423    @property
424    def is_indirect_non_breaking(self) -> bool:
425        return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
is_no_rebuild: bool
427    @property
428    def is_no_rebuild(self) -> bool:
429        """Returns true if this snapshot doesn't require a rebuild in production."""
430        return self.forward_only or self.change_category in (
431            SnapshotChangeCategory.FORWARD_ONLY,  # Backwards compatibility
432            SnapshotChangeCategory.METADATA,
433            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
434        )

Returns true if this snapshot doesn't require a rebuild in production.

is_no_preview: bool
436    @property
437    def is_no_preview(self) -> bool:
438        """Returns true if this snapshot doesn't require a preview in development."""
439        return self.forward_only and self.change_category in (
440            SnapshotChangeCategory.METADATA,
441            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
442        )

Returns true if this snapshot doesn't require a preview in development.

all_versions: Tuple[SnapshotDataVersion, ...]
444    @property
445    def all_versions(self) -> t.Tuple[SnapshotDataVersion, ...]:
446        """Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT."""
447        return (*self.previous_versions, self.data_version)[-c.DATA_VERSION_LIMIT :]

Returns previous versions with the current version trimmed to DATA_VERSION_LIMIT.

def display_name( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Optional[str], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
449    def display_name(
450        self,
451        environment_naming_info: EnvironmentNamingInfo,
452        default_catalog: t.Optional[str],
453        dialect: DialectType = None,
454    ) -> str:
455        """
456        Returns the model name as a qualified view name.
457        This is just used for presenting information back to the user and `qualified_view_name` should be used
458        when wanting a view name in all other cases.
459        """
460        return display_name(self, environment_naming_info, default_catalog, dialect=dialect)

Returns the model name as a qualified view name. This is just used for presenting information back to the user and qualified_view_name should be used when wanting a view name in all other cases.

def data_hash_matches( self, other: Union[SnapshotInfoMixin, SnapshotDataVersion, NoneType]) -> bool:
462    def data_hash_matches(self, other: t.Optional[SnapshotInfoMixin | SnapshotDataVersion]) -> bool:
463        return other is not None and self.fingerprint.data_hash == other.fingerprint.data_hash
node_type: sqlmesh.core.node.NodeType
505    @property
506    def node_type(self) -> NodeType:
507        raise NotImplementedError
is_model: bool
509    @property
510    def is_model(self) -> bool:
511        return self.node_type == NodeType.MODEL
is_audit: bool
513    @property
514    def is_audit(self) -> bool:
515        return self.node_type == NodeType.AUDIT
class SnapshotTableInfo(sqlmesh.utils.pydantic.PydanticModel, SnapshotInfoMixin):
518class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
519    name: str
520    fingerprint: SnapshotFingerprint
521    version: str
522    dev_version_: t.Optional[str] = Field(default=None, alias="dev_version")
523    physical_schema_: str = Field(alias="physical_schema")
524    parents: t.Tuple[SnapshotId, ...]
525    previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
526    change_category: t.Optional[SnapshotChangeCategory] = None
527    kind_name: t.Optional[ModelKindName] = None
528    node_type_: NodeType = Field(default=NodeType.MODEL, alias="node_type")
529    # Added to support Migration # 34 (default catalog)
530    # This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though)
531    base_table_name_override: t.Optional[str] = None
532    custom_materialization: t.Optional[str] = None
533    dev_table_suffix: str
534    model_gateway: t.Optional[str] = None
535    forward_only: bool = False
536    table_naming_convention: TableNamingConvention = TableNamingConvention.default
537    virtual_environment_mode_: VirtualEnvironmentMode = Field(
538        default=VirtualEnvironmentMode.default, alias="virtual_environment_mode"
539    )
540
541    def __lt__(self, other: SnapshotTableInfo) -> bool:
542        return self.name < other.name
543
544    def __eq__(self, other: t.Any) -> bool:
545        return isinstance(other, SnapshotTableInfo) and self.fingerprint == other.fingerprint
546
547    def __hash__(self) -> int:
548        return hash((self.__class__, self.name, self.fingerprint))
549
550    def table_name(self, is_deployable: bool = True) -> str:
551        """Full table name pointing to the materialized location of the snapshot.
552
553        Args:
554            is_deployable: Indicates whether to return the table name for deployment to production.
555        """
556        return self._table_name(self.version, is_deployable)
557
558    @property
559    def physical_schema(self) -> str:
560        return self.physical_schema_
561
562    @cached_property
563    def fully_qualified_table(self) -> exp.Table:
564        return exp.to_table(self.name)
565
566    @property
567    def table_info(self) -> SnapshotTableInfo:
568        """Helper method to return self."""
569        return self
570
571    @property
572    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
573        return self.virtual_environment_mode_
574
575    @property
576    def data_version(self) -> SnapshotDataVersion:
577        return SnapshotDataVersion(
578            fingerprint=self.fingerprint,
579            version=self.version,
580            dev_version=self.dev_version,
581            change_category=self.change_category,
582            physical_schema=self.physical_schema,
583            dev_table_suffix=self.dev_table_suffix,
584            table_naming_convention=self.table_naming_convention,
585            virtual_environment_mode=self.virtual_environment_mode,
586        )
587
588    @property
589    def is_new_version(self) -> bool:
590        """Returns whether or not this version is new and requires a backfill."""
591        return self.fingerprint.to_version() == self.version
592
593    @property
594    def model_kind_name(self) -> t.Optional[ModelKindName]:
595        return self.kind_name
596
597    @property
598    def node_type(self) -> NodeType:
599        return self.node_type_
600
601    @property
602    def name_version(self) -> SnapshotNameVersion:
603        """Returns the name and version of the snapshot."""
604        return SnapshotNameVersion(name=self.name, version=self.version)
605
606    @property
607    def id_and_version(self) -> SnapshotIdAndVersion:
608        return SnapshotIdAndVersion(
609            name=self.name,
610            kind_name=self.kind_name,
611            identifier=self.identifier,
612            version=self.version,
613            dev_version=self.dev_version,
614            fingerprint=self.fingerprint,
615        )

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
name: str
fingerprint: SnapshotFingerprint
version: str
dev_version_: Optional[str]
physical_schema_: str
parents: Tuple[SnapshotId, ...]
previous_versions: Tuple[SnapshotDataVersion, ...]
change_category: Optional[SnapshotChangeCategory]
base_table_name_override: Optional[str]
custom_materialization: Optional[str]
dev_table_suffix: str
model_gateway: Optional[str]
forward_only: bool
def table_name(self, is_deployable: bool = True) -> str:
550    def table_name(self, is_deployable: bool = True) -> str:
551        """Full table name pointing to the materialized location of the snapshot.
552
553        Args:
554            is_deployable: Indicates whether to return the table name for deployment to production.
555        """
556        return self._table_name(self.version, is_deployable)

Full table name pointing to the materialized location of the snapshot.

Arguments:
  • is_deployable: Indicates whether to return the table name for deployment to production.
physical_schema: str
558    @property
559    def physical_schema(self) -> str:
560        return self.physical_schema_
fully_qualified_table: sqlglot.expressions.query.Table
562    @cached_property
563    def fully_qualified_table(self) -> exp.Table:
564        return exp.to_table(self.name)
table_info: SnapshotTableInfo
566    @property
567    def table_info(self) -> SnapshotTableInfo:
568        """Helper method to return self."""
569        return self

Helper method to return self.

virtual_environment_mode: sqlmesh.core.config.common.VirtualEnvironmentMode
571    @property
572    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
573        return self.virtual_environment_mode_
data_version: SnapshotDataVersion
575    @property
576    def data_version(self) -> SnapshotDataVersion:
577        return SnapshotDataVersion(
578            fingerprint=self.fingerprint,
579            version=self.version,
580            dev_version=self.dev_version,
581            change_category=self.change_category,
582            physical_schema=self.physical_schema,
583            dev_table_suffix=self.dev_table_suffix,
584            table_naming_convention=self.table_naming_convention,
585            virtual_environment_mode=self.virtual_environment_mode,
586        )
is_new_version: bool
588    @property
589    def is_new_version(self) -> bool:
590        """Returns whether or not this version is new and requires a backfill."""
591        return self.fingerprint.to_version() == self.version

Returns whether or not this version is new and requires a backfill.

model_kind_name: Optional[sqlmesh.core.model.kind.ModelKindName]
593    @property
594    def model_kind_name(self) -> t.Optional[ModelKindName]:
595        return self.kind_name

Returns the model kind name.

node_type: sqlmesh.core.node.NodeType
597    @property
598    def node_type(self) -> NodeType:
599        return self.node_type_
name_version: SnapshotNameVersion
601    @property
602    def name_version(self) -> SnapshotNameVersion:
603        """Returns the name and version of the snapshot."""
604        return SnapshotNameVersion(name=self.name, version=self.version)

Returns the name and version of the snapshot.

id_and_version: SnapshotIdAndVersion
606    @property
607    def id_and_version(self) -> SnapshotIdAndVersion:
608        return SnapshotIdAndVersion(
609            name=self.name,
610            kind_name=self.kind_name,
611            identifier=self.identifier,
612            version=self.version,
613            dev_version=self.dev_version,
614            fingerprint=self.fingerprint,
615        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

618class SnapshotIdAndVersion(PydanticModel, ModelKindMixin):
619    """A stripped down version of a snapshot that is used in situations where we want to fetch the main fields of the snapshots table
620    without the overhead of parsing the full snapshot payload and fetching intervals.
621    """
622
623    name: str
624    version: str
625    kind_name_: t.Optional[ModelKindName] = Field(default=None, alias="kind_name")
626    dev_version_: t.Optional[str] = Field(alias="dev_version")
627    identifier: str
628    fingerprint_: t.Union[str, SnapshotFingerprint] = Field(alias="fingerprint")
629
630    @property
631    def snapshot_id(self) -> SnapshotId:
632        return SnapshotId(name=self.name, identifier=self.identifier)
633
634    @property
635    def id_and_version(self) -> SnapshotIdAndVersion:
636        return self
637
638    @property
639    def name_version(self) -> SnapshotNameVersion:
640        return SnapshotNameVersion(name=self.name, version=self.version)
641
642    @property
643    def fingerprint(self) -> SnapshotFingerprint:
644        value = self.fingerprint_
645        if isinstance(value, str):
646            self.fingerprint_ = value = SnapshotFingerprint.parse_raw(value)
647        return value
648
649    @property
650    def dev_version(self) -> str:
651        return self.dev_version_ or self.fingerprint.to_version()
652
653    @property
654    def model_kind_name(self) -> t.Optional[ModelKindName]:
655        return self.kind_name_
656
657    def display_name(
658        self,
659        environment_naming_info: EnvironmentNamingInfo,
660        default_catalog: t.Optional[str],
661        dialect: DialectType = None,
662    ) -> str:
663        return model_display_name(
664            self.name, environment_naming_info, default_catalog, dialect=dialect
665        )

A stripped down version of a snapshot that is used in situations where we want to fetch the main fields of the snapshots table without the overhead of parsing the full snapshot payload and fetching intervals.

name: str
version: str
dev_version_: Optional[str]
identifier: str
fingerprint_: Union[str, SnapshotFingerprint]
snapshot_id: SnapshotId
630    @property
631    def snapshot_id(self) -> SnapshotId:
632        return SnapshotId(name=self.name, identifier=self.identifier)
id_and_version: SnapshotIdAndVersion
634    @property
635    def id_and_version(self) -> SnapshotIdAndVersion:
636        return self
name_version: SnapshotNameVersion
638    @property
639    def name_version(self) -> SnapshotNameVersion:
640        return SnapshotNameVersion(name=self.name, version=self.version)
fingerprint: SnapshotFingerprint
642    @property
643    def fingerprint(self) -> SnapshotFingerprint:
644        value = self.fingerprint_
645        if isinstance(value, str):
646            self.fingerprint_ = value = SnapshotFingerprint.parse_raw(value)
647        return value
dev_version: str
649    @property
650    def dev_version(self) -> str:
651        return self.dev_version_ or self.fingerprint.to_version()
model_kind_name: Optional[sqlmesh.core.model.kind.ModelKindName]
653    @property
654    def model_kind_name(self) -> t.Optional[ModelKindName]:
655        return self.kind_name_

Returns the model kind name.

def display_name( self, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Optional[str], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
657    def display_name(
658        self,
659        environment_naming_info: EnvironmentNamingInfo,
660        default_catalog: t.Optional[str],
661        dialect: DialectType = None,
662    ) -> str:
663        return model_display_name(
664            self.name, environment_naming_info, default_catalog, dialect=dialect
665        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
sqlmesh.core.model.kind.ModelKindMixin
is_incremental_by_time_range
is_incremental_by_unique_key
is_incremental_by_partition
is_incremental_unmanaged
is_incremental
is_full
is_view
is_embedded
is_seed
is_external
is_scd_type_2
is_scd_type_2_by_time
is_scd_type_2_by_column
is_custom
is_managed
is_dbt_custom
is_symbolic
is_materialized
only_execution_time
full_history_restatement_only
supports_python_models
supports_grants
 668class Snapshot(PydanticModel, SnapshotInfoMixin):
 669    """A snapshot represents a node at a certain point in time.
 670
 671    Snapshots are used to encapsulate everything needed to evaluate a node.
 672    They are standalone objects that hold all state and dynamic content necessary
 673    to render a node's query including things like macros. Snapshots also store intervals
 674    (timestamp ranges for what data we've processed).
 675
 676    Nodes can be dynamically rendered due to macros. Rendering a node to its full extent
 677    requires storing variables and macro definitions. We store all of the macro definitions and
 678    global variable references in `python_env` in raw text to avoid pickling. The helper methods
 679    to achieve this are defined in utils.metaprogramming.
 680
 681    Args:
 682        name: The snapshot name which is the same as the node name and should be unique per node.
 683        fingerprint: A unique hash of the node definition so that nodes can be reused across environments.
 684        node: Node object that the snapshot encapsulates.
 685        parents: The list of parent snapshots (upstream dependencies).
 686        intervals: List of [start, end) intervals showing which time ranges a snapshot has data for.
 687        dev_intervals: List of [start, end) intervals showing development intervals (forward-only).
 688        created_ts: Epoch millis timestamp when a snapshot was first created.
 689        updated_ts: Epoch millis timestamp when a snapshot was last updated.
 690        ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced
 691            in any environment.
 692        previous: The snapshot data version that this snapshot was based on. If this snapshot is new, then previous will be None.
 693        version: User specified version for a snapshot that is used for physical storage.
 694            By default, the version is the fingerprint, but not all changes to nodes require a backfill.
 695            If a user passes a previous version, that will be used instead and no backfill will be required.
 696        change_category: User specified change category indicating which nodes require backfill from node changes made in this snapshot.
 697        unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that
 698            this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused.
 699        effective_from: The timestamp which indicates when this snapshot should be considered effective.
 700            Applicable for forward-only snapshots only.
 701        migrated: Whether or not this snapshot has been created as a result of migration.
 702        unrestorable: Whether or not this snapshot can be used to revert its model to a previous version.
 703        next_auto_restatement_ts: The timestamp which indicates when is the next time this snapshot should be restated.
 704        table_naming_convention: Convention to follow when generating the physical table name
 705    """
 706
 707    name: str
 708    fingerprint: SnapshotFingerprint
 709    physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
 710    node: Node
 711    parents: t.Tuple[SnapshotId, ...]
 712    intervals: Intervals = []
 713    dev_intervals: Intervals = []
 714    pending_restatement_intervals: Intervals = []
 715    created_ts: int
 716    updated_ts: int
 717    ttl: str
 718    previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
 719    version: t.Optional[str] = None
 720    dev_version_: t.Optional[str] = Field(default=None, alias="dev_version")
 721    change_category: t.Optional[SnapshotChangeCategory] = None
 722    unpaused_ts: t.Optional[int] = None
 723    effective_from: t.Optional[TimeLike] = None
 724    migrated: bool = False
 725    unrestorable: bool = False
 726    # Added to support Migration # 34 (default catalog)
 727    base_table_name_override: t.Optional[str] = None
 728    next_auto_restatement_ts: t.Optional[int] = None
 729    dev_table_suffix: str = "dev"
 730    table_naming_convention: TableNamingConvention = TableNamingConvention.default
 731    forward_only: bool = False
 732    # Physical table last modified timestamp, not to be confused with the "updated_ts" field
 733    # which is for the snapshot record itself
 734    last_altered_ts: t.Optional[int] = None
 735    dev_last_altered_ts: t.Optional[int] = None
 736
 737    @field_validator("ttl")
 738    @classmethod
 739    def _time_delta_must_be_positive(cls, v: str) -> str:
 740        current_time = now()
 741        if to_datetime(v, current_time) < current_time:
 742            raise ValueError(
 743                "Must be positive. Use the 'in' keyword to denote a positive time interval. For example, 'in 7 days'."
 744            )
 745        return v
 746
 747    @staticmethod
 748    def hydrate_with_intervals_by_version(
 749        snapshots: t.Iterable[Snapshot],
 750        intervals: t.Iterable[SnapshotIntervals],
 751    ) -> t.List[Snapshot]:
 752        """Hydrates target snapshots with given intervals.
 753
 754        This will match snapshots with intervals by name and version rather than identifiers.
 755
 756        Args:
 757            snapshots: Target snapshots.
 758            intervals: Target snapshot intervals.
 759
 760        Returns:
 761            List of target snapshots with hydrated intervals.
 762        """
 763        intervals_by_name_version = defaultdict(list)
 764        for interval in intervals:
 765            intervals_by_name_version[(interval.name, interval.version)].append(interval)
 766
 767        result = []
 768        for snapshot in snapshots:
 769            snapshot_intervals = intervals_by_name_version.get(
 770                (snapshot.name, snapshot.version_get_or_generate()), []
 771            )
 772            for interval in snapshot_intervals:
 773                snapshot.merge_intervals(interval)
 774
 775            result.append(snapshot)
 776
 777        return result
 778
 779    @classmethod
 780    def from_node(
 781        cls,
 782        node: Node,
 783        *,
 784        nodes: t.Dict[str, Node],
 785        ttl: str = c.DEFAULT_SNAPSHOT_TTL,
 786        version: t.Optional[str] = None,
 787        cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
 788        table_naming_convention: TableNamingConvention = TableNamingConvention.default,
 789    ) -> Snapshot:
 790        """Creates a new snapshot for a node.
 791
 792        Args:
 793            Node: Node to snapshot.
 794            nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes.
 795                If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
 796            ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live.
 797            version: The version that a snapshot is associated with. Usually set during the planning phase.
 798            cache: Cache of node name to fingerprints.
 799            table_naming_convention: Convention to follow when generating the physical table name
 800
 801        Returns:
 802            The newly created snapshot.
 803        """
 804        created_ts = now_timestamp()
 805
 806        return cls(
 807            name=node.fqn,
 808            fingerprint=fingerprint_from_node(
 809                node,
 810                nodes=nodes,
 811                cache=cache,
 812            ),
 813            node=node,
 814            parents=tuple(
 815                SnapshotId(
 816                    name=parent_node.fqn,
 817                    identifier=fingerprint_from_node(
 818                        parent_node,
 819                        nodes=nodes,
 820                        cache=cache,
 821                    ).to_identifier(),
 822                )
 823                for parent_node in _parents_from_node(node, nodes).values()
 824            ),
 825            intervals=[],
 826            dev_intervals=[],
 827            created_ts=created_ts,
 828            updated_ts=created_ts,
 829            ttl=ttl,
 830            version=version,
 831            table_naming_convention=table_naming_convention,
 832        )
 833
 834    def __eq__(self, other: t.Any) -> bool:
 835        return isinstance(other, Snapshot) and self.fingerprint == other.fingerprint
 836
 837    def __hash__(self) -> int:
 838        return hash((self.__class__, self.name, self.fingerprint))
 839
 840    def __lt__(self, other: Snapshot) -> bool:
 841        return self.name < other.name
 842
 843    def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) -> None:
 844        """Add a newly processed time interval to the snapshot.
 845
 846        The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch
 847        timestamp exclusive. This allows merging of ranges to be easier.
 848
 849        Args:
 850            start: The start date/time of the interval (inclusive)
 851            end: The end date/time of the interval. If end is a date, then it is considered inclusive.
 852                If it is a datetime object, then it is exclusive.
 853            is_dev: Indicates whether the given interval is being added while in development mode.
 854        """
 855        if to_timestamp(start) > to_timestamp(end):
 856            raise ValueError(
 857                f"Attempted to add an Invalid interval ({start}, {end}) to snapshot {self.snapshot_id}"
 858            )
 859
 860        start_ts, end_ts = self.inclusive_exclusive(start, end, strict=False, expand=False)
 861
 862        if start_ts >= end_ts:
 863            # Skipping partial interval.
 864            return
 865
 866        intervals = self.dev_intervals if is_dev else self.intervals
 867        intervals.append((start_ts, end_ts))
 868
 869        if len(intervals) < 2:
 870            return
 871
 872        merged_intervals = merge_intervals(intervals)
 873        if is_dev:
 874            self.dev_intervals = merged_intervals
 875        else:
 876            self.intervals = merged_intervals
 877
 878    def remove_interval(self, interval: Interval) -> None:
 879        """Remove an interval from the snapshot.
 880
 881        Args:
 882            interval: The interval to remove.
 883        """
 884        self.intervals = remove_interval(self.intervals, *interval)
 885        self.dev_intervals = remove_interval(self.dev_intervals, *interval)
 886
 887    def get_removal_interval(
 888        self,
 889        start: TimeLike,
 890        end: TimeLike,
 891        execution_time: t.Optional[TimeLike] = None,
 892        *,
 893        strict: bool = True,
 894        is_preview: bool = False,
 895    ) -> Interval:
 896        """Get the interval that should be removed from the snapshot.
 897
 898        Args:
 899            start: The start date/time of the interval to remove.
 900            end: The end date/time of the interval to removed.
 901            execution_time: The time the interval is being removed.
 902            strict: Whether to fail when the inclusive start is the same as the exclusive end.
 903            is_preview: Whether the interval needs to be removed for a preview of forward-only changes.
 904                When previewing, we are not actually restating a model, but removing an interval to trigger
 905                a run.
 906        """
 907        end = execution_time or now_timestamp() if self.depends_on_past else end
 908        removal_interval = self.inclusive_exclusive(start, end, strict)
 909
 910        if not is_preview and self.full_history_restatement_only and self.intervals:
 911            expanded_removal_interval = self.inclusive_exclusive(self.intervals[0][0], end, strict)
 912            requested_start, requested_end = removal_interval
 913            expanded_start, expanded_end = expanded_removal_interval
 914
 915            # only warn if the requested removal interval was a subset of the actual model intervals and was automatically expanded
 916            # if the requested interval was the same or wider than the actual model intervals, no need to warn
 917            if (
 918                requested_start > expanded_start or requested_end < expanded_end
 919            ) and self.is_incremental:
 920                from sqlmesh.core.console import get_console
 921
 922                get_console().log_warning(
 923                    f"Model '{self.model.name}' is '{self.model_kind_name}' which does not support partial restatement.\n"
 924                    f"Expanding the requested restatement intervals from [{to_ts(requested_start)} - {to_ts(requested_end)}] "
 925                    f"to [{to_ts(expanded_start)} - {to_ts(expanded_end)}] in order to fully restate the model."
 926                )
 927
 928            removal_interval = expanded_removal_interval
 929
 930        return removal_interval
 931
 932    @property
 933    def allow_partials(self) -> bool:
 934        return self.is_model and self.model.allow_partials
 935
 936    def inclusive_exclusive(
 937        self,
 938        start: TimeLike,
 939        end: TimeLike,
 940        strict: bool = True,
 941        allow_partial: t.Optional[bool] = None,
 942        expand: bool = True,
 943    ) -> Interval:
 944        """Transform the inclusive start and end into a [start, end) pair.
 945
 946        Args:
 947            start: The start date/time of the interval (inclusive)
 948            end: The end date/time of the interval (inclusive)
 949            strict: Whether to fail when the inclusive start is the same as the exclusive end.
 950            allow_partial: Whether the interval can be partial or not.
 951            expand: Whether or not partial intervals are expanded outwards.
 952
 953        Returns:
 954            A [start, end) pair.
 955        """
 956        return inclusive_exclusive(
 957            start,
 958            end,
 959            self.node.interval_unit,
 960            strict=strict,
 961            allow_partial=self.allow_partials if allow_partial is None else allow_partial,
 962            expand=expand,
 963        )
 964
 965    def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
 966        """Inherits intervals from the target snapshot.
 967
 968        Args:
 969            other: The target snapshot to inherit intervals from.
 970        """
 971        effective_from_ts = self.normalized_effective_from_ts or 0
 972        apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
 973        for start, end in other.intervals:
 974            # If the effective_from is set, then intervals that come after it must come from
 975            # the current snapshots.
 976            if apply_effective_from and start < effective_from_ts:
 977                end = min(end, effective_from_ts)
 978            if not apply_effective_from or end <= effective_from_ts:
 979                self.add_interval(start, end)
 980
 981        if other.last_altered_ts:
 982            self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts)
 983
 984        if self.dev_version == other.dev_version:
 985            # Merge dev intervals if the dev versions match which would mean
 986            # that this and the other snapshot are pointing to the same dev table.
 987            for start, end in other.dev_intervals:
 988                self.add_interval(start, end, is_dev=True)
 989
 990            if other.dev_last_altered_ts:
 991                self.dev_last_altered_ts = max(
 992                    self.dev_last_altered_ts or 0, other.dev_last_altered_ts
 993                )
 994
 995        self.pending_restatement_intervals = merge_intervals(
 996            [*self.pending_restatement_intervals, *other.pending_restatement_intervals]
 997        )
 998
 999    @property
1000    def evaluatable(self) -> bool:
1001        """Whether or not a snapshot should be evaluated and have intervals."""
1002        return bool(not self.is_symbolic or self.model.audits)
1003
1004    def missing_intervals(
1005        self,
1006        start: TimeLike,
1007        end: TimeLike,
1008        execution_time: t.Optional[TimeLike] = None,
1009        deployability_index: t.Optional[DeployabilityIndex] = None,
1010        ignore_cron: bool = False,
1011        end_bounded: bool = False,
1012    ) -> Intervals:
1013        """Find all missing intervals between [start, end].
1014
1015        Although the inputs are inclusive, the returned stored intervals are
1016        [start_ts, end_ts) or start epoch timestamp inclusive and end epoch
1017        timestamp exclusive.
1018
1019        Args:
1020            start: The start date/time of the interval (inclusive)
1021            end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise)
1022            execution_time: The date/time time reference to use for execution time. Defaults to now.
1023            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1024            ignore_cron: Whether to ignore the node's cron schedule.
1025            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1026                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1027
1028        Returns:
1029            A list of all the missing intervals as epoch timestamps.
1030        """
1031        # If the node says that it has an end, and we are wanting to load past it, then we can return no empty intervals
1032        # Also if a node's start is after the end of the range we are checking then we can return no empty intervals
1033        if (self.node.end and to_datetime(start) > to_datetime(self.node.end)) or (
1034            self.node.start and to_datetime(self.node.start) > to_datetime(end)
1035        ):
1036            return []
1037        if self.node.start and to_datetime(start) < to_datetime(self.node.start):
1038            start = self.node.start
1039        # If the amount of time being checked is less than the size of a single interval then we
1040        # know that there can't being missing intervals within that range and return
1041        validate_date_range(start, end)
1042
1043        if (
1044            not is_date(end)
1045            and not self.allow_partials
1046            and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds
1047        ):
1048            return []
1049
1050        deployability_index = deployability_index or DeployabilityIndex.all_deployable()
1051        intervals = (
1052            self.intervals if deployability_index.is_representative(self) else self.dev_intervals
1053        )
1054
1055        if not self.evaluatable or (self.is_seed and intervals):
1056            return []
1057
1058        start_ts, end_ts = (to_timestamp(ts) for ts in self.inclusive_exclusive(start, end))
1059
1060        interval_unit = self.node.interval_unit
1061        execution_time_ts = to_timestamp(execution_time) if execution_time else now_timestamp()
1062        upper_bound_ts = (
1063            execution_time_ts
1064            if ignore_cron
1065            else to_timestamp(self.node.cron_floor(execution_time_ts))
1066        )
1067        if end_bounded:
1068            upper_bound_ts = min(upper_bound_ts, end_ts)
1069        if not self.allow_partials:
1070            upper_bound_ts = to_timestamp(interval_unit.cron_floor(upper_bound_ts))
1071
1072        end_ts = min(end_ts, upper_bound_ts)
1073
1074        lookback = 0
1075        model_end_ts: t.Optional[int] = None
1076
1077        if self.is_model:
1078            lookback = self.model.lookback
1079            model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None
1080
1081        return compute_missing_intervals(
1082            interval_unit,
1083            tuple(intervals),
1084            start_ts,
1085            end_ts,
1086            lookback,
1087            model_end_ts,
1088        )
1089
1090    def check_ready_intervals(
1091        self,
1092        intervals: Intervals,
1093        context: ExecutionContext,
1094    ) -> Intervals:
1095        """Returns a list of intervals that are considered ready by the provided signal.
1096
1097        Note that this will handle gaps in the provided intervals. The returned intervals
1098        may introduce new gaps.
1099        """
1100        signals = self.is_model and self.model.render_signal_calls()
1101        if not signals:
1102            return intervals
1103
1104        for signal_name, kwargs in signals.signals_to_kwargs.items():
1105            try:
1106                intervals = check_ready_intervals(
1107                    signals.prepared_python_env[signal_name],
1108                    intervals,
1109                    context,
1110                    python_env=signals.python_env,
1111                    dialect=self.model.dialect,
1112                    path=self.model._path,
1113                    snapshot=self,
1114                    kwargs=kwargs,
1115                )
1116            except SQLMeshError as e:
1117                raise SignalEvalError(
1118                    f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}"
1119                )
1120        return intervals
1121
1122    def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
1123        """Assigns the given category to this snapshot.
1124
1125        Args:
1126            category: The change category to assign to this snapshot.
1127            forward_only: Whether or not this snapshot is applied going forward in production.
1128        """
1129        assert category != SnapshotChangeCategory.FORWARD_ONLY, (
1130            "FORWARD_ONLY change category is deprecated"
1131        )
1132
1133        self.dev_version_ = self.fingerprint.to_version()
1134        is_no_rebuild = forward_only or category in (
1135            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
1136            SnapshotChangeCategory.METADATA,
1137        )
1138        if self.is_model and not self.virtual_environment_mode.is_full:
1139            # Hardcode the version if the virtual environment is not fully enabled.
1140            self.version = "novde"
1141        elif self.is_model and self.model.physical_version:
1142            # If the model has a pinned version then use that.
1143            self.version = self.model.physical_version
1144        elif is_no_rebuild and self.previous_version:
1145            self.version = self.previous_version.data_version.version
1146        elif self.is_model and self.model.forward_only and not self.previous_version:
1147            # If this is a new model then use a deterministic version, independent of the fingerprint.
1148            self.version = hash_data([self.name, *self.model.kind.data_hash_values])
1149        else:
1150            self.version = self.fingerprint.to_version()
1151
1152        if is_no_rebuild and self.previous_version:
1153            previous_version = self.previous_version
1154            self.physical_schema_ = previous_version.physical_schema
1155            self.table_naming_convention = previous_version.table_naming_convention
1156            if self.is_materialized and (category.is_indirect_non_breaking or category.is_metadata):
1157                # Reuse the dev table for indirect non-breaking changes.
1158                self.dev_version_ = (
1159                    previous_version.data_version.dev_version
1160                    or previous_version.fingerprint.to_version()
1161                )
1162                self.dev_table_suffix = previous_version.data_version.dev_table_suffix
1163
1164        self.change_category = category
1165        self.forward_only = forward_only
1166
1167    @property
1168    def categorized(self) -> bool:
1169        """Whether the snapshot has been categorized."""
1170        return self.change_category is not None and self.version is not None
1171
1172    def table_name(self, is_deployable: bool = True) -> str:
1173        """Full table name pointing to the materialized location of the snapshot.
1174
1175        Args:
1176            is_deployable: Indicates whether to return the table name for deployment to production.
1177        """
1178        self._ensure_categorized()
1179        assert self.version
1180        return self._table_name(self.version, is_deployable)
1181
1182    def version_get_or_generate(self) -> str:
1183        """Helper method to get the version or generate it from the fingerprint."""
1184        return self.version or self.fingerprint.to_version()
1185
1186    def is_valid_start(
1187        self,
1188        start: t.Optional[TimeLike],
1189        snapshot_start: TimeLike,
1190        execution_time: t.Optional[TimeLike] = None,
1191    ) -> bool:
1192        """Checks if the given start and end are valid for this snapshot.
1193        Args:
1194            start: The start date/time of the interval (inclusive)
1195            snapshot_start: The start date/time of the snapshot (inclusive)
1196        """
1197        # The snapshot may not have a start defined. If so we use the provided snapshot start.
1198        if self.depends_on_past and start:
1199            interval_unit = self.node.interval_unit
1200            start_ts = to_timestamp(interval_unit.cron_floor(start))
1201
1202            if not self.intervals:
1203                # The start date must be aligned by the interval unit.
1204                snapshot_start_ts = to_timestamp(interval_unit.cron_floor(snapshot_start))
1205                if snapshot_start_ts < to_timestamp(snapshot_start):
1206                    snapshot_start_ts = to_timestamp(interval_unit.cron_next(snapshot_start_ts))
1207                return snapshot_start_ts >= start_ts
1208            # Make sure that if there are missing intervals for this snapshot that they all occur at or after the
1209            # provided start_ts. Otherwise we know that we are doing a non-contiguous load and therefore this is not
1210            # a valid start.
1211            missing_intervals = self.missing_intervals(
1212                snapshot_start, now(), execution_time=execution_time
1213            )
1214            earliest_interval = missing_intervals[0][0] if missing_intervals else None
1215            if earliest_interval:
1216                return earliest_interval >= start_ts
1217        return True
1218
1219    def get_latest(self, default: t.Optional[TimeLike] = None) -> t.Optional[TimeLike]:
1220        """The latest interval loaded for the snapshot. Default is used if intervals are not defined"""
1221
1222        def to_end_date(end: int, unit: IntervalUnit) -> TimeLike:
1223            if unit.is_day:
1224                return to_date(make_inclusive_end(end))
1225            return end
1226
1227        return (
1228            to_end_date(to_timestamp(self.intervals[-1][1]), self.node.interval_unit)
1229            if self.intervals
1230            else default
1231        )
1232
1233    def needs_destructive_check(
1234        self,
1235        allow_destructive_snapshots: t.Set[str],
1236    ) -> bool:
1237        return (
1238            self.is_model
1239            and not self.model.on_destructive_change.is_allow
1240            and self.name not in allow_destructive_snapshots
1241        )
1242
1243    def needs_additive_check(
1244        self,
1245        allow_additive_snapshots: t.Set[str],
1246    ) -> bool:
1247        return (
1248            self.is_model
1249            and not self.model.on_additive_change.is_allow
1250            and self.name not in allow_additive_snapshots
1251        )
1252
1253    def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]:
1254        """Returns the next auto restatement interval for the snapshot.
1255
1256        Args:
1257            execution_time: The execution time to use for the restatement.
1258
1259        Returns:
1260            The interval that needs to be restated or None if no restatement is needed.
1261        """
1262        if (
1263            not self.is_model
1264            or not self.intervals
1265            or not self.model.auto_restatement_cron
1266            or self.model.disable_restatement
1267        ):
1268            return None
1269
1270        execution_time_ts = to_timestamp(execution_time)
1271        next_auto_restatement_ts = self.next_auto_restatement_ts or to_timestamp(
1272            self.model.auto_restatement_croniter(self.created_ts).get_next(estimate=False)
1273        )
1274        if execution_time_ts < next_auto_restatement_ts:
1275            return None
1276
1277        num_intervals_to_restate = self.model.auto_restatement_intervals
1278        if num_intervals_to_restate is None:
1279            return (self.intervals[0][0], self.intervals[-1][1])
1280
1281        auto_restatement_end_ts = to_timestamp(
1282            self.node.interval_unit.cron_floor(execution_time_ts)
1283        )
1284        auto_restatement_start_ts = (
1285            auto_restatement_end_ts
1286            - num_intervals_to_restate * self.node.interval_unit.milliseconds
1287        )
1288        return (auto_restatement_start_ts, auto_restatement_end_ts)
1289
1290    def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optional[int]:
1291        """Updates the next auto restatement timestamp.
1292
1293        Args:
1294            execution_time: The execution time to use for the restatement.
1295
1296        Returns:
1297            The next auto restatement timestamp or None if not applicable.
1298        """
1299        if (
1300            not self.is_model
1301            or not self.model.auto_restatement_cron
1302            or self.model.disable_restatement
1303        ):
1304            self.next_auto_restatement_ts = None
1305        else:
1306            self.next_auto_restatement_ts = to_timestamp(
1307                self.model.auto_restatement_croniter(execution_time).get_next(estimate=False)
1308            )
1309        return self.next_auto_restatement_ts
1310
1311    def apply_pending_restatement_intervals(self) -> None:
1312        """Applies the pending restatement intervals to the snapshot's intervals."""
1313        if not self.is_model or self.model.disable_restatement:
1314            return
1315        for pending_restatement_interval in self.pending_restatement_intervals:
1316            logger.info(
1317                "Applying the auto restated interval (%s, %s) to snapshot %s",
1318                time_like_to_str(pending_restatement_interval[0]),
1319                time_like_to_str(pending_restatement_interval[1]),
1320                self.snapshot_id,
1321            )
1322            self.intervals = remove_interval(self.intervals, *pending_restatement_interval)
1323
1324    def is_directly_modified(self, other: Snapshot) -> bool:
1325        """Returns whether or not this snapshot is directly modified in relation to the other snapshot."""
1326        return self.node.is_data_change(other.node)
1327
1328    def is_indirectly_modified(self, other: Snapshot) -> bool:
1329        """Returns whether or not this snapshot is indirectly modified in relation to the other snapshot."""
1330        return (
1331            self.fingerprint.parent_data_hash != other.fingerprint.parent_data_hash
1332            and not self.node.is_data_change(other.node)
1333        )
1334
1335    def is_metadata_updated(self, other: Snapshot) -> bool:
1336        """Returns whether or not this snapshot contains metadata changes in relation to the other snapshot."""
1337        return self.fingerprint.metadata_hash != other.fingerprint.metadata_hash
1338
1339    @property
1340    def physical_schema(self) -> str:
1341        if self.physical_schema_ is not None:
1342            return self.physical_schema_
1343        return self.model.physical_schema if self.is_model else ""
1344
1345    @property
1346    def table_info(self) -> SnapshotTableInfo:
1347        """Helper method to get the SnapshotTableInfo from the Snapshot."""
1348        self._ensure_categorized()
1349
1350        custom_materialization = (
1351            self.model.kind.materialization
1352            if self.is_model and isinstance(self.model.kind, CustomKind)
1353            else None
1354        )
1355
1356        return SnapshotTableInfo(
1357            physical_schema=self.physical_schema,
1358            name=self.name,
1359            fingerprint=self.fingerprint,
1360            version=self.version,
1361            dev_version=self.dev_version,
1362            parents=self.parents,
1363            previous_versions=self.previous_versions,
1364            change_category=self.change_category,
1365            kind_name=self.model_kind_name,
1366            node_type=self.node_type,
1367            custom_materialization=custom_materialization,
1368            dev_table_suffix=self.dev_table_suffix,
1369            model_gateway=self.model_gateway,
1370            table_naming_convention=self.table_naming_convention,  # type: ignore
1371            forward_only=self.forward_only,
1372            virtual_environment_mode=self.virtual_environment_mode,
1373        )
1374
1375    @property
1376    def data_version(self) -> SnapshotDataVersion:
1377        self._ensure_categorized()
1378        return SnapshotDataVersion(
1379            fingerprint=self.fingerprint,
1380            version=self.version,
1381            dev_version=self.dev_version,
1382            change_category=self.change_category,
1383            physical_schema=self.physical_schema,
1384            dev_table_suffix=self.dev_table_suffix,
1385            table_naming_convention=self.table_naming_convention,
1386            virtual_environment_mode=self.virtual_environment_mode,
1387        )
1388
1389    @property
1390    def snapshot_intervals(self) -> SnapshotIntervals:
1391        self._ensure_categorized()
1392        return SnapshotIntervals(
1393            name=self.name,
1394            identifier=self.identifier,
1395            version=self.version,
1396            dev_version=self.dev_version,
1397            intervals=self.intervals.copy(),
1398            dev_intervals=self.dev_intervals.copy(),
1399            pending_restatement_intervals=self.pending_restatement_intervals.copy(),
1400        )
1401
1402    @property
1403    def is_materialized_view(self) -> bool:
1404        """Returns whether or not this snapshot's model represents a materialized view."""
1405        return (
1406            self.is_model and isinstance(self.model.kind, ViewKind) and self.model.kind.materialized
1407        )
1408
1409    @property
1410    def is_new_version(self) -> bool:
1411        """Returns whether or not this version is new and requires a backfill."""
1412        self._ensure_categorized()
1413        return self.fingerprint.to_version() == self.version
1414
1415    @property
1416    def is_paused(self) -> bool:
1417        return self.unpaused_ts is None
1418
1419    @property
1420    def is_paused_forward_only(self) -> bool:
1421        return self.is_paused and self.is_forward_only
1422
1423    @property
1424    def normalized_effective_from_ts(self) -> t.Optional[int]:
1425        return (
1426            to_timestamp(self.node.interval_unit.cron_floor(self.effective_from))
1427            if self.effective_from
1428            else None
1429        )
1430
1431    @property
1432    def model_kind_name(self) -> t.Optional[ModelKindName]:
1433        return self.model.kind.name if self.is_model else None
1434
1435    @property
1436    def node_type(self) -> NodeType:
1437        if self.node.is_model:
1438            return NodeType.MODEL
1439        if self.node.is_audit:
1440            return NodeType.AUDIT
1441        raise SQLMeshError(f"Snapshot {self.snapshot_id} has an unknown node type.")
1442
1443    @property
1444    def model(self) -> Model:
1445        model = self.model_or_none
1446        if model:
1447            return model
1448        raise SQLMeshError(f"Snapshot {self.snapshot_id} is not a model snapshot.")
1449
1450    @property
1451    def model_or_none(self) -> t.Optional[Model]:
1452        if self.is_model:
1453            return t.cast(Model, self.node)
1454        return None
1455
1456    @property
1457    def model_gateway(self) -> t.Optional[str]:
1458        return self.model.gateway if self.is_model else None
1459
1460    @property
1461    def audit(self) -> StandaloneAudit:
1462        if self.is_audit:
1463            return t.cast(StandaloneAudit, self.node)
1464        raise SQLMeshError(f"Snapshot {self.snapshot_id} is not an audit snapshot.")
1465
1466    @property
1467    def depends_on_past(self) -> bool:
1468        """Whether or not this models depends on past intervals to be populated before loading following intervals.
1469
1470        This represents a superset of the following types of models:
1471        1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially.
1472           An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself.
1473        2. Models that can only be restated from the beginning of history *and* their interval batches must be processed sequentially.
1474        """
1475        return self.depends_on_self or self.full_history_restatement_only
1476
1477    @property
1478    def depends_on_self(self) -> bool:
1479        """Whether or not this models depends on self."""
1480        return self.is_model and self.model.depends_on_self
1481
1482    @property
1483    def name_version(self) -> SnapshotNameVersion:
1484        """Returns the name and version of the snapshot."""
1485        return SnapshotNameVersion(name=self.name, version=self.version)
1486
1487    @property
1488    def id_and_version(self) -> SnapshotIdAndVersion:
1489        return self.table_info.id_and_version
1490
1491    @property
1492    def disable_restatement(self) -> bool:
1493        """Is restatement disabled for the node"""
1494        return self.is_model and self.model.disable_restatement
1495
1496    @cached_property
1497    def fully_qualified_table(self) -> t.Optional[exp.Table]:
1498        if not self.is_model:
1499            return None
1500        return t.cast(Model, self.node).fully_qualified_table
1501
1502    @property
1503    def expiration_ts(self) -> int:
1504        return to_timestamp(
1505            self.ttl,
1506            relative_base=to_datetime(self.updated_ts),
1507            check_categorical_relative_expression=False,
1508        )
1509
1510    @property
1511    def supports_schema_migration_in_prod(self) -> bool:
1512        """Returns whether or not this snapshot supports schema migration when deployed to production."""
1513        return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed
1514
1515    @property
1516    def requires_schema_migration_in_prod(self) -> bool:
1517        """Returns whether or not this snapshot requires a schema migration when deployed to production."""
1518        return self.supports_schema_migration_in_prod and (
1519            (self.previous_version and self.previous_version.version == self.version)
1520            or self.model.forward_only
1521            or bool(self.model.physical_version)
1522            or not self.virtual_environment_mode.is_full
1523        )
1524
1525    @property
1526    def ttl_ms(self) -> int:
1527        return self.expiration_ts - self.updated_ts
1528
1529    @property
1530    def custom_materialization(self) -> t.Optional[str]:
1531        if self.is_custom:
1532            return t.cast(CustomKind, self.model.kind).materialization
1533        return None
1534
1535    @property
1536    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
1537        return (
1538            self.model.virtual_environment_mode if self.is_model else VirtualEnvironmentMode.default
1539        )
1540
1541    def _ensure_categorized(self) -> None:
1542        if not self.change_category:
1543            raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been categorized yet.")
1544        if not self.version:
1545            raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been versioned yet.")
1546
1547    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
1548        state = super().__getstate__()
1549        state["__dict__"] = state["__dict__"].copy()
1550        # Don't store intervals.
1551        state["__dict__"]["intervals"] = []
1552        state["__dict__"]["dev_intervals"] = []
1553        return state

A snapshot represents a node at a certain point in time.

Snapshots are used to encapsulate everything needed to evaluate a node. They are standalone objects that hold all state and dynamic content necessary to render a node's query including things like macros. Snapshots also store intervals (timestamp ranges for what data we've processed).

Nodes can be dynamically rendered due to macros. Rendering a node to its full extent requires storing variables and macro definitions. We store all of the macro definitions and global variable references in python_env in raw text to avoid pickling. The helper methods to achieve this are defined in utils.metaprogramming.

Arguments:
  • name: The snapshot name which is the same as the node name and should be unique per node.
  • fingerprint: A unique hash of the node definition so that nodes can be reused across environments.
  • node: Node object that the snapshot encapsulates.
  • parents: The list of parent snapshots (upstream dependencies).
  • intervals: List of [start, end) intervals showing which time ranges a snapshot has data for.
  • dev_intervals: List of [start, end) intervals showing development intervals (forward-only).
  • created_ts: Epoch millis timestamp when a snapshot was first created.
  • updated_ts: Epoch millis timestamp when a snapshot was last updated.
  • ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced in any environment.
  • previous: The snapshot data version that this snapshot was based on. If this snapshot is new, then previous will be None.
  • version: User specified version for a snapshot that is used for physical storage. By default, the version is the fingerprint, but not all changes to nodes require a backfill. If a user passes a previous version, that will be used instead and no backfill will be required.
  • change_category: User specified change category indicating which nodes require backfill from node changes made in this snapshot.
  • unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused.
  • effective_from: The timestamp which indicates when this snapshot should be considered effective. Applicable for forward-only snapshots only.
  • migrated: Whether or not this snapshot has been created as a result of migration.
  • unrestorable: Whether or not this snapshot can be used to revert its model to a previous version.
  • next_auto_restatement_ts: The timestamp which indicates when is the next time this snapshot should be restated.
  • table_naming_convention: Convention to follow when generating the physical table name
name: str
fingerprint: SnapshotFingerprint
physical_schema_: Optional[str]
parents: Tuple[SnapshotId, ...]
intervals: List[Tuple[int, int]]
dev_intervals: List[Tuple[int, int]]
pending_restatement_intervals: List[Tuple[int, int]]
created_ts: int
updated_ts: int
ttl: str
previous_versions: Tuple[SnapshotDataVersion, ...]
version: Optional[str]
dev_version_: Optional[str]
change_category: Optional[SnapshotChangeCategory]
unpaused_ts: Optional[int]
effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
migrated: bool
unrestorable: bool
base_table_name_override: Optional[str]
next_auto_restatement_ts: Optional[int]
dev_table_suffix: str
forward_only: bool
last_altered_ts: Optional[int]
dev_last_altered_ts: Optional[int]
@staticmethod
def hydrate_with_intervals_by_version( snapshots: Iterable[Snapshot], intervals: Iterable[SnapshotIntervals]) -> List[Snapshot]:
747    @staticmethod
748    def hydrate_with_intervals_by_version(
749        snapshots: t.Iterable[Snapshot],
750        intervals: t.Iterable[SnapshotIntervals],
751    ) -> t.List[Snapshot]:
752        """Hydrates target snapshots with given intervals.
753
754        This will match snapshots with intervals by name and version rather than identifiers.
755
756        Args:
757            snapshots: Target snapshots.
758            intervals: Target snapshot intervals.
759
760        Returns:
761            List of target snapshots with hydrated intervals.
762        """
763        intervals_by_name_version = defaultdict(list)
764        for interval in intervals:
765            intervals_by_name_version[(interval.name, interval.version)].append(interval)
766
767        result = []
768        for snapshot in snapshots:
769            snapshot_intervals = intervals_by_name_version.get(
770                (snapshot.name, snapshot.version_get_or_generate()), []
771            )
772            for interval in snapshot_intervals:
773                snapshot.merge_intervals(interval)
774
775            result.append(snapshot)
776
777        return result

Hydrates target snapshots with given intervals.

This will match snapshots with intervals by name and version rather than identifiers.

Arguments:
  • snapshots: Target snapshots.
  • intervals: Target snapshot intervals.
Returns:

List of target snapshots with hydrated intervals.

@classmethod
def from_node( cls, node: Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, sqlmesh.core.audit.definition.StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')], *, nodes: Dict[str, Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, sqlmesh.core.audit.definition.StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')]], ttl: str = 'in 1 week', version: Optional[str] = None, cache: Optional[Dict[str, SnapshotFingerprint]] = None, table_naming_convention: sqlmesh.core.config.common.TableNamingConvention = SCHEMA_AND_TABLE) -> Snapshot:
779    @classmethod
780    def from_node(
781        cls,
782        node: Node,
783        *,
784        nodes: t.Dict[str, Node],
785        ttl: str = c.DEFAULT_SNAPSHOT_TTL,
786        version: t.Optional[str] = None,
787        cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
788        table_naming_convention: TableNamingConvention = TableNamingConvention.default,
789    ) -> Snapshot:
790        """Creates a new snapshot for a node.
791
792        Args:
793            Node: Node to snapshot.
794            nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes.
795                If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
796            ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live.
797            version: The version that a snapshot is associated with. Usually set during the planning phase.
798            cache: Cache of node name to fingerprints.
799            table_naming_convention: Convention to follow when generating the physical table name
800
801        Returns:
802            The newly created snapshot.
803        """
804        created_ts = now_timestamp()
805
806        return cls(
807            name=node.fqn,
808            fingerprint=fingerprint_from_node(
809                node,
810                nodes=nodes,
811                cache=cache,
812            ),
813            node=node,
814            parents=tuple(
815                SnapshotId(
816                    name=parent_node.fqn,
817                    identifier=fingerprint_from_node(
818                        parent_node,
819                        nodes=nodes,
820                        cache=cache,
821                    ).to_identifier(),
822                )
823                for parent_node in _parents_from_node(node, nodes).values()
824            ),
825            intervals=[],
826            dev_intervals=[],
827            created_ts=created_ts,
828            updated_ts=created_ts,
829            ttl=ttl,
830            version=version,
831            table_naming_convention=table_naming_convention,
832        )

Creates a new snapshot for a node.

Arguments:
  • Node: Node to snapshot.
  • nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
  • ttl: A TTL to determine how long orphaned (snapshots that are not promoted anywhere) should live.
  • version: The version that a snapshot is associated with. Usually set during the planning phase.
  • cache: Cache of node name to fingerprints.
  • table_naming_convention: Convention to follow when generating the physical table name
Returns:

The newly created snapshot.

def add_interval( self, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], is_dev: bool = False) -> None:
843    def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) -> None:
844        """Add a newly processed time interval to the snapshot.
845
846        The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch
847        timestamp exclusive. This allows merging of ranges to be easier.
848
849        Args:
850            start: The start date/time of the interval (inclusive)
851            end: The end date/time of the interval. If end is a date, then it is considered inclusive.
852                If it is a datetime object, then it is exclusive.
853            is_dev: Indicates whether the given interval is being added while in development mode.
854        """
855        if to_timestamp(start) > to_timestamp(end):
856            raise ValueError(
857                f"Attempted to add an Invalid interval ({start}, {end}) to snapshot {self.snapshot_id}"
858            )
859
860        start_ts, end_ts = self.inclusive_exclusive(start, end, strict=False, expand=False)
861
862        if start_ts >= end_ts:
863            # Skipping partial interval.
864            return
865
866        intervals = self.dev_intervals if is_dev else self.intervals
867        intervals.append((start_ts, end_ts))
868
869        if len(intervals) < 2:
870            return
871
872        merged_intervals = merge_intervals(intervals)
873        if is_dev:
874            self.dev_intervals = merged_intervals
875        else:
876            self.intervals = merged_intervals

Add a newly processed time interval to the snapshot.

The actual stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch timestamp exclusive. This allows merging of ranges to be easier.

Arguments:
  • start: The start date/time of the interval (inclusive)
  • end: The end date/time of the interval. If end is a date, then it is considered inclusive. If it is a datetime object, then it is exclusive.
  • is_dev: Indicates whether the given interval is being added while in development mode.
def remove_interval(self, interval: Tuple[int, int]) -> None:
878    def remove_interval(self, interval: Interval) -> None:
879        """Remove an interval from the snapshot.
880
881        Args:
882            interval: The interval to remove.
883        """
884        self.intervals = remove_interval(self.intervals, *interval)
885        self.dev_intervals = remove_interval(self.dev_intervals, *interval)

Remove an interval from the snapshot.

Arguments:
  • interval: The interval to remove.
def get_removal_interval( self, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, *, strict: bool = True, is_preview: bool = False) -> Tuple[int, int]:
887    def get_removal_interval(
888        self,
889        start: TimeLike,
890        end: TimeLike,
891        execution_time: t.Optional[TimeLike] = None,
892        *,
893        strict: bool = True,
894        is_preview: bool = False,
895    ) -> Interval:
896        """Get the interval that should be removed from the snapshot.
897
898        Args:
899            start: The start date/time of the interval to remove.
900            end: The end date/time of the interval to removed.
901            execution_time: The time the interval is being removed.
902            strict: Whether to fail when the inclusive start is the same as the exclusive end.
903            is_preview: Whether the interval needs to be removed for a preview of forward-only changes.
904                When previewing, we are not actually restating a model, but removing an interval to trigger
905                a run.
906        """
907        end = execution_time or now_timestamp() if self.depends_on_past else end
908        removal_interval = self.inclusive_exclusive(start, end, strict)
909
910        if not is_preview and self.full_history_restatement_only and self.intervals:
911            expanded_removal_interval = self.inclusive_exclusive(self.intervals[0][0], end, strict)
912            requested_start, requested_end = removal_interval
913            expanded_start, expanded_end = expanded_removal_interval
914
915            # only warn if the requested removal interval was a subset of the actual model intervals and was automatically expanded
916            # if the requested interval was the same or wider than the actual model intervals, no need to warn
917            if (
918                requested_start > expanded_start or requested_end < expanded_end
919            ) and self.is_incremental:
920                from sqlmesh.core.console import get_console
921
922                get_console().log_warning(
923                    f"Model '{self.model.name}' is '{self.model_kind_name}' which does not support partial restatement.\n"
924                    f"Expanding the requested restatement intervals from [{to_ts(requested_start)} - {to_ts(requested_end)}] "
925                    f"to [{to_ts(expanded_start)} - {to_ts(expanded_end)}] in order to fully restate the model."
926                )
927
928            removal_interval = expanded_removal_interval
929
930        return removal_interval

Get the interval that should be removed from the snapshot.

Arguments:
  • start: The start date/time of the interval to remove.
  • end: The end date/time of the interval to removed.
  • execution_time: The time the interval is being removed.
  • strict: Whether to fail when the inclusive start is the same as the exclusive end.
  • is_preview: Whether the interval needs to be removed for a preview of forward-only changes. When previewing, we are not actually restating a model, but removing an interval to trigger a run.
allow_partials: bool
932    @property
933    def allow_partials(self) -> bool:
934        return self.is_model and self.model.allow_partials
def inclusive_exclusive( self, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], strict: bool = True, allow_partial: Optional[bool] = None, expand: bool = True) -> Tuple[int, int]:
936    def inclusive_exclusive(
937        self,
938        start: TimeLike,
939        end: TimeLike,
940        strict: bool = True,
941        allow_partial: t.Optional[bool] = None,
942        expand: bool = True,
943    ) -> Interval:
944        """Transform the inclusive start and end into a [start, end) pair.
945
946        Args:
947            start: The start date/time of the interval (inclusive)
948            end: The end date/time of the interval (inclusive)
949            strict: Whether to fail when the inclusive start is the same as the exclusive end.
950            allow_partial: Whether the interval can be partial or not.
951            expand: Whether or not partial intervals are expanded outwards.
952
953        Returns:
954            A [start, end) pair.
955        """
956        return inclusive_exclusive(
957            start,
958            end,
959            self.node.interval_unit,
960            strict=strict,
961            allow_partial=self.allow_partials if allow_partial is None else allow_partial,
962            expand=expand,
963        )

Transform the inclusive start and end into a [start, end) pair.

Arguments:
  • start: The start date/time of the interval (inclusive)
  • end: The end date/time of the interval (inclusive)
  • strict: Whether to fail when the inclusive start is the same as the exclusive end.
  • allow_partial: Whether the interval can be partial or not.
  • expand: Whether or not partial intervals are expanded outwards.
Returns:

A [start, end) pair.

def merge_intervals( self, other: Union[Snapshot, SnapshotIntervals]) -> None:
965    def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
966        """Inherits intervals from the target snapshot.
967
968        Args:
969            other: The target snapshot to inherit intervals from.
970        """
971        effective_from_ts = self.normalized_effective_from_ts or 0
972        apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
973        for start, end in other.intervals:
974            # If the effective_from is set, then intervals that come after it must come from
975            # the current snapshots.
976            if apply_effective_from and start < effective_from_ts:
977                end = min(end, effective_from_ts)
978            if not apply_effective_from or end <= effective_from_ts:
979                self.add_interval(start, end)
980
981        if other.last_altered_ts:
982            self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts)
983
984        if self.dev_version == other.dev_version:
985            # Merge dev intervals if the dev versions match which would mean
986            # that this and the other snapshot are pointing to the same dev table.
987            for start, end in other.dev_intervals:
988                self.add_interval(start, end, is_dev=True)
989
990            if other.dev_last_altered_ts:
991                self.dev_last_altered_ts = max(
992                    self.dev_last_altered_ts or 0, other.dev_last_altered_ts
993                )
994
995        self.pending_restatement_intervals = merge_intervals(
996            [*self.pending_restatement_intervals, *other.pending_restatement_intervals]
997        )

Inherits intervals from the target snapshot.

Arguments:
  • other: The target snapshot to inherit intervals from.
evaluatable: bool
 999    @property
1000    def evaluatable(self) -> bool:
1001        """Whether or not a snapshot should be evaluated and have intervals."""
1002        return bool(not self.is_symbolic or self.model.audits)

Whether or not a snapshot should be evaluated and have intervals.

def missing_intervals( self, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, deployability_index: Optional[DeployabilityIndex] = None, ignore_cron: bool = False, end_bounded: bool = False) -> List[Tuple[int, int]]:
1004    def missing_intervals(
1005        self,
1006        start: TimeLike,
1007        end: TimeLike,
1008        execution_time: t.Optional[TimeLike] = None,
1009        deployability_index: t.Optional[DeployabilityIndex] = None,
1010        ignore_cron: bool = False,
1011        end_bounded: bool = False,
1012    ) -> Intervals:
1013        """Find all missing intervals between [start, end].
1014
1015        Although the inputs are inclusive, the returned stored intervals are
1016        [start_ts, end_ts) or start epoch timestamp inclusive and end epoch
1017        timestamp exclusive.
1018
1019        Args:
1020            start: The start date/time of the interval (inclusive)
1021            end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise)
1022            execution_time: The date/time time reference to use for execution time. Defaults to now.
1023            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
1024            ignore_cron: Whether to ignore the node's cron schedule.
1025            end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
1026                allow_partials, and other attributes that could cause the intervals to exceed the target end date.
1027
1028        Returns:
1029            A list of all the missing intervals as epoch timestamps.
1030        """
1031        # If the node says that it has an end, and we are wanting to load past it, then we can return no empty intervals
1032        # Also if a node's start is after the end of the range we are checking then we can return no empty intervals
1033        if (self.node.end and to_datetime(start) > to_datetime(self.node.end)) or (
1034            self.node.start and to_datetime(self.node.start) > to_datetime(end)
1035        ):
1036            return []
1037        if self.node.start and to_datetime(start) < to_datetime(self.node.start):
1038            start = self.node.start
1039        # If the amount of time being checked is less than the size of a single interval then we
1040        # know that there can't being missing intervals within that range and return
1041        validate_date_range(start, end)
1042
1043        if (
1044            not is_date(end)
1045            and not self.allow_partials
1046            and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds
1047        ):
1048            return []
1049
1050        deployability_index = deployability_index or DeployabilityIndex.all_deployable()
1051        intervals = (
1052            self.intervals if deployability_index.is_representative(self) else self.dev_intervals
1053        )
1054
1055        if not self.evaluatable or (self.is_seed and intervals):
1056            return []
1057
1058        start_ts, end_ts = (to_timestamp(ts) for ts in self.inclusive_exclusive(start, end))
1059
1060        interval_unit = self.node.interval_unit
1061        execution_time_ts = to_timestamp(execution_time) if execution_time else now_timestamp()
1062        upper_bound_ts = (
1063            execution_time_ts
1064            if ignore_cron
1065            else to_timestamp(self.node.cron_floor(execution_time_ts))
1066        )
1067        if end_bounded:
1068            upper_bound_ts = min(upper_bound_ts, end_ts)
1069        if not self.allow_partials:
1070            upper_bound_ts = to_timestamp(interval_unit.cron_floor(upper_bound_ts))
1071
1072        end_ts = min(end_ts, upper_bound_ts)
1073
1074        lookback = 0
1075        model_end_ts: t.Optional[int] = None
1076
1077        if self.is_model:
1078            lookback = self.model.lookback
1079            model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None
1080
1081        return compute_missing_intervals(
1082            interval_unit,
1083            tuple(intervals),
1084            start_ts,
1085            end_ts,
1086            lookback,
1087            model_end_ts,
1088        )

Find all missing intervals between [start, end].

Although the inputs are inclusive, the returned stored intervals are [start_ts, end_ts) or start epoch timestamp inclusive and end epoch timestamp exclusive.

Arguments:
  • start: The start date/time of the interval (inclusive)
  • end: The end date/time of the interval (inclusive if the type is date, exclusive otherwise)
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • ignore_cron: Whether to ignore the node's cron schedule.
  • end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date.
Returns:

A list of all the missing intervals as epoch timestamps.

def check_ready_intervals( self, intervals: List[Tuple[int, int]], context: sqlmesh.core.context.ExecutionContext) -> List[Tuple[int, int]]:
1090    def check_ready_intervals(
1091        self,
1092        intervals: Intervals,
1093        context: ExecutionContext,
1094    ) -> Intervals:
1095        """Returns a list of intervals that are considered ready by the provided signal.
1096
1097        Note that this will handle gaps in the provided intervals. The returned intervals
1098        may introduce new gaps.
1099        """
1100        signals = self.is_model and self.model.render_signal_calls()
1101        if not signals:
1102            return intervals
1103
1104        for signal_name, kwargs in signals.signals_to_kwargs.items():
1105            try:
1106                intervals = check_ready_intervals(
1107                    signals.prepared_python_env[signal_name],
1108                    intervals,
1109                    context,
1110                    python_env=signals.python_env,
1111                    dialect=self.model.dialect,
1112                    path=self.model._path,
1113                    snapshot=self,
1114                    kwargs=kwargs,
1115                )
1116            except SQLMeshError as e:
1117                raise SignalEvalError(
1118                    f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}"
1119                )
1120        return intervals

Returns a list of intervals that are considered ready by the provided signal.

Note that this will handle gaps in the provided intervals. The returned intervals may introduce new gaps.

def categorize_as( self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
1122    def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
1123        """Assigns the given category to this snapshot.
1124
1125        Args:
1126            category: The change category to assign to this snapshot.
1127            forward_only: Whether or not this snapshot is applied going forward in production.
1128        """
1129        assert category != SnapshotChangeCategory.FORWARD_ONLY, (
1130            "FORWARD_ONLY change category is deprecated"
1131        )
1132
1133        self.dev_version_ = self.fingerprint.to_version()
1134        is_no_rebuild = forward_only or category in (
1135            SnapshotChangeCategory.INDIRECT_NON_BREAKING,
1136            SnapshotChangeCategory.METADATA,
1137        )
1138        if self.is_model and not self.virtual_environment_mode.is_full:
1139            # Hardcode the version if the virtual environment is not fully enabled.
1140            self.version = "novde"
1141        elif self.is_model and self.model.physical_version:
1142            # If the model has a pinned version then use that.
1143            self.version = self.model.physical_version
1144        elif is_no_rebuild and self.previous_version:
1145            self.version = self.previous_version.data_version.version
1146        elif self.is_model and self.model.forward_only and not self.previous_version:
1147            # If this is a new model then use a deterministic version, independent of the fingerprint.
1148            self.version = hash_data([self.name, *self.model.kind.data_hash_values])
1149        else:
1150            self.version = self.fingerprint.to_version()
1151
1152        if is_no_rebuild and self.previous_version:
1153            previous_version = self.previous_version
1154            self.physical_schema_ = previous_version.physical_schema
1155            self.table_naming_convention = previous_version.table_naming_convention
1156            if self.is_materialized and (category.is_indirect_non_breaking or category.is_metadata):
1157                # Reuse the dev table for indirect non-breaking changes.
1158                self.dev_version_ = (
1159                    previous_version.data_version.dev_version
1160                    or previous_version.fingerprint.to_version()
1161                )
1162                self.dev_table_suffix = previous_version.data_version.dev_table_suffix
1163
1164        self.change_category = category
1165        self.forward_only = forward_only

Assigns the given category to this snapshot.

Arguments:
  • category: The change category to assign to this snapshot.
  • forward_only: Whether or not this snapshot is applied going forward in production.
categorized: bool
1167    @property
1168    def categorized(self) -> bool:
1169        """Whether the snapshot has been categorized."""
1170        return self.change_category is not None and self.version is not None

Whether the snapshot has been categorized.

def table_name(self, is_deployable: bool = True) -> str:
1172    def table_name(self, is_deployable: bool = True) -> str:
1173        """Full table name pointing to the materialized location of the snapshot.
1174
1175        Args:
1176            is_deployable: Indicates whether to return the table name for deployment to production.
1177        """
1178        self._ensure_categorized()
1179        assert self.version
1180        return self._table_name(self.version, is_deployable)

Full table name pointing to the materialized location of the snapshot.

Arguments:
  • is_deployable: Indicates whether to return the table name for deployment to production.
def version_get_or_generate(self) -> str:
1182    def version_get_or_generate(self) -> str:
1183        """Helper method to get the version or generate it from the fingerprint."""
1184        return self.version or self.fingerprint.to_version()

Helper method to get the version or generate it from the fingerprint.

def is_valid_start( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType], snapshot_start: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> bool:
1186    def is_valid_start(
1187        self,
1188        start: t.Optional[TimeLike],
1189        snapshot_start: TimeLike,
1190        execution_time: t.Optional[TimeLike] = None,
1191    ) -> bool:
1192        """Checks if the given start and end are valid for this snapshot.
1193        Args:
1194            start: The start date/time of the interval (inclusive)
1195            snapshot_start: The start date/time of the snapshot (inclusive)
1196        """
1197        # The snapshot may not have a start defined. If so we use the provided snapshot start.
1198        if self.depends_on_past and start:
1199            interval_unit = self.node.interval_unit
1200            start_ts = to_timestamp(interval_unit.cron_floor(start))
1201
1202            if not self.intervals:
1203                # The start date must be aligned by the interval unit.
1204                snapshot_start_ts = to_timestamp(interval_unit.cron_floor(snapshot_start))
1205                if snapshot_start_ts < to_timestamp(snapshot_start):
1206                    snapshot_start_ts = to_timestamp(interval_unit.cron_next(snapshot_start_ts))
1207                return snapshot_start_ts >= start_ts
1208            # Make sure that if there are missing intervals for this snapshot that they all occur at or after the
1209            # provided start_ts. Otherwise we know that we are doing a non-contiguous load and therefore this is not
1210            # a valid start.
1211            missing_intervals = self.missing_intervals(
1212                snapshot_start, now(), execution_time=execution_time
1213            )
1214            earliest_interval = missing_intervals[0][0] if missing_intervals else None
1215            if earliest_interval:
1216                return earliest_interval >= start_ts
1217        return True

Checks if the given start and end are valid for this snapshot.

Arguments:
  • start: The start date/time of the interval (inclusive)
  • snapshot_start: The start date/time of the snapshot (inclusive)
def get_latest( self, default: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> Union[datetime.date, datetime.datetime, str, int, float, NoneType]:
1219    def get_latest(self, default: t.Optional[TimeLike] = None) -> t.Optional[TimeLike]:
1220        """The latest interval loaded for the snapshot. Default is used if intervals are not defined"""
1221
1222        def to_end_date(end: int, unit: IntervalUnit) -> TimeLike:
1223            if unit.is_day:
1224                return to_date(make_inclusive_end(end))
1225            return end
1226
1227        return (
1228            to_end_date(to_timestamp(self.intervals[-1][1]), self.node.interval_unit)
1229            if self.intervals
1230            else default
1231        )

The latest interval loaded for the snapshot. Default is used if intervals are not defined

def needs_destructive_check(self, allow_destructive_snapshots: Set[str]) -> bool:
1233    def needs_destructive_check(
1234        self,
1235        allow_destructive_snapshots: t.Set[str],
1236    ) -> bool:
1237        return (
1238            self.is_model
1239            and not self.model.on_destructive_change.is_allow
1240            and self.name not in allow_destructive_snapshots
1241        )
def needs_additive_check(self, allow_additive_snapshots: Set[str]) -> bool:
1243    def needs_additive_check(
1244        self,
1245        allow_additive_snapshots: t.Set[str],
1246    ) -> bool:
1247        return (
1248            self.is_model
1249            and not self.model.on_additive_change.is_allow
1250            and self.name not in allow_additive_snapshots
1251        )
def get_next_auto_restatement_interval( self, execution_time: Union[datetime.date, datetime.datetime, str, int, float]) -> Optional[Tuple[int, int]]:
1253    def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]:
1254        """Returns the next auto restatement interval for the snapshot.
1255
1256        Args:
1257            execution_time: The execution time to use for the restatement.
1258
1259        Returns:
1260            The interval that needs to be restated or None if no restatement is needed.
1261        """
1262        if (
1263            not self.is_model
1264            or not self.intervals
1265            or not self.model.auto_restatement_cron
1266            or self.model.disable_restatement
1267        ):
1268            return None
1269
1270        execution_time_ts = to_timestamp(execution_time)
1271        next_auto_restatement_ts = self.next_auto_restatement_ts or to_timestamp(
1272            self.model.auto_restatement_croniter(self.created_ts).get_next(estimate=False)
1273        )
1274        if execution_time_ts < next_auto_restatement_ts:
1275            return None
1276
1277        num_intervals_to_restate = self.model.auto_restatement_intervals
1278        if num_intervals_to_restate is None:
1279            return (self.intervals[0][0], self.intervals[-1][1])
1280
1281        auto_restatement_end_ts = to_timestamp(
1282            self.node.interval_unit.cron_floor(execution_time_ts)
1283        )
1284        auto_restatement_start_ts = (
1285            auto_restatement_end_ts
1286            - num_intervals_to_restate * self.node.interval_unit.milliseconds
1287        )
1288        return (auto_restatement_start_ts, auto_restatement_end_ts)

Returns the next auto restatement interval for the snapshot.

Arguments:
  • execution_time: The execution time to use for the restatement.
Returns:

The interval that needs to be restated or None if no restatement is needed.

def update_next_auto_restatement_ts( self, execution_time: Union[datetime.date, datetime.datetime, str, int, float]) -> Optional[int]:
1290    def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optional[int]:
1291        """Updates the next auto restatement timestamp.
1292
1293        Args:
1294            execution_time: The execution time to use for the restatement.
1295
1296        Returns:
1297            The next auto restatement timestamp or None if not applicable.
1298        """
1299        if (
1300            not self.is_model
1301            or not self.model.auto_restatement_cron
1302            or self.model.disable_restatement
1303        ):
1304            self.next_auto_restatement_ts = None
1305        else:
1306            self.next_auto_restatement_ts = to_timestamp(
1307                self.model.auto_restatement_croniter(execution_time).get_next(estimate=False)
1308            )
1309        return self.next_auto_restatement_ts

Updates the next auto restatement timestamp.

Arguments:
  • execution_time: The execution time to use for the restatement.
Returns:

The next auto restatement timestamp or None if not applicable.

def apply_pending_restatement_intervals(self) -> None:
1311    def apply_pending_restatement_intervals(self) -> None:
1312        """Applies the pending restatement intervals to the snapshot's intervals."""
1313        if not self.is_model or self.model.disable_restatement:
1314            return
1315        for pending_restatement_interval in self.pending_restatement_intervals:
1316            logger.info(
1317                "Applying the auto restated interval (%s, %s) to snapshot %s",
1318                time_like_to_str(pending_restatement_interval[0]),
1319                time_like_to_str(pending_restatement_interval[1]),
1320                self.snapshot_id,
1321            )
1322            self.intervals = remove_interval(self.intervals, *pending_restatement_interval)

Applies the pending restatement intervals to the snapshot's intervals.

def is_directly_modified(self, other: Snapshot) -> bool:
1324    def is_directly_modified(self, other: Snapshot) -> bool:
1325        """Returns whether or not this snapshot is directly modified in relation to the other snapshot."""
1326        return self.node.is_data_change(other.node)

Returns whether or not this snapshot is directly modified in relation to the other snapshot.

def is_indirectly_modified(self, other: Snapshot) -> bool:
1328    def is_indirectly_modified(self, other: Snapshot) -> bool:
1329        """Returns whether or not this snapshot is indirectly modified in relation to the other snapshot."""
1330        return (
1331            self.fingerprint.parent_data_hash != other.fingerprint.parent_data_hash
1332            and not self.node.is_data_change(other.node)
1333        )

Returns whether or not this snapshot is indirectly modified in relation to the other snapshot.

def is_metadata_updated(self, other: Snapshot) -> bool:
1335    def is_metadata_updated(self, other: Snapshot) -> bool:
1336        """Returns whether or not this snapshot contains metadata changes in relation to the other snapshot."""
1337        return self.fingerprint.metadata_hash != other.fingerprint.metadata_hash

Returns whether or not this snapshot contains metadata changes in relation to the other snapshot.

physical_schema: str
1339    @property
1340    def physical_schema(self) -> str:
1341        if self.physical_schema_ is not None:
1342            return self.physical_schema_
1343        return self.model.physical_schema if self.is_model else ""
table_info: SnapshotTableInfo
1345    @property
1346    def table_info(self) -> SnapshotTableInfo:
1347        """Helper method to get the SnapshotTableInfo from the Snapshot."""
1348        self._ensure_categorized()
1349
1350        custom_materialization = (
1351            self.model.kind.materialization
1352            if self.is_model and isinstance(self.model.kind, CustomKind)
1353            else None
1354        )
1355
1356        return SnapshotTableInfo(
1357            physical_schema=self.physical_schema,
1358            name=self.name,
1359            fingerprint=self.fingerprint,
1360            version=self.version,
1361            dev_version=self.dev_version,
1362            parents=self.parents,
1363            previous_versions=self.previous_versions,
1364            change_category=self.change_category,
1365            kind_name=self.model_kind_name,
1366            node_type=self.node_type,
1367            custom_materialization=custom_materialization,
1368            dev_table_suffix=self.dev_table_suffix,
1369            model_gateway=self.model_gateway,
1370            table_naming_convention=self.table_naming_convention,  # type: ignore
1371            forward_only=self.forward_only,
1372            virtual_environment_mode=self.virtual_environment_mode,
1373        )

Helper method to get the SnapshotTableInfo from the Snapshot.

data_version: SnapshotDataVersion
1375    @property
1376    def data_version(self) -> SnapshotDataVersion:
1377        self._ensure_categorized()
1378        return SnapshotDataVersion(
1379            fingerprint=self.fingerprint,
1380            version=self.version,
1381            dev_version=self.dev_version,
1382            change_category=self.change_category,
1383            physical_schema=self.physical_schema,
1384            dev_table_suffix=self.dev_table_suffix,
1385            table_naming_convention=self.table_naming_convention,
1386            virtual_environment_mode=self.virtual_environment_mode,
1387        )
snapshot_intervals: SnapshotIntervals
1389    @property
1390    def snapshot_intervals(self) -> SnapshotIntervals:
1391        self._ensure_categorized()
1392        return SnapshotIntervals(
1393            name=self.name,
1394            identifier=self.identifier,
1395            version=self.version,
1396            dev_version=self.dev_version,
1397            intervals=self.intervals.copy(),
1398            dev_intervals=self.dev_intervals.copy(),
1399            pending_restatement_intervals=self.pending_restatement_intervals.copy(),
1400        )
is_materialized_view: bool
1402    @property
1403    def is_materialized_view(self) -> bool:
1404        """Returns whether or not this snapshot's model represents a materialized view."""
1405        return (
1406            self.is_model and isinstance(self.model.kind, ViewKind) and self.model.kind.materialized
1407        )

Returns whether or not this snapshot's model represents a materialized view.

is_new_version: bool
1409    @property
1410    def is_new_version(self) -> bool:
1411        """Returns whether or not this version is new and requires a backfill."""
1412        self._ensure_categorized()
1413        return self.fingerprint.to_version() == self.version

Returns whether or not this version is new and requires a backfill.

is_paused: bool
1415    @property
1416    def is_paused(self) -> bool:
1417        return self.unpaused_ts is None
is_paused_forward_only: bool
1419    @property
1420    def is_paused_forward_only(self) -> bool:
1421        return self.is_paused and self.is_forward_only
normalized_effective_from_ts: Optional[int]
1423    @property
1424    def normalized_effective_from_ts(self) -> t.Optional[int]:
1425        return (
1426            to_timestamp(self.node.interval_unit.cron_floor(self.effective_from))
1427            if self.effective_from
1428            else None
1429        )
model_kind_name: Optional[sqlmesh.core.model.kind.ModelKindName]
1431    @property
1432    def model_kind_name(self) -> t.Optional[ModelKindName]:
1433        return self.model.kind.name if self.is_model else None

Returns the model kind name.

node_type: sqlmesh.core.node.NodeType
1435    @property
1436    def node_type(self) -> NodeType:
1437        if self.node.is_model:
1438            return NodeType.MODEL
1439        if self.node.is_audit:
1440            return NodeType.AUDIT
1441        raise SQLMeshError(f"Snapshot {self.snapshot_id} has an unknown node type.")
1443    @property
1444    def model(self) -> Model:
1445        model = self.model_or_none
1446        if model:
1447            return model
1448        raise SQLMeshError(f"Snapshot {self.snapshot_id} is not a model snapshot.")
1450    @property
1451    def model_or_none(self) -> t.Optional[Model]:
1452        if self.is_model:
1453            return t.cast(Model, self.node)
1454        return None
model_gateway: Optional[str]
1456    @property
1457    def model_gateway(self) -> t.Optional[str]:
1458        return self.model.gateway if self.is_model else None
1460    @property
1461    def audit(self) -> StandaloneAudit:
1462        if self.is_audit:
1463            return t.cast(StandaloneAudit, self.node)
1464        raise SQLMeshError(f"Snapshot {self.snapshot_id} is not an audit snapshot.")
depends_on_past: bool
1466    @property
1467    def depends_on_past(self) -> bool:
1468        """Whether or not this models depends on past intervals to be populated before loading following intervals.
1469
1470        This represents a superset of the following types of models:
1471        1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially.
1472           An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself.
1473        2. Models that can only be restated from the beginning of history *and* their interval batches must be processed sequentially.
1474        """
1475        return self.depends_on_self or self.full_history_restatement_only

Whether or not this models depends on past intervals to be populated before loading following intervals.

This represents a superset of the following types of models:

  1. Models that depend on themselves but can be restated from an arbitrary point in time (any start date) as long as interval batches are processed sequentially. An example of this can be an INCREMENTAL_BY_TIME_RANGE model that references previous records from itself.
  2. Models that can only be restated from the beginning of history and their interval batches must be processed sequentially.
depends_on_self: bool
1477    @property
1478    def depends_on_self(self) -> bool:
1479        """Whether or not this models depends on self."""
1480        return self.is_model and self.model.depends_on_self

Whether or not this models depends on self.

name_version: SnapshotNameVersion
1482    @property
1483    def name_version(self) -> SnapshotNameVersion:
1484        """Returns the name and version of the snapshot."""
1485        return SnapshotNameVersion(name=self.name, version=self.version)

Returns the name and version of the snapshot.

id_and_version: SnapshotIdAndVersion
1487    @property
1488    def id_and_version(self) -> SnapshotIdAndVersion:
1489        return self.table_info.id_and_version
disable_restatement: bool
1491    @property
1492    def disable_restatement(self) -> bool:
1493        """Is restatement disabled for the node"""
1494        return self.is_model and self.model.disable_restatement

Is restatement disabled for the node

fully_qualified_table: Optional[sqlglot.expressions.query.Table]
1496    @cached_property
1497    def fully_qualified_table(self) -> t.Optional[exp.Table]:
1498        if not self.is_model:
1499            return None
1500        return t.cast(Model, self.node).fully_qualified_table
expiration_ts: int
1502    @property
1503    def expiration_ts(self) -> int:
1504        return to_timestamp(
1505            self.ttl,
1506            relative_base=to_datetime(self.updated_ts),
1507            check_categorical_relative_expression=False,
1508        )
supports_schema_migration_in_prod: bool
1510    @property
1511    def supports_schema_migration_in_prod(self) -> bool:
1512        """Returns whether or not this snapshot supports schema migration when deployed to production."""
1513        return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed

Returns whether or not this snapshot supports schema migration when deployed to production.

requires_schema_migration_in_prod: bool
1515    @property
1516    def requires_schema_migration_in_prod(self) -> bool:
1517        """Returns whether or not this snapshot requires a schema migration when deployed to production."""
1518        return self.supports_schema_migration_in_prod and (
1519            (self.previous_version and self.previous_version.version == self.version)
1520            or self.model.forward_only
1521            or bool(self.model.physical_version)
1522            or not self.virtual_environment_mode.is_full
1523        )

Returns whether or not this snapshot requires a schema migration when deployed to production.

ttl_ms: int
1525    @property
1526    def ttl_ms(self) -> int:
1527        return self.expiration_ts - self.updated_ts
custom_materialization: Optional[str]
1529    @property
1530    def custom_materialization(self) -> t.Optional[str]:
1531        if self.is_custom:
1532            return t.cast(CustomKind, self.model.kind).materialization
1533        return None
virtual_environment_mode: sqlmesh.core.config.common.VirtualEnvironmentMode
1535    @property
1536    def virtual_environment_mode(self) -> VirtualEnvironmentMode:
1537        return (
1538            self.model.virtual_environment_mode if self.is_model else VirtualEnvironmentMode.default
1539        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SnapshotTableCleanupTask(sqlmesh.utils.pydantic.PydanticModel):
1556class SnapshotTableCleanupTask(PydanticModel):
1557    snapshot: SnapshotTableInfo
1558    dev_table_only: bool

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
snapshot: SnapshotTableInfo
dev_table_only: bool
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
SnapshotIdLike = typing.Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]
SnapshotIdAndVersionLike = typing.Union[SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]
SnapshotInfoLike = typing.Union[SnapshotTableInfo, Snapshot]
SnapshotNameVersionLike = typing.Union[SnapshotNameVersion, SnapshotTableInfo, SnapshotIdAndVersion, Snapshot]
class DeployabilityIndex(sqlmesh.utils.pydantic.PydanticModel):
1569class DeployabilityIndex(PydanticModel, frozen=True):
1570    """Contains information about deployability of every snapshot.
1571
1572    Deployability is defined as whether or not the output that a snapshot produces during the
1573    current evaluation can be reused in (deployed to) the production environment.
1574    """
1575
1576    indexed_ids: t.FrozenSet[str]
1577    is_opposite_index: bool = False
1578    representative_shared_version_ids: t.FrozenSet[str] = frozenset()
1579
1580    @field_validator("indexed_ids", "representative_shared_version_ids", mode="before")
1581    @classmethod
1582    def _snapshot_ids_set_validator(cls, v: t.Any) -> t.Optional[t.FrozenSet[t.Tuple[str, str]]]:
1583        if v is None:
1584            return v
1585        # Transforming into strings because the serialization of sets of objects / lists is broken in Pydantic.
1586        return frozenset(
1587            {
1588                (
1589                    cls._snapshot_id_key(snapshot_id)  # type: ignore
1590                    if isinstance(snapshot_id, SnapshotId)
1591                    else snapshot_id
1592                )
1593                for snapshot_id in v
1594            }
1595        )
1596
1597    def is_deployable(self, snapshot: SnapshotIdLike) -> bool:
1598        """Returns true if the output produced by the given snapshot in a development environment can be reused
1599        in (deployed to) production
1600
1601        Args:
1602            snapshot: The snapshot to check.
1603
1604        Returns:
1605            True if the snapshot is deployable, False otherwise.
1606        """
1607        snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1608        return (self.is_opposite_index and snapshot_id not in self.indexed_ids) or (
1609            not self.is_opposite_index and snapshot_id in self.indexed_ids
1610        )
1611
1612    def is_representative(self, snapshot: SnapshotIdLike) -> bool:
1613        """Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and
1614        computing missing intervals.
1615
1616        Note, that deployable snapshots are also representative, but the reverse is not always true.
1617
1618        Unlike `is_deployable`, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that
1619        are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider
1620        them as such when constructing a plan, building a physical table mapping or computing missing intervals.
1621
1622        Args:
1623            snapshot: The snapshot to check.
1624
1625        Returns:
1626            True if the snapshot is representative, False otherwise.
1627        """
1628        snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1629        representative = snapshot_id in self.representative_shared_version_ids
1630        return representative or self.is_deployable(snapshot)
1631
1632    def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:
1633        """Creates a new index with the given snapshot marked as non-deployable."""
1634        return self._add_snapshot(snapshot, False)
1635
1636    def with_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:
1637        """Creates a new index with the given snapshot marked as deployable."""
1638        return self._add_snapshot(snapshot, True)
1639
1640    def _add_snapshot(self, snapshot: SnapshotIdLike, deployable: bool) -> DeployabilityIndex:
1641        snapshot_id = {self._snapshot_id_key(snapshot.snapshot_id)}
1642        indexed_ids = self.indexed_ids
1643        if self.is_opposite_index:
1644            indexed_ids = indexed_ids - snapshot_id if deployable else indexed_ids | snapshot_id
1645        else:
1646            indexed_ids = indexed_ids | snapshot_id if deployable else indexed_ids - snapshot_id
1647
1648        return DeployabilityIndex(
1649            indexed_ids=indexed_ids,
1650            is_opposite_index=self.is_opposite_index,
1651            representative_shared_version_ids=self.representative_shared_version_ids,
1652        )
1653
1654    @classmethod
1655    def all_deployable(cls) -> DeployabilityIndex:
1656        return cls(indexed_ids=frozenset(), is_opposite_index=True)
1657
1658    @classmethod
1659    def none_deployable(cls) -> DeployabilityIndex:
1660        return cls(indexed_ids=frozenset())
1661
1662    @classmethod
1663    def create(
1664        cls,
1665        snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot],
1666        start: t.Optional[TimeLike] = None,  # plan start
1667        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1668    ) -> DeployabilityIndex:
1669        if not isinstance(snapshots, dict):
1670            snapshots = {s.snapshot_id: s for s in snapshots}
1671
1672        deployability_mapping: t.Dict[SnapshotId, bool] = {}
1673        children_deployability_mapping: t.Dict[SnapshotId, bool] = {}
1674        representative_shared_version_ids: t.Set[SnapshotId] = set()
1675        start_override_per_model = start_override_per_model or {}
1676
1677        start_date_cache: t.Optional[t.Dict[str, datetime]] = {}
1678
1679        dag = snapshots_to_dag(snapshots.values())
1680        for node in dag:
1681            if node not in snapshots:
1682                continue
1683            snapshot = snapshots[node]
1684
1685            if not snapshot.virtual_environment_mode.is_full:
1686                # If the virtual environment is not fully enabled, then the snapshot can never be deployable
1687                this_deployable = False
1688            else:
1689                # Make sure that the node is deployable according to all its parents
1690                this_deployable = all(
1691                    children_deployability_mapping[p_id]
1692                    for p_id in snapshots[node].parents
1693                    if p_id in children_deployability_mapping
1694                )
1695
1696            if this_deployable:
1697                is_forward_only_model = (
1698                    snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata
1699                )
1700                has_auto_restatement = (
1701                    snapshot.is_model and snapshot.model.auto_restatement_cron is not None
1702                )
1703
1704                snapshot_start = start_override_per_model.get(
1705                    node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache)
1706                )
1707
1708                is_valid_start = (
1709                    snapshot.is_valid_start(start, snapshot_start) if start is not None else True
1710                )
1711
1712                children_deployable = is_valid_start and not has_auto_restatement
1713                if (
1714                    snapshot.is_forward_only
1715                    or snapshot.is_indirect_non_breaking
1716                    or is_forward_only_model
1717                    or has_auto_restatement
1718                    or not is_valid_start
1719                ):
1720                    # FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature.
1721                    # Similarly, if the model depends on past and the start date is not aligned with the
1722                    # model's start, we should consider this snapshot non-deployable.
1723                    this_deployable = False
1724                    if not snapshot.is_paused or (
1725                        snapshot.is_indirect_non_breaking and snapshot.intervals
1726                    ):
1727                        # This snapshot represents what's currently deployed in prod.
1728                        representative_shared_version_ids.add(node)
1729                    else:
1730                        # If the parent is not representative then its children can't be deployable.
1731                        children_deployable = False
1732            else:
1733                children_deployable = False
1734                if not snapshots[node].is_paused:
1735                    representative_shared_version_ids.add(node)
1736
1737            deployability_mapping[node] = this_deployable
1738            children_deployability_mapping[node] = children_deployable
1739
1740        deployable_ids = {
1741            snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable
1742        }
1743        non_deployable_ids = set(snapshots) - deployable_ids
1744
1745        # Pick the smaller set to reduce the size of the serialized object.
1746        if len(deployable_ids) <= len(non_deployable_ids):
1747            return cls(
1748                indexed_ids=deployable_ids,
1749                representative_shared_version_ids=representative_shared_version_ids,
1750            )
1751        return cls(
1752            indexed_ids=non_deployable_ids,
1753            is_opposite_index=True,
1754            representative_shared_version_ids=representative_shared_version_ids,
1755        )
1756
1757    @staticmethod
1758    def _snapshot_id_key(snapshot_id: SnapshotId) -> str:
1759        return f"{snapshot_id.name}__{snapshot_id.identifier}"

Contains information about deployability of every snapshot.

Deployability is defined as whether or not the output that a snapshot produces during the current evaluation can be reused in (deployed to) the production environment.

indexed_ids: FrozenSet[str]
is_opposite_index: bool
representative_shared_version_ids: FrozenSet[str]
def is_deployable( self, snapshot: Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]) -> bool:
1597    def is_deployable(self, snapshot: SnapshotIdLike) -> bool:
1598        """Returns true if the output produced by the given snapshot in a development environment can be reused
1599        in (deployed to) production
1600
1601        Args:
1602            snapshot: The snapshot to check.
1603
1604        Returns:
1605            True if the snapshot is deployable, False otherwise.
1606        """
1607        snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1608        return (self.is_opposite_index and snapshot_id not in self.indexed_ids) or (
1609            not self.is_opposite_index and snapshot_id in self.indexed_ids
1610        )

Returns true if the output produced by the given snapshot in a development environment can be reused in (deployed to) production

Arguments:
  • snapshot: The snapshot to check.
Returns:

True if the snapshot is deployable, False otherwise.

def is_representative( self, snapshot: Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]) -> bool:
1612    def is_representative(self, snapshot: SnapshotIdLike) -> bool:
1613        """Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and
1614        computing missing intervals.
1615
1616        Note, that deployable snapshots are also representative, but the reverse is not always true.
1617
1618        Unlike `is_deployable`, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that
1619        are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider
1620        them as such when constructing a plan, building a physical table mapping or computing missing intervals.
1621
1622        Args:
1623            snapshot: The snapshot to check.
1624
1625        Returns:
1626            True if the snapshot is representative, False otherwise.
1627        """
1628        snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1629        representative = snapshot_id in self.representative_shared_version_ids
1630        return representative or self.is_deployable(snapshot)

Returns true if the deployable (non-dev) table of the given snapshot should be used for reading, table mapping, and computing missing intervals.

Note, that deployable snapshots are also representative, but the reverse is not always true.

Unlike is_deployable, this variant also captures FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots that are not deployable by their nature but are currently promoted in production. Therefore, it's safe to consider them as such when constructing a plan, building a physical table mapping or computing missing intervals.

Arguments:
  • snapshot: The snapshot to check.
Returns:

True if the snapshot is representative, False otherwise.

def with_non_deployable( self, snapshot: Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]) -> DeployabilityIndex:
1632    def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:
1633        """Creates a new index with the given snapshot marked as non-deployable."""
1634        return self._add_snapshot(snapshot, False)

Creates a new index with the given snapshot marked as non-deployable.

def with_deployable( self, snapshot: Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]) -> DeployabilityIndex:
1636    def with_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:
1637        """Creates a new index with the given snapshot marked as deployable."""
1638        return self._add_snapshot(snapshot, True)

Creates a new index with the given snapshot marked as deployable.

@classmethod
def all_deployable(cls) -> DeployabilityIndex:
1654    @classmethod
1655    def all_deployable(cls) -> DeployabilityIndex:
1656        return cls(indexed_ids=frozenset(), is_opposite_index=True)
@classmethod
def none_deployable(cls) -> DeployabilityIndex:
1658    @classmethod
1659    def none_deployable(cls) -> DeployabilityIndex:
1660        return cls(indexed_ids=frozenset())
@classmethod
def create( cls, snapshots: Union[Dict[SnapshotId, Snapshot], Collection[Snapshot]], start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None) -> DeployabilityIndex:
1662    @classmethod
1663    def create(
1664        cls,
1665        snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot],
1666        start: t.Optional[TimeLike] = None,  # plan start
1667        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
1668    ) -> DeployabilityIndex:
1669        if not isinstance(snapshots, dict):
1670            snapshots = {s.snapshot_id: s for s in snapshots}
1671
1672        deployability_mapping: t.Dict[SnapshotId, bool] = {}
1673        children_deployability_mapping: t.Dict[SnapshotId, bool] = {}
1674        representative_shared_version_ids: t.Set[SnapshotId] = set()
1675        start_override_per_model = start_override_per_model or {}
1676
1677        start_date_cache: t.Optional[t.Dict[str, datetime]] = {}
1678
1679        dag = snapshots_to_dag(snapshots.values())
1680        for node in dag:
1681            if node not in snapshots:
1682                continue
1683            snapshot = snapshots[node]
1684
1685            if not snapshot.virtual_environment_mode.is_full:
1686                # If the virtual environment is not fully enabled, then the snapshot can never be deployable
1687                this_deployable = False
1688            else:
1689                # Make sure that the node is deployable according to all its parents
1690                this_deployable = all(
1691                    children_deployability_mapping[p_id]
1692                    for p_id in snapshots[node].parents
1693                    if p_id in children_deployability_mapping
1694                )
1695
1696            if this_deployable:
1697                is_forward_only_model = (
1698                    snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata
1699                )
1700                has_auto_restatement = (
1701                    snapshot.is_model and snapshot.model.auto_restatement_cron is not None
1702                )
1703
1704                snapshot_start = start_override_per_model.get(
1705                    node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache)
1706                )
1707
1708                is_valid_start = (
1709                    snapshot.is_valid_start(start, snapshot_start) if start is not None else True
1710                )
1711
1712                children_deployable = is_valid_start and not has_auto_restatement
1713                if (
1714                    snapshot.is_forward_only
1715                    or snapshot.is_indirect_non_breaking
1716                    or is_forward_only_model
1717                    or has_auto_restatement
1718                    or not is_valid_start
1719                ):
1720                    # FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature.
1721                    # Similarly, if the model depends on past and the start date is not aligned with the
1722                    # model's start, we should consider this snapshot non-deployable.
1723                    this_deployable = False
1724                    if not snapshot.is_paused or (
1725                        snapshot.is_indirect_non_breaking and snapshot.intervals
1726                    ):
1727                        # This snapshot represents what's currently deployed in prod.
1728                        representative_shared_version_ids.add(node)
1729                    else:
1730                        # If the parent is not representative then its children can't be deployable.
1731                        children_deployable = False
1732            else:
1733                children_deployable = False
1734                if not snapshots[node].is_paused:
1735                    representative_shared_version_ids.add(node)
1736
1737            deployability_mapping[node] = this_deployable
1738            children_deployability_mapping[node] = children_deployable
1739
1740        deployable_ids = {
1741            snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable
1742        }
1743        non_deployable_ids = set(snapshots) - deployable_ids
1744
1745        # Pick the smaller set to reduce the size of the serialized object.
1746        if len(deployable_ids) <= len(non_deployable_ids):
1747            return cls(
1748                indexed_ids=deployable_ids,
1749                representative_shared_version_ids=representative_shared_version_ids,
1750            )
1751        return cls(
1752            indexed_ids=non_deployable_ids,
1753            is_opposite_index=True,
1754            representative_shared_version_ids=representative_shared_version_ids,
1755        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def table_name( physical_schema: str, name: str, version: str, catalog: Optional[str] = None, suffix: Optional[str] = None, naming_convention: Optional[sqlmesh.core.config.common.TableNamingConvention] = None) -> str:
1762def table_name(
1763    physical_schema: str,
1764    name: str,
1765    version: str,
1766    catalog: t.Optional[str] = None,
1767    suffix: t.Optional[str] = None,
1768    naming_convention: t.Optional[TableNamingConvention] = None,
1769) -> str:
1770    table = exp.to_table(name)
1771
1772    naming_convention = naming_convention or TableNamingConvention.default
1773
1774    if naming_convention == TableNamingConvention.HASH_MD5:
1775        # just take a MD5 hash of what we would have generated anyway using SCHEMA_AND_TABLE
1776        value_to_hash = table_name(
1777            physical_schema=physical_schema,
1778            name=name,
1779            version=version,
1780            catalog=catalog,
1781            suffix=suffix,
1782            naming_convention=TableNamingConvention.SCHEMA_AND_TABLE,
1783        )
1784        full_name = f"{c.SQLMESH}_md5__{md5(value_to_hash)}"
1785    else:
1786        # note: Snapshot._table_name() already strips the catalog from the model name before calling this function
1787        # Therefore, a model with 3-part naming like "foo.bar.baz" gets passed as (name="bar.baz", catalog="foo") to this function
1788        # This is why there is no TableNamingConvention.CATALOG_AND_SCHEMA_AND_TABLE
1789        table_parts = table.parts
1790        parts_to_consider = 2 if naming_convention == TableNamingConvention.SCHEMA_AND_TABLE else 1
1791
1792        # in case the parsed table name has less parts than what the naming convention says we should be considering
1793        parts_to_consider = min(len(table_parts), parts_to_consider)
1794
1795        # bigquery projects usually have "-" in them which is illegal in the table name, so we aggressively prune
1796        name = "__".join(sanitize_name(part.name) for part in table_parts[-parts_to_consider:])
1797
1798        full_name = f"{name}__{version}"
1799
1800    suffix = f"__{suffix}" if suffix else ""
1801
1802    table.set("this", exp.to_identifier(f"{full_name}{suffix}"))
1803    table.set("db", exp.to_identifier(physical_schema))
1804    if not table.catalog and catalog:
1805        table.set("catalog", exp.to_identifier(catalog))
1806    return exp.table_name(table)
def display_name( snapshot_info_like: Union[SnapshotTableInfo, Snapshot, SnapshotInfoMixin], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Optional[str], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
1809def display_name(
1810    snapshot_info_like: t.Union[SnapshotInfoLike, SnapshotInfoMixin],
1811    environment_naming_info: EnvironmentNamingInfo,
1812    default_catalog: t.Optional[str],
1813    dialect: DialectType = None,
1814) -> str:
1815    """
1816    Returns the model name as a qualified view name.
1817    This is just used for presenting information back to the user and `qualified_view_name` should be used
1818    when wanting a view name in all other cases.
1819
1820    Args:
1821        snapshot_info_like: The snapshot info object to get the display name for
1822        environment_naming_info: Environment naming info to use for display name formatting
1823        default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name.
1824        dialect: Optional dialect type to use for name formatting
1825
1826    Returns:
1827        The formatted display name as a string
1828    """
1829    if snapshot_info_like.is_audit:
1830        return snapshot_info_like.name
1831
1832    return model_display_name(
1833        snapshot_info_like.name, environment_naming_info, default_catalog, dialect
1834    )

Returns the model name as a qualified view name. This is just used for presenting information back to the user and qualified_view_name should be used when wanting a view name in all other cases.

Arguments:
  • snapshot_info_like: The snapshot info object to get the display name for
  • environment_naming_info: Environment naming info to use for display name formatting
  • default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name.
  • dialect: Optional dialect type to use for name formatting
Returns:

The formatted display name as a string

def model_display_name( node_name: str, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Optional[str], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> str:
1837def model_display_name(
1838    node_name: str,
1839    environment_naming_info: EnvironmentNamingInfo,
1840    default_catalog: t.Optional[str],
1841    dialect: DialectType = None,
1842) -> str:
1843    view_name = exp.to_table(node_name)
1844
1845    catalog = (
1846        None
1847        if (
1848            environment_naming_info.suffix_target != EnvironmentSuffixTarget.CATALOG
1849            and view_name.catalog == default_catalog
1850        )
1851        else view_name.catalog
1852    )
1853
1854    qvn = QualifiedViewName(
1855        catalog=catalog,
1856        schema_name=view_name.db or None,
1857        table=view_name.name,
1858    )
1859    return qvn.for_environment(environment_naming_info, dialect=dialect)
def fingerprint_from_node( node: Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, sqlmesh.core.audit.definition.StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')], *, nodes: Dict[str, Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, sqlmesh.core.audit.definition.StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')]], cache: Optional[Dict[str, SnapshotFingerprint]] = None) -> SnapshotFingerprint:
1862def fingerprint_from_node(
1863    node: Node,
1864    *,
1865    nodes: t.Dict[str, Node],
1866    cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
1867) -> SnapshotFingerprint:
1868    """Helper function to generate a fingerprint based on the data and metadata of the node and its parents.
1869
1870    This method tries to remove non-meaningful differences to avoid ever-changing fingerprints.
1871    The fingerprint is made up of two parts split by an underscore -- query_metadata. The query hash is
1872    determined purely by the rendered query and the metadata by everything else.
1873
1874    Args:
1875        node: Node to fingerprint.
1876        nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes.
1877            If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
1878        cache: Cache of node name to fingerprints.
1879
1880    Returns:
1881        The fingerprint.
1882    """
1883    cache = {} if cache is None else cache
1884
1885    if node.fqn not in cache:
1886        parents = [
1887            fingerprint_from_node(nodes[table], nodes=nodes, cache=cache)
1888            for table in node.depends_on
1889            if table in nodes
1890        ]
1891
1892        parent_data_hash = hash_data(sorted(p.to_version() for p in parents))
1893
1894        parent_metadata_hash = hash_data(
1895            sorted(h for p in parents for h in (p.metadata_hash, p.parent_metadata_hash))
1896        )
1897
1898        cache[node.fqn] = SnapshotFingerprint(
1899            data_hash=node.data_hash,
1900            metadata_hash=node.metadata_hash,
1901            parent_data_hash=parent_data_hash,
1902            parent_metadata_hash=parent_metadata_hash,
1903        )
1904
1905    return cache[node.fqn]

Helper function to generate a fingerprint based on the data and metadata of the node and its parents.

This method tries to remove non-meaningful differences to avoid ever-changing fingerprints. The fingerprint is made up of two parts split by an underscore -- query_metadata. The query hash is determined purely by the rendered query and the metadata by everything else.

Arguments:
  • node: Node to fingerprint.
  • nodes: Dictionary of all nodes in the graph to make the fingerprint dependent on parent changes. If no dictionary is passed in the fingerprint will not be dependent on a node's parents.
  • cache: Cache of node name to fingerprints.
Returns:

The fingerprint.

def merge_intervals(intervals: Collection[Tuple[int, int]]) -> List[Tuple[int, int]]:
1923def merge_intervals(intervals: t.Collection[Interval]) -> Intervals:
1924    """Merge a list of intervals.
1925
1926    Args:
1927        intervals: A list of intervals to merge together.
1928
1929    Returns:
1930        A new list of sorted and merged intervals.
1931    """
1932    if not intervals:
1933        return []
1934    intervals = sorted(intervals)
1935
1936    merged = [intervals[0]]
1937
1938    for interval in intervals[1:]:
1939        current = merged[-1]
1940
1941        if interval[0] <= current[1]:
1942            merged[-1] = (current[0], max(current[1], interval[1]))
1943        else:
1944            merged.append(interval)
1945
1946    return merged

Merge a list of intervals.

Arguments:
  • intervals: A list of intervals to merge together.
Returns:

A new list of sorted and merged intervals.

def format_intervals( intervals: List[Tuple[int, int]], unit: Optional[sqlmesh.core.node.IntervalUnit]) -> str:
1956def format_intervals(intervals: Intervals, unit: t.Optional[IntervalUnit]) -> str:
1957    inclusive_intervals = [make_inclusive(start, end) for start, end in intervals]
1958    return ", ".join(
1959        " - ".join([_format_date_time(start, unit), _format_date_time(end, unit)])
1960        for start, end in inclusive_intervals
1961    )
def remove_interval( intervals: List[Tuple[int, int]], remove_start: int, remove_end: int) -> List[Tuple[int, int]]:
1964def remove_interval(intervals: Intervals, remove_start: int, remove_end: int) -> Intervals:
1965    """Remove an interval from a list of intervals. Assumes that the correct start and end intervals have been
1966    passed in. Use `get_remove_interval` method of `Snapshot` to get the correct start/end given the snapshot's
1967    information.
1968
1969    Args:
1970        intervals: A list of exclusive intervals.
1971        remove_start: The inclusive start to remove.
1972        remove_end: The exclusive end to remove.
1973
1974    Returns:
1975        A new list of intervals.
1976    """
1977    modified: Intervals = []
1978
1979    for start, end in intervals:
1980        if remove_start > start and remove_end < end:
1981            modified.extend(
1982                (
1983                    (start, remove_start),
1984                    (remove_end, end),
1985                )
1986            )
1987        elif remove_start > start:
1988            modified.append((start, min(remove_start, end)))
1989        elif remove_end < end:
1990            modified.append((max(remove_end, start), end))
1991
1992    return modified

Remove an interval from a list of intervals. Assumes that the correct start and end intervals have been passed in. Use get_remove_interval method of Snapshot to get the correct start/end given the snapshot's information.

Arguments:
  • intervals: A list of exclusive intervals.
  • remove_start: The inclusive start to remove.
  • remove_end: The exclusive end to remove.
Returns:

A new list of intervals.

def to_table_mapping( snapshots: Iterable[Snapshot], deployability_index: Optional[DeployabilityIndex]) -> Dict[str, str]:
1995def to_table_mapping(
1996    snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex]
1997) -> t.Dict[str, str]:
1998    deployability_index = deployability_index or DeployabilityIndex.all_deployable()
1999    return {
2000        snapshot.name: snapshot.table_name(deployability_index.is_representative(snapshot))
2001        for snapshot in snapshots
2002        if snapshot.version and not snapshot.is_embedded and snapshot.is_model
2003    }
def to_view_mapping( snapshots: Iterable[Snapshot], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Optional[str] = None, dialect: Optional[str] = None) -> Dict[str, str]:
2006def to_view_mapping(
2007    snapshots: t.Iterable[Snapshot],
2008    environment_naming_info: EnvironmentNamingInfo,
2009    default_catalog: t.Optional[str] = None,
2010    dialect: t.Optional[str] = None,
2011) -> t.Dict[str, str]:
2012    return {
2013        snapshot.name: snapshot.display_name(
2014            environment_naming_info, default_catalog=default_catalog, dialect=dialect
2015        )
2016        for snapshot in snapshots
2017        if snapshot.is_model
2018    }
def has_paused_forward_only( targets: Iterable[Union[SnapshotId, SnapshotIdAndVersion, SnapshotTableInfo, Snapshot]], snapshots: Union[List[Snapshot], Dict[SnapshotId, Snapshot]]) -> bool:
2021def has_paused_forward_only(
2022    targets: t.Iterable[SnapshotIdLike],
2023    snapshots: t.Union[t.List[Snapshot], t.Dict[SnapshotId, Snapshot]],
2024) -> bool:
2025    if not isinstance(snapshots, dict):
2026        snapshots = {s.snapshot_id: s for s in snapshots}
2027    for target in targets:
2028        target_snapshot = snapshots[target.snapshot_id]
2029        if target_snapshot.is_paused_forward_only:
2030            return True
2031    return False
def missing_intervals( snapshots: Union[Collection[Snapshot], Dict[SnapshotId, Snapshot]], start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, restatements: Optional[Dict[SnapshotId, Tuple[int, int]]] = None, deployability_index: Optional[DeployabilityIndex] = None, start_override_per_model: Optional[Dict[str, datetime.datetime]] = None, end_override_per_model: Optional[Dict[str, datetime.datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False) -> Dict[Snapshot, List[Tuple[int, int]]]:
2034def missing_intervals(
2035    snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]],
2036    start: t.Optional[TimeLike] = None,
2037    end: t.Optional[TimeLike] = None,
2038    execution_time: t.Optional[TimeLike] = None,
2039    restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
2040    deployability_index: t.Optional[DeployabilityIndex] = None,
2041    start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
2042    end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
2043    ignore_cron: bool = False,
2044    end_bounded: bool = False,
2045) -> t.Dict[Snapshot, Intervals]:
2046    """Returns all missing intervals given a collection of snapshots."""
2047    if not isinstance(snapshots, dict):
2048        # Make sure that the mapping is only constructed once
2049        snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
2050    missing = {}
2051    cache: t.Dict[str, datetime] = {}
2052    end_date = end or now_timestamp()
2053    start_dt = (
2054        to_datetime(start)
2055        if start
2056        else earliest_start_date(snapshots, cache=cache, relative_to=end_date)
2057    )
2058    restatements = restatements or {}
2059    start_override_per_model = start_override_per_model or {}
2060    end_override_per_model = end_override_per_model or {}
2061    deployability_index = deployability_index or DeployabilityIndex.all_deployable()
2062
2063    for snapshot in snapshots.values():
2064        if not snapshot.evaluatable:
2065            continue
2066
2067        snapshot_start_date = start_override_per_model.get(snapshot.name, start_dt)
2068        snapshot_end_date: TimeLike = end_date
2069
2070        restated_interval = restatements.get(snapshot.snapshot_id)
2071        if restated_interval:
2072            snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in restated_interval)
2073            snapshot = snapshot.copy()
2074            snapshot.intervals = snapshot.intervals.copy()
2075            snapshot.remove_interval(restated_interval)
2076
2077        existing_interval_end = end_override_per_model.get(snapshot.name)
2078        if existing_interval_end:
2079            if snapshot_start_date >= existing_interval_end:
2080                # The start exceeds the provided interval end, so we can skip this snapshot
2081                # since it doesn't have missing intervals by definition
2082                continue
2083            snapshot_end_date = existing_interval_end
2084
2085        snapshot_start_date = max(
2086            to_datetime(snapshot_start_date),
2087            to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
2088        )
2089        if snapshot_start_date > to_datetime(snapshot_end_date):
2090            continue
2091
2092        missing_interval_end_date = snapshot_end_date
2093        node_end_date = snapshot.node.end
2094        if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
2095            missing_interval_end_date = node_end_date
2096
2097        intervals = snapshot.missing_intervals(
2098            snapshot_start_date,
2099            missing_interval_end_date,
2100            execution_time=execution_time,
2101            deployability_index=deployability_index,
2102            ignore_cron=ignore_cron,
2103            end_bounded=end_bounded,
2104        )
2105        if intervals:
2106            missing[snapshot] = intervals
2107
2108    return missing

Returns all missing intervals given a collection of snapshots.

@lru_cache(maxsize=16384)
def expand_range( start_ts: int, end_ts: int, interval_unit: sqlmesh.core.node.IntervalUnit) -> List[int]:
2111@lru_cache(maxsize=16384)
2112def expand_range(start_ts: int, end_ts: int, interval_unit: IntervalUnit) -> t.List[int]:
2113    croniter = interval_unit.croniter(start_ts)
2114    timestamps = [start_ts]
2115
2116    while True:
2117        ts = to_timestamp(croniter.get_next(estimate=True))
2118
2119        if ts > end_ts:
2120            if timestamps and timestamps[-1] != end_ts:
2121                timestamps.append(end_ts)
2122            break
2123
2124        timestamps.append(ts)
2125    return timestamps
@lru_cache(maxsize=16384)
def compute_missing_intervals( interval_unit: sqlmesh.core.node.IntervalUnit, intervals: Tuple[Tuple[int, int], ...], start_ts: int, end_ts: int, lookback: int, model_end_ts: Optional[int]) -> List[Tuple[int, int]]:
2128@lru_cache(maxsize=16384)
2129def compute_missing_intervals(
2130    interval_unit: IntervalUnit,
2131    intervals: t.Tuple[Interval, ...],
2132    start_ts: int,
2133    end_ts: int,
2134    lookback: int,
2135    model_end_ts: t.Optional[int],
2136) -> Intervals:
2137    """Computes all missing intervals between start and end given intervals.
2138
2139    Args:
2140        interval_unit: The interval unit.
2141        intervals: The intervals to check what's missing.
2142        start_ts: Inclusive timestamp start.
2143        end_ts: Exclusive timestamp end.
2144        lookback: A lookback window.
2145        model_end_ts: The exclusive end timestamp set on the model (if one is set)
2146
2147    Returns:
2148        A list of all timestamps in this range.
2149    """
2150    if start_ts == end_ts:
2151        return []
2152
2153    timestamps = expand_range(start_ts, end_ts, interval_unit)
2154    missing = set()
2155
2156    for current_ts, next_ts in zip(timestamps, timestamps[1:]):
2157        for low, high in intervals:
2158            if current_ts < low:
2159                missing.add((current_ts, next_ts))
2160                break
2161            elif current_ts >= low and next_ts <= high:
2162                break
2163        else:
2164            missing.add((current_ts, next_ts))
2165
2166    if missing:
2167        if lookback:
2168            if model_end_ts:
2169                croniter = interval_unit.croniter(end_ts)
2170                end_ts = to_timestamp(croniter.get_prev(estimate=True))
2171
2172                while model_end_ts < end_ts:
2173                    end_ts = to_timestamp(croniter.get_prev(estimate=True))
2174                    lookback -= 1
2175
2176                lookback = max(lookback, 0)
2177
2178            for i, (current_ts, next_ts) in enumerate(zip(timestamps, timestamps[1:])):
2179                parent = timestamps[i + lookback : i + lookback + 2]
2180
2181                if len(parent) < 2 or tuple(parent) in missing:
2182                    missing.add((current_ts, next_ts))
2183
2184        if model_end_ts:
2185            missing = {interval for interval in missing if interval[0] < model_end_ts}
2186
2187    return sorted(missing)

Computes all missing intervals between start and end given intervals.

Arguments:
  • interval_unit: The interval unit.
  • intervals: The intervals to check what's missing.
  • start_ts: Inclusive timestamp start.
  • end_ts: Exclusive timestamp end.
  • lookback: A lookback window.
  • model_end_ts: The exclusive end timestamp set on the model (if one is set)
Returns:

A list of all timestamps in this range.

@lru_cache(maxsize=16384)
def inclusive_exclusive( start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], interval_unit: sqlmesh.core.node.IntervalUnit, strict: bool = True, allow_partial: bool = False, expand: bool = True) -> Tuple[int, int]:
2190@lru_cache(maxsize=16384)
2191def inclusive_exclusive(
2192    start: TimeLike,
2193    end: TimeLike,
2194    interval_unit: IntervalUnit,
2195    strict: bool = True,
2196    allow_partial: bool = False,
2197    expand: bool = True,
2198) -> Interval:
2199    """Transform the inclusive start and end into a [start, end) pair.
2200
2201    Args:
2202        start: The start date/time of the interval (inclusive)
2203        end: The end date/time of the interval (inclusive)
2204        interval_unit: The interval unit.
2205        strict: Whether to fail when the inclusive start is the same as the exclusive end.
2206        allow_partial: Whether the interval can be partial or not.
2207        expand: Whether or not partial intervals are expanded outwards.
2208
2209    Returns:
2210        A [start, end) pair.
2211    """
2212    start_dt = interval_unit.cron_floor(start)
2213
2214    if not expand and not allow_partial and start_dt < to_datetime(start):
2215        start_dt = interval_unit.cron_next(start_dt)
2216
2217    start_ts = to_timestamp(start_dt)
2218
2219    if is_date(end):
2220        end = to_datetime(end) + timedelta(days=1)
2221
2222    if allow_partial:
2223        end_dt = end
2224    else:
2225        end_dt = interval_unit.cron_floor(end)
2226
2227        if expand and end_dt != to_datetime(end):
2228            end_dt = interval_unit.cron_next(end_dt)
2229
2230    end_ts = to_timestamp(end_dt)
2231
2232    if strict and start_ts >= end_ts:
2233        raise ValueError(
2234            f"`end` ({to_datetime(end_ts)}) must be greater than `start` ({to_datetime(start_ts)})"
2235        )
2236
2237    return (start_ts, end_ts)

Transform the inclusive start and end into a [start, end) pair.

Arguments:
  • start: The start date/time of the interval (inclusive)
  • end: The end date/time of the interval (inclusive)
  • interval_unit: The interval unit.
  • strict: Whether to fail when the inclusive start is the same as the exclusive end.
  • allow_partial: Whether the interval can be partial or not.
  • expand: Whether or not partial intervals are expanded outwards.
Returns:

A [start, end) pair.

def earliest_start_date( snapshots: Union[Collection[Snapshot], Dict[SnapshotId, Snapshot]], cache: Optional[Dict[str, datetime.datetime]] = None, relative_to: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> datetime.datetime:
2240def earliest_start_date(
2241    snapshots: t.Union[t.Collection[Snapshot], t.Dict[SnapshotId, Snapshot]],
2242    cache: t.Optional[t.Dict[str, datetime]] = None,
2243    relative_to: t.Optional[TimeLike] = None,
2244) -> datetime:
2245    """Get the earliest start date from a collection of snapshots.
2246
2247    Args:
2248        snapshots: Snapshots to find earliest start date.
2249        cache: optional cache to make computing cache date more efficient
2250        relative_to: the base date to compute start from if inferred from cron
2251    Returns:
2252        The earliest start date or yesterday if none is found.
2253    """
2254    cache = {} if cache is None else cache
2255    if snapshots:
2256        if not isinstance(snapshots, dict):
2257            # Make sure that the mapping is only constructed once
2258            snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
2259        return min(
2260            start_date(snapshot, snapshots, cache=cache, relative_to=relative_to)
2261            for snapshot in snapshots.values()
2262        )
2263
2264    relative_base = None
2265    if relative_to is not None:
2266        relative_base = to_datetime(relative_to)
2267
2268    return yesterday(relative_base=relative_base)

Get the earliest start date from a collection of snapshots.

Arguments:
  • snapshots: Snapshots to find earliest start date.
  • cache: optional cache to make computing cache date more efficient
  • relative_to: the base date to compute start from if inferred from cron
Returns:

The earliest start date or yesterday if none is found.

def start_date( snapshot: Snapshot, snapshots: Union[Dict[SnapshotId, Snapshot], Iterable[Snapshot]], cache: Optional[Dict[str, datetime.datetime]] = None, relative_to: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> datetime.datetime:
2271def start_date(
2272    snapshot: Snapshot,
2273    snapshots: t.Dict[SnapshotId, Snapshot] | t.Iterable[Snapshot],
2274    cache: t.Optional[t.Dict[str, datetime]] = None,
2275    relative_to: t.Optional[TimeLike] = None,
2276) -> datetime:
2277    """Get the effective/inferred start date for a snapshot.
2278
2279    Not all snapshots define a start date. In those cases, the node's start date
2280    can be inferred from its parent's start date or from its cron.
2281
2282    Args:
2283        snapshot: snapshot to infer start date.
2284        snapshots: a catalog of available snapshots.
2285        cache: optional cache to make computing cache date more efficient
2286        relative_to: the base date to compute start from if inferred from cron
2287
2288    Returns:
2289        Start datetime object.
2290    """
2291    cache = {} if cache is None else cache
2292    key = f"{snapshot.name}_{to_timestamp(relative_to)}" if relative_to else snapshot.name
2293    if key in cache:
2294        return cache[key]
2295    if snapshot.node.start:
2296        start = to_datetime(snapshot.node.start)
2297        cache[key] = start
2298        return start
2299
2300    if not isinstance(snapshots, dict):
2301        snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
2302
2303    parent_starts = [
2304        start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to)
2305        for parent in snapshot.parents
2306        if parent in snapshots
2307    ]
2308    earliest = (
2309        min(parent_starts)
2310        if parent_starts
2311        else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now()))
2312    )
2313
2314    cache[key] = earliest
2315    return earliest

Get the effective/inferred start date for a snapshot.

Not all snapshots define a start date. In those cases, the node's start date can be inferred from its parent's start date or from its cron.

Arguments:
  • snapshot: snapshot to infer start date.
  • snapshots: a catalog of available snapshots.
  • cache: optional cache to make computing cache date more efficient
  • relative_to: the base date to compute start from if inferred from cron
Returns:

Start datetime object.

def snapshots_to_dag( snapshots: Collection[Snapshot]) -> sqlmesh.utils.dag.DAG[SnapshotId]:
2318def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
2319    dag: DAG[SnapshotId] = DAG()
2320    for snapshot in snapshots:
2321        dag.add(snapshot.snapshot_id, snapshot.parents)
2322    return dag
def apply_auto_restatements( snapshots: Dict[SnapshotId, Snapshot], execution_time: Union[datetime.date, datetime.datetime, str, int, float]) -> Tuple[List[SnapshotIntervals], Dict[SnapshotId, List[SnapshotId]]]:
2325def apply_auto_restatements(
2326    snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike
2327) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, t.List[SnapshotId]]]:
2328    """Applies auto restatements to the snapshots.
2329
2330    This operation results in the removal of intervals for snapshots that are ready to be restated based
2331    on the provided execution time and configured auto restatement settings. For each affected snapshot,
2332    it also updates the next auto restatement timestamp.
2333
2334    Args:
2335        snapshots: A dictionary of snapshots to apply auto restatements to.
2336        execution_time: The execution time.
2337
2338    Returns:
2339        A list of SnapshotIntervals with **new** intervals that need to be restated.
2340    """
2341    dag = snapshots_to_dag(snapshots.values())
2342    auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
2343    auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {}
2344    for s_id in dag:
2345        if s_id not in snapshots:
2346            continue
2347        snapshot = snapshots[s_id]
2348        if not snapshot.is_model or snapshot.model.disable_restatement:
2349            continue
2350
2351        next_auto_restated_interval = snapshot.get_next_auto_restatement_interval(execution_time)
2352        auto_restated_intervals = [
2353            auto_restated_intervals_per_snapshot[parent_s_id]
2354            for parent_s_id in snapshot.parents
2355            if parent_s_id in auto_restated_intervals_per_snapshot
2356        ]
2357        upstream_triggers = []
2358        if next_auto_restated_interval:
2359            logger.info(
2360                "Calculated the next auto restated interval (%s, %s) for snapshot %s",
2361                time_like_to_str(next_auto_restated_interval[0]),
2362                time_like_to_str(next_auto_restated_interval[1]),
2363                snapshot.snapshot_id,
2364            )
2365            auto_restated_intervals.append(next_auto_restated_interval)
2366
2367            # auto-restated snapshot is its own trigger
2368            upstream_triggers = [s_id]
2369        else:
2370            # inherit each parent's auto-restatement triggers (if any)
2371            for parent_s_id in snapshot.parents:
2372                if parent_s_id in auto_restatement_triggers:
2373                    upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
2374
2375        # remove duplicate triggers, retaining order and keeping first seen of duplicates
2376        if upstream_triggers:
2377            auto_restatement_triggers[s_id] = unique(upstream_triggers)
2378
2379        if auto_restated_intervals:
2380            auto_restated_interval_start = sys.maxsize
2381            auto_restated_interval_end = -sys.maxsize
2382            for interval in auto_restated_intervals:
2383                auto_restated_interval_start = min(auto_restated_interval_start, interval[0])
2384                auto_restated_interval_end = max(auto_restated_interval_end, interval[1])
2385
2386            interval_to_remove_start = snapshot.node.interval_unit.cron_floor(
2387                auto_restated_interval_start
2388            )
2389            interval_to_remove_end = snapshot.node.interval_unit.cron_floor(
2390                auto_restated_interval_end
2391            )
2392            if auto_restated_interval_end > to_timestamp(interval_to_remove_end):
2393                interval_to_remove_end = snapshot.node.interval_unit.cron_next(
2394                    interval_to_remove_end
2395                )
2396
2397            removal_interval = snapshot.get_removal_interval(
2398                interval_to_remove_start, interval_to_remove_end, execution_time=execution_time
2399            )
2400
2401            auto_restated_intervals_per_snapshot[s_id] = removal_interval
2402            snapshot.pending_restatement_intervals = merge_intervals(
2403                [*snapshot.pending_restatement_intervals, removal_interval]
2404            )
2405
2406        snapshot.apply_pending_restatement_intervals()
2407        snapshot.update_next_auto_restatement_ts(execution_time)
2408    return (
2409        [
2410            SnapshotIntervals(
2411                name=snapshots[s_id].name,
2412                identifier=None,
2413                version=snapshots[s_id].version,
2414                dev_version=None,
2415                intervals=[],
2416                dev_intervals=[],
2417                pending_restatement_intervals=[interval],
2418            )
2419            for s_id, interval in auto_restated_intervals_per_snapshot.items()
2420            if s_id in snapshots
2421        ],
2422        auto_restatement_triggers,
2423    )

Applies auto restatements to the snapshots.

This operation results in the removal of intervals for snapshots that are ready to be restated based on the provided execution time and configured auto restatement settings. For each affected snapshot, it also updates the next auto restatement timestamp.

Arguments:
  • snapshots: A dictionary of snapshots to apply auto restatements to.
  • execution_time: The execution time.
Returns:

A list of SnapshotIntervals with new intervals that need to be restated.

def parent_snapshots_by_name( snapshot: Snapshot, snapshots: Dict[SnapshotId, Snapshot]) -> Dict[str, Snapshot]:
2426def parent_snapshots_by_name(
2427    snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot]
2428) -> t.Dict[str, Snapshot]:
2429    parent_snapshots_by_name = {
2430        snapshots[p_sid].name: snapshots[p_sid] for p_sid in snapshot.parents
2431    }
2432    parent_snapshots_by_name[snapshot.name] = snapshot
2433    return parent_snapshots_by_name
def check_ready_intervals( check: Callable, intervals: List[Tuple[int, int]], context: sqlmesh.core.context.ExecutionContext, python_env: Dict[str, sqlmesh.utils.metaprogramming.Executable], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, path: Optional[pathlib.Path] = None, snapshot: Optional[Snapshot] = None, kwargs: Optional[Dict] = None) -> List[Tuple[int, int]]:
2453def check_ready_intervals(
2454    check: t.Callable,
2455    intervals: Intervals,
2456    context: ExecutionContext,
2457    python_env: t.Dict[str, Executable],
2458    dialect: DialectType = None,
2459    path: t.Optional[Path] = None,
2460    snapshot: t.Optional[Snapshot] = None,
2461    kwargs: t.Optional[t.Dict] = None,
2462) -> Intervals:
2463    checked_intervals: Intervals = []
2464
2465    for interval_batch in _contiguous_intervals(intervals):
2466        batch = [(to_datetime(start), to_datetime(end)) for start, end in interval_batch]
2467
2468        try:
2469            ready_intervals = call_macro(
2470                check,
2471                dialect,
2472                path,
2473                provided_args=(batch,),
2474                provided_kwargs=(kwargs or {}),
2475                context=context,
2476                snapshot=snapshot,
2477            )
2478        except Exception as ex:
2479            raise SignalEvalError(format_evaluated_code_exception(ex, python_env))
2480
2481        if isinstance(ready_intervals, bool):
2482            if not ready_intervals:
2483                batch = []
2484        elif isinstance(ready_intervals, list):
2485            for i in ready_intervals:
2486                if i not in batch:
2487                    raise SignalEvalError(f"Unknown interval {i} for signal")
2488            batch = ready_intervals
2489        else:
2490            raise SignalEvalError(f"Expected bool | list, got {type(ready_intervals)} for signal")
2491
2492        checked_intervals.extend((to_timestamp(start), to_timestamp(end)) for start, end in batch)
2493
2494    return checked_intervals
def get_next_model_interval_start( snapshots: Iterable[Snapshot]) -> Optional[datetime.datetime]:
2497def get_next_model_interval_start(snapshots: t.Iterable[Snapshot]) -> t.Optional[datetime]:
2498    now_dt = now()
2499
2500    starts = [
2501        snap.node.cron_next(now_dt)
2502        for snap in snapshots
2503        if snap.is_model and not snap.is_symbolic and not snap.is_seed
2504    ]
2505
2506    return min(starts) if starts else None