Edit on GitHub

sqlmesh.core.model.cache

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5from pathlib import Path
  6
  7from sqlglot import exp
  8from sqlglot.helper import seq_get
  9from sqlglot.optimizer.simplify import gen
 10from sqlglot.schema import MappingSchema
 11
 12from sqlmesh.core import constants as c
 13from sqlmesh.core.model.definition import ExternalModel, Model, SqlModel, _Model
 14from sqlmesh.utils.cache import FileCache
 15from sqlmesh.utils.hashing import crc32
 16from sqlmesh.utils.process import PoolExecutor, create_process_pool_executor
 17
 18from dataclasses import dataclass
 19
 20logger = logging.getLogger(__name__)
 21
 22if t.TYPE_CHECKING:
 23    from sqlmesh.core.snapshot import SnapshotId
 24    from sqlmesh.core.linter.rule import Rule
 25
 26    T = t.TypeVar("T")
 27
 28
 29class ModelCache:
 30    """File-based cache implementation for model definitions.
 31
 32    Args:
 33        path: The path to the cache folder.
 34    """
 35
 36    def __init__(self, path: Path):
 37        self.path = path
 38        self._file_cache: FileCache[t.List[Model]] = FileCache(
 39            path,
 40            prefix="model_definition",
 41        )
 42
 43    def get_or_load(
 44        self, name: str, entry_id: str = "", *, loader: t.Callable[[], t.List[Model]]
 45    ) -> t.List[Model]:
 46        """Returns an existing cached model definition or loads and caches a new one.
 47        Args:
 48            name: The name of the entry.
 49            entry_id: The unique entry identifier. Used for cache invalidation.
 50            loader: Used to load a new model definition when no cached instance was found.
 51        Returns:
 52            The model definition.
 53        """
 54        cache_entry = self._file_cache.get(name, entry_id)
 55        if isinstance(cache_entry, list) and isinstance(seq_get(cache_entry, 0), _Model):
 56            return cache_entry
 57
 58        models = loader()
 59        if isinstance(models, list) and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)):
 60            # make sure we preload full_depends_on
 61            for model in models:
 62                model.full_depends_on
 63
 64            self._file_cache.put(name, entry_id, value=models)
 65        return models
 66
 67    def put(self, models: t.List[Model], name: str, entry_id: str = "") -> bool:
 68        if models and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)):
 69            # make sure we preload full_depends_on
 70            for model in models:
 71                model.full_depends_on
 72
 73            self._file_cache.put(name, entry_id, value=models)
 74            return True
 75
 76        return False
 77
 78    def get(self, name: str, entry_id: str = "") -> t.List[Model]:
 79        return self._file_cache.get(name, entry_id) or []
 80
 81
 82@dataclass
 83class OptimizedQueryCacheEntry:
 84    optimized_rendered_query: t.Optional[exp.Query]
 85    renderer_violations: t.Optional[t.Dict[type[Rule], t.Any]]
 86
 87
 88class OptimizedQueryCache:
 89    """File-based cache implementation for optimized model queries.
 90
 91    Args:
 92        path: The path to the cache folder.
 93    """
 94
 95    def __init__(self, path: Path):
 96        self.path = path
 97        self._file_cache: FileCache[OptimizedQueryCacheEntry] = FileCache(
 98            path, prefix="optimized_query"
 99        )
