Edit on GitHub

sqlmesh.core.state_sync.base

  1from __future__ import annotations
  2
  3import abc
  4import importlib
  5import logging
  6import pkgutil
  7import typing as t
  8
  9from sqlglot import __version__ as SQLGLOT_VERSION
 10
 11from sqlmesh import migrations
 12from sqlmesh.core.environment import (
 13    Environment,
 14    EnvironmentStatements,
 15    EnvironmentSummary,
 16)
 17from sqlmesh.core.snapshot import (
 18    Snapshot,
 19    SnapshotId,
 20    SnapshotIdLike,
 21    SnapshotIdAndVersionLike,
 22    SnapshotInfoLike,
 23    SnapshotNameVersion,
 24    SnapshotIdAndVersion,
 25)
 26from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
 27from sqlmesh.utils import major_minor
 28from sqlmesh.utils.date import TimeLike
 29from sqlmesh.utils.errors import SQLMeshError
 30from sqlmesh.utils.pydantic import PydanticModel, field_validator
 31from sqlmesh.core.state_sync.common import (
 32    StateStream,
 33    ExpiredSnapshotBatch,
 34    PromotionResult,
 35    ExpiredBatchRange,
 36)
 37
 38logger = logging.getLogger(__name__)
 39
 40
 41class Versions(PydanticModel):
 42    """Represents the various versions of dependencies in the state sync."""
 43
 44    schema_version: int = 0
 45    sqlglot_version: str = "0.0.0"
 46    sqlmesh_version: str = "0.0.0"
 47
 48    @property
 49    def minor_sqlglot_version(self) -> t.Tuple[int, int]:
 50        return major_minor(self.sqlglot_version)
 51
 52    @property
 53    def minor_sqlmesh_version(self) -> t.Tuple[int, int]:
 54        return major_minor(self.sqlmesh_version)
 55
 56    @field_validator("sqlglot_version", "sqlmesh_version", mode="before")
 57    @classmethod
 58    def _package_version_validator(cls, v: t.Any) -> str:
 59        return "0.0.0" if v is None else str(v)
 60
 61    @field_validator("schema_version", mode="before")
 62    @classmethod
 63    def _schema_version_validator(cls, v: t.Any) -> int:
 64        return 0 if v is None else int(v)
 65
 66
 67MIN_SCHEMA_VERSION = 60
 68MIN_SQLMESH_VERSION = "0.134.0"
 69MIGRATIONS = [
 70    importlib.import_module(f"sqlmesh.migrations.{migration}")
 71    for migration in sorted(info.name for info in pkgutil.iter_modules(migrations.__path__))
 72]
 73# -1 to account for the baseline script
 74SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1
 75
 76
 77class StateReader(abc.ABC):
 78    """Abstract base class for read-only operations on snapshot and environment state."""
 79
 80    @abc.abstractmethod
 81    def get_snapshots(
 82        self, snapshot_ids: t.Iterable[SnapshotIdLike]
 83    ) -> t.Dict[SnapshotId, Snapshot]:
 84        """Bulk fetch snapshots given the corresponding snapshot ids.
 85
 86        Args:
 87            snapshot_ids: Iterable of snapshot ids to get.
 88
 89        Returns:
 90            A dictionary of snapshot ids to snapshots for ones that could be found.
 91        """
 92
 93    @abc.abstractmethod
 94    def get_snapshots_by_names(
 95        self,
 96        snapshot_names: t.Iterable[str],
 97        current_ts: t.Optional[int] = None,
 98        exclude_expired: bool = True,
 99    ) -> t.Set[SnapshotIdAndVersion]:
