Edit on GitHub

sqlmesh.utils.cache

  1from __future__ import annotations
  2
  3import gzip
  4import logging
  5import pickle
  6import typing as t
  7from pathlib import Path
  8
  9from sqlglot import __version__ as SQLGLOT_VERSION
 10
 11from sqlmesh.utils import sanitize_name
 12from sqlmesh.utils.date import to_datetime
 13from sqlmesh.utils.errors import SQLMeshError
 14from sqlmesh.utils.pydantic import PydanticModel
 15
 16logger = logging.getLogger(__name__)
 17
 18T = t.TypeVar("T", bound=PydanticModel)
 19
 20
 21SQLGLOT_VERSION_TUPLE = tuple(SQLGLOT_VERSION.split("."))
 22SQLGLOT_MAJOR_VERSION = SQLGLOT_VERSION_TUPLE[0]
 23SQLGLOT_MINOR_VERSION = SQLGLOT_VERSION_TUPLE[1]
 24
 25
 26class FileCache(t.Generic[T]):
 27    """Generic file-based cache implementation.
 28
 29    Args:
 30        path: The path to the cache folder.
 31        entry_class: The type of cached entries.
 32        prefix: The prefix shared between all entries to distinguish them from other entries
 33            stored in the same cache folder.
 34    """
 35
 36    def __init__(
 37        self,
 38        path: Path,
 39        entry_class: t.Type[T],
 40        prefix: t.Optional[str] = None,
 41    ):
 42        self._path = path / prefix if prefix else path
 43        self._entry_class = entry_class
 44
 45        from sqlmesh.core.state_sync.base import SCHEMA_VERSION
 46
 47        try:
 48            from sqlmesh._version import __version_tuple__
 49
 50            major, minor = __version_tuple__[0], __version_tuple__[1]
 51        except ImportError:
 52            major, minor = 0, 0
 53
 54        self._cache_version = "_".join(
 55            [
 56                str(major),
 57                str(minor),
 58                SQLGLOT_MAJOR_VERSION,
 59                SQLGLOT_MINOR_VERSION,
 60                str(SCHEMA_VERSION),
 61            ]
 62        )
 63
 64        threshold = to_datetime("1 week ago").timestamp()
 65        # delete all old cache files
 66        for file in self._path.glob("*"):
 67            if not file.stem.startswith(self._cache_version) or file.stat().st_mtime < threshold:
 68                file.unlink(missing_ok=True)
 69
 70    def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
 71        """Returns an existing cached entry or loads and caches a new one.
 72
 73        Args:
 74            name: The name of the entry.
 75            entry_id: The unique entry identifier. Used for cache invalidation.
 76            loader: Used to load a new entry when no cached instance was found.
 77
 78        Returns:
 79            The entry.
 80        """
 81        cached_entry = self.get(name, entry_id)
 82        if cached_entry:
 83            return cached_entry
 84
 85        loaded_entry = loader()
 86        self.put(name, entry_id, value=loaded_entry)
 87        return loaded_entry
 88
 89    def get(self, name: str, entry_id: str = "") -> t.Optional[T]:
 90        """Returns a cached entry if exists.
 91
 92        Args:
 93            name: The name of the entry.
 94            entry_id: The unique entry identifier. Used for cache invalidation.
 95
 96        Returns:
 97            The entry or None if no entry was found in the cache.
 98        """
 99        cache_entry_path = self._cache_entry_path(name, entry_id)
100        if cache_entry_path.exists():
101            with gzip.open(cache_entry_path, "rb") as fd:
102                try:
103                    return self._entry_class.parse_obj(pickle.load(fd))
104                except Exception as ex:
105                    logger.warning("Failed to load a cache entry '%s': %s", name, ex)
106
107        return None
108
109    def put(self, name: str, entry_id: str = "", *, value: T) -> None:
110        """Stores the given value in the cache.
111
112        Args:
113            name: The name of the entry.
114            entry_id: The unique entry identifier. Used for cache invalidation.
115            value: The value to store in the cache.
116        """
117        self._path.mkdir(parents=True, exist_ok=True)
118        if not self._path.is_dir():
119            raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
120
121        with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
122            pickle.dump(value.dict(), fd)
123
124    def _cache_entry_path(self, name: str, entry_id: str = "") -> Path:
125        entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p)
126        return self._path / sanitize_name(entry_file_name)
class FileCache(typing.Generic[~T]):
 27class FileCache(t.Generic[T]):
 28    """Generic file-based cache implementation.
 29
 30    Args:
 31        path: The path to the cache folder.
 32        entry_class: The type of cached entries.
 33        prefix: The prefix shared between all entries to distinguish them from other entries
 34            stored in the same cache folder.
 35    """
 36
 37    def __init__(
 38        self,
 39        path: Path,
 40        entry_class: t.Type[T],
 41        prefix: t.Optional[str] = None,
 42    ):
 43        self._path = path / prefix if prefix else path
 44        self._entry_class = entry_class
 45
 46        from sqlmesh.core.state_sync.base import SCHEMA_VERSION
 47
 48        try:
 49            from sqlmesh._version import __version_tuple__
 50
 51            major, minor = __version_tuple__[0], __version_tuple__[1]
 52        except ImportError:
 53            major, minor = 0, 0
 54
 55        self._cache_version = "_".join(
 56            [
 57                str(major),
 58                str(minor),
 59                SQLGLOT_MAJOR_VERSION,
 60                SQLGLOT_MINOR_VERSION,
 61                str(SCHEMA_VERSION),
 62            ]
 63        )
 64
 65        threshold = to_datetime("1 week ago").timestamp()
 66        # delete all old cache files
 67        for file in self._path.glob("*"):
 68            if not file.stem.startswith(self._cache_version) or file.stat().st_mtime < threshold:
 69                file.unlink(missing_ok=True)
 70
 71    def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
 72        """Returns an existing cached entry or loads and caches a new one.
 73
 74        Args:
 75            name: The name of the entry.
 76            entry_id: The unique entry identifier. Used for cache invalidation.
 77            loader: Used to load a new entry when no cached instance was found.
 78
 79        Returns:
 80            The entry.
 81        """
 82        cached_entry = self.get(name, entry_id)
 83        if cached_entry:
 84            return cached_entry
 85
 86        loaded_entry = loader()
 87        self.put(name, entry_id, value=loaded_entry)
 88        return loaded_entry
 89
 90    def get(self, name: str, entry_id: str = "") -> t.Optional[T]:
 91        """Returns a cached entry if exists.
 92
 93        Args:
 94            name: The name of the entry.
 95            entry_id: The unique entry identifier. Used for cache invalidation.
 96
 97        Returns:
 98            The entry or None if no entry was found in the cache.
 99        """
