Edit on GitHub

sqlmesh.core.state_sync.db

1from sqlmesh.core.state_sync.db.facade import EngineAdapterStateSync
2
3__all__ = ["EngineAdapterStateSync"]
class EngineAdapterStateSync(sqlmesh.core.state_sync.base.StateSync):
 75class EngineAdapterStateSync(StateSync):
 76    """Manages state of nodes and snapshot with an existing engine adapter.
 77
 78    This state sync is convenient to use because it requires no additional setup.
 79    You can reuse the same engine/warehouse that your data is stored in.
 80
 81    Args:
 82        engine_adapter: The EngineAdapter to use to store and fetch snapshots.
 83        schema: The schema to store state metadata in. If None or empty string then no schema is defined
 84        console: The console to log information to.
 85        cache_dir: The cache path, used for caching snapshot models.
 86    """
 87
 88    def __init__(
 89        self,
 90        engine_adapter: EngineAdapter,
 91        schema: t.Optional[str],
 92        console: t.Optional[Console] = None,
 93        cache_dir: Path = Path(),
 94    ):
 95        self.interval_state = IntervalState(engine_adapter, schema=schema)
 96        self.environment_state = EnvironmentState(engine_adapter, schema=schema)
 97        self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir)
 98        self.version_state = VersionState(engine_adapter, schema=schema)
 99        self.migrator = StateMigrator(
100            engine_adapter,
101            version_state=self.version_state,
102            snapshot_state=self.snapshot_state,
103            environment_state=self.environment_state,
104            interval_state=self.interval_state,
105            console=console,
106        )
107        # Make sure that if an empty string is provided that we treat it as None
108        self.schema = schema or None
109        self.engine_adapter = engine_adapter
110        self.console = console or get_console()
111
112    @transactional()
113    def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None:
114        """Pushes snapshots to the state store, merging them with existing ones.
115
116        This method first finds all existing snapshots in the store and merges them with
117        the local snapshots. It will then delete all existing snapshots and then
118        insert all the local snapshots. This can be made safer with locks or merge/upsert.
119
120        Args:
121            snapshots: The snapshots to push.
122        """
123        snapshots_by_id = {}
124        for snapshot in snapshots:
125            if not snapshot.version:
126                raise SQLMeshError(
127                    f"Snapshot {snapshot} has not been versioned yet. Create a plan before pushing a snapshot."
128                )
129            snapshots_by_id[snapshot.snapshot_id] = snapshot
130
131        existing = self.snapshots_exist(snapshots_by_id)
132
133        if existing:
134            logger.error(
135                "Snapshots %s already exists. This could be due to a concurrent plan or a hash collision. If this is a hash collision, add a stamp to your model.",
136                str(existing),
137            )
138
139            for sid in tuple(snapshots_by_id):
140                if sid in existing:
141                    snapshots_by_id.pop(sid)
142
143        snapshots = snapshots_by_id.values()
144        if snapshots:
145            self.snapshot_state.push_snapshots(snapshots)
146
147    @transactional()
148    def promote(
149        self,
150        environment: Environment,
151        no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
152        environment_statements: t.Optional[t.List[EnvironmentStatements]] = None,
153    ) -> PromotionResult:
154        """Update the environment to reflect the current state.
155
156        This method verifies that snapshots have been pushed.
157
158        Args:
159            environment: The environment to promote.
160            no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
161                all snapshots will be checked. The data gap check ensures that models that are already a
162                part of the target environment have no data gaps when compared against previous
163                snapshots for same models.
164
165        Returns:
166           A tuple of (added snapshot table infos, removed snapshot table infos, and environment target suffix for the removed table infos)
167        """
168        logger.info("Promoting environment '%s'", environment.name)
169
170        missing = {s.snapshot_id for s in environment.snapshots} - self.snapshots_exist(
171            environment.snapshots
172        )
173        if missing:
174            raise SQLMeshError(
175                f"Missing snapshots {missing}. Make sure to push and backfill your snapshots."
176            )
177
178        existing_environment = self.environment_state.get_environment(
179            environment.name, lock_for_update=True
180        )
181
182        existing_table_infos = (
183            {table_info.name: table_info for table_info in existing_environment.promoted_snapshots}
184            if existing_environment
185            else {}
186        )
187        table_infos = {table_info.name: table_info for table_info in environment.promoted_snapshots}
188        views_that_changed_location: t.Set[SnapshotTableInfo] = set()
189        if existing_environment:
190            views_that_changed_location = {
191                existing_table_info
192                for name, existing_table_info in existing_table_infos.items()
193                if name in table_infos
194                and existing_table_info.qualified_view_name.for_environment(
195                    existing_environment.naming_info
196                )
197                != table_infos[name].qualified_view_name.for_environment(environment.naming_info)
198            }
199            if not existing_environment.expired:
200                if environment.previous_plan_id != existing_environment.plan_id:
201                    raise ConflictingPlanError(
202                        f"Another plan ({existing_environment.plan_id}) was applied to the target environment '{environment.name}' while your current plan "
203                        f"({environment.plan_id}) was still in progress, interrupting it. Please re-apply your plan to resolve this error."
204                    )
205                if no_gaps_snapshot_names != set():
206                    snapshots = self.get_snapshots(environment.snapshots).values()
207                    self._ensure_no_gaps(
208                        snapshots,
209                        existing_environment,
210                        no_gaps_snapshot_names,
211                    )
212            demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots)
213            # Update the updated_at attribute.
214            self.snapshot_state.touch_snapshots(demoted_snapshots)
215
216        missing_models = set(existing_table_infos) - {
217            snapshot.name for snapshot in environment.promoted_snapshots
218        }
219
220        added_table_infos = set(table_infos.values())
221        if existing_environment and environment.can_partially_promote(existing_environment):
222            # Only promote new snapshots.
223            added_table_infos -= set(existing_environment.promoted_snapshots)
224
225        self.environment_state.update_environment(environment)
226
227        # If it is an empty list, we want to update the environment statements
228        # To reflect there are no statements anymore in this environment
229        if environment_statements is not None:
230            self.environment_state.update_environment_statements(
231                environment.name, environment.plan_id, environment_statements
232            )
233
234        removed = {existing_table_infos[name] for name in missing_models}.union(
235            views_that_changed_location
236        )
237
238        return PromotionResult(
239            added=sorted(added_table_infos),
240            removed=list(removed),
241            removed_environment_naming_info=(
242                existing_environment.naming_info if removed and existing_environment else None
243            ),
244        )
245
246    @transactional()
247    def finalize(self, environment: Environment) -> None:
248        """Finalize the target environment, indicating that this environment has been
249        fully promoted and is ready for use.
250
251        Args:
252            environment: The target environment to finalize.
253        """
254        self.environment_state.finalize(environment)
255
256    @transactional()
257    def unpause_snapshots(
258        self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
259    ) -> None:
260        self.snapshot_state.unpause_snapshots(snapshots, unpaused_dt)
261
262    def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
263        self.environment_state.invalidate_environment(name, protect_prod)
264
265    def get_expired_snapshots(
266        self,
267        *,
268        batch_range: ExpiredBatchRange,
269        current_ts: t.Optional[int] = None,
270        ignore_ttl: bool = False,
271    ) -> t.Optional[ExpiredSnapshotBatch]:
272        current_ts = current_ts or now_timestamp()
273        return self.snapshot_state.get_expired_snapshots(
274            environments=self.environment_state.get_environments(),
275            current_ts=current_ts,
276            ignore_ttl=ignore_ttl,
277            batch_range=batch_range,
278        )
279
280    def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
281        return self.environment_state.get_expired_environments(current_ts=current_ts)
282
283    @transactional()
284    def delete_expired_snapshots(
285        self,
286        batch_range: ExpiredBatchRange,
287        ignore_ttl: bool = False,
288        current_ts: t.Optional[int] = None,
289    ) -> None:
290        batch = self.get_expired_snapshots(
291            ignore_ttl=ignore_ttl,
292            current_ts=current_ts,
293            batch_range=batch_range,
294        )
295        if batch and batch.expired_snapshot_ids:
296            self.snapshot_state.delete_snapshots(batch.expired_snapshot_ids)
297            self.interval_state.cleanup_intervals(batch.cleanup_tasks, batch.expired_snapshot_ids)
298
299    @transactional()
300    def delete_expired_environments(
301        self, current_ts: t.Optional[int] = None
302    ) -> t.List[EnvironmentSummary]:
303        current_ts = current_ts or now_timestamp()
304        return self.environment_state.delete_expired_environments(current_ts=current_ts)
305
306    def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
307        self.snapshot_state.delete_snapshots(snapshot_ids)
308
309    def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
310        return self.snapshot_state.snapshots_exist(snapshot_ids)
311
312    def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]:
313        return self.snapshot_state.nodes_exist(names, exclude_external)
314
315    def remove_state(self, including_backup: bool = False) -> None:
316        """Removes the state store objects."""
317        for table in (
318            self.snapshot_state.snapshots_table,
319            self.snapshot_state.auto_restatements_table,
320            self.environment_state.environments_table,
321            self.environment_state.environment_statements_table,
322            self.interval_state.intervals_table,
323            self.version_state.versions_table,
324        ):
325            self.engine_adapter.drop_table(table)
326            if including_backup:
327                self.engine_adapter.drop_table(_backup_table_name(table))
328
329        self.snapshot_state.clear_cache()
330
331    def reset(self, default_catalog: t.Optional[str]) -> None:
332        """Resets the state store to the state when it was first initialized."""
333        self.remove_state()
334        self.migrate(default_catalog)
335
336    @transactional()
337    def update_auto_restatements(
338        self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]]
339    ) -> None:
340        self.snapshot_state.update_auto_restatements(next_auto_restatement_ts)
341
342    def get_environment(self, environment: str) -> t.Optional[Environment]:
343        return self.environment_state.get_environment(environment)
344
345    def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]:
346        return self.environment_state.get_environment_statements(environment)
347
348    def get_environments(self) -> t.List[Environment]:
349        """Fetches all environments.
350
351        Returns:
352            A list of all environments.
353        """
354        return self.environment_state.get_environments()
355
356    def get_environments_summary(self) -> t.List[EnvironmentSummary]:
357        """Fetches all environment names along with expiry datetime.
358
359        Returns:
360            A list of all environment summaries.
361        """
362        return self.environment_state.get_environments_summary()
363
364    def get_snapshots(
365        self,
366        snapshot_ids: t.Iterable[SnapshotIdLike],
367    ) -> t.Dict[SnapshotId, Snapshot]:
368        """Fetches snapshots from the state.
369
370        Args:
371            snapshot_ids: The snapshot IDs to fetch.
372
373        Returns:
374            A dict of snapshots.
375        """
376        snapshots = self.snapshot_state.get_snapshots(snapshot_ids)
377        intervals = self.interval_state.get_snapshot_intervals(snapshots.values())
378        Snapshot.hydrate_with_intervals_by_version(snapshots.values(), intervals)
379        return snapshots
380
381    def get_snapshots_by_names(
382        self,
383        snapshot_names: t.Iterable[str],
384        current_ts: t.Optional[int] = None,
385        exclude_expired: bool = True,
386    ) -> t.Set[SnapshotIdAndVersion]:
387        return self.snapshot_state.get_snapshots_by_names(
388            snapshot_names=snapshot_names, current_ts=current_ts, exclude_expired=exclude_expired
389        )
390
391    @transactional()
392    def add_interval(
393        self,
394        snapshot: Snapshot,
395        start: TimeLike,
396        end: TimeLike,
397        is_dev: bool = False,
398        last_altered_ts: t.Optional[int] = None,
399    ) -> None:
400        super().add_interval(snapshot, start, end, is_dev, last_altered_ts)
401
402    @transactional()
403    def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
404        intervals_to_insert = []
405        for snapshot_intervals in snapshots_intervals:
406            snapshot_intervals = snapshot_intervals.copy(
407                update={
408                    "intervals": _remove_partial_intervals(
409                        snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False
410                    ),
411                    "dev_intervals": _remove_partial_intervals(
412                        snapshot_intervals.dev_intervals,
413                        snapshot_intervals.snapshot_id,
414                        is_dev=True,
415                    ),
416                }
417            )
418            if not snapshot_intervals.is_empty():
419                intervals_to_insert.append(snapshot_intervals)
420        if intervals_to_insert:
421            self.interval_state.add_snapshots_intervals(intervals_to_insert)
422
423    @transactional()
424    def remove_intervals(
425        self,
426        snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]],
427        remove_shared_versions: bool = False,
428    ) -> None:
429        self.interval_state.remove_intervals(snapshot_intervals, remove_shared_versions)
430
431    @transactional()
432    def compact_intervals(self) -> None:
433        self.interval_state.compact_intervals()
434
435    def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]:
436        return self.interval_state.refresh_snapshot_intervals(snapshots)
437
438    def max_interval_end_per_model(
439        self,
440        environment: str,
441        models: t.Optional[t.Set[str]] = None,
442        ensure_finalized_snapshots: bool = False,
443    ) -> t.Dict[str, int]:
444        env = self.get_environment(environment)
445        if not env:
446            return {}
447
448        snapshots = (
449            env.snapshots if not ensure_finalized_snapshots else env.finalized_or_current_snapshots
450        )
451        if models is not None:
452            snapshots = [s for s in snapshots if s.name in models]
453
454        if not snapshots:
455            return {}
456
457        return self.interval_state.max_interval_end_per_model(snapshots)
458
459    def recycle(self) -> None:
460        self.engine_adapter.recycle()
461
462    def close(self) -> None:
463        self.engine_adapter.close()
464
465    @transactional()
466    def migrate(
467        self,
468        skip_backup: bool = False,
469        promoted_snapshots_only: bool = True,
470    ) -> None:
471        """Migrate the state sync to the latest SQLMesh / SQLGlot version."""
472        self.migrator.migrate(
473            self.schema,
474            skip_backup=skip_backup,
475            promoted_snapshots_only=promoted_snapshots_only,
476        )
477
478    @transactional()
479    def rollback(self) -> None:
480        """Rollback to the previous migration."""
481        self.migrator.rollback()
482
483    @transactional()
484    def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream:
485        versions = self.get_versions(
486            validate=True
487        )  # will throw if the state db hasnt been created or there is a version mismatch
488
489        snapshot_ids_to_export: t.Set[SnapshotId] = set()
490        selected_environments: t.List[Environment] = []
491        if environment_names:
492            for env_name in environment_names:
493                environment = self.get_environment(env_name)
494                if not environment:
495                    raise SQLMeshError(f"No such environment: {env_name}")
496                selected_environments.append(environment)
497        else:
498            selected_environments = self.get_environments()
499
500        for env in selected_environments:
501            snapshot_ids_to_export |= set([s.snapshot_id for s in env.snapshots or []])
502
503        def _export_snapshots() -> t.Iterator[Snapshot]:
504            for chunk in chunk_iterable(snapshot_ids_to_export, SnapshotState.SNAPSHOT_BATCH_SIZE):
505                yield from self.get_snapshots(chunk).values()
506
507        def _export_environments() -> t.Iterator[EnvironmentWithStatements]:
508            for env in selected_environments:
509                yield EnvironmentWithStatements(
510                    environment=env, statements=self.get_environment_statements(env.name)
511                )
512
513        return StateStream.from_iterators(
514            versions=versions,
515            snapshots=_export_snapshots(),
516            environments=_export_environments(),
517        )
518
519    @transactional()
520    def import_(self, stream: StateStream, clear: bool = True) -> None:
521        existing_versions = self.get_versions()
522
523        for state_chunk in stream:
524            if isinstance(state_chunk, VersionsChunk):
525                # SQLMesh major/minor version must match so that we can be sure the JSON contained in the state file
526                # is compatible with our Pydantic model definitions. Patch versions dont need to match because the assumption
527                # is that they dont contain any breaking changes
528                incoming_versions = state_chunk.versions
529                if (
530                    incoming_versions.minor_sqlmesh_version
531                    != existing_versions.minor_sqlmesh_version
532                ):
533                    raise SQLMeshError(
534                        f"SQLMesh version mismatch. You are running '{existing_versions.sqlmesh_version}' but the state file was created with '{incoming_versions.sqlmesh_version}'.\n"
535                        "Please upgrade/downgrade your SQLMesh version to match the state file before performing the import."
536                    )
537
538                if clear:
539                    self.reset(default_catalog=None)
540
541            if isinstance(state_chunk, SnapshotsChunk):
542                auto_restatements: t.Dict[SnapshotNameVersion, t.Optional[int]] = {}
543
544                for snapshot_chunk in chunk_iterable(
545                    state_chunk, SnapshotState.SNAPSHOT_BATCH_SIZE
546                ):
547                    snapshot_chunk = list(snapshot_chunk)
548                    overwrite_existing_snapshots = (
549                        not clear
550                    )  # if clear=True, all existing snapshots were dropped anyway
551                    self.snapshot_state.push_snapshots(
552                        snapshot_chunk, overwrite=overwrite_existing_snapshots
553                    )
554                    self.add_snapshots_intervals((s.snapshot_intervals for s in snapshot_chunk))
555
556                    auto_restatements.update(
557                        {
558                            s.name_version: s.next_auto_restatement_ts
559                            for s in snapshot_chunk
560                            if s.next_auto_restatement_ts
561                        }
562                    )
563
564                self.update_auto_restatements(auto_restatements)
565
566            if isinstance(state_chunk, EnvironmentsChunk):
567                for environment_with_statements in state_chunk:
568                    environment = environment_with_statements.environment
569                    self.environment_state.update_environment(environment)
570                    self.environment_state.update_environment_statements(
571                        environment.name,
572                        environment.plan_id,
573                        environment_with_statements.statements,
574                    )
575
576    def state_type(self) -> str:
577        return self.engine_adapter.dialect
578
579    def _get_versions(self) -> Versions:
580        return self.version_state.get_versions()
581
582    def _ensure_no_gaps(
583        self,
584        target_snapshots: t.Iterable[Snapshot],
585        target_environment: Environment,
586        snapshot_names: t.Optional[t.Set[str]],
587    ) -> None:
588        target_snapshots_by_name = {s.name: s for s in target_snapshots}
589
590        changed_version_prev_snapshots_by_name = {
591            s.name: s
592            for s in target_environment.snapshots
593            if s.name in target_snapshots_by_name
594            and target_snapshots_by_name[s.name].version != s.version
595        }
596
597        prev_snapshots = self.get_snapshots(
598            changed_version_prev_snapshots_by_name.values()
599        ).values()
600        cache: t.Dict[str, datetime] = {}
601
602        for prev_snapshot in prev_snapshots:
603            target_snapshot = target_snapshots_by_name[prev_snapshot.name]
604            if (
605                (snapshot_names is None or prev_snapshot.name in snapshot_names)
606                and target_snapshot.is_incremental
607                and prev_snapshot.is_incremental
608                and prev_snapshot.intervals
609            ):
610                start = to_timestamp(
611                    start_date(target_snapshot, target_snapshots_by_name.values(), cache)
612                )
613                end = prev_snapshot.intervals[-1][1]
614
615                if start < end:
616                    missing_intervals = target_snapshot.missing_intervals(
617                        start, end, end_bounded=True
618                    )
619
620                    if missing_intervals:
621                        raise SQLMeshError(
622                            f"Detected missing intervals for model {target_snapshot.name}, interrupting your current plan. "
623                            "Please re-apply your plan to resolve this error."
624                        )
625
626    @contextlib.contextmanager
627    def _transaction(self) -> t.Iterator[None]:
628        with self.engine_adapter.transaction():
629            yield

