sqlmesh.utils.cache
1from __future__ import annotations 2 3import gzip 4import logging 5import pickle 6import shutil 7import typing as t 8from pathlib import Path 9 10from sqlglot import __version__ as SQLGLOT_VERSION 11 12from sqlmesh.utils import sanitize_name 13from sqlmesh.utils.date import to_datetime 14from sqlmesh.utils.errors import SQLMeshError 15from sqlmesh.utils.windows import IS_WINDOWS, fix_windows_path 16 17logger = logging.getLogger(__name__) 18 19T = t.TypeVar("T") 20 21 22SQLGLOT_VERSION_TUPLE = tuple(SQLGLOT_VERSION.split(".")) 23SQLGLOT_MAJOR_VERSION = SQLGLOT_VERSION_TUPLE[0] 24SQLGLOT_MINOR_VERSION = SQLGLOT_VERSION_TUPLE[1] 25 26 27class FileCache(t.Generic[T]): 28 """Generic file-based cache implementation. 29 30 Args: 31 path: The path to the cache folder. 32 entry_class: The type of cached entries. 33 prefix: The prefix shared between all entries to distinguish them from other entries 34 stored in the same cache folder. 35 """ 36 37 def __init__(self, path: Path, prefix: t.Optional[str] = None): 38 self._path = path / prefix if prefix else path 39 40 from sqlmesh.core.state_sync.base import SCHEMA_VERSION 41 42 try: 43 from sqlmesh._version import __version_tuple__ 44 45 major, minor = __version_tuple__[0], __version_tuple__[1] 46 except ImportError: 47 major, minor = 0, 0 48 49 self._cache_version = "_".join( 50 [ 51 str(major), 52 str(minor), 53 SQLGLOT_MAJOR_VERSION, 54 SQLGLOT_MINOR_VERSION, 55 str(SCHEMA_VERSION), 56 ] 57 ) 58 59 threshold = to_datetime("1 week ago").timestamp() 60 # delete all old cache files 61 for file in self._path.glob("*"): 62 if IS_WINDOWS: 63 # the file.stat() call below will fail on windows if the :file name is longer than 260 chars 64 file = fix_windows_path(file) 65 66 if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold: 67 file.unlink(missing_ok=True) 68 69 def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T: 70 """Returns an existing cached entry or loads and caches a new one. 71 72 Args: 73 name: The name of the entry. 74 entry_id: The unique entry identifier. Used for cache invalidation. 75 loader: Used to load a new entry when no cached instance was found. 76 77 Returns: 78 The entry. 79 """ 80 cached_entry = self.get(name, entry_id) 81 if cached_entry is not None: 82 return cached_entry 83 84 loaded_entry = loader() 85 self.put(name, entry_id, value=loaded_entry) 86 return loaded_entry 87 88 def get(self, name: str, entry_id: str = "") -> t.Optional[T]: 89 """Returns a cached entry if exists. 90 91 Args: 92 name: The name of the entry. 93 entry_id: The unique entry identifier. Used for cache invalidation. 94 95 Returns: 96 The entry or None if no entry was found in the cache. 97 """ 98 cache_entry_path = self._cache_entry_path(name, entry_id) 99 if cache_entry_path.exists(): 100 with gzip.open(cache_entry_path, "rb") as fd: 101 try: 102 return pickle.load(fd) 103 except Exception as ex: 104 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 105 106 return None 107 108 def put(self, name: str, entry_id: str = "", *, value: T) -> None: 109 """Stores the given value in the cache. 110 111 Args: 112 name: The name of the entry. 113 entry_id: The unique entry identifier. Used for cache invalidation. 114 value: The value to store in the cache. 115 """ 116 self._path.mkdir(parents=True, exist_ok=True) 117 if not self._path.is_dir(): 118 raise SQLMeshError(f"Cache path '{self._path}' is not a directory.") 119 120 with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd: 121 pickle.dump(value, fd) 122 123 def exists(self, name: str, entry_id: str = "") -> bool: 124 """Returns true if the cache entry with the given name and ID exists, false otherwise. 125 126 Args: 127 name: The name of the entry. 128 entry_id: The unique entry identifier. Used for cache invalidation. 129 """ 130 return self._cache_entry_path(name, entry_id).exists() 131 132 def clear(self) -> None: 133 try: 134 shutil.rmtree(str(self._path.absolute())) 135 except Exception: 136 pass 137 138 def _cache_entry_path(self, name: str, entry_id: str = "") -> Path: 139 entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p) 140 full_path = self._path / sanitize_name(entry_file_name, include_unicode=True) 141 if IS_WINDOWS: 142 # handle paths longer than 260 chars 143 full_path = fix_windows_path(full_path) 144 return full_path
logger =
<Logger sqlmesh.utils.cache (WARNING)>
SQLGLOT_VERSION_TUPLE =
('30', '0', '3')
SQLGLOT_MAJOR_VERSION =
'30'
SQLGLOT_MINOR_VERSION =
'0'
class
FileCache(typing.Generic[~T]):
28class FileCache(t.Generic[T]): 29 """Generic file-based cache implementation. 30 31 Args: 32 path: The path to the cache folder. 33 entry_class: The type of cached entries. 34 prefix: The prefix shared between all entries to distinguish them from other entries 35 stored in the same cache folder. 36 """ 37 38 def __init__(self, path: Path, prefix: t.Optional[str] = None): 39 self._path = path / prefix if prefix else path 40 41 from sqlmesh.core.state_sync.base import SCHEMA_VERSION 42 43 try: 44 from sqlmesh._version import __version_tuple__ 45 46 major, minor = __version_tuple__[0], __version_tuple__[1] 47 except ImportError: 48 major, minor = 0, 0 49 50 self._cache_version = "_".join( 51 [ 52 str(major), 53 str(minor), 54 SQLGLOT_MAJOR_VERSION, 55 SQLGLOT_MINOR_VERSION, 56 str(SCHEMA_VERSION), 57 ] 58 ) 59 60 threshold = to_datetime("1 week ago").timestamp() 61 # delete all old cache files 62 for file in self._path.glob("*"): 63 if IS_WINDOWS: 64 # the file.stat() call below will fail on windows if the :file name is longer than 260 chars 65 file = fix_windows_path(file) 66 67 if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold: 68 file.unlink(missing_ok=True) 69 70 def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T: 71 """Returns an existing cached entry or loads and caches a new one. 72 73 Args: 74 name: The name of the entry. 75 entry_id: The unique entry identifier. Used for cache invalidation. 76 loader: Used to load a new entry when no cached instance was found. 77 78 Returns: 79 The entry. 80 """ 81 cached_entry = self.get(name, entry_id) 82 if cached_entry is not None: 83 return cached_entry 84 85 loaded_entry = loader() 86 self.put(name, entry_id, value=loaded_entry) 87 return loaded_entry 88 89 def get(self, name: str, entry_id: str = "") -> t.Optional[T]: 90 """Returns a cached entry if exists. 91 92 Args: 93 name: The name of the entry. 94 entry_id: The unique entry identifier. Used for cache invalidation. 95 96 Returns: 97 The entry or None if no entry was found in the cache. 98 """ 99 cache_entry_path = self._cache_entry_path(name, entry_id) 100 if cache_entry_path.exists(): 101 with gzip.open(cache_entry_path, "rb") as fd: 102 try: 103 return pickle.load(fd) 104 except Exception as ex: 105 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 106 107 return None 108 109 def put(self, name: str, entry_id: str = "", *, value: T) -> None: 110 """Stores the given value in the cache. 111 112 Args: 113 name: The name of the entry. 114 entry_id: The unique entry identifier. Used for cache invalidation. 115 value: The value to store in the cache. 116 """ 117 self._path.mkdir(parents=True, exist_ok=True) 118 if not self._path.is_dir(): 119 raise SQLMeshError(f"Cache path '{self._path}' is not a directory.") 120 121 with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd: 122 pickle.dump(value, fd) 123 124 def exists(self, name: str, entry_id: str = "") -> bool: 125 """Returns true if the cache entry with the given name and ID exists, false otherwise. 126 127 Args: 128 name: The name of the entry. 129 entry_id: The unique entry identifier. Used for cache invalidation. 130 """ 131 return self._cache_entry_path(name, entry_id).exists() 132 133 def clear(self) -> None: 134 try: 135 shutil.rmtree(str(self._path.absolute())) 136 except Exception: 137 pass 138 139 def _cache_entry_path(self, name: str, entry_id: str = "") -> Path: 140 entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p) 141 full_path = self._path / sanitize_name(entry_file_name, include_unicode=True) 142 if IS_WINDOWS: 143 # handle paths longer than 260 chars 144 full_path = fix_windows_path(full_path) 145 return full_path
Generic file-based cache implementation.
Arguments:
- path: The path to the cache folder.
- entry_class: The type of cached entries.
- prefix: The prefix shared between all entries to distinguish them from other entries stored in the same cache folder.
FileCache(path: pathlib.Path, prefix: Optional[str] = None)
38 def __init__(self, path: Path, prefix: t.Optional[str] = None): 39 self._path = path / prefix if prefix else path 40 41 from sqlmesh.core.state_sync.base import SCHEMA_VERSION 42 43 try: 44 from sqlmesh._version import __version_tuple__ 45 46 major, minor = __version_tuple__[0], __version_tuple__[1] 47 except ImportError: 48 major, minor = 0, 0 49 50 self._cache_version = "_".join( 51 [ 52 str(major), 53 str(minor), 54 SQLGLOT_MAJOR_VERSION, 55 SQLGLOT_MINOR_VERSION, 56 str(SCHEMA_VERSION), 57 ] 58 ) 59 60 threshold = to_datetime("1 week ago").timestamp() 61 # delete all old cache files 62 for file in self._path.glob("*"): 63 if IS_WINDOWS: 64 # the file.stat() call below will fail on windows if the :file name is longer than 260 chars 65 file = fix_windows_path(file) 66 67 if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold: 68 file.unlink(missing_ok=True)
def
get_or_load(self, name: str, entry_id: str = '', *, loader: Callable[[], ~T]) -> ~T:
70 def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T: 71 """Returns an existing cached entry or loads and caches a new one. 72 73 Args: 74 name: The name of the entry. 75 entry_id: The unique entry identifier. Used for cache invalidation. 76 loader: Used to load a new entry when no cached instance was found. 77 78 Returns: 79 The entry. 80 """ 81 cached_entry = self.get(name, entry_id) 82 if cached_entry is not None: 83 return cached_entry 84 85 loaded_entry = loader() 86 self.put(name, entry_id, value=loaded_entry) 87 return loaded_entry
Returns an existing cached entry or loads and caches a new one.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
- loader: Used to load a new entry when no cached instance was found.
Returns:
The entry.
def
get(self, name: str, entry_id: str = '') -> Optional[~T]:
89 def get(self, name: str, entry_id: str = "") -> t.Optional[T]: 90 """Returns a cached entry if exists. 91 92 Args: 93 name: The name of the entry. 94 entry_id: The unique entry identifier. Used for cache invalidation. 95 96 Returns: 97 The entry or None if no entry was found in the cache. 98 """ 99 cache_entry_path = self._cache_entry_path(name, entry_id) 100 if cache_entry_path.exists(): 101 with gzip.open(cache_entry_path, "rb") as fd: 102 try: 103 return pickle.load(fd) 104 except Exception as ex: 105 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 106 107 return None
Returns a cached entry if exists.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
Returns:
The entry or None if no entry was found in the cache.
def
put(self, name: str, entry_id: str = '', *, value: ~T) -> None:
109 def put(self, name: str, entry_id: str = "", *, value: T) -> None: 110 """Stores the given value in the cache. 111 112 Args: 113 name: The name of the entry. 114 entry_id: The unique entry identifier. Used for cache invalidation. 115 value: The value to store in the cache. 116 """ 117 self._path.mkdir(parents=True, exist_ok=True) 118 if not self._path.is_dir(): 119 raise SQLMeshError(f"Cache path '{self._path}' is not a directory.") 120 121 with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd: 122 pickle.dump(value, fd)
Stores the given value in the cache.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
- value: The value to store in the cache.
def
exists(self, name: str, entry_id: str = '') -> bool:
124 def exists(self, name: str, entry_id: str = "") -> bool: 125 """Returns true if the cache entry with the given name and ID exists, false otherwise. 126 127 Args: 128 name: The name of the entry. 129 entry_id: The unique entry identifier. Used for cache invalidation. 130 """ 131 return self._cache_entry_path(name, entry_id).exists()
Returns true if the cache entry with the given name and ID exists, false otherwise.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.