100        """Return the snapshot records for all versions of the specified snapshot names.
101
102        Args:
103            snapshot_names: Iterable of snapshot names to fetch all snapshot records for
104            current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
105            exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
106
107        Returns:
108            A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()
109        """
110
111    @abc.abstractmethod
112    def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
113        """Checks if multiple snapshots exist in the state sync.
114
115        Args:
116            snapshot_ids: Iterable of snapshot ids to bulk check.
117
118        Returns:
119            A set of all the existing snapshot ids.
120        """
121
122    @abc.abstractmethod
123    def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]:
124        """Updates given snapshots with latest intervals from the state.
125
126        Args:
127            snapshots: The snapshots to refresh.
128
129        Returns:
130            The updated snapshots.
131        """
132
133    @abc.abstractmethod
134    def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]:
135        """Returns the node names that exist in the state sync.
136
137        Args:
138            names: Iterable of node names to check.
139            exclude_external: Whether to exclude external models from the output.
140
141        Returns:
142            A set of all the existing node names.
143        """
144
145    @abc.abstractmethod
146    def get_environment(self, environment: str) -> t.Optional[Environment]:
147        """Fetches the environment if it exists.
148
149        Args:
150            environment: The environment
151
152        Returns:
153            The environment object.
154        """
155
156    @abc.abstractmethod
157    def get_environments(self) -> t.List[Environment]:
158        """Fetches all environments.
159
160        Returns:
161            A list of all environments.
162        """
163
164    @abc.abstractmethod
165    def get_environments_summary(self) -> t.List[EnvironmentSummary]:
166        """Fetches all environment names along with expiry datetime.
167
168        Returns:
169            A list of all environment summaries.
170        """
171
172    @abc.abstractmethod
173    def max_interval_end_per_model(
174        self,
175        environment: str,
176        models: t.Optional[t.Set[str]] = None,
177        ensure_finalized_snapshots: bool = False,
178    ) -> t.Dict[str, int]:
179        """Returns the max interval end per model for the given environment.
180
181        Args:
182            environment: The target environment.
183            models: The models to get the max interval end for. If None, all models are considered.
184            ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state,
185                or to use whatever snapshots are in the current environment state even if the environment is not finalized.
186
187        Returns:
188            A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.
189        """
190
191    @abc.abstractmethod
192    def recycle(self) -> None:
193        """Closes all open connections and releases all allocated resources associated with any thread
194        except the calling one."""
195
196    @abc.abstractmethod
197    def close(self) -> None:
198        """Closes all open connections and releases all allocated resources."""
199
200    @abc.abstractmethod
201    def state_type(self) -> str:
202        """Returns the type of state sync."""
203
204    @abc.abstractmethod
205    def update_auto_restatements(
206        self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]]
207    ) -> None:
208        """Updates the next auto restatement timestamp for the snapshots.
209
210        Args:
211            next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
212        """
213
214    @abc.abstractmethod
215    def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]:
216        """Fetches environment statements from the environment_statements table.
217
218        Returns:
219            A list of the Environment Statements.
220        """
221
222    def get_versions(self, validate: bool = True) -> Versions:
223        """Get the current versions of the SQLMesh schema and libraries.
224
225        Args:
226            validate: Whether or not to raise error if the running version is different from what's in state.
227
228        Returns:
229            The versions object.
230        """
231        from sqlmesh._version import __version__ as SQLMESH_VERSION
232
233        versions = self._get_versions()
234
235        if validate:
236
237            def raise_error(
238                lib: str,
239                local: str | int,
240                remote: str | int,
241                remote_package_version: t.Optional[str] = None,
242                ahead: bool = False,
243            ) -> None:
244                if ahead:
245                    raise SQLMeshError(
246                        f"{lib} (local) is using version '{local}' which is ahead of '{remote}' (remote). "
247                        "Please run a migration ('sqlmesh migrate' command)."
248                    )
249
250                if remote_package_version:
251                    upgrade_suggestion = f" Please upgrade {lib} ('pip install --upgrade \"{lib.lower()}=={remote_package_version}\"' command)."
252                else:
253                    upgrade_suggestion = ""
254
255                raise SQLMeshError(
256                    f"{lib} (local) is using version '{local}' which is behind '{remote}' (remote).{upgrade_suggestion}"
257                )
258
259            if major_minor(SQLMESH_VERSION) != major_minor(versions.sqlmesh_version):
260                raise_error(
261                    "SQLMesh",
262                    SQLMESH_VERSION,
263                    versions.sqlmesh_version,
264                    remote_package_version=versions.sqlmesh_version,
265                    ahead=major_minor(SQLMESH_VERSION) > major_minor(versions.sqlmesh_version),
266                )
267
268            if SCHEMA_VERSION != versions.schema_version:
269                raise_error(
270                    "SQLMesh",
271                    SCHEMA_VERSION,
272                    versions.schema_version,
273                    remote_package_version=versions.sqlmesh_version,
274                    ahead=SCHEMA_VERSION > versions.schema_version,
275                )
276
277            if major_minor(SQLGLOT_VERSION) != major_minor(versions.sqlglot_version):
278                raise_error(
279                    "SQLGlot",
280                    SQLGLOT_VERSION,
281                    versions.sqlglot_version,
282                    remote_package_version=versions.sqlglot_version,
283                    ahead=major_minor(SQLGLOT_VERSION) > major_minor(versions.sqlglot_version),
284                )
285
286        return versions
287
288    @abc.abstractmethod
289    def _get_versions(self) -> Versions:
290        """Queries the store to get the current versions of SQLMesh and deps.
291
292        Returns:
293            The versions object.
294        """
295
296    @abc.abstractmethod
297    def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream:
298        """Export the contents of this StateSync as a StateStream
299
300        Args:
301            environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
302        """
303
304    @abc.abstractmethod
305    def get_expired_snapshots(
306        self,
307        *,
308        batch_range: ExpiredBatchRange,
309        current_ts: t.Optional[int] = None,
310        ignore_ttl: bool = False,
311    ) -> t.Optional[ExpiredSnapshotBatch]:
312        """Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
313
314        Args:
315            current_ts: Timestamp used to evaluate expiration.
316            ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
317            batch_range: The range of the batch to fetch.
318
319        Returns:
320            A batch describing expired snapshots or None if no snapshots are pending cleanup.
321        """
322
323    @abc.abstractmethod
324    def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
325        """Returns the expired environments.
326
327        Expired environments are environments that have exceeded their time-to-live value.
328        Returns:
329            The list of environment summaries to remove.
330        """
331
332
333class StateSync(StateReader, abc.ABC):
334    """Abstract base class for snapshot and environment state management."""
335
336    @abc.abstractmethod
337    def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None:
338        """Push snapshots into the state sync.
339
340        This method only allows for pushing new snapshots. If existing snapshots are found,
341        this method should raise an error.
342
343        Raises:
344            SQLMeshError when existing snapshots are pushed.
345
346        Args:
347            snapshots: A list of snapshots to save in the state sync.
348        """
349
350    @abc.abstractmethod
351    def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
352        """Delete snapshots from the state sync.
353
354        Args:
355            snapshot_ids: A list of snapshot like objects to delete.
356        """
357
358    @abc.abstractmethod
359    def delete_expired_snapshots(
360        self,
361        batch_range: ExpiredBatchRange,
362        ignore_ttl: bool = False,
363        current_ts: t.Optional[int] = None,
364    ) -> None:
365        """Removes expired snapshots.
366
367        Expired snapshots are snapshots that have exceeded their time-to-live
368        and are no longer in use within an environment.
369
370        Args:
371            batch_range: The range of snapshots to delete in this batch.
372            ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
373                all snapshots that are not referenced in any environment
374            current_ts: Timestamp used to evaluate expiration.
375        """
376
377    @abc.abstractmethod
378    def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
379        """Invalidates the target environment by setting its expiration timestamp to now.
380
381        Args:
382            name: The name of the environment to invalidate.
383            protect_prod: If True, prevents invalidation of the production environment.
384        """
385
386    @abc.abstractmethod
387    def remove_state(self, including_backup: bool = False) -> None:
388        """Removes the state store objects."""
389
390    @abc.abstractmethod
391    def remove_intervals(
392        self,
393        snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]],
394        remove_shared_versions: bool = False,
395    ) -> None:
396        """Remove an interval from a list of snapshots and sync it to the store.
397
398        Because multiple snapshots can be pointing to the same version or physical table, this method
399        can also grab all snapshots tied to the passed in version.
400
401        Args:
402            snapshot_intervals: The snapshot intervals to remove.
403            remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
404        """
405
406    @abc.abstractmethod
407    def promote(
408        self,
409        environment: Environment,
410        no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
411        environment_statements: t.Optional[t.List[EnvironmentStatements]] = None,
412    ) -> PromotionResult:
413        """Update the environment to reflect the current state.
414
415        This method verifies that snapshots have been pushed.
416
417        Args:
418            environment: The environment to promote.
419            no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
420                all snapshots will be checked. The data gap check ensures that models that are already a
421                part of the target environment have no data gaps when compared against previous
422                snapshots for same models.
423
424        Returns:
425           A tuple of (added snapshot table infos, removed snapshot table infos)
426        """
427
428    @abc.abstractmethod
429    def finalize(self, environment: Environment) -> None:
430        """Finalize the target environment, indicating that this environment has been
431        fully promoted and is ready for use.
432
433        Args:
434            environment: The target environment to finalize.
435        """
436
437    @abc.abstractmethod
438    def delete_expired_environments(
439        self, current_ts: t.Optional[int] = None
440    ) -> t.List[EnvironmentSummary]:
441        """Removes expired environments.
442
443        Expired environments are environments that have exceeded their time-to-live value.
444
445        Returns:
446            The list of removed environments.
447        """
448
449    @abc.abstractmethod
450    def unpause_snapshots(
451        self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
452    ) -> None:
453        """Unpauses target snapshots.
454
455        Unpaused snapshots are scheduled for evaluation on a recurring basis.
456        Once unpaused a snapshot can't be paused again.
457
458        Args:
459            snapshots: Target snapshots.
460            unpaused_dt: The datetime object which indicates when target snapshots
461                were unpaused.
462        """
463
464    @abc.abstractmethod
465    def compact_intervals(self) -> None:
466        """Compacts intervals for all snapshots.
467
468        Compaction process involves merging of existing interval records into new records and
469        then deleting the old ones.
470        """
471
472    @abc.abstractmethod
473    def migrate(
474        self,
475        skip_backup: bool = False,
476        promoted_snapshots_only: bool = True,
477    ) -> None:
478        """Migrate the state sync to the latest SQLMesh / SQLGlot version."""
479
480    @abc.abstractmethod
481    def rollback(self) -> None:
482        """Rollback to previous backed up state."""
483
484    @abc.abstractmethod
485    def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
486        """Add snapshot intervals to state
487
488        Args:
489            snapshots_intervals: The intervals to add.
490        """
491
492    def add_interval(
493        self,
494        snapshot: Snapshot,
495        start: TimeLike,
496        end: TimeLike,
497        is_dev: bool = False,
498        last_altered_ts: t.Optional[int] = None,
499    ) -> None:
500        """Add an interval to a snapshot and sync it to the store.
501
502        Args:
503            snapshot: The snapshot like object to add an interval to.
504            start: The start of the interval to add.
505            end: The end of the interval to add.
506            is_dev: Indicates whether the given interval is being added while in development mode
507            last_altered_ts: The timestamp of the last modification of the physical table
508        """
509        start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False)
510        if not snapshot.version:
511            raise SQLMeshError("Snapshot version must be set to add an interval.")
512        intervals = [(start_ts, end_ts)]
513        snapshot_intervals = SnapshotIntervals(
514            name=snapshot.name,
515            identifier=snapshot.identifier,
516            version=snapshot.version,
517            dev_version=snapshot.dev_version,
518            intervals=intervals if not is_dev else [],
519            dev_intervals=intervals if is_dev else [],
520            last_altered_ts=last_altered_ts if not is_dev else None,
521            dev_last_altered_ts=last_altered_ts if is_dev else None,
522        )
523        self.add_snapshots_intervals([snapshot_intervals])
524
525    @abc.abstractmethod
526    def import_(self, stream: StateStream, clear: bool = True) -> None:
527        """
528        Replace the existing state with the state contained in the StateStream
529
530        Args:
531            stream: The stream of new state
532            clear: Whether or not to clear existing state before inserting state from the stream
533        """
534
535
536class DelegatingStateSync(StateSync):
537    def __init__(self, state_sync: StateSync) -> None:
538        self.state_sync = state_sync
539
540
541def _create_delegate_method(name: str) -> t.Callable:
542    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
543        return getattr(self.state_sync, name)(*args, **kwargs)
544
545    return delegate
546
547
548DelegatingStateSync.__abstractmethods__ = frozenset()
549for name in StateSync.__abstractmethods__:
550    setattr(DelegatingStateSync, name, _create_delegate_method(name))
logger = <Logger sqlmesh.core.state_sync.base (WARNING)>
class Versions(sqlmesh.utils.pydantic.PydanticModel):
42class Versions(PydanticModel):
43    """Represents the various versions of dependencies in the state sync."""
44
45    schema_version: int = 0
46    sqlglot_version: str = "0.0.0"
47    sqlmesh_version: str = "0.0.0"
48
49    @property
50    def minor_sqlglot_version(self) -> t.Tuple[int, int]:
51        return major_minor(self.sqlglot_version)
52
53    @property
54    def minor_sqlmesh_version(self) -> t.Tuple[int, int]:
55        return major_minor(self.sqlmesh_version)
56
57    @field_validator("sqlglot_version", "sqlmesh_version", mode="before")
58    @classmethod
59    def _package_version_validator(cls, v: t.Any) -> str:
60        return "0.0.0" if v is None else str(v)
61
62    @field_validator("schema_version", mode="before")
63    @classmethod
64    def _schema_version_validator(cls, v: t.Any) -> int:
65        return 0 if v is None else int(v)