Manages state of nodes and snapshot with an existing engine adapter.

This state sync is convenient to use because it requires no additional setup. You can reuse the same engine/warehouse that your data is stored in.

Arguments:
  • engine_adapter: The EngineAdapter to use to store and fetch snapshots.
  • schema: The schema to store state metadata in. If None or empty string then no schema is defined
  • console: The console to log information to.
  • cache_dir: The cache path, used for caching snapshot models.
EngineAdapterStateSync( engine_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, schema: Optional[str], console: Optional[sqlmesh.core.console.Console] = None, cache_dir: pathlib.Path = PosixPath('.'))
 88    def __init__(
 89        self,
 90        engine_adapter: EngineAdapter,
 91        schema: t.Optional[str],
 92        console: t.Optional[Console] = None,
 93        cache_dir: Path = Path(),
 94    ):
 95        self.interval_state = IntervalState(engine_adapter, schema=schema)
 96        self.environment_state = EnvironmentState(engine_adapter, schema=schema)
 97        self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir)
 98        self.version_state = VersionState(engine_adapter, schema=schema)
 99        self.migrator = StateMigrator(
100            engine_adapter,
101            version_state=self.version_state,
102            snapshot_state=self.snapshot_state,
103            environment_state=self.environment_state,
104            interval_state=self.interval_state,
105            console=console,
106        )
107        # Make sure that if an empty string is provided that we treat it as None
108        self.schema = schema or None
109        self.engine_adapter = engine_adapter
110        self.console = console or get_console()
interval_state
environment_state
snapshot_state
version_state
migrator
schema
engine_adapter
console
@transactional()
def push_snapshots( self, snapshots: Iterable[sqlmesh.core.snapshot.definition.Snapshot]) -> None:
112    @transactional()
113    def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None:
114        """Pushes snapshots to the state store, merging them with existing ones.
115
116        This method first finds all existing snapshots in the store and merges them with
117        the local snapshots. It will then delete all existing snapshots and then
118        insert all the local snapshots. This can be made safer with locks or merge/upsert.
119
120        Args:
121            snapshots: The snapshots to push.
122        """
123        snapshots_by_id = {}
124        for snapshot in snapshots:
125            if not snapshot.version:
126                raise SQLMeshError(
127                    f"Snapshot {snapshot} has not been versioned yet. Create a plan before pushing a snapshot."
128                )
129            snapshots_by_id[snapshot.snapshot_id] = snapshot
130
131        existing = self.snapshots_exist(snapshots_by_id)
132
133        if existing:
134            logger.error(
135                "Snapshots %s already exists. This could be due to a concurrent plan or a hash collision. If this is a hash collision, add a stamp to your model.",
136                str(existing),
137            )
138
139            for sid in tuple(snapshots_by_id):
140                if sid in existing:
141                    snapshots_by_id.pop(sid)
142
143        snapshots = snapshots_by_id.values()
144        if snapshots:
145            self.snapshot_state.push_snapshots(snapshots)

