Edit on GitHub

sqlmesh.utils.cache

  1from __future__ import annotations
  2
  3import gzip
  4import logging
  5import pickle
  6import shutil
  7import typing as t
  8from pathlib import Path
  9
 10from sqlglot import __version__ as SQLGLOT_VERSION
 11
 12from sqlmesh.utils import sanitize_name
 13from sqlmesh.utils.date import to_datetime
 14from sqlmesh.utils.errors import SQLMeshError
 15from sqlmesh.utils.windows import IS_WINDOWS, fix_windows_path
 16
 17logger = logging.getLogger(__name__)
 18
 19T = t.TypeVar("T")
 20
 21
 22SQLGLOT_VERSION_TUPLE = tuple(SQLGLOT_VERSION.split("."))
 23SQLGLOT_MAJOR_VERSION = SQLGLOT_VERSION_TUPLE[0]
 24SQLGLOT_MINOR_VERSION = SQLGLOT_VERSION_TUPLE[1]
 25
 26
 27class FileCache(t.Generic[T]):
 28    """Generic file-based cache implementation.
 29
 30    Args:
 31        path: The path to the cache folder.
 32        entry_class: The type of cached entries.
 33        prefix: The prefix shared between all entries to distinguish them from other entries
 34            stored in the same cache folder.
 35    """
 36
 37    def __init__(self, path: Path, prefix: t.Optional[str] = None):
 38        self._path = path / prefix if prefix else path
 39
 40        from sqlmesh.core.state_sync.base import SCHEMA_VERSION
 41
 42        try:
 43            from sqlmesh._version import __version_tuple__
 44
 45            major, minor = __version_tuple__[0], __version_tuple__[1]
 46        except ImportError:
 47            major, minor = 0, 0
 48
 49        self._cache_version = "_".join(
 50            [
 51                str(major),
 52                str(minor),
 53                SQLGLOT_MAJOR_VERSION,
 54                SQLGLOT_MINOR_VERSION,
 55                str(SCHEMA_VERSION),
 56            ]
 57        )
 58
 59        threshold = to_datetime("1 week ago").timestamp()
 60        # delete all old cache files
 61        for file in self._path.glob("*"):
 62            if IS_WINDOWS:
 63                # the file.stat() call below will fail on windows if the :file name is longer than 260 chars
 64                file = fix_windows_path(file)
 65
 66            if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold:
 67                file.unlink(missing_ok=True)
 68
 69    def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
 70        """Returns an existing cached entry or loads and caches a new one.
 71
 72        Args:
 73            name: The name of the entry.
 74            entry_id: The unique entry identifier. Used for cache invalidation.
 75            loader: Used to load a new entry when no cached instance was found.
 76
 77        Returns:
 78            The entry.
 79        """
 80        cached_entry = self.get(name, entry_id)
 81        if cached_entry is not None:
 82            return cached_entry
 83
 84        loaded_entry = loader()
 85        self.put(name, entry_id, value=loaded_entry)
 86        return loaded_entry
 87
 88    def get(self, name: str, entry_id: str = "") -> t.Optional[T]:
 89        """Returns a cached entry if exists.
 90
 91        Args:
 92            name: The name of the entry.
 93            entry_id: The unique entry identifier. Used for cache invalidation.
 94
 95        Returns:
 96            The entry or None if no entry was found in the cache.
 97        """
 98        cache_entry_path = self._cache_entry_path(name, entry_id)
 99        if cache_entry_path.exists():
