Edit on GitHub

sqlmesh.core.janitor

  1from __future__ import annotations
  2
  3import typing as t
  4
  5from sqlglot import exp
  6
  7from sqlmesh.core.engine_adapter import EngineAdapter
  8from sqlmesh.core.console import Console
  9from sqlmesh.core.dialect import schema_
 10from sqlmesh.core.environment import Environment
 11from sqlmesh.core.snapshot import SnapshotEvaluator
 12from sqlmesh.core.state_sync import StateSync
 13from sqlmesh.core.state_sync.common import (
 14    logger,
 15    iter_expired_snapshot_batches,
 16    RowBoundary,
 17    ExpiredBatchRange,
 18)
 19from sqlmesh.utils.errors import SQLMeshError
 20
 21
 22def cleanup_expired_views(
 23    default_adapter: EngineAdapter,
 24    engine_adapters: t.Dict[str, EngineAdapter],
 25    environments: t.List[Environment],
 26    warn_on_delete_failure: bool = False,
 27    console: t.Optional[Console] = None,
 28) -> None:
 29    expired_schema_or_catalog_environments = [
 30        environment
 31        for environment in environments
 32        if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
 33    ]
 34    expired_table_environments = [
 35        environment for environment in environments if environment.suffix_target.is_table
 36    ]
 37
 38    # We have to use the corresponding adapter if the virtual layer is gateway managed
 39    def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
 40        if gateway_managed and gateway:
 41            return engine_adapters.get(gateway, default_adapter)
 42        return default_adapter
 43
 44    catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
 45    schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()
 46
 47    # Collect schemas and catalogs to drop
 48    for engine_adapter, expired_catalog, expired_schema, suffix_target in {
 49        (
 50            (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
 51            snapshot.qualified_view_name.catalog_for_environment(
 52                environment.naming_info, dialect=engine_adapter.dialect
 53            ),
 54            snapshot.qualified_view_name.schema_for_environment(
 55                environment.naming_info, dialect=engine_adapter.dialect
 56            ),
 57            environment.suffix_target,
 58        )
 59        for environment in expired_schema_or_catalog_environments
 60        for snapshot in environment.snapshots
 61        if snapshot.is_model and not snapshot.is_symbolic
 62    }:
 63        if suffix_target.is_catalog:
 64            if expired_catalog:
 65                catalogs_to_drop.add((engine_adapter, expired_catalog))
 66        else:
 67            schema = schema_(expired_schema, expired_catalog)
 68            schemas_to_drop.add((engine_adapter, schema))
 69
 70    # Drop the views for the expired environments
 71    for engine_adapter, expired_view in {
 72        (
 73            (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
 74            snapshot.qualified_view_name.for_environment(
 75                environment.naming_info, dialect=engine_adapter.dialect
 76            ),
 77        )
 78        for environment in expired_table_environments
 79        for snapshot in environment.snapshots
 80        if snapshot.is_model and not snapshot.is_symbolic
 81    }:
 82        try:
 83            engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
 84            if console:
 85                console.update_cleanup_progress(expired_view)
 86        except Exception as e:
 87            message = f"Failed to drop the expired environment view '{expired_view}': {e}"
 88            if warn_on_delete_failure:
 89                logger.warning(message)
 90            else:
 91                raise SQLMeshError(message) from e
 92
 93    # Drop the schemas for the expired environments
 94    for engine_adapter, schema in schemas_to_drop:
 95        try:
 96            engine_adapter.drop_schema(
 97                schema,
 98                ignore_if_not_exists=True,
 99                cascade=True,
100            )
101            if console:
102                console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
103        except Exception as e:
104            message = f"Failed to drop the expired environment schema '{schema}': {e}"
105            if warn_on_delete_failure:
106                logger.warning(message)
107            else:
108                raise SQLMeshError(message) from e
109
110    # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
111    # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
112    for engine_adapter, catalog in catalogs_to_drop:
113        if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
114            try:
115                engine_adapter.drop_catalog(catalog)
116                if console:
117                    console.update_cleanup_progress(catalog)
118            except Exception as e:
119                message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
120                if warn_on_delete_failure:
121                    logger.warning(message)
122                else:
123                    raise SQLMeshError(message) from e
124
125
126def delete_expired_snapshots(
127    state_sync: StateSync,
128    snapshot_evaluator: SnapshotEvaluator,
129    *,
130    current_ts: int,
131    ignore_ttl: bool = False,
132    batch_size: t.Optional[int] = None,
133    console: t.Optional[Console] = None,
134) -> None:
135    """Delete all expired snapshots in batches.
136
137    This helper function encapsulates the logic for deleting expired snapshots in batches,
138    eliminating code duplication across different use cases.
139
140    Args:
141        state_sync: StateSync instance to query and delete expired snapshots from.
142        snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
143        current_ts: Timestamp used to evaluate expiration.
144        ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
145        batch_size: Maximum number of snapshots to fetch per batch.
146        console: Optional console for reporting progress.
147
148    Returns:
149        The total number of deleted expired snapshots.
150    """
151    num_expired_snapshots = 0
152    for batch in iter_expired_snapshot_batches(
153        state_reader=state_sync,
154        current_ts=current_ts,
155        ignore_ttl=ignore_ttl,
156        batch_size=batch_size,
157    ):
158        end_info = (
159            f"updated_ts={batch.batch_range.end.updated_ts}"
160            if isinstance(batch.batch_range.end, RowBoundary)
161            else f"limit={batch.batch_range.end.batch_size}"
162        )
163        logger.info(
164            "Processing batch of size %s with end %s",
165            len(batch.expired_snapshot_ids),
166            end_info,
167        )
168        snapshot_evaluator.cleanup(
169            target_snapshots=batch.cleanup_tasks,
170            on_complete=console.update_cleanup_progress if console else None,
171        )
172        state_sync.delete_expired_snapshots(
173            batch_range=ExpiredBatchRange(
174                start=RowBoundary.lowest_boundary(),
175                end=batch.batch_range.end,
176            ),
177            ignore_ttl=ignore_ttl,
178        )
179        logger.info("Cleaned up expired snapshots batch")
180        num_expired_snapshots += len(batch.expired_snapshot_ids)
181    logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
def cleanup_expired_views( default_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, engine_adapters: Dict[str, sqlmesh.core.engine_adapter.base.EngineAdapter], environments: List[sqlmesh.core.environment.Environment], warn_on_delete_failure: bool = False, console: Optional[sqlmesh.core.console.Console] = None) -> None:
 23def cleanup_expired_views(
 24    default_adapter: EngineAdapter,
 25    engine_adapters: t.Dict[str, EngineAdapter],
 26    environments: t.List[Environment],
 27    warn_on_delete_failure: bool = False,
 28    console: t.Optional[Console] = None,
 29) -> None:
 30    expired_schema_or_catalog_environments = [
 31        environment
 32        for environment in environments
 33        if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
 34    ]
 35    expired_table_environments = [
 36        environment for environment in environments if environment.suffix_target.is_table
 37    ]
 38
 39    # We have to use the corresponding adapter if the virtual layer is gateway managed
 40    def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
 41        if gateway_managed and gateway:
 42            return engine_adapters.get(gateway, default_adapter)
 43        return default_adapter
 44
 45    catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
 46    schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()
 47
 48    # Collect schemas and catalogs to drop
 49    for engine_adapter, expired_catalog, expired_schema, suffix_target in {
 50        (
 51            (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
 52            snapshot.qualified_view_name.catalog_for_environment(
 53                environment.naming_info, dialect=engine_adapter.dialect
 54            ),
 55            snapshot.qualified_view_name.schema_for_environment(
 56                environment.naming_info, dialect=engine_adapter.dialect
 57            ),
 58            environment.suffix_target,
 59        )
 60        for environment in expired_schema_or_catalog_environments
 61        for snapshot in environment.snapshots
 62        if snapshot.is_model and not snapshot.is_symbolic
 63    }:
 64        if suffix_target.is_catalog:
 65            if expired_catalog:
 66                catalogs_to_drop.add((engine_adapter, expired_catalog))
 67        else:
 68            schema = schema_(expired_schema, expired_catalog)
 69            schemas_to_drop.add((engine_adapter, schema))
 70
 71    # Drop the views for the expired environments
 72    for engine_adapter, expired_view in {
 73        (
 74            (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
 75            snapshot.qualified_view_name.for_environment(
 76                environment.naming_info, dialect=engine_adapter.dialect
 77            ),
 78        )
 79        for environment in expired_table_environments
 80        for snapshot in environment.snapshots
 81        if snapshot.is_model and not snapshot.is_symbolic
 82    }:
 83        try:
 84            engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
 85            if console:
 86                console.update_cleanup_progress(expired_view)
 87        except Exception as e:
 88            message = f"Failed to drop the expired environment view '{expired_view}': {e}"
 89            if warn_on_delete_failure:
 90                logger.warning(message)
 91            else:
 92                raise SQLMeshError(message) from e
 93
 94    # Drop the schemas for the expired environments
 95    for engine_adapter, schema in schemas_to_drop:
 96        try:
 97            engine_adapter.drop_schema(
 98                schema,
 99                ignore_if_not_exists=True,
100                cascade=True,
101            )
102            if console:
103                console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
104        except Exception as e:
105            message = f"Failed to drop the expired environment schema '{schema}': {e}"
106            if warn_on_delete_failure:
107                logger.warning(message)
108            else:
109                raise SQLMeshError(message) from e
110
111    # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
112    # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
113    for engine_adapter, catalog in catalogs_to_drop:
114        if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
115            try:
116                engine_adapter.drop_catalog(catalog)
117                if console:
118                    console.update_cleanup_progress(catalog)
119            except Exception as e:
120                message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
121                if warn_on_delete_failure:
122                    logger.warning(message)
123                else:
124                    raise SQLMeshError(message) from e
def delete_expired_snapshots( state_sync: sqlmesh.core.state_sync.base.StateSync, snapshot_evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, *, current_ts: int, ignore_ttl: bool = False, batch_size: Optional[int] = None, console: Optional[sqlmesh.core.console.Console] = None) -> None:
127def delete_expired_snapshots(
128    state_sync: StateSync,
129    snapshot_evaluator: SnapshotEvaluator,
130    *,
131    current_ts: int,
132    ignore_ttl: bool = False,
133    batch_size: t.Optional[int] = None,
134    console: t.Optional[Console] = None,
135) -> None:
136    """Delete all expired snapshots in batches.
137
138    This helper function encapsulates the logic for deleting expired snapshots in batches,
139    eliminating code duplication across different use cases.
140
141    Args:
142        state_sync: StateSync instance to query and delete expired snapshots from.
143        snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
144        current_ts: Timestamp used to evaluate expiration.
145        ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
146        batch_size: Maximum number of snapshots to fetch per batch.
147        console: Optional console for reporting progress.
148
149    Returns:
150        The total number of deleted expired snapshots.
151    """
152    num_expired_snapshots = 0
153    for batch in iter_expired_snapshot_batches(
154        state_reader=state_sync,
155        current_ts=current_ts,
156        ignore_ttl=ignore_ttl,
157        batch_size=batch_size,
158    ):
159        end_info = (
160            f"updated_ts={batch.batch_range.end.updated_ts}"
161            if isinstance(batch.batch_range.end, RowBoundary)
162            else f"limit={batch.batch_range.end.batch_size}"
163        )
164        logger.info(
165            "Processing batch of size %s with end %s",
166            len(batch.expired_snapshot_ids),
167            end_info,
168        )
169        snapshot_evaluator.cleanup(
170            target_snapshots=batch.cleanup_tasks,
171            on_complete=console.update_cleanup_progress if console else None,
172        )
173        state_sync.delete_expired_snapshots(
174            batch_range=ExpiredBatchRange(
175                start=RowBoundary.lowest_boundary(),
176                end=batch.batch_range.end,
177            ),
178            ignore_ttl=ignore_ttl,
179        )
180        logger.info("Cleaned up expired snapshots batch")
181        num_expired_snapshots += len(batch.expired_snapshot_ids)
182    logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)

Delete all expired snapshots in batches.

This helper function encapsulates the logic for deleting expired snapshots in batches, eliminating code duplication across different use cases.

Arguments:
  • state_sync: StateSync instance to query and delete expired snapshots from.
  • snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
  • current_ts: Timestamp used to evaluate expiration.
  • ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
  • batch_size: Maximum number of snapshots to fetch per batch.
  • console: Optional console for reporting progress.
Returns:

The total number of deleted expired snapshots.