Represents the various versions of dependencies in the state sync.

schema_version: int
sqlglot_version: str
sqlmesh_version: str
minor_sqlglot_version: Tuple[int, int]
49    @property
50    def minor_sqlglot_version(self) -> t.Tuple[int, int]:
51        return major_minor(self.sqlglot_version)
minor_sqlmesh_version: Tuple[int, int]
53    @property
54    def minor_sqlmesh_version(self) -> t.Tuple[int, int]:
55        return major_minor(self.sqlmesh_version)
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
MIN_SCHEMA_VERSION = 60
MIN_SQLMESH_VERSION = '0.134.0'
MIGRATIONS = [<module 'sqlmesh.migrations.v0000_baseline' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0000_baseline.py'>, <module 'sqlmesh.migrations.v0061_mysql_fix_blob_text_type' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py'>, <module 'sqlmesh.migrations.v0062_add_model_gateway' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0062_add_model_gateway.py'>, <module 'sqlmesh.migrations.v0063_change_signals' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0063_change_signals.py'>, <module 'sqlmesh.migrations.v0064_join_when_matched_strings' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0064_join_when_matched_strings.py'>, <module 'sqlmesh.migrations.v0065_add_model_optimize' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0065_add_model_optimize.py'>, <module 'sqlmesh.migrations.v0066_add_auto_restatements' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0066_add_auto_restatements.py'>, <module 'sqlmesh.migrations.v0067_add_tsql_date_full_precision' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0067_add_tsql_date_full_precision.py'>, <module 'sqlmesh.migrations.v0068_include_unrendered_query_in_metadata_hash' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0068_include_unrendered_query_in_metadata_hash.py'>, <module 'sqlmesh.migrations.v0069_update_dev_table_suffix' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0069_update_dev_table_suffix.py'>, <module 'sqlmesh.migrations.v0070_include_grains_in_metadata_hash' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0070_include_grains_in_metadata_hash.py'>, <module 'sqlmesh.migrations.v0071_add_dev_version_to_intervals' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0071_add_dev_version_to_intervals.py'>, <module 'sqlmesh.migrations.v0072_add_environment_statements' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0072_add_environment_statements.py'>, <module 'sqlmesh.migrations.v0073_remove_symbolic_disable_restatement' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0073_remove_symbolic_disable_restatement.py'>, <module 'sqlmesh.migrations.v0074_add_partition_by_time_column_property' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py'>, <module 'sqlmesh.migrations.v0075_remove_validate_query' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0075_remove_validate_query.py'>, <module 'sqlmesh.migrations.v0076_add_cron_tz' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0076_add_cron_tz.py'>, <module 'sqlmesh.migrations.v0077_fix_column_type_hash_calculation' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0077_fix_column_type_hash_calculation.py'>, <module 'sqlmesh.migrations.v0078_warn_if_non_migratable_python_env' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0078_warn_if_non_migratable_python_env.py'>, <module 'sqlmesh.migrations.v0079_add_gateway_managed_property' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0079_add_gateway_managed_property.py'>, <module 'sqlmesh.migrations.v0080_add_batch_size_to_scd_type_2_models' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0080_add_batch_size_to_scd_type_2_models.py'>, <module 'sqlmesh.migrations.v0081_update_partitioned_by' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0081_update_partitioned_by.py'>, <module 'sqlmesh.migrations.v0082_warn_if_incorrectly_duplicated_statements' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py'>, <module 'sqlmesh.migrations.v0083_use_sql_for_scd_time_data_type_data_hash' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0083_use_sql_for_scd_time_data_type_data_hash.py'>, <module 'sqlmesh.migrations.v0084_normalize_quote_when_matched_and_merge_filter' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0084_normalize_quote_when_matched_and_merge_filter.py'>, <module 'sqlmesh.migrations.v0085_deterministic_repr' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0085_deterministic_repr.py'>, <module 'sqlmesh.migrations.v0086_check_deterministic_bug' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0086_check_deterministic_bug.py'>, <module 'sqlmesh.migrations.v0087_normalize_blueprint_variables' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0087_normalize_blueprint_variables.py'>, <module 'sqlmesh.migrations.v0088_warn_about_variable_python_env_diffs' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0088_warn_about_variable_python_env_diffs.py'>, <module 'sqlmesh.migrations.v0089_add_virtual_environment_mode' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0089_add_virtual_environment_mode.py'>, <module 'sqlmesh.migrations.v0090_add_forward_only_column' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0090_add_forward_only_column.py'>, <module 'sqlmesh.migrations.v0091_on_additive_change' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0091_on_additive_change.py'>, <module 'sqlmesh.migrations.v0092_warn_about_dbt_data_type_diff' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0092_warn_about_dbt_data_type_diff.py'>, <module 'sqlmesh.migrations.v0093_use_raw_sql_in_fingerprint' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0093_use_raw_sql_in_fingerprint.py'>, <module 'sqlmesh.migrations.v0094_add_dev_version_and_fingerprint_columns' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0094_add_dev_version_and_fingerprint_columns.py'>, <module 'sqlmesh.migrations.v0095_warn_about_dbt_raw_sql_diff' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0095_warn_about_dbt_raw_sql_diff.py'>, <module 'sqlmesh.migrations.v0096_remove_plan_dags_table' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0096_remove_plan_dags_table.py'>, <module 'sqlmesh.migrations.v0097_add_dbt_name_in_node' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0097_add_dbt_name_in_node.py'>, <module 'sqlmesh.migrations.v0098_add_dbt_node_info_in_node' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py'>, <module 'sqlmesh.migrations.v0099_add_last_altered_to_intervals' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0099_add_last_altered_to_intervals.py'>, <module 'sqlmesh.migrations.v0100_add_grants_and_grants_target_layer' from '/home/docs/checkouts/readthedocs.org/user_builds/sqlmesh/checkouts/latest/sqlmesh/migrations/v0100_add_grants_and_grants_target_layer.py'>]
SCHEMA_VERSION: int = 100
class StateReader(abc.ABC):
 78class StateReader(abc.ABC):
 79    """Abstract base class for read-only operations on snapshot and environment state."""
 80
 81    @abc.abstractmethod
 82    def get_snapshots(
 83        self, snapshot_ids: t.Iterable[SnapshotIdLike]
 84    ) -> t.Dict[SnapshotId, Snapshot]:
 85        """Bulk fetch snapshots given the corresponding snapshot ids.
 86
 87        Args:
 88            snapshot_ids: Iterable of snapshot ids to get.
 89
 90        Returns:
 91            A dictionary of snapshot ids to snapshots for ones that could be found.
 92        """
 93
 94    @abc.abstractmethod
 95    def get_snapshots_by_names(
 96        self,
 97        snapshot_names: t.Iterable[str],
 98        current_ts: t.Optional[int] = None,
 99        exclude_expired: bool = True,
100    ) -> t.Set[SnapshotIdAndVersion]:
101        """Return the snapshot records for all versions of the specified snapshot names.
102
103        Args:
104            snapshot_names: Iterable of snapshot names to fetch all snapshot records for
105            current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
106            exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
107
108        Returns:
109            A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()
110        """
111
112    @abc.abstractmethod
113    def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
114        """Checks if multiple snapshots exist in the state sync.
115
116        Args:
117            snapshot_ids: Iterable of snapshot ids to bulk check.
118
119        Returns:
120            A set of all the existing snapshot ids.
121        """
122
123    @abc.abstractmethod
124    def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]:
125        """Updates given snapshots with latest intervals from the state.
126
127        Args:
128            snapshots: The snapshots to refresh.
129
130        Returns:
131            The updated snapshots.
132        """
133
134    @abc.abstractmethod
135    def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]:
136        """Returns the node names that exist in the state sync.
137
138        Args:
139            names: Iterable of node names to check.
140            exclude_external: Whether to exclude external models from the output.
141
142        Returns:
143            A set of all the existing node names.
144        """
145
146    @abc.abstractmethod
147    def get_environment(self, environment: str) -> t.Optional[Environment]:
148        """Fetches the environment if it exists.
149
150        Args:
151            environment: The environment
152
153        Returns:
154            The environment object.
155        """
156
157    @abc.abstractmethod
158    def get_environments(self) -> t.List[Environment]:
159        """Fetches all environments.
160
161        Returns:
162            A list of all environments.
163        """
164
165    @abc.abstractmethod
166    def get_environments_summary(self) -> t.List[EnvironmentSummary]:
167        """Fetches all environment names along with expiry datetime.
168
169        Returns:
170            A list of all environment summaries.
171        """
172
173    @abc.abstractmethod
174    def max_interval_end_per_model(
175        self,
176        environment: str,
177        models: t.Optional[t.Set[str]] = None,
178        ensure_finalized_snapshots: bool = False,
179    ) -> t.Dict[str, int]:
180        """Returns the max interval end per model for the given environment.
181
182        Args:
183            environment: The target environment.
184            models: The models to get the max interval end for. If None, all models are considered.
185            ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state,
186                or to use whatever snapshots are in the current environment state even if the environment is not finalized.
187
188        Returns:
189            A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.
190        """
191
192    @abc.abstractmethod
193    def recycle(self) -> None:
194        """Closes all open connections and releases all allocated resources associated with any thread
195        except the calling one."""
196
197    @abc.abstractmethod
198    def close(self) -> None:
199        """Closes all open connections and releases all allocated resources."""
200
201    @abc.abstractmethod
202    def state_type(self) -> str:
203        """Returns the type of state sync."""
204
205    @abc.abstractmethod
206    def update_auto_restatements(
207        self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]]
208    ) -> None:
209        """Updates the next auto restatement timestamp for the snapshots.
210
211        Args:
212            next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
213        """
214
215    @abc.abstractmethod
216    def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]:
217        """Fetches environment statements from the environment_statements table.
218
219        Returns:
220            A list of the Environment Statements.
221        """
222
223    def get_versions(self, validate: bool = True) -> Versions:
224        """Get the current versions of the SQLMesh schema and libraries.
225
226        Args:
227            validate: Whether or not to raise error if the running version is different from what's in state.
228
229        Returns:
230            The versions object.
231        """
232        from sqlmesh._version import __version__ as SQLMESH_VERSION
233
234        versions = self._get_versions()
235
236        if validate:
237
238            def raise_error(
239                lib: str,
240                local: str | int,
241                remote: str | int,
242                remote_package_version: t.Optional[str] = None,
243                ahead: bool = False,
244            ) -> None:
245                if ahead:
246                    raise SQLMeshError(
247                        f"{lib} (local) is using version '{local}' which is ahead of '{remote}' (remote). "
248                        "Please run a migration ('sqlmesh migrate' command)."
249                    )
250
251                if remote_package_version:
252                    upgrade_suggestion = f" Please upgrade {lib} ('pip install --upgrade \"{lib.lower()}=={remote_package_version}\"' command)."
253                else:
254                    upgrade_suggestion = ""
255
256                raise SQLMeshError(
257                    f"{lib} (local) is using version '{local}' which is behind '{remote}' (remote).{upgrade_suggestion}"
258                )
259
260            if major_minor(SQLMESH_VERSION) != major_minor(versions.sqlmesh_version):
261                raise_error(
262                    "SQLMesh",
263                    SQLMESH_VERSION,
264                    versions.sqlmesh_version,
265                    remote_package_version=versions.sqlmesh_version,
266                    ahead=major_minor(SQLMESH_VERSION) > major_minor(versions.sqlmesh_version),
267                )
268
269            if SCHEMA_VERSION != versions.schema_version:
270                raise_error(
271                    "SQLMesh",
272                    SCHEMA_VERSION,
273                    versions.schema_version,
274                    remote_package_version=versions.sqlmesh_version,
275                    ahead=SCHEMA_VERSION > versions.schema_version,
276                )
277
278            if major_minor(SQLGLOT_VERSION) != major_minor(versions.sqlglot_version):
279                raise_error(
280                    "SQLGlot",
281                    SQLGLOT_VERSION,
282                    versions.sqlglot_version,
283                    remote_package_version=versions.sqlglot_version,
284                    ahead=major_minor(SQLGLOT_VERSION) > major_minor(versions.sqlglot_version),
285                )
286
287        return versions
288
289    @abc.abstractmethod
290    def _get_versions(self) -> Versions:
291        """Queries the store to get the current versions of SQLMesh and deps.
292
293        Returns:
294            The versions object.
295        """
296
297    @abc.abstractmethod
298    def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream:
299        """Export the contents of this StateSync as a StateStream
300
301        Args:
302            environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
303        """
304
305    @abc.abstractmethod
306    def get_expired_snapshots(
307        self,
308        *,
309        batch_range: ExpiredBatchRange,
310        current_ts: t.Optional[int] = None,
311        ignore_ttl: bool = False,
312    ) -> t.Optional[ExpiredSnapshotBatch]:
313        """Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
314
315        Args:
316            current_ts: Timestamp used to evaluate expiration.
317            ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
318            batch_range: The range of the batch to fetch.
319
320        Returns:
321            A batch describing expired snapshots or None if no snapshots are pending cleanup.
322        """
323
324    @abc.abstractmethod
325    def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
326        """Returns the expired environments.
327
328        Expired environments are environments that have exceeded their time-to-live value.
329        Returns:
330            The list of environment summaries to remove.
331        """