Pushes snapshots to the state store, merging them with existing ones.

This method first finds all existing snapshots in the store and merges them with the local snapshots. It will then delete all existing snapshots and then insert all the local snapshots. This can be made safer with locks or merge/upsert.

Arguments:
  • snapshots: The snapshots to push.
@transactional()
def promote( self, environment: sqlmesh.core.environment.Environment, no_gaps_snapshot_names: Optional[Set[str]] = None, environment_statements: Optional[List[sqlmesh.core.environment.EnvironmentStatements]] = None) -> sqlmesh.core.state_sync.common.PromotionResult:
147    @transactional()
148    def promote(
149        self,
150        environment: Environment,
151        no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
152        environment_statements: t.Optional[t.List[EnvironmentStatements]] = None,
153    ) -> PromotionResult:
154        """Update the environment to reflect the current state.
155
156        This method verifies that snapshots have been pushed.
157
158        Args:
159            environment: The environment to promote.
160            no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
161                all snapshots will be checked. The data gap check ensures that models that are already a
162                part of the target environment have no data gaps when compared against previous
163                snapshots for same models.
164
165        Returns:
166           A tuple of (added snapshot table infos, removed snapshot table infos, and environment target suffix for the removed table infos)
167        """
168        logger.info("Promoting environment '%s'", environment.name)
169
170        missing = {s.snapshot_id for s in environment.snapshots} - self.snapshots_exist(
171            environment.snapshots
172        )
173        if missing:
174            raise SQLMeshError(
175                f"Missing snapshots {missing}. Make sure to push and backfill your snapshots."
176            )
177
178        existing_environment = self.environment_state.get_environment(
179            environment.name, lock_for_update=True
180        )
181
182        existing_table_infos = (
183            {table_info.name: table_info for table_info in existing_environment.promoted_snapshots}
184            if existing_environment
185            else {}
186        )
187        table_infos = {table_info.name: table_info for table_info in environment.promoted_snapshots}
188        views_that_changed_location: t.Set[SnapshotTableInfo] = set()
189        if existing_environment:
190            views_that_changed_location = {
191                existing_table_info
192                for name, existing_table_info in existing_table_infos.items()
193                if name in table_infos
194                and existing_table_info.qualified_view_name.for_environment(
195                    existing_environment.naming_info
196                )
197                != table_infos[name].qualified_view_name.for_environment(environment.naming_info)
198            }
199            if not existing_environment.expired:
200                if environment.previous_plan_id != existing_environment.plan_id:
201                    raise ConflictingPlanError(
202                        f"Another plan ({existing_environment.plan_id}) was applied to the target environment '{environment.name}' while your current plan "
203                        f"({environment.plan_id}) was still in progress, interrupting it. Please re-apply your plan to resolve this error."
204                    )
205                if no_gaps_snapshot_names != set():
206                    snapshots = self.get_snapshots(environment.snapshots).values()
207                    self._ensure_no_gaps(
208                        snapshots,
209                        existing_environment,
210                        no_gaps_snapshot_names,
211                    )
212            demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots)
213            # Update the updated_at attribute.
214            self.snapshot_state.touch_snapshots(demoted_snapshots)
215
216        missing_models = set(existing_table_infos) - {
217            snapshot.name for snapshot in environment.promoted_snapshots
218        }
219
220        added_table_infos = set(table_infos.values())
221        if existing_environment and environment.can_partially_promote(existing_environment):
222            # Only promote new snapshots.
223            added_table_infos -= set(existing_environment.promoted_snapshots)
224
225        self.environment_state.update_environment(environment)
226
227        # If it is an empty list, we want to update the environment statements
228        # To reflect there are no statements anymore in this environment
229        if environment_statements is not None:
230            self.environment_state.update_environment_statements(
231                environment.name, environment.plan_id, environment_statements
232            )
233
234        removed = {existing_table_infos[name] for name in missing_models}.union(
235            views_that_changed_location
236        )
237
238        return PromotionResult(
239            added=sorted(added_table_infos),
240            removed=list(removed),
241            removed_environment_naming_info=(
242                existing_environment.naming_info if removed and existing_environment else None
243            ),
244        )