100            with gzip.open(cache_entry_path, "rb") as fd:
101                try:
102                    return pickle.load(fd)
103                except Exception as ex:
104                    logger.warning("Failed to load a cache entry '%s': %s", name, ex)
105
106        return None
107
108    def put(self, name: str, entry_id: str = "", *, value: T) -> None:
109        """Stores the given value in the cache.
110
111        Args:
112            name: The name of the entry.
113            entry_id: The unique entry identifier. Used for cache invalidation.
114            value: The value to store in the cache.
115        """
116        self._path.mkdir(parents=True, exist_ok=True)
117        if not self._path.is_dir():
118            raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
119
120        with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
121            pickle.dump(value, fd)
122
123    def exists(self, name: str, entry_id: str = "") -> bool:
124        """Returns true if the cache entry with the given name and ID exists, false otherwise.
125
126        Args:
127            name: The name of the entry.
128            entry_id: The unique entry identifier. Used for cache invalidation.
129        """
130        return self._cache_entry_path(name, entry_id).exists()
131
132    def clear(self) -> None:
133        try:
134            shutil.rmtree(str(self._path.absolute()))
135        except Exception:
136            pass
137
138    def _cache_entry_path(self, name: str, entry_id: str = "") -> Path:
139        entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p)
140        full_path = self._path / sanitize_name(entry_file_name, include_unicode=True)
141        if IS_WINDOWS:
142            # handle paths longer than 260 chars
143            full_path = fix_windows_path(full_path)
144        return full_path
logger = <Logger sqlmesh.utils.cache (WARNING)>
SQLGLOT_VERSION_TUPLE = ('30', '0', '3')
SQLGLOT_MAJOR_VERSION = '30'
SQLGLOT_MINOR_VERSION = '0'
class FileCache(typing.Generic[~T]):
 28class FileCache(t.Generic[T]):
 29    """Generic file-based cache implementation.
 30
 31    Args:
 32        path: The path to the cache folder.
 33        entry_class: The type of cached entries.
 34        prefix: The prefix shared between all entries to distinguish them from other entries
 35            stored in the same cache folder.
 36    """
 37
 38    def __init__(self, path: Path, prefix: t.Optional[str] = None):
 39        self._path = path / prefix if prefix else path
 40
 41        from sqlmesh.core.state_sync.base import SCHEMA_VERSION
 42
 43        try:
 44            from sqlmesh._version import __version_tuple__
 45
 46            major, minor = __version_tuple__[0], __version_tuple__[1]
 47        except ImportError:
 48            major, minor = 0, 0
 49
 50        self._cache_version = "_".join(
 51            [
 52                str(major),
 53                str(minor),
 54                SQLGLOT_MAJOR_VERSION,
 55                SQLGLOT_MINOR_VERSION,
 56                str(SCHEMA_VERSION),
 57            ]
 58        )
 59
 60        threshold = to_datetime("1 week ago").timestamp()
 61        # delete all old cache files
 62        for file in self._path.glob("*"):
 63            if IS_WINDOWS:
 64                # the file.stat() call below will fail on windows if the :file name is longer than 260 chars
 65                file = fix_windows_path(file)
 66
 67            if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold:
 68                file.unlink(missing_ok=True)
 69
 70    def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
 71        """Returns an existing cached entry or loads and caches a new one.
 72
 73        Args:
 74            name: The name of the entry.
 75            entry_id: The unique entry identifier. Used for cache invalidation.
 76            loader: Used to load a new entry when no cached instance was found.
 77
 78        Returns:
 79            The entry.
 80        """
 81        cached_entry = self.get(name, entry_id)
 82        if cached_entry is not None:
 83            return cached_entry
 84
 85        loaded_entry = loader()
 86        self.put(name, entry_id, value=loaded_entry)
 87        return loaded_entry
 88
 89    def get(self, name: str, entry_id: str = "") -> t.Optional[T]:
 90        """Returns a cached entry if exists.
 91
 92        Args:
 93            name: The name of the entry.
 94            entry_id: The unique entry identifier. Used for cache invalidation.
 95
 96        Returns:
 97            The entry or None if no entry was found in the cache.
 98        """
 99        cache_entry_path = self._cache_entry_path(name, entry_id)
100        if cache_entry_path.exists():
101            with gzip.open(cache_entry_path, "rb") as fd:
102                try:
103                    return pickle.load(fd)
104                except Exception as ex:
105                    logger.warning("Failed to load a cache entry '%s': %s", name, ex)
106
107        return None
108
109    def put(self, name: str, entry_id: str = "", *, value: T) -> None:
110        """Stores the given value in the cache.
111
112        Args:
113            name: The name of the entry.
114            entry_id: The unique entry identifier. Used for cache invalidation.
115            value: The value to store in the cache.
116        """
117        self._path.mkdir(parents=True, exist_ok=True)
118        if not self._path.is_dir():
119            raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
120
121        with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
122            pickle.dump(value, fd)
123
124    def exists(self, name: str, entry_id: str = "") -> bool:
125        """Returns true if the cache entry with the given name and ID exists, false otherwise.
126
127        Args:
128            name: The name of the entry.
129            entry_id: The unique entry identifier. Used for cache invalidation.
130        """
131        return self._cache_entry_path(name, entry_id).exists()
132
133    def clear(self) -> None:
134        try:
135            shutil.rmtree(str(self._path.absolute()))
136        except Exception:
137            pass
138
139    def _cache_entry_path(self, name: str, entry_id: str = "") -> Path:
140        entry_file_name = "__".join(p for p in (self._cache_version, name, entry_id) if p)
141        full_path = self._path / sanitize_name(entry_file_name, include_unicode=True)
142        if IS_WINDOWS:
143            # handle paths longer than 260 chars
144            full_path = fix_windows_path(full_path)
145        return full_path

