sqlmesh.core.janitor
1from __future__ import annotations 2 3import typing as t 4 5from sqlglot import exp 6 7from sqlmesh.core.engine_adapter import EngineAdapter 8from sqlmesh.core.console import Console 9from sqlmesh.core.dialect import schema_ 10from sqlmesh.core.environment import Environment 11from sqlmesh.core.snapshot import SnapshotEvaluator 12from sqlmesh.core.state_sync import StateSync 13from sqlmesh.core.state_sync.common import ( 14 logger, 15 iter_expired_snapshot_batches, 16 RowBoundary, 17 ExpiredBatchRange, 18) 19from sqlmesh.utils.errors import SQLMeshError 20 21 22def cleanup_expired_views( 23 default_adapter: EngineAdapter, 24 engine_adapters: t.Dict[str, EngineAdapter], 25 environments: t.List[Environment], 26 warn_on_delete_failure: bool = False, 27 console: t.Optional[Console] = None, 28) -> None: 29 expired_schema_or_catalog_environments = [ 30 environment 31 for environment in environments 32 if environment.suffix_target.is_schema or environment.suffix_target.is_catalog 33 ] 34 expired_table_environments = [ 35 environment for environment in environments if environment.suffix_target.is_table 36 ] 37 38 # We have to use the corresponding adapter if the virtual layer is gateway managed 39 def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter: 40 if gateway_managed and gateway: 41 return engine_adapters.get(gateway, default_adapter) 42 return default_adapter 43 44 catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set() 45 schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set() 46 47 # Collect schemas and catalogs to drop 48 for engine_adapter, expired_catalog, expired_schema, suffix_target in { 49 ( 50 (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), 51 snapshot.qualified_view_name.catalog_for_environment( 52 environment.naming_info, dialect=engine_adapter.dialect 53 ), 54 snapshot.qualified_view_name.schema_for_environment( 55 environment.naming_info, dialect=engine_adapter.dialect 56 ), 57 environment.suffix_target, 58 ) 59 for environment in expired_schema_or_catalog_environments 60 for snapshot in environment.snapshots 61 if snapshot.is_model and not snapshot.is_symbolic 62 }: 63 if suffix_target.is_catalog: 64 if expired_catalog: 65 catalogs_to_drop.add((engine_adapter, expired_catalog)) 66 else: 67 schema = schema_(expired_schema, expired_catalog) 68 schemas_to_drop.add((engine_adapter, schema)) 69 70 # Drop the views for the expired environments 71 for engine_adapter, expired_view in { 72 ( 73 (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), 74 snapshot.qualified_view_name.for_environment( 75 environment.naming_info, dialect=engine_adapter.dialect 76 ), 77 ) 78 for environment in expired_table_environments 79 for snapshot in environment.snapshots 80 if snapshot.is_model and not snapshot.is_symbolic 81 }: 82 try: 83 engine_adapter.drop_view(expired_view, ignore_if_not_exists=True) 84 if console: 85 console.update_cleanup_progress(expired_view) 86 except Exception as e: 87 message = f"Failed to drop the expired environment view '{expired_view}': {e}" 88 if warn_on_delete_failure: 89 logger.warning(message) 90 else: 91 raise SQLMeshError(message) from e 92 93 # Drop the schemas for the expired environments 94 for engine_adapter, schema in schemas_to_drop: 95 try: 96 engine_adapter.drop_schema( 97 schema, 98 ignore_if_not_exists=True, 99 cascade=True, 100 ) 101 if console: 102 console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) 103 except Exception as e: 104 message = f"Failed to drop the expired environment schema '{schema}': {e}" 105 if warn_on_delete_failure: 106 logger.warning(message) 107 else: 108 raise SQLMeshError(message) from e 109 110 # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs 111 # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog' 112 for engine_adapter, catalog in catalogs_to_drop: 113 if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG: 114 try: 115 engine_adapter.drop_catalog(catalog) 116 if console: 117 console.update_cleanup_progress(catalog) 118 except Exception as e: 119 message = f"Failed to drop the expired environment catalog '{catalog}': {e}" 120 if warn_on_delete_failure: 121 logger.warning(message) 122 else: 123 raise SQLMeshError(message) from e 124 125 126def delete_expired_snapshots( 127 state_sync: StateSync, 128 snapshot_evaluator: SnapshotEvaluator, 129 *, 130 current_ts: int, 131 ignore_ttl: bool = False, 132 batch_size: t.Optional[int] = None, 133 console: t.Optional[Console] = None, 134) -> None: 135 """Delete all expired snapshots in batches. 136 137 This helper function encapsulates the logic for deleting expired snapshots in batches, 138 eliminating code duplication across different use cases. 139 140 Args: 141 state_sync: StateSync instance to query and delete expired snapshots from. 142 snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots. 143 current_ts: Timestamp used to evaluate expiration. 144 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 145 batch_size: Maximum number of snapshots to fetch per batch. 146 console: Optional console for reporting progress. 147 148 Returns: 149 The total number of deleted expired snapshots. 150 """ 151 num_expired_snapshots = 0 152 for batch in iter_expired_snapshot_batches( 153 state_reader=state_sync, 154 current_ts=current_ts, 155 ignore_ttl=ignore_ttl, 156 batch_size=batch_size, 157 ): 158 end_info = ( 159 f"updated_ts={batch.batch_range.end.updated_ts}" 160 if isinstance(batch.batch_range.end, RowBoundary) 161 else f"limit={batch.batch_range.end.batch_size}" 162 ) 163 logger.info( 164 "Processing batch of size %s with end %s", 165 len(batch.expired_snapshot_ids), 166 end_info, 167 ) 168 snapshot_evaluator.cleanup( 169 target_snapshots=batch.cleanup_tasks, 170 on_complete=console.update_cleanup_progress if console else None, 171 ) 172 state_sync.delete_expired_snapshots( 173 batch_range=ExpiredBatchRange( 174 start=RowBoundary.lowest_boundary(), 175 end=batch.batch_range.end, 176 ), 177 ignore_ttl=ignore_ttl, 178 ) 179 logger.info("Cleaned up expired snapshots batch") 180 num_expired_snapshots += len(batch.expired_snapshot_ids) 181 logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
def
cleanup_expired_views( default_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, engine_adapters: Dict[str, sqlmesh.core.engine_adapter.base.EngineAdapter], environments: List[sqlmesh.core.environment.Environment], warn_on_delete_failure: bool = False, console: Optional[sqlmesh.core.console.Console] = None) -> None:
23def cleanup_expired_views( 24 default_adapter: EngineAdapter, 25 engine_adapters: t.Dict[str, EngineAdapter], 26 environments: t.List[Environment], 27 warn_on_delete_failure: bool = False, 28 console: t.Optional[Console] = None, 29) -> None: 30 expired_schema_or_catalog_environments = [ 31 environment 32 for environment in environments 33 if environment.suffix_target.is_schema or environment.suffix_target.is_catalog 34 ] 35 expired_table_environments = [ 36 environment for environment in environments if environment.suffix_target.is_table 37 ] 38 39 # We have to use the corresponding adapter if the virtual layer is gateway managed 40 def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter: 41 if gateway_managed and gateway: 42 return engine_adapters.get(gateway, default_adapter) 43 return default_adapter 44 45 catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set() 46 schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set() 47 48 # Collect schemas and catalogs to drop 49 for engine_adapter, expired_catalog, expired_schema, suffix_target in { 50 ( 51 (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), 52 snapshot.qualified_view_name.catalog_for_environment( 53 environment.naming_info, dialect=engine_adapter.dialect 54 ), 55 snapshot.qualified_view_name.schema_for_environment( 56 environment.naming_info, dialect=engine_adapter.dialect 57 ), 58 environment.suffix_target, 59 ) 60 for environment in expired_schema_or_catalog_environments 61 for snapshot in environment.snapshots 62 if snapshot.is_model and not snapshot.is_symbolic 63 }: 64 if suffix_target.is_catalog: 65 if expired_catalog: 66 catalogs_to_drop.add((engine_adapter, expired_catalog)) 67 else: 68 schema = schema_(expired_schema, expired_catalog) 69 schemas_to_drop.add((engine_adapter, schema)) 70 71 # Drop the views for the expired environments 72 for engine_adapter, expired_view in { 73 ( 74 (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), 75 snapshot.qualified_view_name.for_environment( 76 environment.naming_info, dialect=engine_adapter.dialect 77 ), 78 ) 79 for environment in expired_table_environments 80 for snapshot in environment.snapshots 81 if snapshot.is_model and not snapshot.is_symbolic 82 }: 83 try: 84 engine_adapter.drop_view(expired_view, ignore_if_not_exists=True) 85 if console: 86 console.update_cleanup_progress(expired_view) 87 except Exception as e: 88 message = f"Failed to drop the expired environment view '{expired_view}': {e}" 89 if warn_on_delete_failure: 90 logger.warning(message) 91 else: 92 raise SQLMeshError(message) from e 93 94 # Drop the schemas for the expired environments 95 for engine_adapter, schema in schemas_to_drop: 96 try: 97 engine_adapter.drop_schema( 98 schema, 99 ignore_if_not_exists=True, 100 cascade=True, 101 ) 102 if console: 103 console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) 104 except Exception as e: 105 message = f"Failed to drop the expired environment schema '{schema}': {e}" 106 if warn_on_delete_failure: 107 logger.warning(message) 108 else: 109 raise SQLMeshError(message) from e 110 111 # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs 112 # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog' 113 for engine_adapter, catalog in catalogs_to_drop: 114 if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG: 115 try: 116 engine_adapter.drop_catalog(catalog) 117 if console: 118 console.update_cleanup_progress(catalog) 119 except Exception as e: 120 message = f"Failed to drop the expired environment catalog '{catalog}': {e}" 121 if warn_on_delete_failure: 122 logger.warning(message) 123 else: 124 raise SQLMeshError(message) from e
def
delete_expired_snapshots( state_sync: sqlmesh.core.state_sync.base.StateSync, snapshot_evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, *, current_ts: int, ignore_ttl: bool = False, batch_size: Optional[int] = None, console: Optional[sqlmesh.core.console.Console] = None) -> None:
127def delete_expired_snapshots( 128 state_sync: StateSync, 129 snapshot_evaluator: SnapshotEvaluator, 130 *, 131 current_ts: int, 132 ignore_ttl: bool = False, 133 batch_size: t.Optional[int] = None, 134 console: t.Optional[Console] = None, 135) -> None: 136 """Delete all expired snapshots in batches. 137 138 This helper function encapsulates the logic for deleting expired snapshots in batches, 139 eliminating code duplication across different use cases. 140 141 Args: 142 state_sync: StateSync instance to query and delete expired snapshots from. 143 snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots. 144 current_ts: Timestamp used to evaluate expiration. 145 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 146 batch_size: Maximum number of snapshots to fetch per batch. 147 console: Optional console for reporting progress. 148 149 Returns: 150 The total number of deleted expired snapshots. 151 """ 152 num_expired_snapshots = 0 153 for batch in iter_expired_snapshot_batches( 154 state_reader=state_sync, 155 current_ts=current_ts, 156 ignore_ttl=ignore_ttl, 157 batch_size=batch_size, 158 ): 159 end_info = ( 160 f"updated_ts={batch.batch_range.end.updated_ts}" 161 if isinstance(batch.batch_range.end, RowBoundary) 162 else f"limit={batch.batch_range.end.batch_size}" 163 ) 164 logger.info( 165 "Processing batch of size %s with end %s", 166 len(batch.expired_snapshot_ids), 167 end_info, 168 ) 169 snapshot_evaluator.cleanup( 170 target_snapshots=batch.cleanup_tasks, 171 on_complete=console.update_cleanup_progress if console else None, 172 ) 173 state_sync.delete_expired_snapshots( 174 batch_range=ExpiredBatchRange( 175 start=RowBoundary.lowest_boundary(), 176 end=batch.batch_range.end, 177 ), 178 ignore_ttl=ignore_ttl, 179 ) 180 logger.info("Cleaned up expired snapshots batch") 181 num_expired_snapshots += len(batch.expired_snapshot_ids) 182 logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
Delete all expired snapshots in batches.
This helper function encapsulates the logic for deleting expired snapshots in batches, eliminating code duplication across different use cases.
Arguments:
- state_sync: StateSync instance to query and delete expired snapshots from.
- snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
- current_ts: Timestamp used to evaluate expiration.
- ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
- batch_size: Maximum number of snapshots to fetch per batch.
- console: Optional console for reporting progress.
Returns:
The total number of deleted expired snapshots.