100
101    def with_optimized_query(self, model: Model, name: t.Optional[str] = None) -> bool:
102        """Adds an optimized query to the model's in-memory cache.
103
104        Args:
105            model: The model to add the optimized query to.
106            name: The cache entry name of the model.
107        """
108        if not isinstance(model, SqlModel):
109            return False
110
111        name = self._entry_name(model) if name is None else name
112        cache_entry = self._file_cache.get(name)
113        if cache_entry:
114            try:
115                # If the optimized rendered query is None, then there are likely adapter calls in the query
116                # that prevent us from rendering it at load time. This means that we can safely set the
117                # unoptimized cache to None as well to prevent attempts to render it downstream.
118                optimized = cache_entry.optimized_rendered_query is not None
119                model._query_renderer.update_cache(
120                    cache_entry.optimized_rendered_query,
121                    cache_entry.renderer_violations,
122                    optimized=optimized,
123                )
124                return True
125            except Exception as ex:
126                logger.warning("Failed to load a cache entry '%s': %s", name, ex)
127
128        self._put(name, model)
129        return False
130
131    def put(self, model: Model) -> t.Optional[str]:
132        if not isinstance(model, SqlModel):
133            return None
134
135        name = self._entry_name(model)
136
137        if self._file_cache.exists(name):
138            return name
139
140        self._put(name, model)
141        return name
142
143    def _put(self, name: str, model: SqlModel) -> None:
144        optimized_query = model.render_query()
145
146        new_entry = OptimizedQueryCacheEntry(
147            optimized_rendered_query=optimized_query,
148            renderer_violations=model.violated_rules_for_query,
149        )
150        self._file_cache.put(name, value=new_entry)
151
152    @staticmethod
153    def _entry_name(model: SqlModel) -> str:
154        hash_data = _mapping_schema_hash_data(model.mapping_schema)
155        hash_data.append(gen(model.query, comments=True))
156        hash_data.append(str([gen(d) for d in model.macro_definitions]))
157        hash_data.append(str([(k, v) for k, v in model.sorted_python_env]))
158        hash_data.extend(model.jinja_macros.data_hash_values)
159        return f"{model.name}_{crc32(hash_data)}"
160
161
162def optimized_query_cache_pool(optimized_query_cache: OptimizedQueryCache) -> PoolExecutor:
163    return create_process_pool_executor(
164        initializer=_init_optimized_query_cache,
165        initargs=(optimized_query_cache,),
166        max_workers=c.MAX_FORK_WORKERS,
167    )
168
169
170_optimized_query_cache: t.Optional[OptimizedQueryCache] = None
171
172
173def _init_optimized_query_cache(optimized_query_cache: OptimizedQueryCache) -> None:
174    global _optimized_query_cache
175    _optimized_query_cache = optimized_query_cache
176
177
178def load_optimized_query(
179    model_snapshot_id: t.Tuple[Model, SnapshotId],
180) -> t.Tuple[SnapshotId, t.Optional[str]]:
181    assert _optimized_query_cache
182    model, snapshot_id = model_snapshot_id
183
184    entry_name = None
185
186    if isinstance(model, SqlModel):
187        try:
188            entry_name = _optimized_query_cache.put(model)
189        except:
190            # this can happen if there is a query rendering error.
191            # for example, the model query references some python library or function that was available
192            # at the time the model was created but has since been removed locally
193            logger.exception(f"Failed to cache optimized query for model '{model.name}'")
194
195    return snapshot_id, entry_name
196
197
198def load_optimized_query_and_mapping(
199    model: Model, mapping: t.Dict
200) -> t.Tuple[str, t.Optional[str], str, str, t.Dict]:
201    assert _optimized_query_cache
202
203    schema = MappingSchema(normalize=False)
204    for parent, columns_to_types in mapping.items():
205        schema.add_table(parent, columns_to_types, dialect=model.dialect)
206    model.update_schema(schema)
207
208    if isinstance(model, SqlModel):
209        entry_name = _optimized_query_cache._entry_name(model)
210        _optimized_query_cache.with_optimized_query(model, entry_name)
211    else:
212        entry_name = None
213
214    return (
215        model.fqn,
216        entry_name,
217        model.data_hash,
218        model.metadata_hash,
219        model.mapping_schema,
220    )
221
222
223def _mapping_schema_hash_data(schema: t.Dict[str, t.Any]) -> t.List[str]:
224    keys = sorted(schema) if all(isinstance(v, dict) for v in schema.values()) else schema
225
226    data = []
227    for k in keys:
228        data.append(k)
229        if isinstance(schema[k], dict):
230            data.extend(_mapping_schema_hash_data(schema[k]))
231        else:
232            data.append(str(schema[k]))
233
234    return data
logger = <Logger sqlmesh.core.model.cache (WARNING)>
class ModelCache:
30class ModelCache:
31    """File-based cache implementation for model definitions.
32
33    Args:
34        path: The path to the cache folder.
35    """
36
37    def __init__(self, path: Path):
38        self.path = path
39        self._file_cache: FileCache[t.List[Model]] = FileCache(
40            path,
41            prefix="model_definition",
42        )
43
44    def get_or_load(
45        self, name: str, entry_id: str = "", *, loader: t.Callable[[], t.List[Model]]
46    ) -> t.List[Model]:
47        """Returns an existing cached model definition or loads and caches a new one.
48        Args:
49            name: The name of the entry.
50            entry_id: The unique entry identifier. Used for cache invalidation.
51            loader: Used to load a new model definition when no cached instance was found.
52        Returns:
53            The model definition.
54        """
55        cache_entry = self._file_cache.get(name, entry_id)
56        if isinstance(cache_entry, list) and isinstance(seq_get(cache_entry, 0), _Model):
57            return cache_entry
58
59        models = loader()
60        if isinstance(models, list) and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)):
61            # make sure we preload full_depends_on
62            for model in models:
63                model.full_depends_on
64
65            self._file_cache.put(name, entry_id, value=models)
66        return models
67
68    def put(self, models: t.List[Model], name: str, entry_id: str = "") -> bool:
69        if models and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)):
70            # make sure we preload full_depends_on
71            for model in models:
72                model.full_depends_on
73
74            self._file_cache.put(name, entry_id, value=models)
75            return True
76
77        return False
78
79    def get(self, name: str, entry_id: str = "") -> t.List[Model]:
80        return self._file_cache.get(name, entry_id) or []

