sqlmesh.core.state_sync.db
75class EngineAdapterStateSync(StateSync): 76 """Manages state of nodes and snapshot with an existing engine adapter. 77 78 This state sync is convenient to use because it requires no additional setup. 79 You can reuse the same engine/warehouse that your data is stored in. 80 81 Args: 82 engine_adapter: The EngineAdapter to use to store and fetch snapshots. 83 schema: The schema to store state metadata in. If None or empty string then no schema is defined 84 console: The console to log information to. 85 cache_dir: The cache path, used for caching snapshot models. 86 """ 87 88 def __init__( 89 self, 90 engine_adapter: EngineAdapter, 91 schema: t.Optional[str], 92 console: t.Optional[Console] = None, 93 cache_dir: Path = Path(), 94 ): 95 self.interval_state = IntervalState(engine_adapter, schema=schema) 96 self.environment_state = EnvironmentState(engine_adapter, schema=schema) 97 self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir) 98 self.version_state = VersionState(engine_adapter, schema=schema) 99 self.migrator = StateMigrator( 100 engine_adapter, 101 version_state=self.version_state, 102 snapshot_state=self.snapshot_state, 103 environment_state=self.environment_state, 104 interval_state=self.interval_state, 105 console=console, 106 ) 107 # Make sure that if an empty string is provided that we treat it as None 108 self.schema = schema or None 109 self.engine_adapter = engine_adapter 110 self.console = console or get_console() 111 112 @transactional() 113 def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None: 114 """Pushes snapshots to the state store, merging them with existing ones. 115 116 This method first finds all existing snapshots in the store and merges them with 117 the local snapshots. It will then delete all existing snapshots and then 118 insert all the local snapshots. This can be made safer with locks or merge/upsert. 119 120 Args: 121 snapshots: The snapshots to push. 122 """ 123 snapshots_by_id = {} 124 for snapshot in snapshots: 125 if not snapshot.version: 126 raise SQLMeshError( 127 f"Snapshot {snapshot} has not been versioned yet. Create a plan before pushing a snapshot." 128 ) 129 snapshots_by_id[snapshot.snapshot_id] = snapshot 130 131 existing = self.snapshots_exist(snapshots_by_id) 132 133 if existing: 134 logger.error( 135 "Snapshots %s already exists. This could be due to a concurrent plan or a hash collision. If this is a hash collision, add a stamp to your model.", 136 str(existing), 137 ) 138 139 for sid in tuple(snapshots_by_id): 140 if sid in existing: 141 snapshots_by_id.pop(sid) 142 143 snapshots = snapshots_by_id.values() 144 if snapshots: 145 self.snapshot_state.push_snapshots(snapshots) 146 147 @transactional() 148 def promote( 149 self, 150 environment: Environment, 151 no_gaps_snapshot_names: t.Optional[t.Set[str]] = None, 152 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None, 153 ) -> PromotionResult: 154 """Update the environment to reflect the current state. 155 156 This method verifies that snapshots have been pushed. 157 158 Args: 159 environment: The environment to promote. 160 no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, 161 all snapshots will be checked. The data gap check ensures that models that are already a 162 part of the target environment have no data gaps when compared against previous 163 snapshots for same models. 164 165 Returns: 166 A tuple of (added snapshot table infos, removed snapshot table infos, and environment target suffix for the removed table infos) 167 """ 168 logger.info("Promoting environment '%s'", environment.name) 169 170 missing = {s.snapshot_id for s in environment.snapshots} - self.snapshots_exist( 171 environment.snapshots 172 ) 173 if missing: 174 raise SQLMeshError( 175 f"Missing snapshots {missing}. Make sure to push and backfill your snapshots." 176 ) 177 178 existing_environment = self.environment_state.get_environment( 179 environment.name, lock_for_update=True 180 ) 181 182 existing_table_infos = ( 183 {table_info.name: table_info for table_info in existing_environment.promoted_snapshots} 184 if existing_environment 185 else {} 186 ) 187 table_infos = {table_info.name: table_info for table_info in environment.promoted_snapshots} 188 views_that_changed_location: t.Set[SnapshotTableInfo] = set() 189 if existing_environment: 190 views_that_changed_location = { 191 existing_table_info 192 for name, existing_table_info in existing_table_infos.items() 193 if name in table_infos 194 and existing_table_info.qualified_view_name.for_environment( 195 existing_environment.naming_info 196 ) 197 != table_infos[name].qualified_view_name.for_environment(environment.naming_info) 198 } 199 if not existing_environment.expired: 200 if environment.previous_plan_id != existing_environment.plan_id: 201 raise ConflictingPlanError( 202 f"Another plan ({existing_environment.plan_id}) was applied to the target environment '{environment.name}' while your current plan " 203 f"({environment.plan_id}) was still in progress, interrupting it. Please re-apply your plan to resolve this error." 204 ) 205 if no_gaps_snapshot_names != set(): 206 snapshots = self.get_snapshots(environment.snapshots).values() 207 self._ensure_no_gaps( 208 snapshots, 209 existing_environment, 210 no_gaps_snapshot_names, 211 ) 212 demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots) 213 # Update the updated_at attribute. 214 self.snapshot_state.touch_snapshots(demoted_snapshots) 215 216 missing_models = set(existing_table_infos) - { 217 snapshot.name for snapshot in environment.promoted_snapshots 218 } 219 220 added_table_infos = set(table_infos.values()) 221 if existing_environment and environment.can_partially_promote(existing_environment): 222 # Only promote new snapshots. 223 added_table_infos -= set(existing_environment.promoted_snapshots) 224 225 self.environment_state.update_environment(environment) 226 227 # If it is an empty list, we want to update the environment statements 228 # To reflect there are no statements anymore in this environment 229 if environment_statements is not None: 230 self.environment_state.update_environment_statements( 231 environment.name, environment.plan_id, environment_statements 232 ) 233 234 removed = {existing_table_infos[name] for name in missing_models}.union( 235 views_that_changed_location 236 ) 237 238 return PromotionResult( 239 added=sorted(added_table_infos), 240 removed=list(removed), 241 removed_environment_naming_info=( 242 existing_environment.naming_info if removed and existing_environment else None 243 ), 244 ) 245 246 @transactional() 247 def finalize(self, environment: Environment) -> None: 248 """Finalize the target environment, indicating that this environment has been 249 fully promoted and is ready for use. 250 251 Args: 252 environment: The target environment to finalize. 253 """ 254 self.environment_state.finalize(environment) 255 256 @transactional() 257 def unpause_snapshots( 258 self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike 259 ) -> None: 260 self.snapshot_state.unpause_snapshots(snapshots, unpaused_dt) 261 262 def invalidate_environment(self, name: str, protect_prod: bool = True) -> None: 263 self.environment_state.invalidate_environment(name, protect_prod) 264 265 def get_expired_snapshots( 266 self, 267 *, 268 batch_range: ExpiredBatchRange, 269 current_ts: t.Optional[int] = None, 270 ignore_ttl: bool = False, 271 ) -> t.Optional[ExpiredSnapshotBatch]: 272 current_ts = current_ts or now_timestamp() 273 return self.snapshot_state.get_expired_snapshots( 274 environments=self.environment_state.get_environments(), 275 current_ts=current_ts, 276 ignore_ttl=ignore_ttl, 277 batch_range=batch_range, 278 ) 279 280 def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]: 281 return self.environment_state.get_expired_environments(current_ts=current_ts) 282 283 @transactional() 284 def delete_expired_snapshots( 285 self, 286 batch_range: ExpiredBatchRange, 287 ignore_ttl: bool = False, 288 current_ts: t.Optional[int] = None, 289 ) -> None: 290 batch = self.get_expired_snapshots( 291 ignore_ttl=ignore_ttl, 292 current_ts=current_ts, 293 batch_range=batch_range, 294 ) 295 if batch and batch.expired_snapshot_ids: 296 self.snapshot_state.delete_snapshots(batch.expired_snapshot_ids) 297 self.interval_state.cleanup_intervals(batch.cleanup_tasks, batch.expired_snapshot_ids) 298 299 @transactional() 300 def delete_expired_environments( 301 self, current_ts: t.Optional[int] = None 302 ) -> t.List[EnvironmentSummary]: 303 current_ts = current_ts or now_timestamp() 304 return self.environment_state.delete_expired_environments(current_ts=current_ts) 305 306 def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None: 307 self.snapshot_state.delete_snapshots(snapshot_ids) 308 309 def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]: 310 return self.snapshot_state.snapshots_exist(snapshot_ids) 311 312 def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]: 313 return self.snapshot_state.nodes_exist(names, exclude_external) 314 315 def remove_state(self, including_backup: bool = False) -> None: 316 """Removes the state store objects.""" 317 for table in ( 318 self.snapshot_state.snapshots_table, 319 self.snapshot_state.auto_restatements_table, 320 self.environment_state.environments_table, 321 self.environment_state.environment_statements_table, 322 self.interval_state.intervals_table, 323 self.version_state.versions_table, 324 ): 325 self.engine_adapter.drop_table(table) 326 if including_backup: 327 self.engine_adapter.drop_table(_backup_table_name(table)) 328 329 self.snapshot_state.clear_cache() 330 331 def reset(self, default_catalog: t.Optional[str]) -> None: 332 """Resets the state store to the state when it was first initialized.""" 333 self.remove_state() 334 self.migrate(default_catalog) 335 336 @transactional() 337 def update_auto_restatements( 338 self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]] 339 ) -> None: 340 self.snapshot_state.update_auto_restatements(next_auto_restatement_ts) 341 342 def get_environment(self, environment: str) -> t.Optional[Environment]: 343 return self.environment_state.get_environment(environment) 344 345 def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]: 346 return self.environment_state.get_environment_statements(environment) 347 348 def get_environments(self) -> t.List[Environment]: 349 """Fetches all environments. 350 351 Returns: 352 A list of all environments. 353 """ 354 return self.environment_state.get_environments() 355 356 def get_environments_summary(self) -> t.List[EnvironmentSummary]: 357 """Fetches all environment names along with expiry datetime. 358 359 Returns: 360 A list of all environment summaries. 361 """ 362 return self.environment_state.get_environments_summary() 363 364 def get_snapshots( 365 self, 366 snapshot_ids: t.Iterable[SnapshotIdLike], 367 ) -> t.Dict[SnapshotId, Snapshot]: 368 """Fetches snapshots from the state. 369 370 Args: 371 snapshot_ids: The snapshot IDs to fetch. 372 373 Returns: 374 A dict of snapshots. 375 """ 376 snapshots = self.snapshot_state.get_snapshots(snapshot_ids) 377 intervals = self.interval_state.get_snapshot_intervals(snapshots.values()) 378 Snapshot.hydrate_with_intervals_by_version(snapshots.values(), intervals) 379 return snapshots 380 381 def get_snapshots_by_names( 382 self, 383 snapshot_names: t.Iterable[str], 384 current_ts: t.Optional[int] = None, 385 exclude_expired: bool = True, 386 ) -> t.Set[SnapshotIdAndVersion]: 387 return self.snapshot_state.get_snapshots_by_names( 388 snapshot_names=snapshot_names, current_ts=current_ts, exclude_expired=exclude_expired 389 ) 390 391 @transactional() 392 def add_interval( 393 self, 394 snapshot: Snapshot, 395 start: TimeLike, 396 end: TimeLike, 397 is_dev: bool = False, 398 last_altered_ts: t.Optional[int] = None, 399 ) -> None: 400 super().add_interval(snapshot, start, end, is_dev, last_altered_ts) 401 402 @transactional() 403 def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: 404 intervals_to_insert = [] 405 for snapshot_intervals in snapshots_intervals: 406 snapshot_intervals = snapshot_intervals.copy( 407 update={ 408 "intervals": _remove_partial_intervals( 409 snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False 410 ), 411 "dev_intervals": _remove_partial_intervals( 412 snapshot_intervals.dev_intervals, 413 snapshot_intervals.snapshot_id, 414 is_dev=True, 415 ), 416 } 417 ) 418 if not snapshot_intervals.is_empty(): 419 intervals_to_insert.append(snapshot_intervals) 420 if intervals_to_insert: 421 self.interval_state.add_snapshots_intervals(intervals_to_insert) 422 423 @transactional() 424 def remove_intervals( 425 self, 426 snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]], 427 remove_shared_versions: bool = False, 428 ) -> None: 429 self.interval_state.remove_intervals(snapshot_intervals, remove_shared_versions) 430 431 @transactional() 432 def compact_intervals(self) -> None: 433 self.interval_state.compact_intervals() 434 435 def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]: 436 return self.interval_state.refresh_snapshot_intervals(snapshots) 437 438 def max_interval_end_per_model( 439 self, 440 environment: str, 441 models: t.Optional[t.Set[str]] = None, 442 ensure_finalized_snapshots: bool = False, 443 ) -> t.Dict[str, int]: 444 env = self.get_environment(environment) 445 if not env: 446 return {} 447 448 snapshots = ( 449 env.snapshots if not ensure_finalized_snapshots else env.finalized_or_current_snapshots 450 ) 451 if models is not None: 452 snapshots = [s for s in snapshots if s.name in models] 453 454 if not snapshots: 455 return {} 456 457 return self.interval_state.max_interval_end_per_model(snapshots) 458 459 def recycle(self) -> None: 460 self.engine_adapter.recycle() 461 462 def close(self) -> None: 463 self.engine_adapter.close() 464 465 @transactional() 466 def migrate( 467 self, 468 skip_backup: bool = False, 469 promoted_snapshots_only: bool = True, 470 ) -> None: 471 """Migrate the state sync to the latest SQLMesh / SQLGlot version.""" 472 self.migrator.migrate( 473 self.schema, 474 skip_backup=skip_backup, 475 promoted_snapshots_only=promoted_snapshots_only, 476 ) 477 478 @transactional() 479 def rollback(self) -> None: 480 """Rollback to the previous migration.""" 481 self.migrator.rollback() 482 483 @transactional() 484 def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream: 485 versions = self.get_versions( 486 validate=True 487 ) # will throw if the state db hasnt been created or there is a version mismatch 488 489 snapshot_ids_to_export: t.Set[SnapshotId] = set() 490 selected_environments: t.List[Environment] = [] 491 if environment_names: 492 for env_name in environment_names: 493 environment = self.get_environment(env_name) 494 if not environment: 495 raise SQLMeshError(f"No such environment: {env_name}") 496 selected_environments.append(environment) 497 else: 498 selected_environments = self.get_environments() 499 500 for env in selected_environments: 501 snapshot_ids_to_export |= set([s.snapshot_id for s in env.snapshots or []]) 502 503 def _export_snapshots() -> t.Iterator[Snapshot]: 504 for chunk in chunk_iterable(snapshot_ids_to_export, SnapshotState.SNAPSHOT_BATCH_SIZE): 505 yield from self.get_snapshots(chunk).values() 506 507 def _export_environments() -> t.Iterator[EnvironmentWithStatements]: 508 for env in selected_environments: 509 yield EnvironmentWithStatements( 510 environment=env, statements=self.get_environment_statements(env.name) 511 ) 512 513 return StateStream.from_iterators( 514 versions=versions, 515 snapshots=_export_snapshots(), 516 environments=_export_environments(), 517 ) 518 519 @transactional() 520 def import_(self, stream: StateStream, clear: bool = True) -> None: 521 existing_versions = self.get_versions() 522 523 for state_chunk in stream: 524 if isinstance(state_chunk, VersionsChunk): 525 # SQLMesh major/minor version must match so that we can be sure the JSON contained in the state file 526 # is compatible with our Pydantic model definitions. Patch versions dont need to match because the assumption 527 # is that they dont contain any breaking changes 528 incoming_versions = state_chunk.versions 529 if ( 530 incoming_versions.minor_sqlmesh_version 531 != existing_versions.minor_sqlmesh_version 532 ): 533 raise SQLMeshError( 534 f"SQLMesh version mismatch. You are running '{existing_versions.sqlmesh_version}' but the state file was created with '{incoming_versions.sqlmesh_version}'.\n" 535 "Please upgrade/downgrade your SQLMesh version to match the state file before performing the import." 536 ) 537 538 if clear: 539 self.reset(default_catalog=None) 540 541 if isinstance(state_chunk, SnapshotsChunk): 542 auto_restatements: t.Dict[SnapshotNameVersion, t.Optional[int]] = {} 543 544 for snapshot_chunk in chunk_iterable( 545 state_chunk, SnapshotState.SNAPSHOT_BATCH_SIZE 546 ): 547 snapshot_chunk = list(snapshot_chunk) 548 overwrite_existing_snapshots = ( 549 not clear 550 ) # if clear=True, all existing snapshots were dropped anyway 551 self.snapshot_state.push_snapshots( 552 snapshot_chunk, overwrite=overwrite_existing_snapshots 553 ) 554 self.add_snapshots_intervals((s.snapshot_intervals for s in snapshot_chunk)) 555 556 auto_restatements.update( 557 { 558 s.name_version: s.next_auto_restatement_ts 559 for s in snapshot_chunk 560 if s.next_auto_restatement_ts 561 } 562 ) 563 564 self.update_auto_restatements(auto_restatements) 565 566 if isinstance(state_chunk, EnvironmentsChunk): 567 for environment_with_statements in state_chunk: 568 environment = environment_with_statements.environment 569 self.environment_state.update_environment(environment) 570 self.environment_state.update_environment_statements( 571 environment.name, 572 environment.plan_id, 573 environment_with_statements.statements, 574 ) 575 576 def state_type(self) -> str: 577 return self.engine_adapter.dialect 578 579 def _get_versions(self) -> Versions: 580 return self.version_state.get_versions() 581 582 def _ensure_no_gaps( 583 self, 584 target_snapshots: t.Iterable[Snapshot], 585 target_environment: Environment, 586 snapshot_names: t.Optional[t.Set[str]], 587 ) -> None: 588 target_snapshots_by_name = {s.name: s for s in target_snapshots} 589 590 changed_version_prev_snapshots_by_name = { 591 s.name: s 592 for s in target_environment.snapshots 593 if s.name in target_snapshots_by_name 594 and target_snapshots_by_name[s.name].version != s.version 595 } 596 597 prev_snapshots = self.get_snapshots( 598 changed_version_prev_snapshots_by_name.values() 599 ).values() 600 cache: t.Dict[str, datetime] = {} 601 602 for prev_snapshot in prev_snapshots: 603 target_snapshot = target_snapshots_by_name[prev_snapshot.name] 604 if ( 605 (snapshot_names is None or prev_snapshot.name in snapshot_names) 606 and target_snapshot.is_incremental 607 and prev_snapshot.is_incremental 608 and prev_snapshot.intervals 609 ): 610 start = to_timestamp( 611 start_date(target_snapshot, target_snapshots_by_name.values(), cache) 612 ) 613 end = prev_snapshot.intervals[-1][1] 614 615 if start < end: 616 missing_intervals = target_snapshot.missing_intervals( 617 start, end, end_bounded=True 618 ) 619 620 if missing_intervals: 621 raise SQLMeshError( 622 f"Detected missing intervals for model {target_snapshot.name}, interrupting your current plan. " 623 "Please re-apply your plan to resolve this error." 624 ) 625 626 @contextlib.contextmanager 627 def _transaction(self) -> t.Iterator[None]: 628 with self.engine_adapter.transaction(): 629 yield
Manages state of nodes and snapshot with an existing engine adapter.
This state sync is convenient to use because it requires no additional setup. You can reuse the same engine/warehouse that your data is stored in.
Arguments:
- engine_adapter: The EngineAdapter to use to store and fetch snapshots.
- schema: The schema to store state metadata in. If None or empty string then no schema is defined
- console: The console to log information to.
- cache_dir: The cache path, used for caching snapshot models.
88 def __init__( 89 self, 90 engine_adapter: EngineAdapter, 91 schema: t.Optional[str], 92 console: t.Optional[Console] = None, 93 cache_dir: Path = Path(), 94 ): 95 self.interval_state = IntervalState(engine_adapter, schema=schema) 96 self.environment_state = EnvironmentState(engine_adapter, schema=schema) 97 self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir) 98 self.version_state = VersionState(engine_adapter, schema=schema) 99 self.migrator = StateMigrator( 100 engine_adapter, 101 version_state=self.version_state, 102 snapshot_state=self.snapshot_state, 103 environment_state=self.environment_state, 104 interval_state=self.interval_state, 105 console=console, 106 ) 107 # Make sure that if an empty string is provided that we treat it as None 108 self.schema = schema or None 109 self.engine_adapter = engine_adapter 110 self.console = console or get_console()
112 @transactional() 113 def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None: 114 """Pushes snapshots to the state store, merging them with existing ones. 115 116 This method first finds all existing snapshots in the store and merges them with 117 the local snapshots. It will then delete all existing snapshots and then 118 insert all the local snapshots. This can be made safer with locks or merge/upsert. 119 120 Args: 121 snapshots: The snapshots to push. 122 """ 123 snapshots_by_id = {} 124 for snapshot in snapshots: 125 if not snapshot.version: 126 raise SQLMeshError( 127 f"Snapshot {snapshot} has not been versioned yet. Create a plan before pushing a snapshot." 128 ) 129 snapshots_by_id[snapshot.snapshot_id] = snapshot 130 131 existing = self.snapshots_exist(snapshots_by_id) 132 133 if existing: 134 logger.error( 135 "Snapshots %s already exists. This could be due to a concurrent plan or a hash collision. If this is a hash collision, add a stamp to your model.", 136 str(existing), 137 ) 138 139 for sid in tuple(snapshots_by_id): 140 if sid in existing: 141 snapshots_by_id.pop(sid) 142 143 snapshots = snapshots_by_id.values() 144 if snapshots: 145 self.snapshot_state.push_snapshots(snapshots)
Pushes snapshots to the state store, merging them with existing ones.
This method first finds all existing snapshots in the store and merges them with the local snapshots. It will then delete all existing snapshots and then insert all the local snapshots. This can be made safer with locks or merge/upsert.
Arguments:
- snapshots: The snapshots to push.
147 @transactional() 148 def promote( 149 self, 150 environment: Environment, 151 no_gaps_snapshot_names: t.Optional[t.Set[str]] = None, 152 environment_statements: t.Optional[t.List[EnvironmentStatements]] = None, 153 ) -> PromotionResult: 154 """Update the environment to reflect the current state. 155 156 This method verifies that snapshots have been pushed. 157 158 Args: 159 environment: The environment to promote. 160 no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, 161 all snapshots will be checked. The data gap check ensures that models that are already a 162 part of the target environment have no data gaps when compared against previous 163 snapshots for same models. 164 165 Returns: 166 A tuple of (added snapshot table infos, removed snapshot table infos, and environment target suffix for the removed table infos) 167 """ 168 logger.info("Promoting environment '%s'", environment.name) 169 170 missing = {s.snapshot_id for s in environment.snapshots} - self.snapshots_exist( 171 environment.snapshots 172 ) 173 if missing: 174 raise SQLMeshError( 175 f"Missing snapshots {missing}. Make sure to push and backfill your snapshots." 176 ) 177 178 existing_environment = self.environment_state.get_environment( 179 environment.name, lock_for_update=True 180 ) 181 182 existing_table_infos = ( 183 {table_info.name: table_info for table_info in existing_environment.promoted_snapshots} 184 if existing_environment 185 else {} 186 ) 187 table_infos = {table_info.name: table_info for table_info in environment.promoted_snapshots} 188 views_that_changed_location: t.Set[SnapshotTableInfo] = set() 189 if existing_environment: 190 views_that_changed_location = { 191 existing_table_info 192 for name, existing_table_info in existing_table_infos.items() 193 if name in table_infos 194 and existing_table_info.qualified_view_name.for_environment( 195 existing_environment.naming_info 196 ) 197 != table_infos[name].qualified_view_name.for_environment(environment.naming_info) 198 } 199 if not existing_environment.expired: 200 if environment.previous_plan_id != existing_environment.plan_id: 201 raise ConflictingPlanError( 202 f"Another plan ({existing_environment.plan_id}) was applied to the target environment '{environment.name}' while your current plan " 203 f"({environment.plan_id}) was still in progress, interrupting it. Please re-apply your plan to resolve this error." 204 ) 205 if no_gaps_snapshot_names != set(): 206 snapshots = self.get_snapshots(environment.snapshots).values() 207 self._ensure_no_gaps( 208 snapshots, 209 existing_environment, 210 no_gaps_snapshot_names, 211 ) 212 demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots) 213 # Update the updated_at attribute. 214 self.snapshot_state.touch_snapshots(demoted_snapshots) 215 216 missing_models = set(existing_table_infos) - { 217 snapshot.name for snapshot in environment.promoted_snapshots 218 } 219 220 added_table_infos = set(table_infos.values()) 221 if existing_environment and environment.can_partially_promote(existing_environment): 222 # Only promote new snapshots. 223 added_table_infos -= set(existing_environment.promoted_snapshots) 224 225 self.environment_state.update_environment(environment) 226 227 # If it is an empty list, we want to update the environment statements 228 # To reflect there are no statements anymore in this environment 229 if environment_statements is not None: 230 self.environment_state.update_environment_statements( 231 environment.name, environment.plan_id, environment_statements 232 ) 233 234 removed = {existing_table_infos[name] for name in missing_models}.union( 235 views_that_changed_location 236 ) 237 238 return PromotionResult( 239 added=sorted(added_table_infos), 240 removed=list(removed), 241 removed_environment_naming_info=( 242 existing_environment.naming_info if removed and existing_environment else None 243 ), 244 )
Update the environment to reflect the current state.
This method verifies that snapshots have been pushed.
Arguments:
- environment: The environment to promote.
- no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, all snapshots will be checked. The data gap check ensures that models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
Returns:
A tuple of (added snapshot table infos, removed snapshot table infos, and environment target suffix for the removed table infos)
246 @transactional() 247 def finalize(self, environment: Environment) -> None: 248 """Finalize the target environment, indicating that this environment has been 249 fully promoted and is ready for use. 250 251 Args: 252 environment: The target environment to finalize. 253 """ 254 self.environment_state.finalize(environment)
Finalize the target environment, indicating that this environment has been fully promoted and is ready for use.
Arguments:
- environment: The target environment to finalize.
256 @transactional() 257 def unpause_snapshots( 258 self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike 259 ) -> None: 260 self.snapshot_state.unpause_snapshots(snapshots, unpaused_dt)
Unpauses target snapshots.
Unpaused snapshots are scheduled for evaluation on a recurring basis. Once unpaused a snapshot can't be paused again.
Arguments:
- snapshots: Target snapshots.
- unpaused_dt: The datetime object which indicates when target snapshots were unpaused.
262 def invalidate_environment(self, name: str, protect_prod: bool = True) -> None: 263 self.environment_state.invalidate_environment(name, protect_prod)
Invalidates the target environment by setting its expiration timestamp to now.
Arguments:
- name: The name of the environment to invalidate.
- protect_prod: If True, prevents invalidation of the production environment.
265 def get_expired_snapshots( 266 self, 267 *, 268 batch_range: ExpiredBatchRange, 269 current_ts: t.Optional[int] = None, 270 ignore_ttl: bool = False, 271 ) -> t.Optional[ExpiredSnapshotBatch]: 272 current_ts = current_ts or now_timestamp() 273 return self.snapshot_state.get_expired_snapshots( 274 environments=self.environment_state.get_environments(), 275 current_ts=current_ts, 276 ignore_ttl=ignore_ttl, 277 batch_range=batch_range, 278 )
Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
Arguments:
- current_ts: Timestamp used to evaluate expiration.
- ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
- batch_range: The range of the batch to fetch.
Returns:
A batch describing expired snapshots or None if no snapshots are pending cleanup.
280 def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]: 281 return self.environment_state.get_expired_environments(current_ts=current_ts)
Returns the expired environments.
Expired environments are environments that have exceeded their time-to-live value.
Returns:
The list of environment summaries to remove.
283 @transactional() 284 def delete_expired_snapshots( 285 self, 286 batch_range: ExpiredBatchRange, 287 ignore_ttl: bool = False, 288 current_ts: t.Optional[int] = None, 289 ) -> None: 290 batch = self.get_expired_snapshots( 291 ignore_ttl=ignore_ttl, 292 current_ts=current_ts, 293 batch_range=batch_range, 294 ) 295 if batch and batch.expired_snapshot_ids: 296 self.snapshot_state.delete_snapshots(batch.expired_snapshot_ids) 297 self.interval_state.cleanup_intervals(batch.cleanup_tasks, batch.expired_snapshot_ids)
Removes expired snapshots.
Expired snapshots are snapshots that have exceeded their time-to-live and are no longer in use within an environment.
Arguments:
- batch_range: The range of snapshots to delete in this batch.
- ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting all snapshots that are not referenced in any environment
- current_ts: Timestamp used to evaluate expiration.
299 @transactional() 300 def delete_expired_environments( 301 self, current_ts: t.Optional[int] = None 302 ) -> t.List[EnvironmentSummary]: 303 current_ts = current_ts or now_timestamp() 304 return self.environment_state.delete_expired_environments(current_ts=current_ts)
Removes expired environments.
Expired environments are environments that have exceeded their time-to-live value.
Returns:
The list of removed environments.
306 def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None: 307 self.snapshot_state.delete_snapshots(snapshot_ids)
Delete snapshots from the state sync.
Arguments:
- snapshot_ids: A list of snapshot like objects to delete.
309 def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]: 310 return self.snapshot_state.snapshots_exist(snapshot_ids)
Checks if multiple snapshots exist in the state sync.
Arguments:
- snapshot_ids: Iterable of snapshot ids to bulk check.
Returns:
A set of all the existing snapshot ids.
312 def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]: 313 return self.snapshot_state.nodes_exist(names, exclude_external)
Returns the node names that exist in the state sync.
Arguments:
- names: Iterable of node names to check.
- exclude_external: Whether to exclude external models from the output.
Returns:
A set of all the existing node names.
315 def remove_state(self, including_backup: bool = False) -> None: 316 """Removes the state store objects.""" 317 for table in ( 318 self.snapshot_state.snapshots_table, 319 self.snapshot_state.auto_restatements_table, 320 self.environment_state.environments_table, 321 self.environment_state.environment_statements_table, 322 self.interval_state.intervals_table, 323 self.version_state.versions_table, 324 ): 325 self.engine_adapter.drop_table(table) 326 if including_backup: 327 self.engine_adapter.drop_table(_backup_table_name(table)) 328 329 self.snapshot_state.clear_cache()
Removes the state store objects.
331 def reset(self, default_catalog: t.Optional[str]) -> None: 332 """Resets the state store to the state when it was first initialized.""" 333 self.remove_state() 334 self.migrate(default_catalog)
Resets the state store to the state when it was first initialized.
336 @transactional() 337 def update_auto_restatements( 338 self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]] 339 ) -> None: 340 self.snapshot_state.update_auto_restatements(next_auto_restatement_ts)
Updates the next auto restatement timestamp for the snapshots.
Arguments:
- next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
342 def get_environment(self, environment: str) -> t.Optional[Environment]: 343 return self.environment_state.get_environment(environment)
Fetches the environment if it exists.
Arguments:
- environment: The environment
Returns:
The environment object.
345 def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]: 346 return self.environment_state.get_environment_statements(environment)
Fetches environment statements from the environment_statements table.
Returns:
A list of the Environment Statements.
348 def get_environments(self) -> t.List[Environment]: 349 """Fetches all environments. 350 351 Returns: 352 A list of all environments. 353 """ 354 return self.environment_state.get_environments()
Fetches all environments.
Returns:
A list of all environments.
356 def get_environments_summary(self) -> t.List[EnvironmentSummary]: 357 """Fetches all environment names along with expiry datetime. 358 359 Returns: 360 A list of all environment summaries. 361 """ 362 return self.environment_state.get_environments_summary()
Fetches all environment names along with expiry datetime.
Returns:
A list of all environment summaries.
364 def get_snapshots( 365 self, 366 snapshot_ids: t.Iterable[SnapshotIdLike], 367 ) -> t.Dict[SnapshotId, Snapshot]: 368 """Fetches snapshots from the state. 369 370 Args: 371 snapshot_ids: The snapshot IDs to fetch. 372 373 Returns: 374 A dict of snapshots. 375 """ 376 snapshots = self.snapshot_state.get_snapshots(snapshot_ids) 377 intervals = self.interval_state.get_snapshot_intervals(snapshots.values()) 378 Snapshot.hydrate_with_intervals_by_version(snapshots.values(), intervals) 379 return snapshots
Fetches snapshots from the state.
Arguments:
- snapshot_ids: The snapshot IDs to fetch.
Returns:
A dict of snapshots.
381 def get_snapshots_by_names( 382 self, 383 snapshot_names: t.Iterable[str], 384 current_ts: t.Optional[int] = None, 385 exclude_expired: bool = True, 386 ) -> t.Set[SnapshotIdAndVersion]: 387 return self.snapshot_state.get_snapshots_by_names( 388 snapshot_names=snapshot_names, current_ts=current_ts, exclude_expired=exclude_expired 389 )
Return the snapshot records for all versions of the specified snapshot names.
Arguments:
- snapshot_names: Iterable of snapshot names to fetch all snapshot records for
- current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
- exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
Returns:
A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()
391 @transactional() 392 def add_interval( 393 self, 394 snapshot: Snapshot, 395 start: TimeLike, 396 end: TimeLike, 397 is_dev: bool = False, 398 last_altered_ts: t.Optional[int] = None, 399 ) -> None: 400 super().add_interval(snapshot, start, end, is_dev, last_altered_ts)
Add an interval to a snapshot and sync it to the store.
Arguments:
- snapshot: The snapshot like object to add an interval to.
- start: The start of the interval to add.
- end: The end of the interval to add.
- is_dev: Indicates whether the given interval is being added while in development mode
- last_altered_ts: The timestamp of the last modification of the physical table
402 @transactional() 403 def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: 404 intervals_to_insert = [] 405 for snapshot_intervals in snapshots_intervals: 406 snapshot_intervals = snapshot_intervals.copy( 407 update={ 408 "intervals": _remove_partial_intervals( 409 snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False 410 ), 411 "dev_intervals": _remove_partial_intervals( 412 snapshot_intervals.dev_intervals, 413 snapshot_intervals.snapshot_id, 414 is_dev=True, 415 ), 416 } 417 ) 418 if not snapshot_intervals.is_empty(): 419 intervals_to_insert.append(snapshot_intervals) 420 if intervals_to_insert: 421 self.interval_state.add_snapshots_intervals(intervals_to_insert)
Add snapshot intervals to state
Arguments:
- snapshots_intervals: The intervals to add.
423 @transactional() 424 def remove_intervals( 425 self, 426 snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]], 427 remove_shared_versions: bool = False, 428 ) -> None: 429 self.interval_state.remove_intervals(snapshot_intervals, remove_shared_versions)
Remove an interval from a list of snapshots and sync it to the store.
Because multiple snapshots can be pointing to the same version or physical table, this method can also grab all snapshots tied to the passed in version.
Arguments:
- snapshot_intervals: The snapshot intervals to remove.
- remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
431 @transactional() 432 def compact_intervals(self) -> None: 433 self.interval_state.compact_intervals()
Compacts intervals for all snapshots.
Compaction process involves merging of existing interval records into new records and then deleting the old ones.
435 def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]: 436 return self.interval_state.refresh_snapshot_intervals(snapshots)
Updates given snapshots with latest intervals from the state.
Arguments:
- snapshots: The snapshots to refresh.
Returns:
The updated snapshots.
438 def max_interval_end_per_model( 439 self, 440 environment: str, 441 models: t.Optional[t.Set[str]] = None, 442 ensure_finalized_snapshots: bool = False, 443 ) -> t.Dict[str, int]: 444 env = self.get_environment(environment) 445 if not env: 446 return {} 447 448 snapshots = ( 449 env.snapshots if not ensure_finalized_snapshots else env.finalized_or_current_snapshots 450 ) 451 if models is not None: 452 snapshots = [s for s in snapshots if s.name in models] 453 454 if not snapshots: 455 return {} 456 457 return self.interval_state.max_interval_end_per_model(snapshots)
Returns the max interval end per model for the given environment.
Arguments:
- environment: The target environment.
- models: The models to get the max interval end for. If None, all models are considered.
- ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:
A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.
Closes all open connections and releases all allocated resources associated with any thread except the calling one.
465 @transactional() 466 def migrate( 467 self, 468 skip_backup: bool = False, 469 promoted_snapshots_only: bool = True, 470 ) -> None: 471 """Migrate the state sync to the latest SQLMesh / SQLGlot version.""" 472 self.migrator.migrate( 473 self.schema, 474 skip_backup=skip_backup, 475 promoted_snapshots_only=promoted_snapshots_only, 476 )
Migrate the state sync to the latest SQLMesh / SQLGlot version.
478 @transactional() 479 def rollback(self) -> None: 480 """Rollback to the previous migration.""" 481 self.migrator.rollback()
Rollback to the previous migration.
483 @transactional() 484 def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream: 485 versions = self.get_versions( 486 validate=True 487 ) # will throw if the state db hasnt been created or there is a version mismatch 488 489 snapshot_ids_to_export: t.Set[SnapshotId] = set() 490 selected_environments: t.List[Environment] = [] 491 if environment_names: 492 for env_name in environment_names: 493 environment = self.get_environment(env_name) 494 if not environment: 495 raise SQLMeshError(f"No such environment: {env_name}") 496 selected_environments.append(environment) 497 else: 498 selected_environments = self.get_environments() 499 500 for env in selected_environments: 501 snapshot_ids_to_export |= set([s.snapshot_id for s in env.snapshots or []]) 502 503 def _export_snapshots() -> t.Iterator[Snapshot]: 504 for chunk in chunk_iterable(snapshot_ids_to_export, SnapshotState.SNAPSHOT_BATCH_SIZE): 505 yield from self.get_snapshots(chunk).values() 506 507 def _export_environments() -> t.Iterator[EnvironmentWithStatements]: 508 for env in selected_environments: 509 yield EnvironmentWithStatements( 510 environment=env, statements=self.get_environment_statements(env.name) 511 ) 512 513 return StateStream.from_iterators( 514 versions=versions, 515 snapshots=_export_snapshots(), 516 environments=_export_environments(), 517 )
Export the contents of this StateSync as a StateStream
Arguments:
- environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
519 @transactional() 520 def import_(self, stream: StateStream, clear: bool = True) -> None: 521 existing_versions = self.get_versions() 522 523 for state_chunk in stream: 524 if isinstance(state_chunk, VersionsChunk): 525 # SQLMesh major/minor version must match so that we can be sure the JSON contained in the state file 526 # is compatible with our Pydantic model definitions. Patch versions dont need to match because the assumption 527 # is that they dont contain any breaking changes 528 incoming_versions = state_chunk.versions 529 if ( 530 incoming_versions.minor_sqlmesh_version 531 != existing_versions.minor_sqlmesh_version 532 ): 533 raise SQLMeshError( 534 f"SQLMesh version mismatch. You are running '{existing_versions.sqlmesh_version}' but the state file was created with '{incoming_versions.sqlmesh_version}'.\n" 535 "Please upgrade/downgrade your SQLMesh version to match the state file before performing the import." 536 ) 537 538 if clear: 539 self.reset(default_catalog=None) 540 541 if isinstance(state_chunk, SnapshotsChunk): 542 auto_restatements: t.Dict[SnapshotNameVersion, t.Optional[int]] = {} 543 544 for snapshot_chunk in chunk_iterable( 545 state_chunk, SnapshotState.SNAPSHOT_BATCH_SIZE 546 ): 547 snapshot_chunk = list(snapshot_chunk) 548 overwrite_existing_snapshots = ( 549 not clear 550 ) # if clear=True, all existing snapshots were dropped anyway 551 self.snapshot_state.push_snapshots( 552 snapshot_chunk, overwrite=overwrite_existing_snapshots 553 ) 554 self.add_snapshots_intervals((s.snapshot_intervals for s in snapshot_chunk)) 555 556 auto_restatements.update( 557 { 558 s.name_version: s.next_auto_restatement_ts 559 for s in snapshot_chunk 560 if s.next_auto_restatement_ts 561 } 562 ) 563 564 self.update_auto_restatements(auto_restatements) 565 566 if isinstance(state_chunk, EnvironmentsChunk): 567 for environment_with_statements in state_chunk: 568 environment = environment_with_statements.environment 569 self.environment_state.update_environment(environment) 570 self.environment_state.update_environment_statements( 571 environment.name, 572 environment.plan_id, 573 environment_with_statements.statements, 574 )
Replace the existing state with the state contained in the StateStream
Arguments:
- stream: The stream of new state
- clear: Whether or not to clear existing state before inserting state from the stream