Update the environment to reflect the current state.

This method verifies that snapshots have been pushed.

Arguments:
  • environment: The environment to promote.
  • no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None, all snapshots will be checked. The data gap check ensures that models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
Returns:

A tuple of (added snapshot table infos, removed snapshot table infos, and environment target suffix for the removed table infos)

@transactional()
def finalize(self, environment: sqlmesh.core.environment.Environment) -> None:
246    @transactional()
247    def finalize(self, environment: Environment) -> None:
248        """Finalize the target environment, indicating that this environment has been
249        fully promoted and is ready for use.
250
251        Args:
252            environment: The target environment to finalize.
253        """
254        self.environment_state.finalize(environment)

Finalize the target environment, indicating that this environment has been fully promoted and is ready for use.

Arguments:
  • environment: The target environment to finalize.
@transactional()
def unpause_snapshots( self, snapshots: Collection[Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot]], unpaused_dt: Union[datetime.date, datetime.datetime, str, int, float]) -> None:
256    @transactional()
257    def unpause_snapshots(
258        self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
259    ) -> None:
260        self.snapshot_state.unpause_snapshots(snapshots, unpaused_dt)

Unpauses target snapshots.

Unpaused snapshots are scheduled for evaluation on a recurring basis. Once unpaused a snapshot can't be paused again.