File-based cache implementation for model definitions.

Arguments:
  • path: The path to the cache folder.
ModelCache(path: pathlib.Path)
37    def __init__(self, path: Path):
38        self.path = path
39        self._file_cache: FileCache[t.List[Model]] = FileCache(
40            path,
41            prefix="model_definition",
42        )
path
44    def get_or_load(
45        self, name: str, entry_id: str = "", *, loader: t.Callable[[], t.List[Model]]
46    ) -> t.List[Model]:
47        """Returns an existing cached model definition or loads and caches a new one.
48        Args:
49            name: The name of the entry.
50            entry_id: The unique entry identifier. Used for cache invalidation.
51            loader: Used to load a new model definition when no cached instance was found.
52        Returns:
53            The model definition.
54        """
55        cache_entry = self._file_cache.get(name, entry_id)
56        if isinstance(cache_entry, list) and isinstance(seq_get(cache_entry, 0), _Model):
57            return cache_entry
58
59        models = loader()
60        if isinstance(models, list) and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)):
61            # make sure we preload full_depends_on
62            for model in models:
63                model.full_depends_on
64
65            self._file_cache.put(name, entry_id, value=models)
66        return models

Returns an existing cached model definition or loads and caches a new one.

Arguments:
  • name: The name of the entry.
  • entry_id: The unique entry identifier. Used for cache invalidation.
  • loader: Used to load a new model definition when no cached instance was found.
Returns:

The model definition.

68    def put(self, models: t.List[Model], name: str, entry_id: str = "") -> bool:
69        if models and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)):
70            # make sure we preload full_depends_on
71            for model in models:
72                model.full_depends_on
73
74            self._file_cache.put(name, entry_id, value=models)
75            return True
76
77        return False
79    def get(self, name: str, entry_id: str = "") -> t.List[Model]:
80        return self._file_cache.get(name, entry_id) or []
@dataclass
class OptimizedQueryCacheEntry:
83@dataclass
84class OptimizedQueryCacheEntry:
85    optimized_rendered_query: t.Optional[exp.Query]
86    renderer_violations: t.Optional[t.Dict[type[Rule], t.Any]]
OptimizedQueryCacheEntry( optimized_rendered_query: Optional[sqlglot.expressions.query.Query], renderer_violations: Optional[Dict[type[sqlmesh.core.linter.rule.Rule], Any]])
optimized_rendered_query: Optional[sqlglot.expressions.query.Query]
renderer_violations: Optional[Dict[type[sqlmesh.core.linter.rule.Rule], Any]]
class OptimizedQueryCache:
 89class OptimizedQueryCache:
 90    """File-based cache implementation for optimized model queries.
 91
 92    Args:
 93        path: The path to the cache folder.
 94    """
 95
 96    def __init__(self, path: Path):
 97        self.path = path
 98        self._file_cache: FileCache[OptimizedQueryCacheEntry] = FileCache(
 99            path, prefix="optimized_query"
100        )
101
102    def with_optimized_query(self, model: Model, name: t.Optional[str] = None) -> bool:
103        """Adds an optimized query to the model's in-memory cache.
104
105        Args:
106            model: The model to add the optimized query to.
107            name: The cache entry name of the model.
108        """
109        if not isinstance(model, SqlModel):
110            return False
111
112        name = self._entry_name(model) if name is None else name
113        cache_entry = self._file_cache.get(name)
114        if cache_entry:
115            try:
116                # If the optimized rendered query is None, then there are likely adapter calls in the query
117                # that prevent us from rendering it at load time. This means that we can safely set the
118                # unoptimized cache to None as well to prevent attempts to render it downstream.
119                optimized = cache_entry.optimized_rendered_query is not None
120                model._query_renderer.update_cache(
121                    cache_entry.optimized_rendered_query,
122                    cache_entry.renderer_violations,
123                    optimized=optimized,
124                )
125                return True
126            except Exception as ex:
127                logger.warning("Failed to load a cache entry '%s': %s", name, ex)
128
129        self._put(name, model)
130        return False
131
132    def put(self, model: Model) -> t.Optional[str]:
133        if not isinstance(model, SqlModel):
134            return None
135
136        name = self._entry_name(model)
137
138        if self._file_cache.exists(name):
139            return name
140
141        self._put(name, model)
142        return name
143
144    def _put(self, name: str, model: SqlModel) -> None:
145        optimized_query = model.render_query()
146
147        new_entry = OptimizedQueryCacheEntry(
148            optimized_rendered_query=optimized_query,
149            renderer_violations=model.violated_rules_for_query,
150        )
151        self._file_cache.put(name, value=new_entry)
152
153    @staticmethod
154    def _entry_name(model: SqlModel) -> str:
155        hash_data = _mapping_schema_hash_data(model.mapping_schema)
156        hash_data.append(gen(model.query, comments=True))
157        hash_data.append(str([gen(d) for d in model.macro_definitions]))
158        hash_data.append(str([(k, v) for k, v in model.sorted_python_env]))
159        hash_data.extend(model.jinja_macros.data_hash_values)
160        return f"{model.name}_{crc32(hash_data)}"

