sqlmesh.utils.cache
1from __future__ import annotations 2 3import gzip 4import logging 5import pickle 6import typing as t 7from pathlib import Path 8 9from sqlglot import __version__ as SQLGLOT_VERSION 10 11from sqlmesh.utils import sanitize_name 12from sqlmesh.utils.date import to_datetime 13from sqlmesh.utils.errors import SQLMeshError 14from sqlmesh.utils.pydantic import PydanticModel 15 16logger = logging.getLogger(__name__) 17 18T = t.TypeVar("T", bound=PydanticModel) 19 20 21SQLGLOT_VERSION_TUPLE = tuple(SQLGLOT_VERSION.split(".")) 22SQLGLOT_MAJOR_VERSION = SQLGLOT_VERSION_TUPLE[0] 23SQLGLOT_MINOR_VERSION = SQLGLOT_VERSION_TUPLE[1] 24 25 26class FileCache(t.Generic[T]): 27 """Generic file-based cache implementation. 28 29 Args: 30 path: The path to the cache folder. 31 entry_class: The type of cached entries. 32 prefix: The prefix shared between all entries to distinguish them from other entries 33 stored in the same cache folder. 34 """ 35 36 def __init__( 37 self, 38 path: Path, 39 entry_class: t.Type[T], 40 prefix: t.Optional[str] = None, 41 ): 42 self._path = path / prefix if prefix else path 43 self._entry_class = entry_class 44 45 from sqlmesh.core.state_sync.base import SCHEMA_VERSION 46 47 try: 48 from sqlmesh._version import __version_tuple__ 49 50 major, minor = __version_tuple__[0], __version_tuple__[1] 51 except ImportError: 52 major, minor = 0, 0 53 54 self._cache_version = "_".join( 55 [ 56 str(major), 57 str(minor), 58 SQLGLOT_MAJOR_VERSION, 59 SQLGLOT_MINOR_VERSION, 60 str(SCHEMA_VERSION), 61 ] 62 ) 63 64 threshold = to_datetime("1 week ago").timestamp() 65 # delete all old cache files 66 for file in self._path.glob("*"): 67 if not file.stem.startswith(self._cache_version) or file.stat().st_mtime < threshold: 68 file.unlink(missing_ok=True) 69 70 def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T: 71 """Returns an existing cached entry or loads and caches a new one. 72 73 Args: 74 name: The name of the entry. 75 entry_id: The unique entry identifier. Used for cache invalidation. 76 loader: Used to load a new entry when no cached instance was found. 77 78 Returns: 79 The entry. 80 """ 81 cached_entry = self.get(name, entry_id) 82 if cached_entry: 83 return cached_entry 84 85 loaded_entry = loader() 86 self.put(name, entry_id, value=loaded_entry) 87 return loaded_entry 88 89 def get(self, name: str, entry_id: str = "") -> t.Optional[T]: 90 """Returns a cached entry if exists. 91 92 Args: 93 name: The name of the entry. 94 entry_id: The unique entry identifier. Used for cache invalidation. 95 96 Returns: 97 The entry or None if no entry was found in the cache. 98 """ 99 cache_entry_path = self._cache_entry_path(name, entry_id) 100 if cache_entry_path.exists(): 101 with gzip.open(cache_entry_path, "rb") as fd: 102 try: 103 return self._entry_class.parse_obj(pickle.load(fd)) 104 except Exception as ex: 105 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 106 107 return None 108 109 def put(self, name: str, entry_id: str = "", *, value: T) -> None: 110 """Stores the given value in the cache. 111 112 Args: 113 name: The name of the entry. 114 entry_id: The unique entry identifier. Used for cache invalidation. 115 value: The value to store in the cache. 116 """ 117 self._path.mkdir(parents=True, exist_ok=True) 118 if not self._path.is_dir(): 119 raise SQLMeshError(f"Cache path '{self._path}' is not a directory.") 120 121 with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd: 122 pickle.dump(value.dict(), fd) 123 124 def _cache_entry_path(self, name: str, entry_id: str = "") -> Path: 125 entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p) 126 return self._path / sanitize_name(entry_file_name)
class
FileCache(typing.Generic[~T]):
27class FileCache(t.Generic[T]): 28 """Generic file-based cache implementation. 29 30 Args: 31 path: The path to the cache folder. 32 entry_class: The type of cached entries. 33 prefix: The prefix shared between all entries to distinguish them from other entries 34 stored in the same cache folder. 35 """ 36 37 def __init__( 38 self, 39 path: Path, 40 entry_class: t.Type[T], 41 prefix: t.Optional[str] = None, 42 ): 43 self._path = path / prefix if prefix else path 44 self._entry_class = entry_class 45 46 from sqlmesh.core.state_sync.base import SCHEMA_VERSION 47 48 try: 49 from sqlmesh._version import __version_tuple__ 50 51 major, minor = __version_tuple__[0], __version_tuple__[1] 52 except ImportError: 53 major, minor = 0, 0 54 55 self._cache_version = "_".join( 56 [ 57 str(major), 58 str(minor), 59 SQLGLOT_MAJOR_VERSION, 60 SQLGLOT_MINOR_VERSION, 61 str(SCHEMA_VERSION), 62 ] 63 ) 64 65 threshold = to_datetime("1 week ago").timestamp() 66 # delete all old cache files 67 for file in self._path.glob("*"): 68 if not file.stem.startswith(self._cache_version) or file.stat().st_mtime < threshold: 69 file.unlink(missing_ok=True) 70 71 def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T: 72 """Returns an existing cached entry or loads and caches a new one. 73 74 Args: 75 name: The name of the entry. 76 entry_id: The unique entry identifier. Used for cache invalidation. 77 loader: Used to load a new entry when no cached instance was found. 78 79 Returns: 80 The entry. 81 """ 82 cached_entry = self.get(name, entry_id) 83 if cached_entry: 84 return cached_entry 85 86 loaded_entry = loader() 87 self.put(name, entry_id, value=loaded_entry) 88 return loaded_entry 89 90 def get(self, name: str, entry_id: str = "") -> t.Optional[T]: 91 """Returns a cached entry if exists. 92 93 Args: 94 name: The name of the entry. 95 entry_id: The unique entry identifier. Used for cache invalidation. 96 97 Returns: 98 The entry or None if no entry was found in the cache. 99 """ 100 cache_entry_path = self._cache_entry_path(name, entry_id) 101 if cache_entry_path.exists(): 102 with gzip.open(cache_entry_path, "rb") as fd: 103 try: 104 return self._entry_class.parse_obj(pickle.load(fd)) 105 except Exception as ex: 106 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 107 108 return None 109 110 def put(self, name: str, entry_id: str = "", *, value: T) -> None: 111 """Stores the given value in the cache. 112 113 Args: 114 name: The name of the entry. 115 entry_id: The unique entry identifier. Used for cache invalidation. 116 value: The value to store in the cache. 117 """ 118 self._path.mkdir(parents=True, exist_ok=True) 119 if not self._path.is_dir(): 120 raise SQLMeshError(f"Cache path '{self._path}' is not a directory.") 121 122 with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd: 123 pickle.dump(value.dict(), fd) 124 125 def _cache_entry_path(self, name: str, entry_id: str = "") -> Path: 126 entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p) 127 return self._path / sanitize_name(entry_file_name)
Generic file-based cache implementation.
Arguments:
- path: The path to the cache folder.
- entry_class: The type of cached entries.
- prefix: The prefix shared between all entries to distinguish them from other entries stored in the same cache folder.
FileCache( path: pathlib.Path, entry_class: Type[~T], prefix: Union[str, NoneType] = None)
37 def __init__( 38 self, 39 path: Path, 40 entry_class: t.Type[T], 41 prefix: t.Optional[str] = None, 42 ): 43 self._path = path / prefix if prefix else path 44 self._entry_class = entry_class 45 46 from sqlmesh.core.state_sync.base import SCHEMA_VERSION 47 48 try: 49 from sqlmesh._version import __version_tuple__ 50 51 major, minor = __version_tuple__[0], __version_tuple__[1] 52 except ImportError: 53 major, minor = 0, 0 54 55 self._cache_version = "_".join( 56 [ 57 str(major), 58 str(minor), 59 SQLGLOT_MAJOR_VERSION, 60 SQLGLOT_MINOR_VERSION, 61 str(SCHEMA_VERSION), 62 ] 63 ) 64 65 threshold = to_datetime("1 week ago").timestamp() 66 # delete all old cache files 67 for file in self._path.glob("*"): 68 if not file.stem.startswith(self._cache_version) or file.stat().st_mtime < threshold: 69 file.unlink(missing_ok=True)
def
get_or_load(self, name: str, entry_id: str = '', *, loader: Callable[[], ~T]) -> ~T:
71 def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T: 72 """Returns an existing cached entry or loads and caches a new one. 73 74 Args: 75 name: The name of the entry. 76 entry_id: The unique entry identifier. Used for cache invalidation. 77 loader: Used to load a new entry when no cached instance was found. 78 79 Returns: 80 The entry. 81 """ 82 cached_entry = self.get(name, entry_id) 83 if cached_entry: 84 return cached_entry 85 86 loaded_entry = loader() 87 self.put(name, entry_id, value=loaded_entry) 88 return loaded_entry
Returns an existing cached entry or loads and caches a new one.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
- loader: Used to load a new entry when no cached instance was found.
Returns:
The entry.
def
get(self, name: str, entry_id: str = '') -> Union[~T, NoneType]:
90 def get(self, name: str, entry_id: str = "") -> t.Optional[T]: 91 """Returns a cached entry if exists. 92 93 Args: 94 name: The name of the entry. 95 entry_id: The unique entry identifier. Used for cache invalidation. 96 97 Returns: 98 The entry or None if no entry was found in the cache. 99 """ 100 cache_entry_path = self._cache_entry_path(name, entry_id) 101 if cache_entry_path.exists(): 102 with gzip.open(cache_entry_path, "rb") as fd: 103 try: 104 return self._entry_class.parse_obj(pickle.load(fd)) 105 except Exception as ex: 106 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 107 108 return None
Returns a cached entry if exists.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
Returns:
The entry or None if no entry was found in the cache.
def
put(self, name: str, entry_id: str = '', *, value: ~T) -> None:
110 def put(self, name: str, entry_id: str = "", *, value: T) -> None: 111 """Stores the given value in the cache. 112 113 Args: 114 name: The name of the entry. 115 entry_id: The unique entry identifier. Used for cache invalidation. 116 value: The value to store in the cache. 117 """ 118 self._path.mkdir(parents=True, exist_ok=True) 119 if not self._path.is_dir(): 120 raise SQLMeshError(f"Cache path '{self._path}' is not a directory.") 121 122 with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd: 123 pickle.dump(value.dict(), fd)
Stores the given value in the cache.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
- value: The value to store in the cache.