Abstract base class for read-only operations on snapshot and environment state.

81    @abc.abstractmethod
82    def get_snapshots(
83        self, snapshot_ids: t.Iterable[SnapshotIdLike]
84    ) -> t.Dict[SnapshotId, Snapshot]:
85        """Bulk fetch snapshots given the corresponding snapshot ids.
86
87        Args:
88            snapshot_ids: Iterable of snapshot ids to get.
89
90        Returns:
91            A dictionary of snapshot ids to snapshots for ones that could be found.
92        """

Bulk fetch snapshots given the corresponding snapshot ids.

Arguments:
  • snapshot_ids: Iterable of snapshot ids to get.
Returns:

A dictionary of snapshot ids to snapshots for ones that could be found.

@abc.abstractmethod
def get_snapshots_by_names( self, snapshot_names: Iterable[str], current_ts: Optional[int] = None, exclude_expired: bool = True) -> Set[sqlmesh.core.snapshot.definition.SnapshotIdAndVersion]:
 94    @abc.abstractmethod
 95    def get_snapshots_by_names(
 96        self,
 97        snapshot_names: t.Iterable[str],
 98        current_ts: t.Optional[int] = None,
 99        exclude_expired: bool = True,
100    ) -> t.Set[SnapshotIdAndVersion]:
101        """Return the snapshot records for all versions of the specified snapshot names.
102
103        Args:
104            snapshot_names: Iterable of snapshot names to fetch all snapshot records for
105            current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
106            exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
107
108        Returns:
109            A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()
110        """