Arguments:
  • snapshots: Target snapshots.
  • unpaused_dt: The datetime object which indicates when target snapshots were unpaused.
def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
262    def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
263        self.environment_state.invalidate_environment(name, protect_prod)

Invalidates the target environment by setting its expiration timestamp to now.

Arguments:
  • name: The name of the environment to invalidate.
  • protect_prod: If True, prevents invalidation of the production environment.
def get_expired_snapshots( self, *, batch_range: sqlmesh.core.state_sync.common.ExpiredBatchRange, current_ts: Optional[int] = None, ignore_ttl: bool = False) -> Optional[sqlmesh.core.state_sync.common.ExpiredSnapshotBatch]:
265    def get_expired_snapshots(
266        self,
267        *,
268        batch_range: ExpiredBatchRange,
269        current_ts: t.Optional[int] = None,
270        ignore_ttl: bool = False,
271    ) -> t.Optional[ExpiredSnapshotBatch]:
272        current_ts = current_ts or now_timestamp()
273        return self.snapshot_state.get_expired_snapshots(
274            environments=self.environment_state.get_environments(),
275            current_ts=current_ts,
276            ignore_ttl=ignore_ttl,
277            batch_range=batch_range,
278        )

Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).

Arguments:
  • current_ts: Timestamp used to evaluate expiration.
  • ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
  • batch_range: The range of the batch to fetch.
