Edit on GitHub

sqlmesh.core.snapshot.cache

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5
  6from pathlib import Path
  7from sqlmesh.core.model.cache import (
  8    OptimizedQueryCache,
  9    optimized_query_cache_pool,
 10    load_optimized_query,
 11)
 12from sqlmesh.core import constants as c
 13from sqlmesh.core.snapshot.definition import Snapshot, SnapshotId
 14from sqlmesh.utils.cache import FileCache
 15
 16
 17logger = logging.getLogger(__name__)
 18
 19
 20class SnapshotCache:
 21    def __init__(self, path: Path):
 22        self._snapshot_cache: FileCache[Snapshot] = FileCache(path, prefix="snapshot")
 23        self._optimized_query_cache = OptimizedQueryCache(path)
 24
 25    def get_or_load(
 26        self,
 27        snapshot_ids: t.Set[SnapshotId],
 28        loader: t.Callable[[t.Set[SnapshotId]], t.Collection[Snapshot]],
 29    ) -> t.Tuple[t.Dict[SnapshotId, Snapshot], t.Set[SnapshotId]]:
 30        """Fetches the target snapshots from cache or loads them using the provided loader on cache miss.
 31
 32        Args:
 33            snapshot_ids: Target snapshot IDs to fetch.
 34            loader: The loader to load snapshot records that are missing in the cache.
 35
 36        Returns:
 37            A tuple where the first value represents the fetched snapshots, and the second value is a set of
 38            snapshot IDs for which records were retrieved from the cache.
 39
 40        """
 41        snapshots = {}
 42        cache_hits: t.Set[SnapshotId] = set()
 43
 44        for s_id in snapshot_ids:
 45            snapshot = self._snapshot_cache.get(self._entry_name(s_id))
 46            if snapshot:
 47                snapshot.intervals = []
 48                snapshot.dev_intervals = []
 49                snapshots[s_id] = snapshot
 50                cache_hits.add(s_id)
 51
 52        snapshot_ids_to_load = snapshot_ids - snapshots.keys()
 53        if snapshot_ids_to_load:
 54            loaded_snapshots = loader(snapshot_ids_to_load)
 55            for snapshot in loaded_snapshots:
 56                snapshots[snapshot.snapshot_id] = snapshot
 57
 58        with optimized_query_cache_pool(self._optimized_query_cache) as executor:
 59            for key, entry_name in executor.map(
 60                load_optimized_query,
 61                (
 62                    (snapshot.model, s_id)
 63                    for s_id, snapshot in snapshots.items()
 64                    if snapshot.is_model
 65                ),
 66            ):
 67                if entry_name:
 68                    self._optimized_query_cache.with_optimized_query(
 69                        snapshots[key].model, entry_name
 70                    )
 71
 72        for snapshot in snapshots.values():
 73            self._update_node_hash_cache(snapshot)
 74
 75            if snapshot.is_model and c.MAX_FORK_WORKERS == 1:
 76                try:
 77                    self._optimized_query_cache.with_optimized_query(snapshot.model)
 78                except Exception:
 79                    logger.exception(
 80                        "Failed to cache optimized query for snapshot %s", snapshot.snapshot_id
 81                    )
 82
 83            self.put(snapshot)
 84
 85        return snapshots, cache_hits
 86
 87    def put(self, snapshot: Snapshot) -> None:
 88        entry_name = self._entry_name(snapshot.snapshot_id)
 89
 90        if self._snapshot_cache.exists(entry_name):
 91            return
 92
 93        try:
 94            if snapshot.is_model:
 95                # make sure we preload full_depends_on
 96                snapshot.model.full_depends_on
 97            self._snapshot_cache.put(entry_name, value=snapshot)
 98        except Exception:
 99            logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id)