Return the snapshot records for all versions of the specified snapshot names.

Arguments:
  • snapshot_names: Iterable of snapshot names to fetch all snapshot records for
  • current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
  • exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
Returns:

A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()

112    @abc.abstractmethod
113    def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
114        """Checks if multiple snapshots exist in the state sync.
115
116        Args:
117            snapshot_ids: Iterable of snapshot ids to bulk check.
118
119        Returns:
120            A set of all the existing snapshot ids.
121        """

Checks if multiple snapshots exist in the state sync.

Arguments:
  • snapshot_ids: Iterable of snapshot ids to bulk check.
Returns:

A set of all the existing snapshot ids.

@abc.abstractmethod
def refresh_snapshot_intervals( self, snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot]) -> List[sqlmesh.core.snapshot.definition.Snapshot]:
123    @abc.abstractmethod
124    def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]:
125        """Updates given snapshots with latest intervals from the state.
126
127        Args:
128            snapshots: The snapshots to refresh.
129
130        Returns:
131            The updated snapshots.
132        """

Updates given snapshots with latest intervals from the state.

Arguments:
  • snapshots: The snapshots to refresh.
Returns:

The updated snapshots.

@abc.abstractmethod
def nodes_exist(self, names: Iterable[str], exclude_external: bool = False) -> Set[str]:
134    @abc.abstractmethod
135    def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]:
136        """Returns the node names that exist in the state sync.
137
138        Args:
139            names: Iterable of node names to check.
140            exclude_external: Whether to exclude external models from the output.
141
142        Returns:
143            A set of all the existing node names.
144        """

Returns the node names that exist in the state sync.

Arguments:
  • names: Iterable of node names to check.
  • exclude_external: Whether to exclude external models from the output.
Returns:

A set of all the existing node names.

@abc.abstractmethod
def get_environment(self, environment: str) -> Optional[sqlmesh.core.environment.Environment]:
146    @abc.abstractmethod
147    def get_environment(self, environment: str) -> t.Optional[Environment]:
148        """Fetches the environment if it exists.
149
150        Args:
151            environment: The environment
152
153        Returns:
154            The environment object.
155        """

Fetches the environment if it exists.

Arguments:
  • environment: The environment
Returns:

The environment object.

@abc.abstractmethod
def get_environments(self) -> List[sqlmesh.core.environment.Environment]:
157    @abc.abstractmethod
158    def get_environments(self) -> t.List[Environment]:
159        """Fetches all environments.
160
161        Returns:
162            A list of all environments.
163        """

Fetches all environments.

Returns:

A list of all environments.

@abc.abstractmethod
def get_environments_summary(self) -> List[sqlmesh.core.environment.EnvironmentSummary]:
165    @abc.abstractmethod
166    def get_environments_summary(self) -> t.List[EnvironmentSummary]:
167        """Fetches all environment names along with expiry datetime.
168
169        Returns:
170            A list of all environment summaries.
171        """

Fetches all environment names along with expiry datetime.

Returns:

A list of all environment summaries.

@abc.abstractmethod
def max_interval_end_per_model( self, environment: str, models: Optional[Set[str]] = None, ensure_finalized_snapshots: bool = False) -> Dict[str, int]:
173    @abc.abstractmethod
174    def max_interval_end_per_model(
175        self,
176        environment: str,
177        models: t.Optional[t.Set[str]] = None,
178        ensure_finalized_snapshots: bool = False,
179    ) -> t.Dict[str, int]:
180        """Returns the max interval end per model for the given environment.
181
182        Args:
183            environment: The target environment.
184            models: The models to get the max interval end for. If None, all models are considered.
185            ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state,
186                or to use whatever snapshots are in the current environment state even if the environment is not finalized.
187
188        Returns:
189            A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.
190        """

Returns the max interval end per model for the given environment.

Arguments:
  • environment: The target environment.
  • models: The models to get the max interval end for. If None, all models are considered.
  • ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:

A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.

@abc.abstractmethod
def recycle(self) -> None:
192    @abc.abstractmethod
193    def recycle(self) -> None:
194        """Closes all open connections and releases all allocated resources associated with any thread
195        except the calling one."""

Closes all open connections and releases all allocated resources associated with any thread except the calling one.

@abc.abstractmethod
def close(self) -> None:
197    @abc.abstractmethod
198    def close(self) -> None:
199        """Closes all open connections and releases all allocated resources."""

Closes all open connections and releases all allocated resources.

@abc.abstractmethod
def state_type(self) -> str:
201    @abc.abstractmethod
202    def state_type(self) -> str:
203        """Returns the type of state sync."""

Returns the type of state sync.

@abc.abstractmethod
def update_auto_restatements( self, next_auto_restatement_ts: Dict[sqlmesh.core.snapshot.definition.SnapshotNameVersion, Optional[int]]) -> None:
205    @abc.abstractmethod
206    def update_auto_restatements(
207        self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]]
208    ) -> None:
209        """Updates the next auto restatement timestamp for the snapshots.
210
211        Args:
212            next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
213        """

Updates the next auto restatement timestamp for the snapshots.

Arguments:
  • next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
@abc.abstractmethod
def get_environment_statements( self, environment: str) -> List[sqlmesh.core.environment.EnvironmentStatements]:
215    @abc.abstractmethod
216    def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]:
217        """Fetches environment statements from the environment_statements table.
218
219        Returns:
220            A list of the Environment Statements.
221        """

Fetches environment statements from the environment_statements table.

Returns:

A list of the Environment Statements.

def get_versions(self, validate: bool = True) -> Versions:
223    def get_versions(self, validate: bool = True) -> Versions:
224        """Get the current versions of the SQLMesh schema and libraries.
225
226        Args:
227            validate: Whether or not to raise error if the running version is different from what's in state.
228
229        Returns:
230            The versions object.
231        """
232        from sqlmesh._version import __version__ as SQLMESH_VERSION
233
234        versions = self._get_versions()
235
236        if validate:
237
238            def raise_error(
239                lib: str,
240                local: str | int,
241                remote: str | int,
242                remote_package_version: t.Optional[str] = None,
243                ahead: bool = False,
244            ) -> None:
245                if ahead:
246                    raise SQLMeshError(
247                        f"{lib} (local) is using version '{local}' which is ahead of '{remote}' (remote). "
248                        "Please run a migration ('sqlmesh migrate' command)."
249                    )
250
251                if remote_package_version:
252                    upgrade_suggestion = f" Please upgrade {lib} ('pip install --upgrade \"{lib.lower()}=={remote_package_version}\"' command)."
253                else:
254                    upgrade_suggestion = ""
255
256                raise SQLMeshError(
257                    f"{lib} (local) is using version '{local}' which is behind '{remote}' (remote).{upgrade_suggestion}"
258                )
259
260            if major_minor(SQLMESH_VERSION) != major_minor(versions.sqlmesh_version):
261                raise_error(
262                    "SQLMesh",
263                    SQLMESH_VERSION,
264                    versions.sqlmesh_version,
265                    remote_package_version=versions.sqlmesh_version,
266                    ahead=major_minor(SQLMESH_VERSION) > major_minor(versions.sqlmesh_version),
267                )
268
269            if SCHEMA_VERSION != versions.schema_version:
270                raise_error(
271                    "SQLMesh",
272                    SCHEMA_VERSION,
273                    versions.schema_version,
274                    remote_package_version=versions.sqlmesh_version,
275                    ahead=SCHEMA_VERSION > versions.schema_version,
276                )
277
278            if major_minor(SQLGLOT_VERSION) != major_minor(versions.sqlglot_version):
279                raise_error(
280                    "SQLGlot",
281                    SQLGLOT_VERSION,
282                    versions.sqlglot_version,
283                    remote_package_version=versions.sqlglot_version,
284                    ahead=major_minor(SQLGLOT_VERSION) > major_minor(versions.sqlglot_version),
285                )
286
287        return versions

Get the current versions of the SQLMesh schema and libraries.

Arguments:
  • validate: Whether or not to raise error if the running version is different from what's in state.
Returns:

The versions object.