Returns:

A batch describing expired snapshots or None if no snapshots are pending cleanup.

def get_expired_environments( self, current_ts: int) -> List[sqlmesh.core.environment.EnvironmentSummary]:
280    def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
281        return self.environment_state.get_expired_environments(current_ts=current_ts)

Returns the expired environments.

Expired environments are environments that have exceeded their time-to-live value.

Returns:

The list of environment summaries to remove.

@transactional()
def delete_expired_snapshots( self, batch_range: sqlmesh.core.state_sync.common.ExpiredBatchRange, ignore_ttl: bool = False, current_ts: Optional[int] = None) -> None:
283    @transactional()
284    def delete_expired_snapshots(
285        self,
286        batch_range: ExpiredBatchRange,
287        ignore_ttl: bool = False,
288        current_ts: t.Optional[int] = None,
289    ) -> None:
290        batch = self.get_expired_snapshots(
291            ignore_ttl=ignore_ttl,
292            current_ts=current_ts,
293            batch_range=batch_range,
294        )
295        if batch and batch.expired_snapshot_ids:
296            self.snapshot_state.delete_snapshots(batch.expired_snapshot_ids)
297            self.interval_state.cleanup_intervals(batch.cleanup_tasks, batch.expired_snapshot_ids)

Removes expired snapshots.

Expired snapshots are snapshots that have exceeded their time-to-live and are no longer in use within an environment.

Arguments:
  • batch_range: The range of snapshots to delete in this batch.
  • ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting all snapshots that are not referenced in any environment
  • current_ts: Timestamp used to evaluate expiration.
@transactional()
def delete_expired_environments( self, current_ts: Optional[int] = None) -> List[sqlmesh.core.environment.EnvironmentSummary]:
299    @transactional()
300    def delete_expired_environments(
301        self, current_ts: t.Optional[int] = None
302    ) -> t.List[EnvironmentSummary]:
303        current_ts = current_ts or now_timestamp()
304        return self.environment_state.delete_expired_environments(current_ts=current_ts)

Removes expired environments.

Expired environments are environments that have exceeded their time-to-live value.

Returns:

The list of removed environments.

306    def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
307        self.snapshot_state.delete_snapshots(snapshot_ids)

Delete snapshots from the state sync.

Arguments:
  • snapshot_ids: A list of snapshot like objects to delete.
309    def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
310        return self.snapshot_state.snapshots_exist(snapshot_ids)

Checks if multiple snapshots exist in the state sync.

Arguments:
  • snapshot_ids: Iterable of snapshot ids to bulk check.
Returns:

A set of all the existing snapshot ids.

def nodes_exist(self, names: Iterable[str], exclude_external: bool = False) -> Set[str]:
312    def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]:
313        return self.snapshot_state.nodes_exist(names, exclude_external)

Returns the node names that exist in the state sync.

Arguments:
  • names: Iterable of node names to check.
  • exclude_external: Whether to exclude external models from the output.
Returns:

A set of all the existing node names.

def remove_state(self, including_backup: bool = False) -> None:
315    def remove_state(self, including_backup: bool = False) -> None:
316        """Removes the state store objects."""
317        for table in (
318            self.snapshot_state.snapshots_table,
319            self.snapshot_state.auto_restatements_table,
320            self.environment_state.environments_table,
321            self.environment_state.environment_statements_table,
322            self.interval_state.intervals_table,
323            self.version_state.versions_table,
324        ):
325            self.engine_adapter.drop_table(table)
326            if including_backup:
327                self.engine_adapter.drop_table(_backup_table_name(table))
328
329        self.snapshot_state.clear_cache()

Removes the state store objects.

def reset(self, default_catalog: Optional[str]) -> None:
331    def reset(self, default_catalog: t.Optional[str]) -> None:
332        """Resets the state store to the state when it was first initialized."""
333        self.remove_state()
334        self.migrate(default_catalog)

Resets the state store to the state when it was first initialized.

@transactional()
def update_auto_restatements( self, next_auto_restatement_ts: Dict[sqlmesh.core.snapshot.definition.SnapshotNameVersion, Optional[int]]) -> None:
336    @transactional()
337    def update_auto_restatements(
338        self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]]
339    ) -> None:
340        self.snapshot_state.update_auto_restatements(next_auto_restatement_ts)

Updates the next auto restatement timestamp for the snapshots.

Arguments:
  • next_auto_restatement_ts: A dictionary of snapshot name / version pairs to the next auto restatement timestamp.
def get_environment(self, environment: str) -> Optional[sqlmesh.core.environment.Environment]:
342    def get_environment(self, environment: str) -> t.Optional[Environment]:
343        return self.environment_state.get_environment(environment)

Fetches the environment if it exists.

Arguments:
  • environment: The environment
Returns:

The environment object.

def get_environment_statements( self, environment: str) -> List[sqlmesh.core.environment.EnvironmentStatements]:
345    def get_environment_statements(self, environment: str) -> t.List[EnvironmentStatements]:
346        return self.environment_state.get_environment_statements(environment)

Fetches environment statements from the environment_statements table.

Returns:

A list of the Environment Statements.

def get_environments(self) -> List[sqlmesh.core.environment.Environment]:
348    def get_environments(self) -> t.List[Environment]:
349        """Fetches all environments.
350
351        Returns:
352            A list of all environments.
353        """
354        return self.environment_state.get_environments()

Fetches all environments.

Returns:

A list of all environments.

def get_environments_summary(self) -> List[sqlmesh.core.environment.EnvironmentSummary]:
356    def get_environments_summary(self) -> t.List[EnvironmentSummary]:
357        """Fetches all environment names along with expiry datetime.
358
359        Returns:
360            A list of all environment summaries.
361        """
362        return self.environment_state.get_environments_summary()