100
101    def clear(self) -> None:
102        self._snapshot_cache.clear()
103
104    @staticmethod
105    def _entry_name(snapshot_id: SnapshotId) -> str:
106        return f"{snapshot_id.name}_{snapshot_id.identifier}"
107
108    @staticmethod
109    def _update_node_hash_cache(snapshot: Snapshot) -> None:
110        snapshot.node._data_hash = snapshot.fingerprint.data_hash
111        snapshot.node._metadata_hash = snapshot.fingerprint.metadata_hash
logger = <Logger sqlmesh.core.snapshot.cache (WARNING)>
class SnapshotCache:
 21class SnapshotCache:
 22    def __init__(self, path: Path):
 23        self._snapshot_cache: FileCache[Snapshot] = FileCache(path, prefix="snapshot")
 24        self._optimized_query_cache = OptimizedQueryCache(path)
 25
 26    def get_or_load(
 27        self,
 28        snapshot_ids: t.Set[SnapshotId],
 29        loader: t.Callable[[t.Set[SnapshotId]], t.Collection[Snapshot]],
 30    ) -> t.Tuple[t.Dict[SnapshotId, Snapshot], t.Set[SnapshotId]]:
 31        """Fetches the target snapshots from cache or loads them using the provided loader on cache miss.
 32
 33        Args:
 34            snapshot_ids: Target snapshot IDs to fetch.
 35            loader: The loader to load snapshot records that are missing in the cache.
 36
 37        Returns:
 38            A tuple where the first value represents the fetched snapshots, and the second value is a set of
 39            snapshot IDs for which records were retrieved from the cache.
 40
 41        """
 42        snapshots = {}
 43        cache_hits: t.Set[SnapshotId] = set()
 44
 45        for s_id in snapshot_ids:
 46            snapshot = self._snapshot_cache.get(self._entry_name(s_id))
 47            if snapshot:
 48                snapshot.intervals = []
 49                snapshot.dev_intervals = []
 50                snapshots[s_id] = snapshot
 51                cache_hits.add(s_id)
 52
 53        snapshot_ids_to_load = snapshot_ids - snapshots.keys()
 54        if snapshot_ids_to_load:
 55            loaded_snapshots = loader(snapshot_ids_to_load)
 56            for snapshot in loaded_snapshots:
 57                snapshots[snapshot.snapshot_id] = snapshot
 58
 59        with optimized_query_cache_pool(self._optimized_query_cache) as executor:
 60            for key, entry_name in executor.map(
 61                load_optimized_query,
 62                (
 63                    (snapshot.model, s_id)
 64                    for s_id, snapshot in snapshots.items()
 65                    if snapshot.is_model
 66                ),
 67            ):
 68                if entry_name:
 69                    self._optimized_query_cache.with_optimized_query(
 70                        snapshots[key].model, entry_name
 71                    )
 72
 73        for snapshot in snapshots.values():
 74            self._update_node_hash_cache(snapshot)
 75
 76            if snapshot.is_model and c.MAX_FORK_WORKERS == 1:
 77                try:
 78                    self._optimized_query_cache.with_optimized_query(snapshot.model)
 79                except Exception:
 80                    logger.exception(
 81                        "Failed to cache optimized query for snapshot %s", snapshot.snapshot_id
 82                    )
 83
 84            self.put(snapshot)
 85
 86        return snapshots, cache_hits
 87
 88    def put(self, snapshot: Snapshot) -> None:
 89        entry_name = self._entry_name(snapshot.snapshot_id)
 90
 91        if self._snapshot_cache.exists(entry_name):
 92            return
 93
 94        try:
 95            if snapshot.is_model:
 96                # make sure we preload full_depends_on
 97                snapshot.model.full_depends_on
 98            self._snapshot_cache.put(entry_name, value=snapshot)
 99        except Exception:
100            logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id)
101
102    def clear(self) -> None:
103        self._snapshot_cache.clear()
104
105    @staticmethod
106    def _entry_name(snapshot_id: SnapshotId) -> str:
107        return f"{snapshot_id.name}_{snapshot_id.identifier}"
108
109    @staticmethod
110    def _update_node_hash_cache(snapshot: Snapshot) -> None:
111        snapshot.node._data_hash = snapshot.fingerprint.data_hash
112        snapshot.node._metadata_hash = snapshot.fingerprint.metadata_hash
SnapshotCache(path: pathlib.Path)
22    def __init__(self, path: Path):
23        self._snapshot_cache: FileCache[Snapshot] = FileCache(path, prefix="snapshot")
24        self._optimized_query_cache = OptimizedQueryCache(path)
26    def get_or_load(
27        self,
28        snapshot_ids: t.Set[SnapshotId],
29        loader: t.Callable[[t.Set[SnapshotId]], t.Collection[Snapshot]],
30    ) -> t.Tuple[t.Dict[SnapshotId, Snapshot], t.Set[SnapshotId]]:
31        """Fetches the target snapshots from cache or loads them using the provided loader on cache miss.
32
33        Args:
34            snapshot_ids: Target snapshot IDs to fetch.
35            loader: The loader to load snapshot records that are missing in the cache.
36
37        Returns:
38            A tuple where the first value represents the fetched snapshots, and the second value is a set of
39            snapshot IDs for which records were retrieved from the cache.
40
41        """
42        snapshots = {}
43        cache_hits: t.Set[SnapshotId] = set()
44
45        for s_id in snapshot_ids:
46            snapshot = self._snapshot_cache.get(self._entry_name(s_id))
47            if snapshot:
48                snapshot.intervals = []
49                snapshot.dev_intervals = []
50                snapshots[s_id] = snapshot
51                cache_hits.add(s_id)
52
53        snapshot_ids_to_load = snapshot_ids - snapshots.keys()
54        if snapshot_ids_to_load:
55            loaded_snapshots = loader(snapshot_ids_to_load)
56            for snapshot in loaded_snapshots:
57                snapshots[snapshot.snapshot_id] = snapshot
58
59        with optimized_query_cache_pool(self._optimized_query_cache) as executor:
60            for key, entry_name in executor.map(
61                load_optimized_query,
62                (
63                    (snapshot.model, s_id)
64                    for s_id, snapshot in snapshots.items()
65                    if snapshot.is_model
66                ),
67            ):
68                if entry_name:
69                    self._optimized_query_cache.with_optimized_query(
70                        snapshots[key].model, entry_name
71                    )
72
73        for snapshot in snapshots.values():
74            self._update_node_hash_cache(snapshot)
75
76            if snapshot.is_model and c.MAX_FORK_WORKERS == 1:
77                try:
78                    self._optimized_query_cache.with_optimized_query(snapshot.model)
79                except Exception:
80                    logger.exception(
81                        "Failed to cache optimized query for snapshot %s", snapshot.snapshot_id
82                    )
83
84            self.put(snapshot)
85
86        return snapshots, cache_hits

Fetches the target snapshots from cache or loads them using the provided loader on cache miss.

Arguments:
  • snapshot_ids: Target snapshot IDs to fetch.
  • loader: The loader to load snapshot records that are missing in the cache.
Returns:

A tuple where the first value represents the fetched snapshots, and the second value is a set of snapshot IDs for which records were retrieved from the cache.

def put(self, snapshot: sqlmesh.core.snapshot.definition.Snapshot) -> None:
 88    def put(self, snapshot: Snapshot) -> None:
 89        entry_name = self._entry_name(snapshot.snapshot_id)
 90
 91        if self._snapshot_cache.exists(entry_name):
 92            return
 93
 94        try:
 95            if snapshot.is_model:
 96                # make sure we preload full_depends_on
 97                snapshot.model.full_depends_on
 98            self._snapshot_cache.put(entry_name, value=snapshot)
 99        except Exception:
100            logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id)
def clear(self) -> None:
102    def clear(self) -> None:
103        self._snapshot_cache.clear()