@abc.abstractmethod
def export( self, environment_names: Optional[List[str]] = None) -> sqlmesh.core.state_sync.common.StateStream:
297    @abc.abstractmethod
298    def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream:
299        """Export the contents of this StateSync as a StateStream
300
301        Args:
302            environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
303        """

Export the contents of this StateSync as a StateStream

Arguments:
  • environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
@abc.abstractmethod
def get_expired_snapshots( self, *, batch_range: sqlmesh.core.state_sync.common.ExpiredBatchRange, current_ts: Optional[int] = None, ignore_ttl: bool = False) -> Optional[sqlmesh.core.state_sync.common.ExpiredSnapshotBatch]:
305    @abc.abstractmethod
306    def get_expired_snapshots(
307        self,
308        *,
309        batch_range: ExpiredBatchRange,
310        current_ts: t.Optional[int] = None,
311        ignore_ttl: bool = False,
312    ) -> t.Optional[ExpiredSnapshotBatch]:
313        """Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
314
315        Args:
316            current_ts: Timestamp used to evaluate expiration.
317            ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
318            batch_range: The range of the batch to fetch.
319
320        Returns:
321            A batch describing expired snapshots or None if no snapshots are pending cleanup.
322        """

Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).

Arguments:
  • current_ts: Timestamp used to evaluate expiration.
  • ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
  • batch_range: The range of the batch to fetch.
Returns:

A batch describing expired snapshots or None if no snapshots are pending cleanup.

@abc.abstractmethod
def get_expired_environments( self, current_ts: int) -> List[sqlmesh.core.environment.EnvironmentSummary]:
324    @abc.abstractmethod
325    def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
326        """Returns the expired environments.
327
328        Expired environments are environments that have exceeded their time-to-live value.
329        Returns:
330            The list of environment summaries to remove.
331        """

Returns the expired environments.

Expired environments are environments that have exceeded their time-to-live value.

Returns:

The list of environment summaries to remove.

class StateSync(StateReader, abc.ABC):
334class StateSync(StateReader, abc.ABC):
335    """Abstract base class for snapshot and environment state management."""
336
337    @abc.abstractmethod
338    def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None:
339        """Push snapshots into the state sync.
340
341        This method only allows for pushing new snapshots. If existing snapshots are found,
342        this method should raise an error.
343
344        Raises:
345            SQLMeshError when existing snapshots are pushed.
346
347        Args:
348            snapshots: A list of snapshots to save in the state sync.
349        """
350
351    @abc.abstractmethod
352    def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
353        """Delete snapshots from the state sync.
354
355        Args:
356            snapshot_ids: A list of snapshot like objects to delete.
357        """
358
359    @abc.abstractmethod
360    def delete_expired_snapshots(
361        self,
362        batch_range: ExpiredBatchRange,
363        ignore_ttl: bool = False,
364        current_ts: t.Optional[int] = None,
365    ) -> None:
366        """Removes expired snapshots.
367
368        Expired snapshots are snapshots that have exceeded their time-to-live
369        and are no longer in use within an environment.
370
371        Args:
372            batch_range: The range of snapshots to delete in this batch.
373            ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
374                all snapshots that are not referenced in any environment
375            current_ts: Timestamp used to evaluate expiration.
376        """
377
378    @abc.abstractmethod
379    def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
380        """Invalidates the target environment by setting its expiration timestamp to now.
381
382        Args:
383            name: The name of the environment to invalidate.
384            protect_prod: If True, prevents invalidation of the production environment.
385        """
386
387    @abc.abstractmethod
388    def remove_state(self, including_backup: bool = False) -> None:
389        """Removes the state store objects."""
390
391    @abc.abstractmethod
392    def remove_intervals(
393        self,
394        snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]],
395        remove_shared_versions: bool = False,
396    ) -> None:
397        """Remove an interval from a list of snapshots and sync it to the store.
398
399        Because multiple snapshots can be pointing to the same version or physical table, this method
400        can also grab all snapshots tied to the passed in version.
401
402        Args:
403            snapshot_intervals: The snapshot intervals to remove.
404            remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
405        """
406
407    @abc.abstractmethod
408    def promote(
409        self,
410        environment: Environment,
411        no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
412        environment_statements: t.Optional[t.List[EnvironmentStatements]] = None,
413    ) -> PromotionResult:
414        """Update the environment to reflect the current state.
415
416        This method verifies that snapshots have been pushed.
417
418        Args:
419            environment: The environment to promote.
420            no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
421                all snapshots will be checked. The data gap check ensures that models that are already a
422                part of the target environment have no data gaps when compared against previous
423                snapshots for same models.
424
425        Returns:
426           A tuple of (added snapshot table infos, removed snapshot table infos)
427        """
428
429    @abc.abstractmethod
430    def finalize(self, environment: Environment) -> None:
431        """Finalize the target environment, indicating that this environment has been
432        fully promoted and is ready for use.
433
434        Args:
435            environment: The target environment to finalize.
436        """
437
438    @abc.abstractmethod
439    def delete_expired_environments(
440        self, current_ts: t.Optional[int] = None
441    ) -> t.List[EnvironmentSummary]:
442        """Removes expired environments.
443
444        Expired environments are environments that have exceeded their time-to-live value.
445
446        Returns:
447            The list of removed environments.
448        """
449
450    @abc.abstractmethod
451    def unpause_snapshots(
452        self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
453    ) -> None:
454        """Unpauses target snapshots.
455
456        Unpaused snapshots are scheduled for evaluation on a recurring basis.
457        Once unpaused a snapshot can't be paused again.
458
459        Args:
460            snapshots: Target snapshots.
461            unpaused_dt: The datetime object which indicates when target snapshots
462                were unpaused.
463        """
464
465    @abc.abstractmethod
466    def compact_intervals(self) -> None:
467        """Compacts intervals for all snapshots.
468
469        Compaction process involves merging of existing interval records into new records and
470        then deleting the old ones.
471        """
472
473    @abc.abstractmethod
474    def migrate(
475        self,
476        skip_backup: bool = False,
477        promoted_snapshots_only: bool = True,
478    ) -> None:
479        """Migrate the state sync to the latest SQLMesh / SQLGlot version."""
480
481    @abc.abstractmethod
482    def rollback(self) -> None:
483        """Rollback to previous backed up state."""
484
485    @abc.abstractmethod
486    def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
487        """Add snapshot intervals to state
488
489        Args:
490            snapshots_intervals: The intervals to add.
491        """
492
493    def add_interval(
494        self,
495        snapshot: Snapshot,
496        start: TimeLike,
497        end: TimeLike,
498        is_dev: bool = False,
499        last_altered_ts: t.Optional[int] = None,
500    ) -> None:
501        """Add an interval to a snapshot and sync it to the store.
502
503        Args:
504            snapshot: The snapshot like object to add an interval to.
505            start: The start of the interval to add.
506            end: The end of the interval to add.
507            is_dev: Indicates whether the given interval is being added while in development mode
508            last_altered_ts: The timestamp of the last modification of the physical table
509        """
510        start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False)
511        if not snapshot.version:
512            raise SQLMeshError("Snapshot version must be set to add an interval.")
513        intervals = [(start_ts, end_ts)]
514        snapshot_intervals = SnapshotIntervals(
515            name=snapshot.name,
516            identifier=snapshot.identifier,
517            version=snapshot.version,
518            dev_version=snapshot.dev_version,
519            intervals=intervals if not is_dev else [],
520            dev_intervals=intervals if is_dev else [],
521            last_altered_ts=last_altered_ts if not is_dev else None,
522            dev_last_altered_ts=last_altered_ts if is_dev else None,
523        )
524        self.add_snapshots_intervals([snapshot_intervals])
525
526    @abc.abstractmethod
527    def import_(self, stream: StateStream, clear: bool = True) -> None:
528        """
529        Replace the existing state with the state contained in the StateStream
530
531        Args:
532            stream: The stream of new state
533            clear: Whether or not to clear existing state before inserting state from the stream
534        """

Abstract base class for snapshot and environment state management.

