sqlmesh.core.snapshot.cache
1from __future__ import annotations 2 3import logging 4import typing as t 5 6from pathlib import Path 7from sqlmesh.core.model.cache import ( 8 OptimizedQueryCache, 9 optimized_query_cache_pool, 10 load_optimized_query, 11) 12from sqlmesh.core import constants as c 13from sqlmesh.core.snapshot.definition import Snapshot, SnapshotId 14from sqlmesh.utils.cache import FileCache 15 16 17logger = logging.getLogger(__name__) 18 19 20class SnapshotCache: 21 def __init__(self, path: Path): 22 self._snapshot_cache: FileCache[Snapshot] = FileCache(path, prefix="snapshot") 23 self._optimized_query_cache = OptimizedQueryCache(path) 24 25 def get_or_load( 26 self, 27 snapshot_ids: t.Set[SnapshotId], 28 loader: t.Callable[[t.Set[SnapshotId]], t.Collection[Snapshot]], 29 ) -> t.Tuple[t.Dict[SnapshotId, Snapshot], t.Set[SnapshotId]]: 30 """Fetches the target snapshots from cache or loads them using the provided loader on cache miss. 31 32 Args: 33 snapshot_ids: Target snapshot IDs to fetch. 34 loader: The loader to load snapshot records that are missing in the cache. 35 36 Returns: 37 A tuple where the first value represents the fetched snapshots, and the second value is a set of 38 snapshot IDs for which records were retrieved from the cache. 39 40 """ 41 snapshots = {} 42 cache_hits: t.Set[SnapshotId] = set() 43 44 for s_id in snapshot_ids: 45 snapshot = self._snapshot_cache.get(self._entry_name(s_id)) 46 if snapshot: 47 snapshot.intervals = [] 48 snapshot.dev_intervals = [] 49 snapshots[s_id] = snapshot 50 cache_hits.add(s_id) 51 52 snapshot_ids_to_load = snapshot_ids - snapshots.keys() 53 if snapshot_ids_to_load: 54 loaded_snapshots = loader(snapshot_ids_to_load) 55 for snapshot in loaded_snapshots: 56 snapshots[snapshot.snapshot_id] = snapshot 57 58 with optimized_query_cache_pool(self._optimized_query_cache) as executor: 59 for key, entry_name in executor.map( 60 load_optimized_query, 61 ( 62 (snapshot.model, s_id) 63 for s_id, snapshot in snapshots.items() 64 if snapshot.is_model 65 ), 66 ): 67 if entry_name: 68 self._optimized_query_cache.with_optimized_query( 69 snapshots[key].model, entry_name 70 ) 71 72 for snapshot in snapshots.values(): 73 self._update_node_hash_cache(snapshot) 74 75 if snapshot.is_model and c.MAX_FORK_WORKERS == 1: 76 try: 77 self._optimized_query_cache.with_optimized_query(snapshot.model) 78 except Exception: 79 logger.exception( 80 "Failed to cache optimized query for snapshot %s", snapshot.snapshot_id 81 ) 82 83 self.put(snapshot) 84 85 return snapshots, cache_hits 86 87 def put(self, snapshot: Snapshot) -> None: 88 entry_name = self._entry_name(snapshot.snapshot_id) 89 90 if self._snapshot_cache.exists(entry_name): 91 return 92 93 try: 94 if snapshot.is_model: 95 # make sure we preload full_depends_on 96 snapshot.model.full_depends_on 97 self._snapshot_cache.put(entry_name, value=snapshot) 98 except Exception: 99 logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id) 100 101 def clear(self) -> None: 102 self._snapshot_cache.clear() 103 104 @staticmethod 105 def _entry_name(snapshot_id: SnapshotId) -> str: 106 return f"{snapshot_id.name}_{snapshot_id.identifier}" 107 108 @staticmethod 109 def _update_node_hash_cache(snapshot: Snapshot) -> None: 110 snapshot.node._data_hash = snapshot.fingerprint.data_hash 111 snapshot.node._metadata_hash = snapshot.fingerprint.metadata_hash
logger =
<Logger sqlmesh.core.snapshot.cache (WARNING)>
class
SnapshotCache:
21class SnapshotCache: 22 def __init__(self, path: Path): 23 self._snapshot_cache: FileCache[Snapshot] = FileCache(path, prefix="snapshot") 24 self._optimized_query_cache = OptimizedQueryCache(path) 25 26 def get_or_load( 27 self, 28 snapshot_ids: t.Set[SnapshotId], 29 loader: t.Callable[[t.Set[SnapshotId]], t.Collection[Snapshot]], 30 ) -> t.Tuple[t.Dict[SnapshotId, Snapshot], t.Set[SnapshotId]]: 31 """Fetches the target snapshots from cache or loads them using the provided loader on cache miss. 32 33 Args: 34 snapshot_ids: Target snapshot IDs to fetch. 35 loader: The loader to load snapshot records that are missing in the cache. 36 37 Returns: 38 A tuple where the first value represents the fetched snapshots, and the second value is a set of 39 snapshot IDs for which records were retrieved from the cache. 40 41 """ 42 snapshots = {} 43 cache_hits: t.Set[SnapshotId] = set() 44 45 for s_id in snapshot_ids: 46 snapshot = self._snapshot_cache.get(self._entry_name(s_id)) 47 if snapshot: 48 snapshot.intervals = [] 49 snapshot.dev_intervals = [] 50 snapshots[s_id] = snapshot 51 cache_hits.add(s_id) 52 53 snapshot_ids_to_load = snapshot_ids - snapshots.keys() 54 if snapshot_ids_to_load: 55 loaded_snapshots = loader(snapshot_ids_to_load) 56 for snapshot in loaded_snapshots: 57 snapshots[snapshot.snapshot_id] = snapshot 58 59 with optimized_query_cache_pool(self._optimized_query_cache) as executor: 60 for key, entry_name in executor.map( 61 load_optimized_query, 62 ( 63 (snapshot.model, s_id) 64 for s_id, snapshot in snapshots.items() 65 if snapshot.is_model 66 ), 67 ): 68 if entry_name: 69 self._optimized_query_cache.with_optimized_query( 70 snapshots[key].model, entry_name 71 ) 72 73 for snapshot in snapshots.values(): 74 self._update_node_hash_cache(snapshot) 75 76 if snapshot.is_model and c.MAX_FORK_WORKERS == 1: 77 try: 78 self._optimized_query_cache.with_optimized_query(snapshot.model) 79 except Exception: 80 logger.exception( 81 "Failed to cache optimized query for snapshot %s", snapshot.snapshot_id 82 ) 83 84 self.put(snapshot) 85 86 return snapshots, cache_hits 87 88 def put(self, snapshot: Snapshot) -> None: 89 entry_name = self._entry_name(snapshot.snapshot_id) 90 91 if self._snapshot_cache.exists(entry_name): 92 return 93 94 try: 95 if snapshot.is_model: 96 # make sure we preload full_depends_on 97 snapshot.model.full_depends_on 98 self._snapshot_cache.put(entry_name, value=snapshot) 99 except Exception: 100 logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id) 101 102 def clear(self) -> None: 103 self._snapshot_cache.clear() 104 105 @staticmethod 106 def _entry_name(snapshot_id: SnapshotId) -> str: 107 return f"{snapshot_id.name}_{snapshot_id.identifier}" 108 109 @staticmethod 110 def _update_node_hash_cache(snapshot: Snapshot) -> None: 111 snapshot.node._data_hash = snapshot.fingerprint.data_hash 112 snapshot.node._metadata_hash = snapshot.fingerprint.metadata_hash
def
get_or_load( self, snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId], loader: Callable[[Set[sqlmesh.core.snapshot.definition.SnapshotId]], Collection[sqlmesh.core.snapshot.definition.Snapshot]]) -> Tuple[Dict[sqlmesh.core.snapshot.definition.SnapshotId, sqlmesh.core.snapshot.definition.Snapshot], Set[sqlmesh.core.snapshot.definition.SnapshotId]]:
26 def get_or_load( 27 self, 28 snapshot_ids: t.Set[SnapshotId], 29 loader: t.Callable[[t.Set[SnapshotId]], t.Collection[Snapshot]], 30 ) -> t.Tuple[t.Dict[SnapshotId, Snapshot], t.Set[SnapshotId]]: 31 """Fetches the target snapshots from cache or loads them using the provided loader on cache miss. 32 33 Args: 34 snapshot_ids: Target snapshot IDs to fetch. 35 loader: The loader to load snapshot records that are missing in the cache. 36 37 Returns: 38 A tuple where the first value represents the fetched snapshots, and the second value is a set of 39 snapshot IDs for which records were retrieved from the cache. 40 41 """ 42 snapshots = {} 43 cache_hits: t.Set[SnapshotId] = set() 44 45 for s_id in snapshot_ids: 46 snapshot = self._snapshot_cache.get(self._entry_name(s_id)) 47 if snapshot: 48 snapshot.intervals = [] 49 snapshot.dev_intervals = [] 50 snapshots[s_id] = snapshot 51 cache_hits.add(s_id) 52 53 snapshot_ids_to_load = snapshot_ids - snapshots.keys() 54 if snapshot_ids_to_load: 55 loaded_snapshots = loader(snapshot_ids_to_load) 56 for snapshot in loaded_snapshots: 57 snapshots[snapshot.snapshot_id] = snapshot 58 59 with optimized_query_cache_pool(self._optimized_query_cache) as executor: 60 for key, entry_name in executor.map( 61 load_optimized_query, 62 ( 63 (snapshot.model, s_id) 64 for s_id, snapshot in snapshots.items() 65 if snapshot.is_model 66 ), 67 ): 68 if entry_name: 69 self._optimized_query_cache.with_optimized_query( 70 snapshots[key].model, entry_name 71 ) 72 73 for snapshot in snapshots.values(): 74 self._update_node_hash_cache(snapshot) 75 76 if snapshot.is_model and c.MAX_FORK_WORKERS == 1: 77 try: 78 self._optimized_query_cache.with_optimized_query(snapshot.model) 79 except Exception: 80 logger.exception( 81 "Failed to cache optimized query for snapshot %s", snapshot.snapshot_id 82 ) 83 84 self.put(snapshot) 85 86 return snapshots, cache_hits
Fetches the target snapshots from cache or loads them using the provided loader on cache miss.
Arguments:
- snapshot_ids: Target snapshot IDs to fetch.
- loader: The loader to load snapshot records that are missing in the cache.
Returns:
A tuple where the first value represents the fetched snapshots, and the second value is a set of snapshot IDs for which records were retrieved from the cache.
88 def put(self, snapshot: Snapshot) -> None: 89 entry_name = self._entry_name(snapshot.snapshot_id) 90 91 if self._snapshot_cache.exists(entry_name): 92 return 93 94 try: 95 if snapshot.is_model: 96 # make sure we preload full_depends_on 97 snapshot.model.full_depends_on 98 self._snapshot_cache.put(entry_name, value=snapshot) 99 except Exception: 100 logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id)