Generic file-based cache implementation.

Arguments:
  • path: The path to the cache folder.
  • entry_class: The type of cached entries.
  • prefix: The prefix shared between all entries to distinguish them from other entries stored in the same cache folder.
FileCache(path: pathlib.Path, prefix: Optional[str] = None)
38    def __init__(self, path: Path, prefix: t.Optional[str] = None):
39        self._path = path / prefix if prefix else path
40
41        from sqlmesh.core.state_sync.base import SCHEMA_VERSION
42
43        try:
44            from sqlmesh._version import __version_tuple__
45
46            major, minor = __version_tuple__[0], __version_tuple__[1]
47        except ImportError:
48            major, minor = 0, 0
49
50        self._cache_version = "_".join(
51            [
52                str(major),
53                str(minor),
54                SQLGLOT_MAJOR_VERSION,
55                SQLGLOT_MINOR_VERSION,
56                str(SCHEMA_VERSION),
57            ]
58        )
59
60        threshold = to_datetime("1 week ago").timestamp()
61        # delete all old cache files
62        for file in self._path.glob("*"):
63            if IS_WINDOWS:
64                # the file.stat() call below will fail on windows if the :file name is longer than 260 chars
65                file = fix_windows_path(file)
66
67            if not file.stem.startswith(self._cache_version) or file.stat().st_atime < threshold:
68                file.unlink(missing_ok=True)
def get_or_load(self, name: str, entry_id: str = '', *, loader: Callable[[], ~T]) -> ~T:
70    def get_or_load(self, name: str, entry_id: str = "", *, loader: t.Callable[[], T]) -> T:
71        """Returns an existing cached entry or loads and caches a new one.
72
73        Args:
74            name: The name of the entry.
75            entry_id: The unique entry identifier. Used for cache invalidation.
76            loader: Used to load a new entry when no cached instance was found.
77
78        Returns:
79            The entry.
80        """
81        cached_entry = self.get(name, entry_id)
82        if cached_entry is not None:
83            return cached_entry
84
85        loaded_entry = loader()
86        self.put(name, entry_id, value=loaded_entry)
87        return loaded_entry

Returns an existing cached entry or loads and caches a new one.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
  • loader: Used to load a new entry when no cached instance was found.
Returns:

The entry.

def get(self, name: str, entry_id: str = '') -> Optional[~T]:
 89    def get(self, name: str, entry_id: str = "") -> t.Optional[T]:
 90        """Returns a cached entry if exists.
 91
 92        Args:
 93            name: The name of the entry.
 94            entry_id: The unique entry identifier. Used for cache invalidation.
 95
 96        Returns:
 97            The entry or None if no entry was found in the cache.
 98        """
 99        cache_entry_path = self._cache_entry_path(name, entry_id)
100        if cache_entry_path.exists():
101            with gzip.open(cache_entry_path, "rb") as fd:
102                try:
103                    return pickle.load(fd)
104                except Exception as ex:
105                    logger.warning("Failed to load a cache entry '%s': %s", name, ex)
106
107        return None

Returns a cached entry if exists.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
Returns:

The entry or None if no entry was found in the cache.

def put(self, name: str, entry_id: str = '', *, value: ~T) -> None:
109    def put(self, name: str, entry_id: str = "", *, value: T) -> None:
110        """Stores the given value in the cache.
111
112        Args:
113            name: The name of the entry.
114            entry_id: The unique entry identifier. Used for cache invalidation.
115            value: The value to store in the cache.
116        """
117        self._path.mkdir(parents=True, exist_ok=True)
118        if not self._path.is_dir():
119            raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
120
121        with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
122            pickle.dump(value, fd)

Stores the given value in the cache.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
  • value: The value to store in the cache.
def exists(self, name: str, entry_id: str = '') -> bool:
124    def exists(self, name: str, entry_id: str = "") -> bool:
125        """Returns true if the cache entry with the given name and ID exists, false otherwise.
126
127        Args:
128            name: The name of the entry.
129            entry_id: The unique entry identifier. Used for cache invalidation.
130        """
131        return self._cache_entry_path(name, entry_id).exists()

Returns true if the cache entry with the given name and ID exists, false otherwise.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
def clear(self) -> None:
133    def clear(self) -> None:
134        try:
135            shutil.rmtree(str(self._path.absolute()))
136        except Exception:
137            pass