@abc.abstractmethod
def push_snapshots( self, snapshots: Iterable[sqlmesh.core.snapshot.definition.Snapshot]) -> None:
337    @abc.abstractmethod
338    def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None:
339        """Push snapshots into the state sync.
340
341        This method only allows for pushing new snapshots. If existing snapshots are found,
342        this method should raise an error.
343
344        Raises:
345            SQLMeshError when existing snapshots are pushed.
346
347        Args:
348            snapshots: A list of snapshots to save in the state sync.
349        """

Push snapshots into the state sync.

This method only allows for pushing new snapshots. If existing snapshots are found, this method should raise an error.

Raises:
  • SQLMeshError when existing snapshots are pushed.
Arguments:
  • snapshots: A list of snapshots to save in the state sync.
351    @abc.abstractmethod
352    def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
353        """Delete snapshots from the state sync.
354
355        Args:
356            snapshot_ids: A list of snapshot like objects to delete.
357        """

Delete snapshots from the state sync.

Arguments:
  • snapshot_ids: A list of snapshot like objects to delete.
@abc.abstractmethod
def delete_expired_snapshots( self, batch_range: sqlmesh.core.state_sync.common.ExpiredBatchRange, ignore_ttl: bool = False, current_ts: Optional[int] = None) -> None:
359    @abc.abstractmethod
360    def delete_expired_snapshots(
361        self,
362        batch_range: ExpiredBatchRange,
363        ignore_ttl: bool = False,
364        current_ts: t.Optional[int] = None,
365    ) -> None:
366        """Removes expired snapshots.
367
368        Expired snapshots are snapshots that have exceeded their time-to-live
369        and are no longer in use within an environment.
370
371        Args:
372            batch_range: The range of snapshots to delete in this batch.
373            ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
374                all snapshots that are not referenced in any environment
375            current_ts: Timestamp used to evaluate expiration.
376        """

Removes expired snapshots.

Expired snapshots are snapshots that have exceeded their time-to-live and are no longer in use within an environment.

Arguments:
  • batch_range: The range of snapshots to delete in this batch.
  • ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting all snapshots that are not referenced in any environment
  • current_ts: Timestamp used to evaluate expiration.
@abc.abstractmethod
def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
378    @abc.abstractmethod
379    def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
380        """Invalidates the target environment by setting its expiration timestamp to now.
381
382        Args:
383            name: The name of the environment to invalidate.
384            protect_prod: If True, prevents invalidation of the production environment.
385        """

Invalidates the target environment by setting its expiration timestamp to now.

Arguments:
  • name: The name of the environment to invalidate.
  • protect_prod: If True, prevents invalidation of the production environment.
@abc.abstractmethod
def remove_state(self, including_backup: bool = False) -> None:
387    @abc.abstractmethod
388    def remove_state(self, including_backup: bool = False) -> None:
389        """Removes the state store objects."""

Removes the state store objects.

@abc.abstractmethod
def remove_intervals( self, snapshot_intervals: Sequence[Tuple[Union[sqlmesh.core.snapshot.definition.SnapshotIdAndVersion, sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot], Tuple[int, int]]], remove_shared_versions: bool = False) -> None:
391    @abc.abstractmethod
392    def remove_intervals(
393        self,
394        snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]],
395        remove_shared_versions: bool = False,
396    ) -> None:
397        """Remove an interval from a list of snapshots and sync it to the store.
398
399        Because multiple snapshots can be pointing to the same version or physical table, this method
400        can also grab all snapshots tied to the passed in version.
401
402        Args:
403            snapshot_intervals: The snapshot intervals to remove.
404            remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
405        """

Remove an interval from a list of snapshots and sync it to the store.

Because multiple snapshots can be pointing to the same version or physical table, this method can also grab all snapshots tied to the passed in version.

Arguments:
  • snapshot_intervals: The snapshot intervals to remove.
  • remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
@abc.abstractmethod
def promote( self, environment: sqlmesh.core.environment.Environment, no_gaps_snapshot_names: Optional[Set[str]] = None, environment_statements: Optional[List[sqlmesh.core.environment.EnvironmentStatements]] = None) -> sqlmesh.core.state_sync.common.PromotionResult:
407    @abc.abstractmethod
408    def promote(
409        self,
410        environment: Environment,
411        no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
412        environment_statements: t.Optional[t.List[EnvironmentStatements]] = None,
413    ) -> PromotionResult:
414        """Update the environment to reflect the current state.
415
416        This method verifies that snapshots have been pushed.
417
418        Args:
419            environment: The environment to promote.
420            no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
421                all snapshots will be checked. The data gap check ensures that models that are already a
422                part of the target environment have no data gaps when compared against previous
423                snapshots for same models.
424
425        Returns:
426           A tuple of (added snapshot table infos, removed snapshot table infos)
427        """

Update the environment to reflect the current state.

This method verifies that snapshots have been pushed.

Arguments:
  • environment: The environment to promote.
  • no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, all snapshots will be checked. The data gap check ensures that models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
Returns:

A tuple of (added snapshot table infos, removed snapshot table infos)

@abc.abstractmethod
def finalize(self, environment: sqlmesh.core.environment.Environment) -> None:
429    @abc.abstractmethod
430    def finalize(self, environment: Environment) -> None:
431        """Finalize the target environment, indicating that this environment has been
432        fully promoted and is ready for use.
433
434        Args:
435            environment: The target environment to finalize.
436        """

Finalize the target environment, indicating that this environment has been fully promoted and is ready for use.

Arguments:
  • environment: The target environment to finalize.
@abc.abstractmethod
def delete_expired_environments( self, current_ts: Optional[int] = None) -> List[sqlmesh.core.environment.EnvironmentSummary]:
438    @abc.abstractmethod
439    def delete_expired_environments(
440        self, current_ts: t.Optional[int] = None
441    ) -> t.List[EnvironmentSummary]:
442        """Removes expired environments.
443
444        Expired environments are environments that have exceeded their time-to-live value.
445
446        Returns:
447            The list of removed environments.
448        """

Removes expired environments.

Expired environments are environments that have exceeded their time-to-live value.

Returns:

The list of removed environments.

@abc.abstractmethod
def unpause_snapshots( self, snapshots: Collection[Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot]], unpaused_dt: Union[datetime.date, datetime.datetime, str, int, float]) -> None:
450    @abc.abstractmethod
451    def unpause_snapshots(
452        self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
453    ) -> None:
454        """Unpauses target snapshots.
455
456        Unpaused snapshots are scheduled for evaluation on a recurring basis.
457        Once unpaused a snapshot can't be paused again.
458
459        Args:
460            snapshots: Target snapshots.
461            unpaused_dt: The datetime object which indicates when target snapshots
462                were unpaused.
463        """

Unpauses target snapshots.

Unpaused snapshots are scheduled for evaluation on a recurring basis. Once unpaused a snapshot can't be paused again.

Arguments:
  • snapshots: Target snapshots.
  • unpaused_dt: The datetime object which indicates when target snapshots were unpaused.
@abc.abstractmethod
def compact_intervals(self) -> None:
465    @abc.abstractmethod
466    def compact_intervals(self) -> None:
467        """Compacts intervals for all snapshots.
468
469        Compaction process involves merging of existing interval records into new records and
470        then deleting the old ones.
471        """

Compacts intervals for all snapshots.

Compaction process involves merging of existing interval records into new records and then deleting the old ones.

@abc.abstractmethod
def migrate( self, skip_backup: bool = False, promoted_snapshots_only: bool = True) -> None:
473    @abc.abstractmethod
474    def migrate(
475        self,
476        skip_backup: bool = False,
477        promoted_snapshots_only: bool = True,
478    ) -> None:
479        """Migrate the state sync to the latest SQLMesh / SQLGlot version."""

Migrate the state sync to the latest SQLMesh / SQLGlot version.

@abc.abstractmethod
def rollback(self) -> None:
481    @abc.abstractmethod
482    def rollback(self) -> None:
483        """Rollback to previous backed up state."""

Rollback to previous backed up state.

@abc.abstractmethod
def add_snapshots_intervals( self, snapshots_intervals: Sequence[sqlmesh.core.snapshot.definition.SnapshotIntervals]) -> None:
485    @abc.abstractmethod
486    def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
487        """Add snapshot intervals to state
488
489        Args:
490            snapshots_intervals: The intervals to add.
491        """