100        cache_entry_path = self._cache_entry_path(name, entry_id)
101        if cache_entry_path.exists():
102            with gzip.open(cache_entry_path, "rb") as fd:
103                try:
104                    return self._entry_class.parse_obj(pickle.load(fd))
105                except Exception as ex:
106                    logger.warning("Failed to load a cache entry '%s': %s", name, ex)
107
108        return None
109
110    def put(self, name: str, entry_id: str = "", *, value: T) -> None:
111        """Stores the given value in the cache.
112
113        Args:
114            name: The name of the entry.
115            entry_id: The unique entry identifier. Used for cache invalidation.
116            value: The value to store in the cache.
117        """
118        self._path.mkdir(parents=True, exist_ok=True)
119        if not self._path.is_dir():
120            raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
121
122        with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
123            pickle.dump(value.dict(), fd)
124
125    def _cache_entry_path(self, name: str, entry_id: str = "") -> Path:
126        entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p)
127        return self._path / sanitize_name(entry_file_name)

Generic file-based cache implementation.

Arguments:
  • path: The path to the cache folder.
  • entry_class: The type of cached entries.
  • prefix: The prefix shared between all entries to distinguish them from other entries stored in the same cache folder.
FileCache( path: pathlib.Path, entry_class: Type[~T], prefix: Union[str, NoneType] = None)
37    def __init__(
38        self,
39        path: Path,
40        entry_class: t.Type[T],
41        prefix: t.Optional[str] = None,
42    ):
43        self._path = path / prefix if prefix else path
44        self._entry_class = entry_class
45
46        from sqlmesh.core.state_sync.base import SCHEMA_VERSION
47
48        try:
49            from sqlmesh._version import __version_tuple__
50
51            major, minor = __version_tuple__[0], __version_tuple__[1]
52        except ImportError:
53            major, minor = 0, 0
54
55        self._cache_version = "_".join(
56            [
57                str(major),
58                str(minor),
59                SQLGLOT_MAJOR_VERSION,
60                SQLGLOT_MINOR_VERSION,
61                str(SCHEMA_VERSION),
62            ]
63        )
64
65        threshold = to_datetime("1 week ago").timestamp()
66        # delete all old cache files
67        for file in self._path.glob("*"):
68            if not file.stem.startswith(self._cache_version) or file.stat().st_mtime < threshold:
69                file.unlink(missing_ok=True)
def get_or_load(self, name: str, entry_id: str = '', *, loader: Callable[[], ~T]) -> ~T:
71    def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
72        """Returns an existing cached entry or loads and caches a new one.
73
74        Args:
75            name: The name of the entry.
76            entry_id: The unique entry identifier. Used for cache invalidation.
77            loader: Used to load a new entry when no cached instance was found.
78
79        Returns:
80            The entry.
81        """
82        cached_entry = self.get(name, entry_id)
83        if cached_entry:
84            return cached_entry
85
86        loaded_entry = loader()
87        self.put(name, entry_id, value=loaded_entry)
88        return loaded_entry

Returns an existing cached entry or loads and caches a new one.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
  • loader: Used to load a new entry when no cached instance was found.
Returns:

The entry.

def get(self, name: str, entry_id: str = '') -> Union[~T, NoneType]:
 90    def get(self, name: str, entry_id: str = "") -> t.Optional[T]:
 91        """Returns a cached entry if exists.
 92
 93        Args:
 94            name: The name of the entry.
 95            entry_id: The unique entry identifier. Used for cache invalidation.
 96
 97        Returns:
 98            The entry or None if no entry was found in the cache.
 99        """
100        cache_entry_path = self._cache_entry_path(name, entry_id)
101        if cache_entry_path.exists():
102            with gzip.open(cache_entry_path, "rb") as fd:
103                try:
104                    return self._entry_class.parse_obj(pickle.load(fd))
105                except Exception as ex:
106                    logger.warning("Failed to load a cache entry '%s': %s", name, ex)
107
108        return None

Returns a cached entry if exists.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
Returns:

The entry or None if no entry was found in the cache.

def put(self, name: str, entry_id: str = '', *, value: ~T) -> None:
110    def put(self, name: str, entry_id: str = "", *, value: T) -> None:
111        """Stores the given value in the cache.
112
113        Args:
114            name: The name of the entry.
115            entry_id: The unique entry identifier. Used for cache invalidation.
116            value: The value to store in the cache.
117        """
118        self._path.mkdir(parents=True, exist_ok=True)
119        if not self._path.is_dir():
120            raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
121
122        with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
123            pickle.dump(value.dict(), fd)

Stores the given value in the cache.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
  • value: The value to store in the cache.