File-based cache implementation for optimized model queries.

Arguments:
  • path: The path to the cache folder.
OptimizedQueryCache(path: pathlib.Path)
 96    def __init__(self, path: Path):
 97        self.path = path
 98        self._file_cache: FileCache[OptimizedQueryCacheEntry] = FileCache(
 99            path, prefix="optimized_query"
100        )
path
102    def with_optimized_query(self, model: Model, name: t.Optional[str] = None) -> bool:
103        """Adds an optimized query to the model's in-memory cache.
104
105        Args:
106            model: The model to add the optimized query to.
107            name: The cache entry name of the model.
108        """
109        if not isinstance(model, SqlModel):
110            return False
111
112        name = self._entry_name(model) if name is None else name
113        cache_entry = self._file_cache.get(name)
114        if cache_entry:
115            try:
116                # If the optimized rendered query is None, then there are likely adapter calls in the query
117                # that prevent us from rendering it at load time. This means that we can safely set the
118                # unoptimized cache to None as well to prevent attempts to render it downstream.
119                optimized = cache_entry.optimized_rendered_query is not None
120                model._query_renderer.update_cache(
121                    cache_entry.optimized_rendered_query,
122                    cache_entry.renderer_violations,
123                    optimized=optimized,
124                )
125                return True
126            except Exception as ex:
127                logger.warning("Failed to load a cache entry '%s': %s", name, ex)
128
129        self._put(name, model)
130        return False

Adds an optimized query to the model's in-memory cache.

Arguments:
  • model: The model to add the optimized query to.
  • name: The cache entry name of the model.
132    def put(self, model: Model) -> t.Optional[str]:
133        if not isinstance(model, SqlModel):
134            return None
135
136        name = self._entry_name(model)
137
138        if self._file_cache.exists(name):
139            return name
140
141        self._put(name, model)
142        return name
def optimized_query_cache_pool( optimized_query_cache: OptimizedQueryCache) -> Union[sqlmesh.utils.process.SynchronousPoolExecutor, concurrent.futures.process.ProcessPoolExecutor]:
163def optimized_query_cache_pool(optimized_query_cache: OptimizedQueryCache) -> PoolExecutor:
164    return create_process_pool_executor(
165        initializer=_init_optimized_query_cache,
166        initargs=(optimized_query_cache,),
167        max_workers=c.MAX_FORK_WORKERS,
168    )
179def load_optimized_query(
180    model_snapshot_id: t.Tuple[Model, SnapshotId],
181) -> t.Tuple[SnapshotId, t.Optional[str]]:
182    assert _optimized_query_cache
183    model, snapshot_id = model_snapshot_id
184
185    entry_name = None
186
187    if isinstance(model, SqlModel):
188        try:
189            entry_name = _optimized_query_cache.put(model)
190        except:
191            # this can happen if there is a query rendering error.
192            # for example, the model query references some python library or function that was available
193            # at the time the model was created but has since been removed locally
194            logger.exception(f"Failed to cache optimized query for model '{model.name}'")
195
196    return snapshot_id, entry_name
def load_optimized_query_and_mapping( model: Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel], mapping: Dict) -> Tuple[str, Optional[str], str, str, Dict]:
199def load_optimized_query_and_mapping(
200    model: Model, mapping: t.Dict
201) -> t.Tuple[str, t.Optional[str], str, str, t.Dict]:
202    assert _optimized_query_cache
203
204    schema = MappingSchema(normalize=False)
205    for parent, columns_to_types in mapping.items():
206        schema.add_table(parent, columns_to_types, dialect=model.dialect)
207    model.update_schema(schema)
208
209    if isinstance(model, SqlModel):
210        entry_name = _optimized_query_cache._entry_name(model)
211        _optimized_query_cache.with_optimized_query(model, entry_name)
212    else:
213        entry_name = None
214
215    return (
216        model.fqn,
217        entry_name,
218        model.data_hash,
219        model.metadata_hash,
220        model.mapping_schema,
221    )