sqlmesh.core.model.cache
1from __future__ import annotations 2 3import logging 4import typing as t 5from pathlib import Path 6 7from sqlglot import exp 8from sqlglot.helper import seq_get 9from sqlglot.optimizer.simplify import gen 10from sqlglot.schema import MappingSchema 11 12from sqlmesh.core import constants as c 13from sqlmesh.core.model.definition import ExternalModel, Model, SqlModel, _Model 14from sqlmesh.utils.cache import FileCache 15from sqlmesh.utils.hashing import crc32 16from sqlmesh.utils.process import PoolExecutor, create_process_pool_executor 17 18from dataclasses import dataclass 19 20logger = logging.getLogger(__name__) 21 22if t.TYPE_CHECKING: 23 from sqlmesh.core.snapshot import SnapshotId 24 from sqlmesh.core.linter.rule import Rule 25 26 T = t.TypeVar("T") 27 28 29class ModelCache: 30 """File-based cache implementation for model definitions. 31 32 Args: 33 path: The path to the cache folder. 34 """ 35 36 def __init__(self, path: Path): 37 self.path = path 38 self._file_cache: FileCache[t.List[Model]] = FileCache( 39 path, 40 prefix="model_definition", 41 ) 42 43 def get_or_load( 44 self, name: str, entry_id: str = "", *, loader: t.Callable[[], t.List[Model]] 45 ) -> t.List[Model]: 46 """Returns an existing cached model definition or loads and caches a new one. 47 Args: 48 name: The name of the entry. 49 entry_id: The unique entry identifier. Used for cache invalidation. 50 loader: Used to load a new model definition when no cached instance was found. 51 Returns: 52 The model definition. 53 """ 54 cache_entry = self._file_cache.get(name, entry_id) 55 if isinstance(cache_entry, list) and isinstance(seq_get(cache_entry, 0), _Model): 56 return cache_entry 57 58 models = loader() 59 if isinstance(models, list) and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)): 60 # make sure we preload full_depends_on 61 for model in models: 62 model.full_depends_on 63 64 self._file_cache.put(name, entry_id, value=models) 65 return models 66 67 def put(self, models: t.List[Model], name: str, entry_id: str = "") -> bool: 68 if models and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)): 69 # make sure we preload full_depends_on 70 for model in models: 71 model.full_depends_on 72 73 self._file_cache.put(name, entry_id, value=models) 74 return True 75 76 return False 77 78 def get(self, name: str, entry_id: str = "") -> t.List[Model]: 79 return self._file_cache.get(name, entry_id) or [] 80 81 82@dataclass 83class OptimizedQueryCacheEntry: 84 optimized_rendered_query: t.Optional[exp.Query] 85 renderer_violations: t.Optional[t.Dict[type[Rule], t.Any]] 86 87 88class OptimizedQueryCache: 89 """File-based cache implementation for optimized model queries. 90 91 Args: 92 path: The path to the cache folder. 93 """ 94 95 def __init__(self, path: Path): 96 self.path = path 97 self._file_cache: FileCache[OptimizedQueryCacheEntry] = FileCache( 98 path, prefix="optimized_query" 99 ) 100 101 def with_optimized_query(self, model: Model, name: t.Optional[str] = None) -> bool: 102 """Adds an optimized query to the model's in-memory cache. 103 104 Args: 105 model: The model to add the optimized query to. 106 name: The cache entry name of the model. 107 """ 108 if not isinstance(model, SqlModel): 109 return False 110 111 name = self._entry_name(model) if name is None else name 112 cache_entry = self._file_cache.get(name) 113 if cache_entry: 114 try: 115 # If the optimized rendered query is None, then there are likely adapter calls in the query 116 # that prevent us from rendering it at load time. This means that we can safely set the 117 # unoptimized cache to None as well to prevent attempts to render it downstream. 118 optimized = cache_entry.optimized_rendered_query is not None 119 model._query_renderer.update_cache( 120 cache_entry.optimized_rendered_query, 121 cache_entry.renderer_violations, 122 optimized=optimized, 123 ) 124 return True 125 except Exception as ex: 126 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 127 128 self._put(name, model) 129 return False 130 131 def put(self, model: Model) -> t.Optional[str]: 132 if not isinstance(model, SqlModel): 133 return None 134 135 name = self._entry_name(model) 136 137 if self._file_cache.exists(name): 138 return name 139 140 self._put(name, model) 141 return name 142 143 def _put(self, name: str, model: SqlModel) -> None: 144 optimized_query = model.render_query() 145 146 new_entry = OptimizedQueryCacheEntry( 147 optimized_rendered_query=optimized_query, 148 renderer_violations=model.violated_rules_for_query, 149 ) 150 self._file_cache.put(name, value=new_entry) 151 152 @staticmethod 153 def _entry_name(model: SqlModel) -> str: 154 hash_data = _mapping_schema_hash_data(model.mapping_schema) 155 hash_data.append(gen(model.query, comments=True)) 156 hash_data.append(str([gen(d) for d in model.macro_definitions])) 157 hash_data.append(str([(k, v) for k, v in model.sorted_python_env])) 158 hash_data.extend(model.jinja_macros.data_hash_values) 159 return f"{model.name}_{crc32(hash_data)}" 160 161 162def optimized_query_cache_pool(optimized_query_cache: OptimizedQueryCache) -> PoolExecutor: 163 return create_process_pool_executor( 164 initializer=_init_optimized_query_cache, 165 initargs=(optimized_query_cache,), 166 max_workers=c.MAX_FORK_WORKERS, 167 ) 168 169 170_optimized_query_cache: t.Optional[OptimizedQueryCache] = None 171 172 173def _init_optimized_query_cache(optimized_query_cache: OptimizedQueryCache) -> None: 174 global _optimized_query_cache 175 _optimized_query_cache = optimized_query_cache 176 177 178def load_optimized_query( 179 model_snapshot_id: t.Tuple[Model, SnapshotId], 180) -> t.Tuple[SnapshotId, t.Optional[str]]: 181 assert _optimized_query_cache 182 model, snapshot_id = model_snapshot_id 183 184 entry_name = None 185 186 if isinstance(model, SqlModel): 187 try: 188 entry_name = _optimized_query_cache.put(model) 189 except: 190 # this can happen if there is a query rendering error. 191 # for example, the model query references some python library or function that was available 192 # at the time the model was created but has since been removed locally 193 logger.exception(f"Failed to cache optimized query for model '{model.name}'") 194 195 return snapshot_id, entry_name 196 197 198def load_optimized_query_and_mapping( 199 model: Model, mapping: t.Dict 200) -> t.Tuple[str, t.Optional[str], str, str, t.Dict]: 201 assert _optimized_query_cache 202 203 schema = MappingSchema(normalize=False) 204 for parent, columns_to_types in mapping.items(): 205 schema.add_table(parent, columns_to_types, dialect=model.dialect) 206 model.update_schema(schema) 207 208 if isinstance(model, SqlModel): 209 entry_name = _optimized_query_cache._entry_name(model) 210 _optimized_query_cache.with_optimized_query(model, entry_name) 211 else: 212 entry_name = None 213 214 return ( 215 model.fqn, 216 entry_name, 217 model.data_hash, 218 model.metadata_hash, 219 model.mapping_schema, 220 ) 221 222 223def _mapping_schema_hash_data(schema: t.Dict[str, t.Any]) -> t.List[str]: 224 keys = sorted(schema) if all(isinstance(v, dict) for v in schema.values()) else schema 225 226 data = [] 227 for k in keys: 228 data.append(k) 229 if isinstance(schema[k], dict): 230 data.extend(_mapping_schema_hash_data(schema[k])) 231 else: 232 data.append(str(schema[k])) 233 234 return data
logger =
<Logger sqlmesh.core.model.cache (WARNING)>
class
ModelCache:
30class ModelCache: 31 """File-based cache implementation for model definitions. 32 33 Args: 34 path: The path to the cache folder. 35 """ 36 37 def __init__(self, path: Path): 38 self.path = path 39 self._file_cache: FileCache[t.List[Model]] = FileCache( 40 path, 41 prefix="model_definition", 42 ) 43 44 def get_or_load( 45 self, name: str, entry_id: str = "", *, loader: t.Callable[[], t.List[Model]] 46 ) -> t.List[Model]: 47 """Returns an existing cached model definition or loads and caches a new one. 48 Args: 49 name: The name of the entry. 50 entry_id: The unique entry identifier. Used for cache invalidation. 51 loader: Used to load a new model definition when no cached instance was found. 52 Returns: 53 The model definition. 54 """ 55 cache_entry = self._file_cache.get(name, entry_id) 56 if isinstance(cache_entry, list) and isinstance(seq_get(cache_entry, 0), _Model): 57 return cache_entry 58 59 models = loader() 60 if isinstance(models, list) and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)): 61 # make sure we preload full_depends_on 62 for model in models: 63 model.full_depends_on 64 65 self._file_cache.put(name, entry_id, value=models) 66 return models 67 68 def put(self, models: t.List[Model], name: str, entry_id: str = "") -> bool: 69 if models and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)): 70 # make sure we preload full_depends_on 71 for model in models: 72 model.full_depends_on 73 74 self._file_cache.put(name, entry_id, value=models) 75 return True 76 77 return False 78 79 def get(self, name: str, entry_id: str = "") -> t.List[Model]: 80 return self._file_cache.get(name, entry_id) or []
File-based cache implementation for model definitions.
Arguments:
- path: The path to the cache folder.
def
get_or_load( self, name: str, entry_id: str = '', *, loader: Callable[[], List[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]]]) -> List[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]]:
44 def get_or_load( 45 self, name: str, entry_id: str = "", *, loader: t.Callable[[], t.List[Model]] 46 ) -> t.List[Model]: 47 """Returns an existing cached model definition or loads and caches a new one. 48 Args: 49 name: The name of the entry. 50 entry_id: The unique entry identifier. Used for cache invalidation. 51 loader: Used to load a new model definition when no cached instance was found. 52 Returns: 53 The model definition. 54 """ 55 cache_entry = self._file_cache.get(name, entry_id) 56 if isinstance(cache_entry, list) and isinstance(seq_get(cache_entry, 0), _Model): 57 return cache_entry 58 59 models = loader() 60 if isinstance(models, list) and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)): 61 # make sure we preload full_depends_on 62 for model in models: 63 model.full_depends_on 64 65 self._file_cache.put(name, entry_id, value=models) 66 return models
Returns an existing cached model definition or loads and caches a new one.
Arguments:
- name: The name of the entry.
- entry_id: The unique entry identifier. Used for cache invalidation.
- loader: Used to load a new model definition when no cached instance was found.
Returns:
The model definition.
def
put( self, models: List[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], name: str, entry_id: str = '') -> bool:
68 def put(self, models: t.List[Model], name: str, entry_id: str = "") -> bool: 69 if models and isinstance(seq_get(models, 0), (SqlModel, ExternalModel)): 70 # make sure we preload full_depends_on 71 for model in models: 72 model.full_depends_on 73 74 self._file_cache.put(name, entry_id, value=models) 75 return True 76 77 return False
def
get( self, name: str, entry_id: str = '') -> List[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]]:
@dataclass
class
OptimizedQueryCacheEntry:
83@dataclass 84class OptimizedQueryCacheEntry: 85 optimized_rendered_query: t.Optional[exp.Query] 86 renderer_violations: t.Optional[t.Dict[type[Rule], t.Any]]
OptimizedQueryCacheEntry( optimized_rendered_query: Optional[sqlglot.expressions.query.Query], renderer_violations: Optional[Dict[type[sqlmesh.core.linter.rule.Rule], Any]])
renderer_violations: Optional[Dict[type[sqlmesh.core.linter.rule.Rule], Any]]
class
OptimizedQueryCache:
89class OptimizedQueryCache: 90 """File-based cache implementation for optimized model queries. 91 92 Args: 93 path: The path to the cache folder. 94 """ 95 96 def __init__(self, path: Path): 97 self.path = path 98 self._file_cache: FileCache[OptimizedQueryCacheEntry] = FileCache( 99 path, prefix="optimized_query" 100 ) 101 102 def with_optimized_query(self, model: Model, name: t.Optional[str] = None) -> bool: 103 """Adds an optimized query to the model's in-memory cache. 104 105 Args: 106 model: The model to add the optimized query to. 107 name: The cache entry name of the model. 108 """ 109 if not isinstance(model, SqlModel): 110 return False 111 112 name = self._entry_name(model) if name is None else name 113 cache_entry = self._file_cache.get(name) 114 if cache_entry: 115 try: 116 # If the optimized rendered query is None, then there are likely adapter calls in the query 117 # that prevent us from rendering it at load time. This means that we can safely set the 118 # unoptimized cache to None as well to prevent attempts to render it downstream. 119 optimized = cache_entry.optimized_rendered_query is not None 120 model._query_renderer.update_cache( 121 cache_entry.optimized_rendered_query, 122 cache_entry.renderer_violations, 123 optimized=optimized, 124 ) 125 return True 126 except Exception as ex: 127 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 128 129 self._put(name, model) 130 return False 131 132 def put(self, model: Model) -> t.Optional[str]: 133 if not isinstance(model, SqlModel): 134 return None 135 136 name = self._entry_name(model) 137 138 if self._file_cache.exists(name): 139 return name 140 141 self._put(name, model) 142 return name 143 144 def _put(self, name: str, model: SqlModel) -> None: 145 optimized_query = model.render_query() 146 147 new_entry = OptimizedQueryCacheEntry( 148 optimized_rendered_query=optimized_query, 149 renderer_violations=model.violated_rules_for_query, 150 ) 151 self._file_cache.put(name, value=new_entry) 152 153 @staticmethod 154 def _entry_name(model: SqlModel) -> str: 155 hash_data = _mapping_schema_hash_data(model.mapping_schema) 156 hash_data.append(gen(model.query, comments=True)) 157 hash_data.append(str([gen(d) for d in model.macro_definitions])) 158 hash_data.append(str([(k, v) for k, v in model.sorted_python_env])) 159 hash_data.extend(model.jinja_macros.data_hash_values) 160 return f"{model.name}_{crc32(hash_data)}"
File-based cache implementation for optimized model queries.
Arguments:
- path: The path to the cache folder.
def
with_optimized_query( self, model: Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel], name: Optional[str] = None) -> bool:
102 def with_optimized_query(self, model: Model, name: t.Optional[str] = None) -> bool: 103 """Adds an optimized query to the model's in-memory cache. 104 105 Args: 106 model: The model to add the optimized query to. 107 name: The cache entry name of the model. 108 """ 109 if not isinstance(model, SqlModel): 110 return False 111 112 name = self._entry_name(model) if name is None else name 113 cache_entry = self._file_cache.get(name) 114 if cache_entry: 115 try: 116 # If the optimized rendered query is None, then there are likely adapter calls in the query 117 # that prevent us from rendering it at load time. This means that we can safely set the 118 # unoptimized cache to None as well to prevent attempts to render it downstream. 119 optimized = cache_entry.optimized_rendered_query is not None 120 model._query_renderer.update_cache( 121 cache_entry.optimized_rendered_query, 122 cache_entry.renderer_violations, 123 optimized=optimized, 124 ) 125 return True 126 except Exception as ex: 127 logger.warning("Failed to load a cache entry '%s': %s", name, ex) 128 129 self._put(name, model) 130 return False
Adds an optimized query to the model's in-memory cache.
Arguments:
- model: The model to add the optimized query to.
- name: The cache entry name of the model.
def
put( self, model: Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]) -> Optional[str]:
def
optimized_query_cache_pool( optimized_query_cache: OptimizedQueryCache) -> Union[sqlmesh.utils.process.SynchronousPoolExecutor, concurrent.futures.process.ProcessPoolExecutor]:
def
load_optimized_query( model_snapshot_id: Tuple[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel], sqlmesh.core.snapshot.definition.SnapshotId]) -> Tuple[sqlmesh.core.snapshot.definition.SnapshotId, Optional[str]]:
179def load_optimized_query( 180 model_snapshot_id: t.Tuple[Model, SnapshotId], 181) -> t.Tuple[SnapshotId, t.Optional[str]]: 182 assert _optimized_query_cache 183 model, snapshot_id = model_snapshot_id 184 185 entry_name = None 186 187 if isinstance(model, SqlModel): 188 try: 189 entry_name = _optimized_query_cache.put(model) 190 except: 191 # this can happen if there is a query rendering error. 192 # for example, the model query references some python library or function that was available 193 # at the time the model was created but has since been removed locally 194 logger.exception(f"Failed to cache optimized query for model '{model.name}'") 195 196 return snapshot_id, entry_name
def
load_optimized_query_and_mapping( model: Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel], mapping: Dict) -> Tuple[str, Optional[str], str, str, Dict]:
199def load_optimized_query_and_mapping( 200 model: Model, mapping: t.Dict 201) -> t.Tuple[str, t.Optional[str], str, str, t.Dict]: 202 assert _optimized_query_cache 203 204 schema = MappingSchema(normalize=False) 205 for parent, columns_to_types in mapping.items(): 206 schema.add_table(parent, columns_to_types, dialect=model.dialect) 207 model.update_schema(schema) 208 209 if isinstance(model, SqlModel): 210 entry_name = _optimized_query_cache._entry_name(model) 211 _optimized_query_cache.with_optimized_query(model, entry_name) 212 else: 213 entry_name = None 214 215 return ( 216 model.fqn, 217 entry_name, 218 model.data_hash, 219 model.metadata_hash, 220 model.mapping_schema, 221 )