Fetches all environment names along with expiry datetime.

Returns:

A list of all environment summaries.

364    def get_snapshots(
365        self,
366        snapshot_ids: t.Iterable[SnapshotIdLike],
367    ) -> t.Dict[SnapshotId, Snapshot]:
368        """Fetches snapshots from the state.
369
370        Args:
371            snapshot_ids: The snapshot IDs to fetch.
372
373        Returns:
374            A dict of snapshots.
375        """
376        snapshots = self.snapshot_state.get_snapshots(snapshot_ids)
377        intervals = self.interval_state.get_snapshot_intervals(snapshots.values())
378        Snapshot.hydrate_with_intervals_by_version(snapshots.values(), intervals)
379        return snapshots

Fetches snapshots from the state.

Arguments:
  • snapshot_ids: The snapshot IDs to fetch.
Returns:

A dict of snapshots.

def get_snapshots_by_names( self, snapshot_names: Iterable[str], current_ts: Optional[int] = None, exclude_expired: bool = True) -> Set[sqlmesh.core.snapshot.definition.SnapshotIdAndVersion]:
381    def get_snapshots_by_names(
382        self,
383        snapshot_names: t.Iterable[str],
384        current_ts: t.Optional[int] = None,
385        exclude_expired: bool = True,
386    ) -> t.Set[SnapshotIdAndVersion]:
387        return self.snapshot_state.get_snapshots_by_names(
388            snapshot_names=snapshot_names, current_ts=current_ts, exclude_expired=exclude_expired
389        )

Return the snapshot records for all versions of the specified snapshot names.

Arguments:
  • snapshot_names: Iterable of snapshot names to fetch all snapshot records for
  • current_ts: Sets the current time for identifying which snapshots have expired so they can be excluded (only relevant if :exclude_expired=True)
  • exclude_expired: Whether or not to return the snapshot id's of expired snapshots in the result
Returns:

A set containing all the matched snapshot records. To fetch full snapshots, pass it into StateSync.get_snapshots()

@transactional()
def add_interval( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], is_dev: bool = False, last_altered_ts: Optional[int] = None) -> None:
391    @transactional()
392    def add_interval(
393        self,
394        snapshot: Snapshot,
395        start: TimeLike,
396        end: TimeLike,
397        is_dev: bool = False,
398        last_altered_ts: t.Optional[int] = None,
399    ) -> None:
400        super().add_interval(snapshot, start, end, is_dev, last_altered_ts)

Add an interval to a snapshot and sync it to the store.

Arguments:
  • snapshot: The snapshot like object to add an interval to.
  • start: The start of the interval to add.
  • end: The end of the interval to add.
  • is_dev: Indicates whether the given interval is being added while in development mode
  • last_altered_ts: The timestamp of the last modification of the physical table
@transactional()
def add_snapshots_intervals( self, snapshots_intervals: Sequence[sqlmesh.core.snapshot.definition.SnapshotIntervals]) -> None:
402    @transactional()
403    def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
404        intervals_to_insert = []
405        for snapshot_intervals in snapshots_intervals:
406            snapshot_intervals = snapshot_intervals.copy(
407                update={
408                    "intervals": _remove_partial_intervals(
409                        snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False
410                    ),
411                    "dev_intervals": _remove_partial_intervals(
412                        snapshot_intervals.dev_intervals,
413                        snapshot_intervals.snapshot_id,
414                        is_dev=True,
415                    ),
416                }
417            )
418            if not snapshot_intervals.is_empty():
419                intervals_to_insert.append(snapshot_intervals)
420        if intervals_to_insert:
421            self.interval_state.add_snapshots_intervals(intervals_to_insert)

Add snapshot intervals to state

Arguments:
  • snapshots_intervals: The intervals to add.
@transactional()
def remove_intervals( self, snapshot_intervals: Sequence[Tuple[Union[sqlmesh.core.snapshot.definition.SnapshotIdAndVersion, sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot], Tuple[int, int]]], remove_shared_versions: bool = False) -> None:
423    @transactional()
424    def remove_intervals(
425        self,
426        snapshot_intervals: t.Sequence[t.Tuple[SnapshotIdAndVersionLike, Interval]],
427        remove_shared_versions: bool = False,
428    ) -> None:
429        self.interval_state.remove_intervals(snapshot_intervals, remove_shared_versions)

Remove an interval from a list of snapshots and sync it to the store.

Because multiple snapshots can be pointing to the same version or physical table, this method can also grab all snapshots tied to the passed in version.

Arguments:
  • snapshot_intervals: The snapshot intervals to remove.
  • remove_shared_versions: Whether to remove intervals for snapshots that share the same version with the target snapshots.
@transactional()
def compact_intervals(self) -> None:
431    @transactional()
432    def compact_intervals(self) -> None:
433        self.interval_state.compact_intervals()

Compacts intervals for all snapshots.

Compaction process involves merging of existing interval records into new records and then deleting the old ones.

def refresh_snapshot_intervals( self, snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot]) -> List[sqlmesh.core.snapshot.definition.Snapshot]:
435    def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.List[Snapshot]:
436        return self.interval_state.refresh_snapshot_intervals(snapshots)

Updates given snapshots with latest intervals from the state.

Arguments:
  • snapshots: The snapshots to refresh.
Returns:

The updated snapshots.

def max_interval_end_per_model( self, environment: str, models: Optional[Set[str]] = None, ensure_finalized_snapshots: bool = False) -> Dict[str, int]:
438    def max_interval_end_per_model(
439        self,
440        environment: str,
441        models: t.Optional[t.Set[str]] = None,
442        ensure_finalized_snapshots: bool = False,
443    ) -> t.Dict[str, int]:
444        env = self.get_environment(environment)
445        if not env:
446            return {}
447
448        snapshots = (
449            env.snapshots if not ensure_finalized_snapshots else env.finalized_or_current_snapshots
450        )
451        if models is not None:
452            snapshots = [s for s in snapshots if s.name in models]
453
454        if not snapshots:
455            return {}
456
457        return self.interval_state.max_interval_end_per_model(snapshots)

