sqlmesh.core.state_sync.base
1from __future__ import annotations 2 3import abc 4import importlib 5import logging 6import pkgutil 7import typing as t 8 9from sqlglot import __version__ as SQLGLOT_VERSION 10 11from sqlmesh import migrations 12from sqlmesh.core.environment import ( 13 Environment, 14 EnvironmentStatements, 15 EnvironmentSummary, 16) 17from sqlmesh.core.snapshot import ( 18 Snapshot, 19 SnapshotId, 20 SnapshotIdLike, 21 SnapshotIdAndVersionLike, 22 SnapshotInfoLike, 23 SnapshotNameVersion, 24 SnapshotIdAndVersion, 25) 26from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals 27from sqlmesh.utils import major_minor 28from sqlmesh.utils.date import TimeLike 29from sqlmesh.utils.errors import SQLMeshError 30from sqlmesh.utils.pydantic import PydanticModel, field_validator 31from sqlmesh.core.state_sync.common import ( 32 StateStream, 33 ExpiredSnapshotBatch, 34 PromotionResult, 35 ExpiredBatchRange, 36) 37 38logger = logging.getLogger(__name__) 39 40 41class Versions(PydanticModel): 42 """Represents the various versions of dependencies in the state sync.""" 43 44 schema_version: int = 0 45 sqlglot_version: str = "0.0.0" 46 sqlmesh_version: str = "0.0.0" 47 48 @property 49 def minor_sqlglot_version(self) -> t.Tuple[int, int]: 50 return major_minor(self.sqlglot_version) 51 52 @property 53 def minor_sqlmesh_version(self) -> t.Tuple[int, int]: 54 return major_minor(self.sqlmesh_version) 55 56 @field_validator("sqlglot_version", "sqlmesh_version", mode="before") 57 @classmethod 58 def _package_version_validator(cls, v: t.Any) -> str: 59 return "0.0.0" if v is None else str(v) 60 61 @field_validator("schema_version", mode="before") 62 @classmethod 63 def _schema_version_validator(cls, v: t.Any) -> int: 64 return 0 if v is None else int(v) 65 66 67MIN_SCHEMA_VERSION = 60 68MIN_SQLMESH_VERSION = "0.134.0" 69MIGRATIONS = [ 70 importlib.import_module(f"sqlmesh.migrations.{migration}") 71 for migration in sorted(info.name for info in pkgutil.iter_modules(migrations.__path__)) 72] 73# -1 to account for the baseline script 74SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1 75 76 77class StateReader(abc.ABC): 78 """Abstract base class for read-only operations on snapshot and environment state.""" 79 80 @abc.abstractmethod 81 def get_snapshots( 82 self, snapshot_ids: t.Iterable[SnapshotIdLike] 83 ) -> t.Dict[SnapshotId, Snapshot]: 84 """Bulk fetch snapshots given the corresponding snapshot ids. 85 86 Args: 87 snapshot_ids: Iterable of snapshot ids to get. 88 89 Returns: 90 A dictionary of snapshot ids to snapshots for ones that could be found. 91 """ 92 93 @abc.abstractmethod 94 def get_snapshots_by_names( 95 self, 96 snapshot_names: t.Iterable[str], 97 current_ts: t.Optional[int] = None, 98 exclude_expired: bool = True, 99 ) -> t.Set[SnapshotIdAndVersion]: 100 """Return the snapshot records for all versions of the specified snapshot names. 101 102 Args: 103 snapshot_names: Iterable of snapshot names to fetch all snapshot records for 104 current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True) 105 exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result 106 107 Returns: 108 A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots() 109 """ 110 111 @abc.abstractmethod 112 def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]: 113 """Checks if multiple snapshots exist in the state sync. 114 115 Args: 116 snapshot_ids: Iterable of snapshot ids to bulk check. 117 118 Returns: 119 A set of all the existing snapshot ids. 120 """ 121 122 @abc.abstractmethod 123 def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]: 124 """Updates given snapshots with latest intervals from the state. 125 126 Args: 127 snapshots: The snapshots to refresh. 128 129 Returns: 130 The updated snapshots. 131 """ 132 133 @abc.abstractmethod 134 def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]: 135 """Returns the node names that exist in the state sync. 136 137 Args: 138 names: Iterable of node names to check. 139 exclude_external: Whether to exclude external models from the output. 140 141 Returns: 142 A set of all the existing node names. 143 """ 144 145 @abc.abstractmethod 146 def get_environment(self, environment: str) -> t.Optional[Environment]: 147 """Fetches the environment if it exists. 148 149 Args: 150 environment: The environment 151 152 Returns: 153 The environment object. 154 """ 155 156 @abc.abstractmethod 157 def get_environments(self) -> t.List[Environment]: 158 """Fetches all environments. 159 160 Returns: 161 A list of all environments. 162 """ 163 164 @abc.abstractmethod 165 def get_environments_summary(self) -> t.List[EnvironmentSummary]: 166 """Fetches all environment names along with expiry datetime. 167 168 Returns: 169 A list of all environment summaries. 170 """ 171 172 @abc.abstractmethod 173 def max_interval_end_per_model( 174 self, 175 environment: str, 176 models: t.Optional[t.Set[str]] = None, 177 ensure_finalized_snapshots: bool = False, 178 ) -> t.Dict[str, int]: 179 """Returns the max interval end per model for the given environment. 180 181 Args: 182 environment: The target environment. 183 models: The models to get the max interval end for. If None, all models are considered. 184 ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, 185 or to use whatever snapshots are in the current environment state even if the environment is not finalized. 186 187 Returns: 188 A dictionary of model FQNs to their respective interval ends in milliseconds since epoch. 189 """ 190 191 @abc.abstractmethod 192 def recycle(self) -> None: 193 """Closes all open connections and releases all allocated resources associated with any thread 194 except the calling one.""" 195 196 @abc.abstractmethod 197 def close(self) -> None: 198 """Closes all open connections and releases all allocated resources.""" 199 200 @abc.abstractmethod 201 def state_type(self) -> str: 202 """Returns the type of state sync.""" 203 204 @abc.abstractmethod 205 def update_auto_restatements( 206 self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]] 207 ) -> None: 208 """Updates the next auto restatement timestamp for the snapshots. 209 210 Args: 211 next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp. 212 """ 213 214 @abc.abstractmethod 215 def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]: 216 """Fetches environment statements from the environment_statements table. 217 218 Returns: 219 A list of the Environment Statements. 220 """ 221 222 def get_versions(self, validate: bool = True) -> Versions: 223 """Get the current versions of the SQLMesh schema and libraries. 224 225 Args: 226 validate: Whether or not to raise error if the running version is different from what's in state. 227 228 Returns: 229 The versions object. 230 """ 231 from sqlmesh._version import __version__ as SQLMESH_VERSION 232 233 versions = self._get_versions() 234 235 if validate: 236 237 def raise_error( 238 lib: str, 239 local: str | int, 240 remote: str | int, 241 remote_package_version: t.Optional[str] = None, 242 ahead: bool = False, 243 ) -> None: 244 if ahead: 245 raise SQLMeshError( 246 f"{lib} (local) is using version '{local}' which is ahead of '{remote}' (remote). " 247 "Please run a migration ('sqlmesh migrate' command)." 248 ) 249 250 if remote_package_version: 251 upgrade_suggestion = f" Please upgrade {lib} ('pip install --upgrade \"{lib.lower()}=={remote_package_version}\"' command)." 252 else: 253 upgrade_suggestion = "" 254 255 raise SQLMeshError( 256 f"{lib} (local) is using version '{local}' which is behind '{remote}' (remote).{upgrade_suggestion}" 257 ) 258 259 if major_minor(SQLMESH_VERSION) != major_minor(versions.sqlmesh_version): 260 raise_error( 261 "SQLMesh", 262 SQLMESH_VERSION, 263 versions.sqlmesh_version, 264 remote_package_version=versions.sqlmesh_version, 265 ahead=major_minor(SQLMESH_VERSION) > major_minor(versions.sqlmesh_version), 266 ) 267 268 if SCHEMA_VERSION != versions.schema_version: 269 raise_error( 270 "SQLMesh", 271 SCHEMA_VERSION, 272 versions.schema_version, 273 remote_package_version=versions.sqlmesh_version, 274 ahead=SCHEMA_VERSION > versions.schema_version, 275 ) 276 277 if major_minor(SQLGLOT_VERSION) != major_minor(versions.sqlglot_version): 278 raise_error( 279 "SQLGlot", 280 SQLGLOT_VERSION, 281 versions.sqlglot_version, 282 remote_package_version=versions.sqlglot_version, 283 ahead=major_minor(SQLGLOT_VERSION) > major_minor(versions.sqlglot_version), 284 ) 285 286 return versions 287 288 @abc.abstractmethod 289 def _get_versions(self) -> Versions: 290 """Queries the store to get the current versions of SQLMesh and deps. 291 292 Returns: 293 The versions object. 294 """ 295 296 @abc.abstractmethod 297 def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream: 298 """Export the contents of this StateSync as a StateStream 299 300 Args: 301 environment_names: An optional list of environment names to export. If not specified, all environments will be exported. 302 """ 303 304 @abc.abstractmethod 305 def get_expired_snapshots( 306 self, 307 *, 308 batch_range: ExpiredBatchRange, 309 current_ts: t.Optional[int] = None, 310 ignore_ttl: bool = False, 311 ) -> t.Optional[ExpiredSnapshotBatch]: 312 """Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier). 313 314 Args: 315 current_ts: Timestamp used to evaluate expiration. 316 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 317 batch_range: The range of the batch to fetch. 318 319 Returns: 320 A batch describing expired snapshots or None if no snapshots are pending cleanup. 321 """ 322 323 @abc.abstractmethod 324 def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]: 325 """Returns the expired environments. 326 327 Expired environments are environments that have exceeded their time-to-live value. 328 Returns: 329 The list of environment summaries to remove. 330 """ 331 332 333class StateSync(StateReader, abc.ABC): 334 """Abstract base class for snapshot and environment state management.""" 335 336 @abc.abstractmethod 337 def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None: 338 """Push snapshots into the state sync. 339 340 This method only allows for pushing new snapshots. If existing snapshots are found, 341 this method should raise an error. 342 343 Raises: 344 SQLMeshError when existing snapshots are pushed. 345 346 Args: 347 snapshots: A list of snapshots to save in the state sync. 348 """ 349 350 @abc.abstractmethod 351 def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None: 352 """Delete snapshots from the state sync. 353 354 Args: 355 snapshot_ids: A list of snapshot like objects to delete. 356 """ 357 358 @abc.abstractmethod 359 def delete_expired_snapshots( 360 self, 361 batch_range: ExpiredBatchRange, 362 ignore_ttl: bool = False, 363 current_ts: t.Optional[int] = None, 364 ) -> None: 365 """Removes expired snapshots. 366 367 Expired snapshots are snapshots that have exceeded their time-to-live 368 and are no longer in use within an environment. 369 370 Args: 371 batch_range: The range of snapshots to delete in this batch. 372 ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting 373 all snapshots that are not referenced in any environment 374 current_ts: Timestamp used to evaluate expiration. 375 """ 376 377 @abc.abstractmethod 378 def invalidate_environment(self, name: str, protect_prod: bool = True) -> None: 379 """Invalidates the target environment by setting its expiration timestamp to now. 380 381 Args: 382 name: The name of the environment to invalidate. 383 protect_prod: If True, prevents invalidation of the production environment. 384 """ 385 386 @abc.abstractmethod 387 def remove_state(self, including_backup: bool = False) -> None: 388 """Removes the state store objects.""" 389 390 @abc.abstractmethod 391 def remove_intervals( 392 self, 393 snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]], 394 remove_shared_versions: bool = False, 395 ) -> None: 396 """Remove an interval from a list of snapshots and sync it to the store. 397 398 Because multiple snapshots can be pointing to the same version or physical table, this method 399 can also grab all snapshots tied to the passed in version. 400 401 Args: 402 snapshot_intervals: The snapshot intervals to remove. 403 remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots. 404 """ 405 406 @abc.abstractmethod 407 def promote( 408 self, 409 environment: Environment, 410 no_gaps_snapshot_names: t.Optional[t.Set[str]] = None, 411 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None, 412 ) -> PromotionResult: 413 """Update the environment to reflect the current state. 414 415 This method verifies that snapshots have been pushed. 416 417 Args: 418 environment: The environment to promote. 419 no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, 420 all snapshots will be checked. The data gap check ensures that models that are already a 421 part of the target environment have no data gaps when compared against previous 422 snapshots for same models. 423 424 Returns: 425 A tuple of (added snapshot table infos, removed snapshot table infos) 426 """ 427 428 @abc.abstractmethod 429 def finalize(self, environment: Environment) -> None: 430 """Finalize the target environment, indicating that this environment has been 431 fully promoted and is ready for use. 432 433 Args: 434 environment: The target environment to finalize. 435 """ 436 437 @abc.abstractmethod 438 def delete_expired_environments( 439 self, current_ts: t.Optional[int] = None 440 ) -> t.List[EnvironmentSummary]: 441 """Removes expired environments. 442 443 Expired environments are environments that have exceeded their time-to-live value. 444 445 Returns: 446 The list of removed environments. 447 """ 448 449 @abc.abstractmethod 450 def unpause_snapshots( 451 self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike 452 ) -> None: 453 """Unpauses target snapshots. 454 455 Unpaused snapshots are scheduled for evaluation on a recurring basis. 456 Once unpaused a snapshot can't be paused again. 457 458 Args: 459 snapshots: Target snapshots. 460 unpaused_dt: The datetime object which indicates when target snapshots 461 were unpaused. 462 """ 463 464 @abc.abstractmethod 465 def compact_intervals(self) -> None: 466 """Compacts intervals for all snapshots. 467 468 Compaction process involves merging of existing interval records into new records and 469 then deleting the old ones. 470 """ 471 472 @abc.abstractmethod 473 def migrate( 474 self, 475 skip_backup: bool = False, 476 promoted_snapshots_only: bool = True, 477 ) -> None: 478 """Migrate the state sync to the latest SQLMesh / SQLGlot version.""" 479 480 @abc.abstractmethod 481 def rollback(self) -> None: 482 """Rollback to previous backed up state.""" 483 484 @abc.abstractmethod 485 def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: 486 """Add snapshot intervals to state 487 488 Args: 489 snapshots_intervals: The intervals to add. 490 """ 491 492 def add_interval( 493 self, 494 snapshot: Snapshot, 495 start: TimeLike, 496 end: TimeLike, 497 is_dev: bool = False, 498 last_altered_ts: t.Optional[int] = None, 499 ) -> None: 500 """Add an interval to a snapshot and sync it to the store. 501 502 Args: 503 snapshot: The snapshot like object to add an interval to. 504 start: The start of the interval to add. 505 end: The end of the interval to add. 506 is_dev: Indicates whether the given interval is being added while in development mode 507 last_altered_ts: The timestamp of the last modification of the physical table 508 """ 509 start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False) 510 if not snapshot.version: 511 raise SQLMeshError("Snapshot version must be set to add an interval.") 512 intervals = [(start_ts, end_ts)] 513 snapshot_intervals = SnapshotIntervals( 514 name=snapshot.name, 515 identifier=snapshot.identifier, 516 version=snapshot.version, 517 dev_version=snapshot.dev_version, 518 intervals=intervals if not is_dev else [], 519 dev_intervals=intervals if is_dev else [], 520 last_altered_ts=last_altered_ts if not is_dev else None, 521 dev_last_altered_ts=last_altered_ts if is_dev else None, 522 ) 523 self.add_snapshots_intervals([snapshot_intervals]) 524 525 @abc.abstractmethod 526 def import_(self, stream: StateStream, clear: bool = True) -> None: 527 """ 528 Replace the existing state with the state contained in the StateStream 529 530 Args: 531 stream: The stream of new state 532 clear: Whether or not to clear existing state before inserting state from the stream 533 """ 534 535 536class DelegatingStateSync(StateSync): 537 def __init__(self, state_sync: StateSync) -> None: 538 self.state_sync = state_sync 539 540 541def _create_delegate_method(name: str) -> t.Callable: 542 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 543 return getattr(self.state_sync, name)(*args, **kwargs) 544 545 return delegate 546 547 548DelegatingStateSync.__abstractmethods__ = frozenset() 549for name in StateSync.__abstractmethods__: 550 setattr(DelegatingStateSync, name, _create_delegate_method(name))
42class Versions(PydanticModel): 43 """Represents the various versions of dependencies in the state sync.""" 44 45 schema_version: int = 0 46 sqlglot_version: str = "0.0.0" 47 sqlmesh_version: str = "0.0.0" 48 49 @property 50 def minor_sqlglot_version(self) -> t.Tuple[int, int]: 51 return major_minor(self.sqlglot_version) 52 53 @property 54 def minor_sqlmesh_version(self) -> t.Tuple[int, int]: 55 return major_minor(self.sqlmesh_version) 56 57 @field_validator("sqlglot_version", "sqlmesh_version", mode="before") 58 @classmethod 59 def _package_version_validator(cls, v: t.Any) -> str: 60 return "0.0.0" if v is None else str(v) 61 62 @field_validator("schema_version", mode="before") 63 @classmethod 64 def _schema_version_validator(cls, v: t.Any) -> int: 65 return 0 if v is None else int(v)
Represents the various versions of dependencies in the state sync.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
78class StateReader(abc.ABC): 79 """Abstract base class for read-only operations on snapshot and environment state.""" 80 81 @abc.abstractmethod 82 def get_snapshots( 83 self, snapshot_ids: t.Iterable[SnapshotIdLike] 84 ) -> t.Dict[SnapshotId, Snapshot]: 85 """Bulk fetch snapshots given the corresponding snapshot ids. 86 87 Args: 88 snapshot_ids: Iterable of snapshot ids to get. 89 90 Returns: 91 A dictionary of snapshot ids to snapshots for ones that could be found. 92 """ 93 94 @abc.abstractmethod 95 def get_snapshots_by_names( 96 self, 97 snapshot_names: t.Iterable[str], 98 current_ts: t.Optional[int] = None, 99 exclude_expired: bool = True, 100 ) -> t.Set[SnapshotIdAndVersion]: 101 """Return the snapshot records for all versions of the specified snapshot names. 102 103 Args: 104 snapshot_names: Iterable of snapshot names to fetch all snapshot records for 105 current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True) 106 exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result 107 108 Returns: 109 A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots() 110 """ 111 112 @abc.abstractmethod 113 def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]: 114 """Checks if multiple snapshots exist in the state sync. 115 116 Args: 117 snapshot_ids: Iterable of snapshot ids to bulk check. 118 119 Returns: 120 A set of all the existing snapshot ids. 121 """ 122 123 @abc.abstractmethod 124 def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]: 125 """Updates given snapshots with latest intervals from the state. 126 127 Args: 128 snapshots: The snapshots to refresh. 129 130 Returns: 131 The updated snapshots. 132 """ 133 134 @abc.abstractmethod 135 def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]: 136 """Returns the node names that exist in the state sync. 137 138 Args: 139 names: Iterable of node names to check. 140 exclude_external: Whether to exclude external models from the output. 141 142 Returns: 143 A set of all the existing node names. 144 """ 145 146 @abc.abstractmethod 147 def get_environment(self, environment: str) -> t.Optional[Environment]: 148 """Fetches the environment if it exists. 149 150 Args: 151 environment: The environment 152 153 Returns: 154 The environment object. 155 """ 156 157 @abc.abstractmethod 158 def get_environments(self) -> t.List[Environment]: 159 """Fetches all environments. 160 161 Returns: 162 A list of all environments. 163 """ 164 165 @abc.abstractmethod 166 def get_environments_summary(self) -> t.List[EnvironmentSummary]: 167 """Fetches all environment names along with expiry datetime. 168 169 Returns: 170 A list of all environment summaries. 171 """ 172 173 @abc.abstractmethod 174 def max_interval_end_per_model( 175 self, 176 environment: str, 177 models: t.Optional[t.Set[str]] = None, 178 ensure_finalized_snapshots: bool = False, 179 ) -> t.Dict[str, int]: 180 """Returns the max interval end per model for the given environment. 181 182 Args: 183 environment: The target environment. 184 models: The models to get the max interval end for. If None, all models are considered. 185 ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, 186 or to use whatever snapshots are in the current environment state even if the environment is not finalized. 187 188 Returns: 189 A dictionary of model FQNs to their respective interval ends in milliseconds since epoch. 190 """ 191 192 @abc.abstractmethod 193 def recycle(self) -> None: 194 """Closes all open connections and releases all allocated resources associated with any thread 195 except the calling one.""" 196 197 @abc.abstractmethod 198 def close(self) -> None: 199 """Closes all open connections and releases all allocated resources.""" 200 201 @abc.abstractmethod 202 def state_type(self) -> str: 203 """Returns the type of state sync.""" 204 205 @abc.abstractmethod 206 def update_auto_restatements( 207 self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]] 208 ) -> None: 209 """Updates the next auto restatement timestamp for the snapshots. 210 211 Args: 212 next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp. 213 """ 214 215 @abc.abstractmethod 216 def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]: 217 """Fetches environment statements from the environment_statements table. 218 219 Returns: 220 A list of the Environment Statements. 221 """ 222 223 def get_versions(self, validate: bool = True) -> Versions: 224 """Get the current versions of the SQLMesh schema and libraries. 225 226 Args: 227 validate: Whether or not to raise error if the running version is different from what's in state. 228 229 Returns: 230 The versions object. 231 """ 232 from sqlmesh._version import __version__ as SQLMESH_VERSION 233 234 versions = self._get_versions() 235 236 if validate: 237 238 def raise_error( 239 lib: str, 240 local: str | int, 241 remote: str | int, 242 remote_package_version: t.Optional[str] = None, 243 ahead: bool = False, 244 ) -> None: 245 if ahead: 246 raise SQLMeshError( 247 f"{lib} (local) is using version '{local}' which is ahead of '{remote}' (remote). " 248 "Please run a migration ('sqlmesh migrate' command)." 249 ) 250 251 if remote_package_version: 252 upgrade_suggestion = f" Please upgrade {lib} ('pip install --upgrade \"{lib.lower()}=={remote_package_version}\"' command)." 253 else: 254 upgrade_suggestion = "" 255 256 raise SQLMeshError( 257 f"{lib} (local) is using version '{local}' which is behind '{remote}' (remote).{upgrade_suggestion}" 258 ) 259 260 if major_minor(SQLMESH_VERSION) != major_minor(versions.sqlmesh_version): 261 raise_error( 262 "SQLMesh", 263 SQLMESH_VERSION, 264 versions.sqlmesh_version, 265 remote_package_version=versions.sqlmesh_version, 266 ahead=major_minor(SQLMESH_VERSION) > major_minor(versions.sqlmesh_version), 267 ) 268 269 if SCHEMA_VERSION != versions.schema_version: 270 raise_error( 271 "SQLMesh", 272 SCHEMA_VERSION, 273 versions.schema_version, 274 remote_package_version=versions.sqlmesh_version, 275 ahead=SCHEMA_VERSION > versions.schema_version, 276 ) 277 278 if major_minor(SQLGLOT_VERSION) != major_minor(versions.sqlglot_version): 279 raise_error( 280 "SQLGlot", 281 SQLGLOT_VERSION, 282 versions.sqlglot_version, 283 remote_package_version=versions.sqlglot_version, 284 ahead=major_minor(SQLGLOT_VERSION) > major_minor(versions.sqlglot_version), 285 ) 286 287 return versions 288 289 @abc.abstractmethod 290 def _get_versions(self) -> Versions: 291 """Queries the store to get the current versions of SQLMesh and deps. 292 293 Returns: 294 The versions object. 295 """ 296 297 @abc.abstractmethod 298 def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream: 299 """Export the contents of this StateSync as a StateStream 300 301 Args: 302 environment_names: An optional list of environment names to export. If not specified, all environments will be exported. 303 """ 304 305 @abc.abstractmethod 306 def get_expired_snapshots( 307 self, 308 *, 309 batch_range: ExpiredBatchRange, 310 current_ts: t.Optional[int] = None, 311 ignore_ttl: bool = False, 312 ) -> t.Optional[ExpiredSnapshotBatch]: 313 """Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier). 314 315 Args: 316 current_ts: Timestamp used to evaluate expiration. 317 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 318 batch_range: The range of the batch to fetch. 319 320 Returns: 321 A batch describing expired snapshots or None if no snapshots are pending cleanup. 322 """ 323 324 @abc.abstractmethod 325 def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]: 326 """Returns the expired environments. 327 328 Expired environments are environments that have exceeded their time-to-live value. 329 Returns: 330 The list of environment summaries to remove. 331 """
Abstract base class for read-only operations on snapshot and environment state.
81 @abc.abstractmethod 82 def get_snapshots( 83 self, snapshot_ids: t.Iterable[SnapshotIdLike] 84 ) -> t.Dict[SnapshotId, Snapshot]: 85 """Bulk fetch snapshots given the corresponding snapshot ids. 86 87 Args: 88 snapshot_ids: Iterable of snapshot ids to get. 89 90 Returns: 91 A dictionary of snapshot ids to snapshots for ones that could be found. 92 """
Bulk fetch snapshots given the corresponding snapshot ids.
Arguments:
- snapshot_ids: Iterable of snapshot ids to get.
Returns:
A dictionary of snapshot ids to snapshots for ones that could be found.
94 @abc.abstractmethod 95 def get_snapshots_by_names( 96 self, 97 snapshot_names: t.Iterable[str], 98 current_ts: t.Optional[int] = None, 99 exclude_expired: bool = True, 100 ) -> t.Set[SnapshotIdAndVersion]: 101 """Return the snapshot records for all versions of the specified snapshot names. 102 103 Args: 104 snapshot_names: Iterable of snapshot names to fetch all snapshot records for 105 current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True) 106 exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result 107 108 Returns: 109 A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots() 110 """
Return the snapshot records for all versions of the specified snapshot names.
Arguments:
- snapshot_names: Iterable of snapshot names to fetch all snapshot records for
- current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
- exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
Returns:
A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()
112 @abc.abstractmethod 113 def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]: 114 """Checks if multiple snapshots exist in the state sync. 115 116 Args: 117 snapshot_ids: Iterable of snapshot ids to bulk check. 118 119 Returns: 120 A set of all the existing snapshot ids. 121 """
Checks if multiple snapshots exist in the state sync.
Arguments:
- snapshot_ids: Iterable of snapshot ids to bulk check.
Returns:
A set of all the existing snapshot ids.
123 @abc.abstractmethod 124 def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]: 125 """Updates given snapshots with latest intervals from the state. 126 127 Args: 128 snapshots: The snapshots to refresh. 129 130 Returns: 131 The updated snapshots. 132 """
Updates given snapshots with latest intervals from the state.
Arguments:
- snapshots: The snapshots to refresh.
Returns:
The updated snapshots.
134 @abc.abstractmethod 135 def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]: 136 """Returns the node names that exist in the state sync. 137 138 Args: 139 names: Iterable of node names to check. 140 exclude_external: Whether to exclude external models from the output. 141 142 Returns: 143 A set of all the existing node names. 144 """
Returns the node names that exist in the state sync.
Arguments:
- names: Iterable of node names to check.
- exclude_external: Whether to exclude external models from the output.
Returns:
A set of all the existing node names.
146 @abc.abstractmethod 147 def get_environment(self, environment: str) -> t.Optional[Environment]: 148 """Fetches the environment if it exists. 149 150 Args: 151 environment: The environment 152 153 Returns: 154 The environment object. 155 """
Fetches the environment if it exists.
Arguments:
- environment: The environment
Returns:
The environment object.
157 @abc.abstractmethod 158 def get_environments(self) -> t.List[Environment]: 159 """Fetches all environments. 160 161 Returns: 162 A list of all environments. 163 """
Fetches all environments.
Returns:
A list of all environments.
165 @abc.abstractmethod 166 def get_environments_summary(self) -> t.List[EnvironmentSummary]: 167 """Fetches all environment names along with expiry datetime. 168 169 Returns: 170 A list of all environment summaries. 171 """
Fetches all environment names along with expiry datetime.
Returns:
A list of all environment summaries.
173 @abc.abstractmethod 174 def max_interval_end_per_model( 175 self, 176 environment: str, 177 models: t.Optional[t.Set[str]] = None, 178 ensure_finalized_snapshots: bool = False, 179 ) -> t.Dict[str, int]: 180 """Returns the max interval end per model for the given environment. 181 182 Args: 183 environment: The target environment. 184 models: The models to get the max interval end for. If None, all models are considered. 185 ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, 186 or to use whatever snapshots are in the current environment state even if the environment is not finalized. 187 188 Returns: 189 A dictionary of model FQNs to their respective interval ends in milliseconds since epoch. 190 """
Returns the max interval end per model for the given environment.
Arguments:
- environment: The target environment.
- models: The models to get the max interval end for. If None, all models are considered.
- ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:
A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.
192 @abc.abstractmethod 193 def recycle(self) -> None: 194 """Closes all open connections and releases all allocated resources associated with any thread 195 except the calling one."""
Closes all open connections and releases all allocated resources associated with any thread except the calling one.
197 @abc.abstractmethod 198 def close(self) -> None: 199 """Closes all open connections and releases all allocated resources."""
Closes all open connections and releases all allocated resources.
205 @abc.abstractmethod 206 def update_auto_restatements( 207 self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]] 208 ) -> None: 209 """Updates the next auto restatement timestamp for the snapshots. 210 211 Args: 212 next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp. 213 """
Updates the next auto restatement timestamp for the snapshots.
Arguments:
- next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
215 @abc.abstractmethod 216 def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]: 217 """Fetches environment statements from the environment_statements table. 218 219 Returns: 220 A list of the Environment Statements. 221 """
Fetches environment statements from the environment_statements table.
Returns:
A list of the Environment Statements.
223 def get_versions(self, validate: bool = True) -> Versions: 224 """Get the current versions of the SQLMesh schema and libraries. 225 226 Args: 227 validate: Whether or not to raise error if the running version is different from what's in state. 228 229 Returns: 230 The versions object. 231 """ 232 from sqlmesh._version import __version__ as SQLMESH_VERSION 233 234 versions = self._get_versions() 235 236 if validate: 237 238 def raise_error( 239 lib: str, 240 local: str | int, 241 remote: str | int, 242 remote_package_version: t.Optional[str] = None, 243 ahead: bool = False, 244 ) -> None: 245 if ahead: 246 raise SQLMeshError( 247 f"{lib} (local) is using version '{local}' which is ahead of '{remote}' (remote). " 248 "Please run a migration ('sqlmesh migrate' command)." 249 ) 250 251 if remote_package_version: 252 upgrade_suggestion = f" Please upgrade {lib} ('pip install --upgrade \"{lib.lower()}=={remote_package_version}\"' command)." 253 else: 254 upgrade_suggestion = "" 255 256 raise SQLMeshError( 257 f"{lib} (local) is using version '{local}' which is behind '{remote}' (remote).{upgrade_suggestion}" 258 ) 259 260 if major_minor(SQLMESH_VERSION) != major_minor(versions.sqlmesh_version): 261 raise_error( 262 "SQLMesh", 263 SQLMESH_VERSION, 264 versions.sqlmesh_version, 265 remote_package_version=versions.sqlmesh_version, 266 ahead=major_minor(SQLMESH_VERSION) > major_minor(versions.sqlmesh_version), 267 ) 268 269 if SCHEMA_VERSION != versions.schema_version: 270 raise_error( 271 "SQLMesh", 272 SCHEMA_VERSION, 273 versions.schema_version, 274 remote_package_version=versions.sqlmesh_version, 275 ahead=SCHEMA_VERSION > versions.schema_version, 276 ) 277 278 if major_minor(SQLGLOT_VERSION) != major_minor(versions.sqlglot_version): 279 raise_error( 280 "SQLGlot", 281 SQLGLOT_VERSION, 282 versions.sqlglot_version, 283 remote_package_version=versions.sqlglot_version, 284 ahead=major_minor(SQLGLOT_VERSION) > major_minor(versions.sqlglot_version), 285 ) 286 287 return versions
Get the current versions of the SQLMesh schema and libraries.
Arguments:
- validate: Whether or not to raise error if the running version is different from what's in state.
Returns:
The versions object.
297 @abc.abstractmethod 298 def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream: 299 """Export the contents of this StateSync as a StateStream 300 301 Args: 302 environment_names: An optional list of environment names to export. If not specified, all environments will be exported. 303 """
Export the contents of this StateSync as a StateStream
Arguments:
- environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
305 @abc.abstractmethod 306 def get_expired_snapshots( 307 self, 308 *, 309 batch_range: ExpiredBatchRange, 310 current_ts: t.Optional[int] = None, 311 ignore_ttl: bool = False, 312 ) -> t.Optional[ExpiredSnapshotBatch]: 313 """Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier). 314 315 Args: 316 current_ts: Timestamp used to evaluate expiration. 317 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 318 batch_range: The range of the batch to fetch. 319 320 Returns: 321 A batch describing expired snapshots or None if no snapshots are pending cleanup. 322 """
Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
Arguments:
- current_ts: Timestamp used to evaluate expiration.
- ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
- batch_range: The range of the batch to fetch.
Returns:
A batch describing expired snapshots or None if no snapshots are pending cleanup.
324 @abc.abstractmethod 325 def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]: 326 """Returns the expired environments. 327 328 Expired environments are environments that have exceeded their time-to-live value. 329 Returns: 330 The list of environment summaries to remove. 331 """
Returns the expired environments.
Expired environments are environments that have exceeded their time-to-live value.
Returns:
The list of environment summaries to remove.
334class StateSync(StateReader, abc.ABC): 335 """Abstract base class for snapshot and environment state management.""" 336 337 @abc.abstractmethod 338 def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None: 339 """Push snapshots into the state sync. 340 341 This method only allows for pushing new snapshots. If existing snapshots are found, 342 this method should raise an error. 343 344 Raises: 345 SQLMeshError when existing snapshots are pushed. 346 347 Args: 348 snapshots: A list of snapshots to save in the state sync. 349 """ 350 351 @abc.abstractmethod 352 def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None: 353 """Delete snapshots from the state sync. 354 355 Args: 356 snapshot_ids: A list of snapshot like objects to delete. 357 """ 358 359 @abc.abstractmethod 360 def delete_expired_snapshots( 361 self, 362 batch_range: ExpiredBatchRange, 363 ignore_ttl: bool = False, 364 current_ts: t.Optional[int] = None, 365 ) -> None: 366 """Removes expired snapshots. 367 368 Expired snapshots are snapshots that have exceeded their time-to-live 369 and are no longer in use within an environment. 370 371 Args: 372 batch_range: The range of snapshots to delete in this batch. 373 ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting 374 all snapshots that are not referenced in any environment 375 current_ts: Timestamp used to evaluate expiration. 376 """ 377 378 @abc.abstractmethod 379 def invalidate_environment(self, name: str, protect_prod: bool = True) -> None: 380 """Invalidates the target environment by setting its expiration timestamp to now. 381 382 Args: 383 name: The name of the environment to invalidate. 384 protect_prod: If True, prevents invalidation of the production environment. 385 """ 386 387 @abc.abstractmethod 388 def remove_state(self, including_backup: bool = False) -> None: 389 """Removes the state store objects.""" 390 391 @abc.abstractmethod 392 def remove_intervals( 393 self, 394 snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]], 395 remove_shared_versions: bool = False, 396 ) -> None: 397 """Remove an interval from a list of snapshots and sync it to the store. 398 399 Because multiple snapshots can be pointing to the same version or physical table, this method 400 can also grab all snapshots tied to the passed in version. 401 402 Args: 403 snapshot_intervals: The snapshot intervals to remove. 404 remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots. 405 """ 406 407 @abc.abstractmethod 408 def promote( 409 self, 410 environment: Environment, 411 no_gaps_snapshot_names: t.Optional[t.Set[str]] = None, 412 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None, 413 ) -> PromotionResult: 414 """Update the environment to reflect the current state. 415 416 This method verifies that snapshots have been pushed. 417 418 Args: 419 environment: The environment to promote. 420 no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, 421 all snapshots will be checked. The data gap check ensures that models that are already a 422 part of the target environment have no data gaps when compared against previous 423 snapshots for same models. 424 425 Returns: 426 A tuple of (added snapshot table infos, removed snapshot table infos) 427 """ 428 429 @abc.abstractmethod 430 def finalize(self, environment: Environment) -> None: 431 """Finalize the target environment, indicating that this environment has been 432 fully promoted and is ready for use. 433 434 Args: 435 environment: The target environment to finalize. 436 """ 437 438 @abc.abstractmethod 439 def delete_expired_environments( 440 self, current_ts: t.Optional[int] = None 441 ) -> t.List[EnvironmentSummary]: 442 """Removes expired environments. 443 444 Expired environments are environments that have exceeded their time-to-live value. 445 446 Returns: 447 The list of removed environments. 448 """ 449 450 @abc.abstractmethod 451 def unpause_snapshots( 452 self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike 453 ) -> None: 454 """Unpauses target snapshots. 455 456 Unpaused snapshots are scheduled for evaluation on a recurring basis. 457 Once unpaused a snapshot can't be paused again. 458 459 Args: 460 snapshots: Target snapshots. 461 unpaused_dt: The datetime object which indicates when target snapshots 462 were unpaused. 463 """ 464 465 @abc.abstractmethod 466 def compact_intervals(self) -> None: 467 """Compacts intervals for all snapshots. 468 469 Compaction process involves merging of existing interval records into new records and 470 then deleting the old ones. 471 """ 472 473 @abc.abstractmethod 474 def migrate( 475 self, 476 skip_backup: bool = False, 477 promoted_snapshots_only: bool = True, 478 ) -> None: 479 """Migrate the state sync to the latest SQLMesh / SQLGlot version.""" 480 481 @abc.abstractmethod 482 def rollback(self) -> None: 483 """Rollback to previous backed up state.""" 484 485 @abc.abstractmethod 486 def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: 487 """Add snapshot intervals to state 488 489 Args: 490 snapshots_intervals: The intervals to add. 491 """ 492 493 def add_interval( 494 self, 495 snapshot: Snapshot, 496 start: TimeLike, 497 end: TimeLike, 498 is_dev: bool = False, 499 last_altered_ts: t.Optional[int] = None, 500 ) -> None: 501 """Add an interval to a snapshot and sync it to the store. 502 503 Args: 504 snapshot: The snapshot like object to add an interval to. 505 start: The start of the interval to add. 506 end: The end of the interval to add. 507 is_dev: Indicates whether the given interval is being added while in development mode 508 last_altered_ts: The timestamp of the last modification of the physical table 509 """ 510 start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False) 511 if not snapshot.version: 512 raise SQLMeshError("Snapshot version must be set to add an interval.") 513 intervals = [(start_ts, end_ts)] 514 snapshot_intervals = SnapshotIntervals( 515 name=snapshot.name, 516 identifier=snapshot.identifier, 517 version=snapshot.version, 518 dev_version=snapshot.dev_version, 519 intervals=intervals if not is_dev else [], 520 dev_intervals=intervals if is_dev else [], 521 last_altered_ts=last_altered_ts if not is_dev else None, 522 dev_last_altered_ts=last_altered_ts if is_dev else None, 523 ) 524 self.add_snapshots_intervals([snapshot_intervals]) 525 526 @abc.abstractmethod 527 def import_(self, stream: StateStream, clear: bool = True) -> None: 528 """ 529 Replace the existing state with the state contained in the StateStream 530 531 Args: 532 stream: The stream of new state 533 clear: Whether or not to clear existing state before inserting state from the stream 534 """
Abstract base class for snapshot and environment state management.
337 @abc.abstractmethod 338 def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None: 339 """Push snapshots into the state sync. 340 341 This method only allows for pushing new snapshots. If existing snapshots are found, 342 this method should raise an error. 343 344 Raises: 345 SQLMeshError when existing snapshots are pushed. 346 347 Args: 348 snapshots: A list of snapshots to save in the state sync. 349 """
Push snapshots into the state sync.
This method only allows for pushing new snapshots. If existing snapshots are found, this method should raise an error.
Raises:
- SQLMeshError when existing snapshots are pushed.
Arguments:
- snapshots: A list of snapshots to save in the state sync.
351 @abc.abstractmethod 352 def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None: 353 """Delete snapshots from the state sync. 354 355 Args: 356 snapshot_ids: A list of snapshot like objects to delete. 357 """
Delete snapshots from the state sync.
Arguments:
- snapshot_ids: A list of snapshot like objects to delete.
359 @abc.abstractmethod 360 def delete_expired_snapshots( 361 self, 362 batch_range: ExpiredBatchRange, 363 ignore_ttl: bool = False, 364 current_ts: t.Optional[int] = None, 365 ) -> None: 366 """Removes expired snapshots. 367 368 Expired snapshots are snapshots that have exceeded their time-to-live 369 and are no longer in use within an environment. 370 371 Args: 372 batch_range: The range of snapshots to delete in this batch. 373 ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting 374 all snapshots that are not referenced in any environment 375 current_ts: Timestamp used to evaluate expiration. 376 """
Removes expired snapshots.
Expired snapshots are snapshots that have exceeded their time-to-live and are no longer in use within an environment.
Arguments:
- batch_range: The range of snapshots to delete in this batch.
- ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting all snapshots that are not referenced in any environment
- current_ts: Timestamp used to evaluate expiration.
378 @abc.abstractmethod 379 def invalidate_environment(self, name: str, protect_prod: bool = True) -> None: 380 """Invalidates the target environment by setting its expiration timestamp to now. 381 382 Args: 383 name: The name of the environment to invalidate. 384 protect_prod: If True, prevents invalidation of the production environment. 385 """
Invalidates the target environment by setting its expiration timestamp to now.
Arguments:
- name: The name of the environment to invalidate.
- protect_prod: If True, prevents invalidation of the production environment.
387 @abc.abstractmethod 388 def remove_state(self, including_backup: bool = False) -> None: 389 """Removes the state store objects."""
Removes the state store objects.
391 @abc.abstractmethod 392 def remove_intervals( 393 self, 394 snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]], 395 remove_shared_versions: bool = False, 396 ) -> None: 397 """Remove an interval from a list of snapshots and sync it to the store. 398 399 Because multiple snapshots can be pointing to the same version or physical table, this method 400 can also grab all snapshots tied to the passed in version. 401 402 Args: 403 snapshot_intervals: The snapshot intervals to remove. 404 remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots. 405 """
Remove an interval from a list of snapshots and sync it to the store.
Because multiple snapshots can be pointing to the same version or physical table, this method can also grab all snapshots tied to the passed in version.
Arguments:
- snapshot_intervals: The snapshot intervals to remove.
- remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
407 @abc.abstractmethod 408 def promote( 409 self, 410 environment: Environment, 411 no_gaps_snapshot_names: t.Optional[t.Set[str]] = None, 412 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None, 413 ) -> PromotionResult: 414 """Update the environment to reflect the current state. 415 416 This method verifies that snapshots have been pushed. 417 418 Args: 419 environment: The environment to promote. 420 no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, 421 all snapshots will be checked. The data gap check ensures that models that are already a 422 part of the target environment have no data gaps when compared against previous 423 snapshots for same models. 424 425 Returns: 426 A tuple of (added snapshot table infos, removed snapshot table infos) 427 """
Update the environment to reflect the current state.
This method verifies that snapshots have been pushed.
Arguments:
- environment: The environment to promote.
- no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, all snapshots will be checked. The data gap check ensures that models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
Returns:
A tuple of (added snapshot table infos, removed snapshot table infos)
429 @abc.abstractmethod 430 def finalize(self, environment: Environment) -> None: 431 """Finalize the target environment, indicating that this environment has been 432 fully promoted and is ready for use. 433 434 Args: 435 environment: The target environment to finalize. 436 """
Finalize the target environment, indicating that this environment has been fully promoted and is ready for use.
Arguments:
- environment: The target environment to finalize.
438 @abc.abstractmethod 439 def delete_expired_environments( 440 self, current_ts: t.Optional[int] = None 441 ) -> t.List[EnvironmentSummary]: 442 """Removes expired environments. 443 444 Expired environments are environments that have exceeded their time-to-live value. 445 446 Returns: 447 The list of removed environments. 448 """
Removes expired environments.
Expired environments are environments that have exceeded their time-to-live value.
Returns:
The list of removed environments.
450 @abc.abstractmethod 451 def unpause_snapshots( 452 self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike 453 ) -> None: 454 """Unpauses target snapshots. 455 456 Unpaused snapshots are scheduled for evaluation on a recurring basis. 457 Once unpaused a snapshot can't be paused again. 458 459 Args: 460 snapshots: Target snapshots. 461 unpaused_dt: The datetime object which indicates when target snapshots 462 were unpaused. 463 """
Unpauses target snapshots.
Unpaused snapshots are scheduled for evaluation on a recurring basis. Once unpaused a snapshot can't be paused again.
Arguments:
- snapshots: Target snapshots.
- unpaused_dt: The datetime object which indicates when target snapshots were unpaused.
465 @abc.abstractmethod 466 def compact_intervals(self) -> None: 467 """Compacts intervals for all snapshots. 468 469 Compaction process involves merging of existing interval records into new records and 470 then deleting the old ones. 471 """
Compacts intervals for all snapshots.
Compaction process involves merging of existing interval records into new records and then deleting the old ones.
473 @abc.abstractmethod 474 def migrate( 475 self, 476 skip_backup: bool = False, 477 promoted_snapshots_only: bool = True, 478 ) -> None: 479 """Migrate the state sync to the latest SQLMesh / SQLGlot version."""
Migrate the state sync to the latest SQLMesh / SQLGlot version.
481 @abc.abstractmethod 482 def rollback(self) -> None: 483 """Rollback to previous backed up state."""
Rollback to previous backed up state.
485 @abc.abstractmethod 486 def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: 487 """Add snapshot intervals to state 488 489 Args: 490 snapshots_intervals: The intervals to add. 491 """
Add snapshot intervals to state
Arguments:
- snapshots_intervals: The intervals to add.
493 def add_interval( 494 self, 495 snapshot: Snapshot, 496 start: TimeLike, 497 end: TimeLike, 498 is_dev: bool = False, 499 last_altered_ts: t.Optional[int] = None, 500 ) -> None: 501 """Add an interval to a snapshot and sync it to the store. 502 503 Args: 504 snapshot: The snapshot like object to add an interval to. 505 start: The start of the interval to add. 506 end: The end of the interval to add. 507 is_dev: Indicates whether the given interval is being added while in development mode 508 last_altered_ts: The timestamp of the last modification of the physical table 509 """ 510 start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False) 511 if not snapshot.version: 512 raise SQLMeshError("Snapshot version must be set to add an interval.") 513 intervals = [(start_ts, end_ts)] 514 snapshot_intervals = SnapshotIntervals( 515 name=snapshot.name, 516 identifier=snapshot.identifier, 517 version=snapshot.version, 518 dev_version=snapshot.dev_version, 519 intervals=intervals if not is_dev else [], 520 dev_intervals=intervals if is_dev else [], 521 last_altered_ts=last_altered_ts if not is_dev else None, 522 dev_last_altered_ts=last_altered_ts if is_dev else None, 523 ) 524 self.add_snapshots_intervals([snapshot_intervals])
Add an interval to a snapshot and sync it to the store.
Arguments:
- snapshot: The snapshot like object to add an interval to.
- start: The start of the interval to add.
- end: The end of the interval to add.
- is_dev: Indicates whether the given interval is being added while in development mode
- last_altered_ts: The timestamp of the last modification of the physical table
526 @abc.abstractmethod 527 def import_(self, stream: StateStream, clear: bool = True) -> None: 528 """ 529 Replace the existing state with the state contained in the StateStream 530 531 Args: 532 stream: The stream of new state 533 clear: Whether or not to clear existing state before inserting state from the stream 534 """
Replace the existing state with the state contained in the StateStream
Arguments:
- stream: The stream of new state
- clear: Whether or not to clear existing state before inserting state from the stream
Inherited Members
- StateReader
- get_snapshots
- get_snapshots_by_names
- snapshots_exist
- refresh_snapshot_intervals
- nodes_exist
- get_environment
- get_environments
- get_environments_summary
- max_interval_end_per_model
- recycle
- close
- state_type
- update_auto_restatements
- get_environment_statements
- get_versions
- export
- get_expired_snapshots
- get_expired_environments
537class DelegatingStateSync(StateSync): 538 def __init__(self, state_sync: StateSync) -> None: 539 self.state_sync = state_sync
Abstract base class for snapshot and environment state management.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Push snapshots into the state sync.
This method only allows for pushing new snapshots. If existing snapshots are found, this method should raise an error.
Raises:
- SQLMeshError when existing snapshots are pushed.
Arguments:
- snapshots: A list of snapshots to save in the state sync.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Delete snapshots from the state sync.
Arguments:
- snapshot_ids: A list of snapshot like objects to delete.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Removes expired snapshots.
Expired snapshots are snapshots that have exceeded their time-to-live and are no longer in use within an environment.
Arguments:
- batch_range: The range of snapshots to delete in this batch.
- ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting all snapshots that are not referenced in any environment
- current_ts: Timestamp used to evaluate expiration.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Invalidates the target environment by setting its expiration timestamp to now.
Arguments:
- name: The name of the environment to invalidate.
- protect_prod: If True, prevents invalidation of the production environment.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Removes the state store objects.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Remove an interval from a list of snapshots and sync it to the store.
Because multiple snapshots can be pointing to the same version or physical table, this method can also grab all snapshots tied to the passed in version.
Arguments:
- snapshot_intervals: The snapshot intervals to remove.
- remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Update the environment to reflect the current state.
This method verifies that snapshots have been pushed.
Arguments:
- environment: The environment to promote.
- no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, all snapshots will be checked. The data gap check ensures that models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
Returns:
A tuple of (added snapshot table infos, removed snapshot table infos)
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Finalize the target environment, indicating that this environment has been fully promoted and is ready for use.
Arguments:
- environment: The target environment to finalize.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Removes expired environments.
Expired environments are environments that have exceeded their time-to-live value.
Returns:
The list of removed environments.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Unpauses target snapshots.
Unpaused snapshots are scheduled for evaluation on a recurring basis. Once unpaused a snapshot can't be paused again.
Arguments:
- snapshots: Target snapshots.
- unpaused_dt: The datetime object which indicates when target snapshots were unpaused.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Compacts intervals for all snapshots.
Compaction process involves merging of existing interval records into new records and then deleting the old ones.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Migrate the state sync to the latest SQLMesh / SQLGlot version.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Rollback to previous backed up state.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Add snapshot intervals to state
Arguments:
- snapshots_intervals: The intervals to add.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Replace the existing state with the state contained in the StateStream
Arguments:
- stream: The stream of new state
- clear: Whether or not to clear existing state before inserting state from the stream
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Bulk fetch snapshots given the corresponding snapshot ids.
Arguments:
- snapshot_ids: Iterable of snapshot ids to get.
Returns:
A dictionary of snapshot ids to snapshots for ones that could be found.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Return the snapshot records for all versions of the specified snapshot names.
Arguments:
- snapshot_names: Iterable of snapshot names to fetch all snapshot records for
- current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
- exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
Returns:
A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Checks if multiple snapshots exist in the state sync.
Arguments:
- snapshot_ids: Iterable of snapshot ids to bulk check.
Returns:
A set of all the existing snapshot ids.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Updates given snapshots with latest intervals from the state.
Arguments:
- snapshots: The snapshots to refresh.
Returns:
The updated snapshots.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Returns the node names that exist in the state sync.
Arguments:
- names: Iterable of node names to check.
- exclude_external: Whether to exclude external models from the output.
Returns:
A set of all the existing node names.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Fetches the environment if it exists.
Arguments:
- environment: The environment
Returns:
The environment object.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Fetches all environments.
Returns:
A list of all environments.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Fetches all environment names along with expiry datetime.
Returns:
A list of all environment summaries.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Returns the max interval end per model for the given environment.
Arguments:
- environment: The target environment.
- models: The models to get the max interval end for. If None, all models are considered.
- ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:
A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Closes all open connections and releases all allocated resources associated with any thread except the calling one.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Closes all open connections and releases all allocated resources.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Returns the type of state sync.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Updates the next auto restatement timestamp for the snapshots.
Arguments:
- next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Fetches environment statements from the environment_statements table.
Returns:
A list of the Environment Statements.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Export the contents of this StateSync as a StateStream
Arguments:
- environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
Arguments:
- current_ts: Timestamp used to evaluate expiration.
- ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
- batch_range: The range of the batch to fetch.
Returns:
A batch describing expired snapshots or None if no snapshots are pending cleanup.
543 def delegate(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 544 return getattr(self.state_sync, name)(*args, **kwargs)
Returns the expired environments.
Expired environments are environments that have exceeded their time-to-live value.
Returns:
The list of environment summaries to remove.