Add snapshot intervals to state

Arguments:
  • snapshots_intervals: The intervals to add.
def add_interval( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], is_dev: bool = False, last_altered_ts: Optional[int] = None) -> None:
493    def add_interval(
494        self,
495        snapshot: Snapshot,
496        start: TimeLike,
497        end: TimeLike,
498        is_dev: bool = False,
499        last_altered_ts: t.Optional[int] = None,
500    ) -> None:
501        """Add an interval to a snapshot and sync it to the store.
502
503        Args:
504            snapshot: The snapshot like object to add an interval to.
505            start: The start of the interval to add.
506            end: The end of the interval to add.
507            is_dev: Indicates whether the given interval is being added while in development mode
508            last_altered_ts: The timestamp of the last modification of the physical table
509        """
510        start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False)
511        if not snapshot.version:
512            raise SQLMeshError("Snapshot version must be set to add an interval.")
513        intervals = [(start_ts, end_ts)]
514        snapshot_intervals = SnapshotIntervals(
515            name=snapshot.name,
516            identifier=snapshot.identifier,
517            version=snapshot.version,
518            dev_version=snapshot.dev_version,
519            intervals=intervals if not is_dev else [],
520            dev_intervals=intervals if is_dev else [],
521            last_altered_ts=last_altered_ts if not is_dev else None,
522            dev_last_altered_ts=last_altered_ts if is_dev else None,
523        )
524        self.add_snapshots_intervals([snapshot_intervals])

Add an interval to a snapshot and sync it to the store.

Arguments:
  • snapshot: The snapshot like object to add an interval to.
  • start: The start of the interval to add.
  • end: The end of the interval to add.
  • is_dev: Indicates whether the given interval is being added while in development mode
  • last_altered_ts: The timestamp of the last modification of the physical table
@abc.abstractmethod
def import_( self, stream: sqlmesh.core.state_sync.common.StateStream, clear: bool = True) -> None:
526    @abc.abstractmethod
527    def import_(self, stream: StateStream, clear: bool = True) -> None:
528        """
529        Replace the existing state with the state contained in the StateStream
530
531        Args:
532            stream: The stream of new state
533            clear: Whether or not to clear existing state before inserting state from the stream
534        """

Replace the existing state with the state contained in the StateStream

Arguments:
  • stream: The stream of new state
  • clear: Whether or not to clear existing state before inserting state from the stream
class DelegatingStateSync(StateSync):
537class DelegatingStateSync(StateSync):
538    def __init__(self, state_sync: StateSync) -> None:
539        self.state_sync = state_sync

Abstract base class for snapshot and environment state management.

DelegatingStateSync(state_sync: StateSync)
538    def __init__(self, state_sync: StateSync) -> None:
539        self.state_sync = state_sync
state_sync
def push_snapshots(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Push snapshots into the state sync.

This method only allows for pushing new snapshots. If existing snapshots are found, this method should raise an error.

Raises:
  • SQLMeshError when existing snapshots are pushed.
Arguments:
  • snapshots: A list of snapshots to save in the state sync.
def delete_snapshots(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Delete snapshots from the state sync.

Arguments:
  • snapshot_ids: A list of snapshot like objects to delete.
def delete_expired_snapshots(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Removes expired snapshots.

Expired snapshots are snapshots that have exceeded their time-to-live and are no longer in use within an environment.

Arguments:
  • batch_range: The range of snapshots to delete in this batch.
  • ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting all snapshots that are not referenced in any environment
  • current_ts: Timestamp used to evaluate expiration.
def invalidate_environment(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Invalidates the target environment by setting its expiration timestamp to now.

Arguments:
  • name: The name of the environment to invalidate.
  • protect_prod: If True, prevents invalidation of the production environment.
def remove_state(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Removes the state store objects.

def remove_intervals(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Remove an interval from a list of snapshots and sync it to the store.

Because multiple snapshots can be pointing to the same version or physical table, this method can also grab all snapshots tied to the passed in version.

Arguments:
  • snapshot_intervals: The snapshot intervals to remove.
  • remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
def promote(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Update the environment to reflect the current state.

This method verifies that snapshots have been pushed.

Arguments:
  • environment: The environment to promote.
  • no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, all snapshots will be checked. The data gap check ensures that models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
Returns:

A tuple of (added snapshot table infos, removed snapshot table infos)

def finalize(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Finalize the target environment, indicating that this environment has been fully promoted and is ready for use.

Arguments:
  • environment: The target environment to finalize.
def delete_expired_environments(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Removes expired environments.

Expired environments are environments that have exceeded their time-to-live value.

Returns:

The list of removed environments.

def unpause_snapshots(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Unpauses target snapshots.

Unpaused snapshots are scheduled for evaluation on a recurring basis. Once unpaused a snapshot can't be paused again.

Arguments:
  • snapshots: Target snapshots.
  • unpaused_dt: The datetime object which indicates when target snapshots were unpaused.
def compact_intervals(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Compacts intervals for all snapshots.

Compaction process involves merging of existing interval records into new records and then deleting the old ones.

def migrate(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Migrate the state sync to the latest SQLMesh / SQLGlot version.

def rollback(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Rollback to previous backed up state.

def add_snapshots_intervals(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Add snapshot intervals to state

Arguments:
  • snapshots_intervals: The intervals to add.
def import_(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Replace the existing state with the state contained in the StateStream

Arguments:
  • stream: The stream of new state
  • clear: Whether or not to clear existing state before inserting state from the stream
def get_snapshots(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Bulk fetch snapshots given the corresponding snapshot ids.

Arguments:
  • snapshot_ids: Iterable of snapshot ids to get.
Returns:

A dictionary of snapshot ids to snapshots for ones that could be found.

def get_snapshots_by_names(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Return the snapshot records for all versions of the specified snapshot names.

Arguments:
  • snapshot_names: Iterable of snapshot names to fetch all snapshot records for
  • current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
  • exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
Returns:

A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()

def snapshots_exist(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Checks if multiple snapshots exist in the state sync.

Arguments:
  • snapshot_ids: Iterable of snapshot ids to bulk check.
Returns:

A set of all the existing snapshot ids.

def refresh_snapshot_intervals(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Updates given snapshots with latest intervals from the state.

Arguments:
  • snapshots: The snapshots to refresh.
Returns:

The updated snapshots.

def nodes_exist(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Returns the node names that exist in the state sync.

Arguments:
  • names: Iterable of node names to check.
  • exclude_external: Whether to exclude external models from the output.
Returns:

A set of all the existing node names.

def get_environment(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Fetches the environment if it exists.

Arguments:
  • environment: The environment
Returns:

The environment object.

def get_environments(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Fetches all environments.

Returns:

A list of all environments.

def get_environments_summary(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Fetches all environment names along with expiry datetime.

Returns:

A list of all environment summaries.

def max_interval_end_per_model(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Returns the max interval end per model for the given environment.

Arguments:
  • environment: The target environment.
  • models: The models to get the max interval end for. If None, all models are considered.
  • ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:

A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.

def recycle(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Closes all open connections and releases all allocated resources associated with any thread except the calling one.

def close(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Closes all open connections and releases all allocated resources.

def state_type(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Returns the type of state sync.

def update_auto_restatements(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Updates the next auto restatement timestamp for the snapshots.

Arguments:
  • next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
def get_environment_statements(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Fetches environment statements from the environment_statements table.

Returns:

A list of the Environment Statements.

def export(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Export the contents of this StateSync as a StateStream

Arguments:
  • environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
def get_expired_snapshots(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).

Arguments:
  • current_ts: Timestamp used to evaluate expiration.
  • ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
  • batch_range: The range of the batch to fetch.
Returns:

A batch describing expired snapshots or None if no snapshots are pending cleanup.

def get_expired_environments(self: Any, *args: Any, **kwargs: Any) -> Any:
543    def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
544        return getattr(self.state_sync, name)(*args, **kwargs)

Returns the expired environments.

Expired environments are environments that have exceeded their time-to-live value.

Returns:

The list of environment summaries to remove.