Returns the max interval end per model for the given environment.

Arguments:
  • environment: The target environment.
  • models: The models to get the max interval end for. If None, all models are considered.
  • ensure_finalized_snapshots: Whether to use snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:

A dictionary of model FQNs to their respective interval ends in milliseconds since epoch.

def recycle(self) -> None:
459    def recycle(self) -> None:
460        self.engine_adapter.recycle()

Closes all open connections and releases all allocated resources associated with any thread except the calling one.

def close(self) -> None:
462    def close(self) -> None:
463        self.engine_adapter.close()

Closes all open connections and releases all allocated resources.

@transactional()
def migrate( self, skip_backup: bool = False, promoted_snapshots_only: bool = True) -> None:
465    @transactional()
466    def migrate(
467        self,
468        skip_backup: bool = False,
469        promoted_snapshots_only: bool = True,
470    ) -> None:
471        """Migrate the state sync to the latest SQLMesh / SQLGlot version."""
472        self.migrator.migrate(
473            self.schema,
474            skip_backup=skip_backup,
475            promoted_snapshots_only=promoted_snapshots_only,
476        )

Migrate the state sync to the latest SQLMesh / SQLGlot version.

@transactional()
def rollback(self) -> None:
478    @transactional()
479    def rollback(self) -> None:
480        """Rollback to the previous migration."""
481        self.migrator.rollback()

Rollback to the previous migration.

@transactional()
def export( self, environment_names: Optional[List[str]] = None) -> sqlmesh.core.state_sync.common.StateStream:
483    @transactional()
484    def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStream:
485        versions = self.get_versions(
486            validate=True
487        )  # will throw if the state db hasnt been created or there is a version mismatch
488
489        snapshot_ids_to_export: t.Set[SnapshotId] = set()
490        selected_environments: t.List[Environment] = []
491        if environment_names:
492            for env_name in environment_names:
493                environment = self.get_environment(env_name)
494                if not environment:
495                    raise SQLMeshError(f"No such environment: {env_name}")
496                selected_environments.append(environment)
497        else:
498            selected_environments = self.get_environments()
499
500        for env in selected_environments:
501            snapshot_ids_to_export |= set([s.snapshot_id for s in env.snapshots or []])
502
503        def _export_snapshots() -> t.Iterator[Snapshot]:
504            for chunk in chunk_iterable(snapshot_ids_to_export, SnapshotState.SNAPSHOT_BATCH_SIZE):
505                yield from self.get_snapshots(chunk).values()
506
507        def _export_environments() -> t.Iterator[EnvironmentWithStatements]:
508            for env in selected_environments:
509                yield EnvironmentWithStatements(
510                    environment=env, statements=self.get_environment_statements(env.name)
511                )
512
513        return StateStream.from_iterators(
514            versions=versions,
515            snapshots=_export_snapshots(),
516            environments=_export_environments(),
517        )

Export the contents of this StateSync as a StateStream

Arguments:
  • environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
@transactional()
def import_( self, stream: sqlmesh.core.state_sync.common.StateStream, clear: bool = True) -> None:
519    @transactional()
520    def import_(self, stream: StateStream, clear: bool = True) -> None:
521        existing_versions = self.get_versions()
522
523        for state_chunk in stream:
524            if isinstance(state_chunk, VersionsChunk):
525                # SQLMesh major/minor version must match so that we can be sure the JSON contained in the state file
526                # is compatible with our Pydantic model definitions. Patch versions dont need to match because the assumption
527                # is that they dont contain any breaking changes
528                incoming_versions = state_chunk.versions
529                if (
530                    incoming_versions.minor_sqlmesh_version
531                    != existing_versions.minor_sqlmesh_version
532                ):
533                    raise SQLMeshError(
534                        f"SQLMesh version mismatch. You are running '{existing_versions.sqlmesh_version}' but the state file was created with '{incoming_versions.sqlmesh_version}'.\n"
535                        "Please upgrade/downgrade your SQLMesh version to match the state file before performing the import."
536                    )
537
538                if clear:
539                    self.reset(default_catalog=None)
540
541            if isinstance(state_chunk, SnapshotsChunk):
542                auto_restatements: t.Dict[SnapshotNameVersion, t.Optional[int]] = {}
543
544                for snapshot_chunk in chunk_iterable(
545                    state_chunk, SnapshotState.SNAPSHOT_BATCH_SIZE
546                ):
547                    snapshot_chunk = list(snapshot_chunk)
548                    overwrite_existing_snapshots = (
549                        not clear
550                    )  # if clear=True, all existing snapshots were dropped anyway
551                    self.snapshot_state.push_snapshots(
552                        snapshot_chunk, overwrite=overwrite_existing_snapshots
553                    )
554                    self.add_snapshots_intervals((s.snapshot_intervals for s in snapshot_chunk))
555
556                    auto_restatements.update(
557                        {
558                            s.name_version: s.next_auto_restatement_ts
559                            for s in snapshot_chunk
560                            if s.next_auto_restatement_ts
561                        }
562                    )
563
564                self.update_auto_restatements(auto_restatements)
565
566            if isinstance(state_chunk, EnvironmentsChunk):
567                for environment_with_statements in state_chunk:
568                    environment = environment_with_statements.environment
569                    self.environment_state.update_environment(environment)
570                    self.environment_state.update_environment_statements(
571                        environment.name,
572                        environment.plan_id,
573                        environment_with_statements.statements,
574                    )

Replace the existing state with the state contained in the StateStream

Arguments:
  • stream: The stream of new state
  • clear: Whether or not to clear existing state before inserting state from the stream
def state_type(self) -> str:
576    def state_type(self) -> str:
577        return self.engine_adapter.dialect

Returns the type of state sync.