sqlmesh.core.loader
1from __future__ import annotations 2 3import abc 4import glob 5import itertools 6import linecache 7import os 8import re 9import typing as t 10from collections import Counter, defaultdict 11from dataclasses import dataclass 12from pathlib import Path 13from pydantic import ValidationError 14import concurrent.futures 15 16from sqlglot.errors import SqlglotError 17from sqlglot import exp 18from sqlglot.helper import subclasses 19 20from sqlmesh.core import constants as c 21from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit, load_multiple_audits 22from sqlmesh.core.console import Console 23from sqlmesh.core.dialect import parse 24from sqlmesh.core.environment import EnvironmentStatements 25from sqlmesh.core.linter.rule import Rule 26from sqlmesh.core.linter.definition import RuleSet 27from sqlmesh.core.macros import MacroRegistry, macro 28from sqlmesh.core.metric import Metric, MetricMeta, expand_metrics, load_metric_ddl 29from sqlmesh.core.model import ( 30 Model, 31 ModelCache, 32 create_external_model, 33 load_sql_based_models, 34) 35from sqlmesh.core.model import model as model_registry 36from sqlmesh.core.model.common import make_python_env 37from sqlmesh.core.signal import signal 38from sqlmesh.core.test import ModelTestMetadata 39from sqlmesh.utils import UniqueKeyDict, sys_path 40from sqlmesh.utils.errors import ConfigError 41from sqlmesh.utils.jinja import JinjaMacroRegistry, MacroExtractor 42from sqlmesh.utils.metaprogramming import import_python_file 43from sqlmesh.utils.pydantic import validation_error_message 44from sqlmesh.utils.process import create_process_pool_executor 45from sqlmesh.utils.yaml import YAML, load as yaml_load 46 47 48if t.TYPE_CHECKING: 49 from sqlmesh.core.context import GenericContext 50 51 52GATEWAY_PATTERN = re.compile(r"gateway:\s*([^\s]+)") 53 54 55@dataclass 56class LoadedProject: 57 macros: MacroRegistry 58 jinja_macros: JinjaMacroRegistry 59 models: UniqueKeyDict[str, Model] 60 standalone_audits: UniqueKeyDict[str, StandaloneAudit] 61 audits: UniqueKeyDict[str, ModelAudit] 62 metrics: UniqueKeyDict[str, Metric] 63 requirements: t.Dict[str, str] 64 excluded_requirements: t.Set[str] 65 environment_statements: t.List[EnvironmentStatements] 66 user_rules: RuleSet 67 model_test_metadata: t.List[ModelTestMetadata] 68 69 70class CacheBase(abc.ABC): 71 @abc.abstractmethod 72 def get_or_load_models( 73 self, target_path: Path, loader: t.Callable[[], t.List[Model]] 74 ) -> t.List[Model]: 75 """Get or load all models from cache.""" 76 pass 77 78 @abc.abstractmethod 79 def put(self, models: t.List[Model], path: Path) -> bool: 80 """Store models in the cache associated with the given path. 81 82 Args: 83 models: List of models to cache 84 path: File path to associate with the cached models 85 86 Returns: 87 True if the models were successfully cached, 88 False otherwise (empty list, not a list, unsupported model types) 89 """ 90 pass 91 92 @abc.abstractmethod 93 def get(self, path: Path) -> t.List[Model]: 94 """Retrieve models from the cache for a given path. 95 96 Args: 97 path: File path to look up in the cache 98 99 Returns: 100 List of cached models associated with the path, an empty list if no cache entry exists 101 """ 102 pass 103 104 105_defaults: t.Optional[t.Dict[str, t.Any]] = None 106_cache: t.Optional[CacheBase] = None 107_config_essentials: t.Optional[t.Dict[str, t.Any]] = None 108_selected_gateway: t.Optional[str] = None 109 110 111def _init_model_defaults( 112 config_essentials: t.Dict[str, t.Any], 113 selected_gateway: t.Optional[str], 114 model_loading_defaults: t.Optional[t.Dict[str, t.Any]] = None, 115 cache: t.Optional[CacheBase] = None, 116 console: t.Optional[Console] = None, 117) -> None: 118 global _defaults, _cache, _config_essentials, _selected_gateway 119 _defaults = model_loading_defaults 120 _cache = cache 121 _config_essentials = config_essentials 122 _selected_gateway = selected_gateway 123 124 # Set the console passed from the parent process 125 if console is not None: 126 from sqlmesh.core.console import set_console 127 128 set_console(console) 129 130 131def load_sql_models(path: Path) -> t.List[Model]: 132 assert _defaults 133 assert _cache 134 135 with open(path, "r", encoding="utf-8") as file: 136 expressions = parse(file.read(), default_dialect=_defaults["dialect"]) 137 models = load_sql_based_models(expressions, path=Path(path).absolute(), **_defaults) 138 139 return [] if _cache.put(models, path) else models 140 141 142def get_variables(gateway_name: t.Optional[str] = None) -> t.Dict[str, t.Any]: 143 assert _config_essentials 144 145 gateway_name = gateway_name or _selected_gateway 146 147 try: 148 gateway = _config_essentials["gateways"].get(gateway_name) 149 except ConfigError: 150 from sqlmesh.core.console import get_console 151 152 get_console().log_warning( 153 f"Gateway '{gateway_name}' not found in project '{_config_essentials['project']}'." 154 ) 155 gateway = None 156 157 return { 158 **_config_essentials["variables"], 159 **(gateway.variables if gateway else {}), 160 c.GATEWAY: gateway_name, 161 } 162 163 164class Loader(abc.ABC): 165 """Abstract base class to load macros and models for a context""" 166 167 def __init__(self, context: GenericContext, path: Path) -> None: 168 # This ensures pandas is imported before any model loading happens in the forked process 169 # to avoid macOS fork() safety issues, see https://stackoverflow.com/a/52230415. Without 170 # it, the following error was observerd in a macOS 15.5 system: 171 # 172 # "+[NSMutableString initialize] may have been in progress in another thread when fork() was called." 173 import pandas as pd # noqa 174 175 from sqlmesh.core.console import get_console 176 177 self._path_mtimes: t.Dict[Path, float] = {} 178 self.context = context 179 self.config_path = path 180 self.config = self.context.configs[self.config_path] 181 self._variables_by_gateway: t.Dict[str, t.Dict[str, t.Any]] = {} 182 self._console = get_console() 183 184 self.config_essentials = { 185 "project": self.config.project, 186 "variables": self.config.variables, 187 "gateways": self.config.gateways, 188 } 189 _init_model_defaults(self.config_essentials, self.context.selected_gateway) 190 191 def load(self) -> LoadedProject: 192 """ 193 Loads all macros and models in the context's path. 194 195 Returns: 196 A loaded project object. 197 """ 198 with sys_path(self.config_path): 199 # python files are cached by the system 200 # need to manually clear here so we can reload macros 201 linecache.clearcache() 202 self._path_mtimes.clear() 203 204 self._load_materializations() 205 signals = self._load_signals() 206 207 config_mtimes: t.Dict[Path, t.List[float]] = defaultdict(list) 208 209 for config_file in self.config_path.glob("config.*"): 210 self._track_file(config_file) 211 config_mtimes[self.config_path].append(self._path_mtimes[config_file]) 212 213 for config_file in c.SQLMESH_PATH.glob("config.*"): 214 self._track_file(config_file) 215 config_mtimes[c.SQLMESH_PATH].append(self._path_mtimes[config_file]) 216 217 self._config_mtimes = {path: max(mtimes) for path, mtimes in config_mtimes.items()} 218 219 macros, jinja_macros = self._load_scripts() 220 audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("audits") 221 standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 222 "standalone_audits" 223 ) 224 225 for name, audit in self._load_audits(macros=macros, jinja_macros=jinja_macros).items(): 226 if isinstance(audit, ModelAudit): 227 audits[name] = audit 228 else: 229 standalone_audits[name] = audit 230 231 models = self._load_models( 232 macros, 233 jinja_macros, 234 self.context.selected_gateway, 235 audits, 236 signals, 237 ) 238 239 metrics = self._load_metrics() 240 241 requirements, excluded_requirements = self._load_requirements() 242 243 environment_statements = self._load_environment_statements(macros=macros) 244 245 user_rules = self._load_linting_rules() 246 247 model_test_metadata = self.load_model_tests() 248 249 project = LoadedProject( 250 macros=macros, 251 jinja_macros=jinja_macros, 252 models=models, 253 audits=audits, 254 standalone_audits=standalone_audits, 255 metrics=expand_metrics(metrics), 256 requirements=requirements, 257 excluded_requirements=excluded_requirements, 258 environment_statements=environment_statements, 259 user_rules=user_rules, 260 model_test_metadata=model_test_metadata, 261 ) 262 return project 263 264 def reload_needed(self) -> bool: 265 """ 266 Checks for any modifications to the files the macros and models depend on 267 since the last load. 268 269 Returns: 270 True if a modification is found; False otherwise 271 """ 272 return any( 273 not path.exists() or path.stat().st_mtime > initial_mtime 274 for path, initial_mtime in self._path_mtimes.copy().items() 275 ) 276 277 @abc.abstractmethod 278 def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]: 279 """Loads all user defined macros.""" 280 281 @abc.abstractmethod 282 def _load_models( 283 self, 284 macros: MacroRegistry, 285 jinja_macros: JinjaMacroRegistry, 286 gateway: t.Optional[str], 287 audits: UniqueKeyDict[str, ModelAudit], 288 signals: UniqueKeyDict[str, signal], 289 ) -> UniqueKeyDict[str, Model]: 290 """Loads all models.""" 291 292 @abc.abstractmethod 293 def _load_audits( 294 self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry 295 ) -> UniqueKeyDict[str, Audit]: 296 """Loads all audits.""" 297 298 def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]: 299 """Loads environment statements.""" 300 return [] 301 302 def load_materializations(self) -> None: 303 """Loads custom materializations.""" 304 305 def _load_materializations(self) -> None: 306 pass 307 308 def _load_signals(self) -> UniqueKeyDict[str, signal]: 309 return UniqueKeyDict("signals") 310 311 def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]: 312 return UniqueKeyDict("metrics") 313 314 def _load_external_models( 315 self, 316 audits: UniqueKeyDict[str, ModelAudit], 317 cache: CacheBase, 318 gateway: t.Optional[str] = None, 319 ) -> UniqueKeyDict[str, Model]: 320 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 321 external_models_yaml = Path(self.config_path / c.EXTERNAL_MODELS_YAML) 322 deprecated_yaml = Path(self.config_path / c.EXTERNAL_MODELS_DEPRECATED_YAML) 323 external_models_path = self.config_path / c.EXTERNAL_MODELS 324 325 paths_to_load = [] 326 if external_models_yaml.exists(): 327 paths_to_load.append(external_models_yaml) 328 elif deprecated_yaml.exists(): 329 paths_to_load.append(deprecated_yaml) 330 331 if external_models_path.exists() and external_models_path.is_dir(): 332 paths_to_load.extend(self._glob_paths(external_models_path, extension=".yaml")) 333 334 def _load(path: Path) -> t.List[Model]: 335 try: 336 with open(path, "r", encoding="utf-8") as file: 337 yaml = YAML().load(file) 338 # Allow empty YAML files to return an empty list 339 if yaml is None: 340 return [] 341 return [ 342 create_external_model( 343 defaults=self.config.model_defaults.dict(), 344 path=path, 345 project=self.config.project, 346 audit_definitions=audits, 347 **{ 348 "dialect": self.config.model_defaults.dialect, 349 "default_catalog": self.context.default_catalog, 350 **row, 351 }, 352 ) 353 for row in yaml 354 ] 355 except Exception as ex: 356 raise ConfigError(self._failed_to_load_model_error(path, ex), path) 357 358 for path in paths_to_load: 359 self._track_file(path) 360 361 external_models = cache.get_or_load_models(path, lambda: _load(path)) 362 363 # external models with no explicit gateway defined form the base set 364 for model in external_models: 365 if model.gateway is None: 366 if model.fqn in models: 367 raise ConfigError( 368 self._failed_to_load_model_error( 369 path, f"Duplicate external model name: '{model.name}'." 370 ), 371 path, 372 ) 373 models[model.fqn] = model 374 375 # however, if there is a gateway defined, gateway-specific models take precedence 376 if gateway: 377 gateway = gateway.lower() 378 for model in external_models: 379 if model.gateway == gateway: 380 if model.fqn in models and models[model.fqn].gateway == gateway: 381 raise ConfigError( 382 self._failed_to_load_model_error( 383 path, f"Duplicate external model name: '{model.name}'." 384 ), 385 path, 386 ) 387 models.update({model.fqn: model}) 388 389 return models 390 391 def _load_requirements(self) -> t.Tuple[t.Dict[str, str], t.Set[str]]: 392 """Loads Python dependencies from the lock file. 393 394 Returns: 395 A tuple of requirements and excluded requirements. 396 """ 397 requirements: t.Dict[str, str] = {} 398 excluded_requirements: t.Set[str] = set() 399 400 requirements_path = self.config_path / c.REQUIREMENTS 401 if requirements_path.is_file(): 402 with open(requirements_path, "r", encoding="utf-8") as file: 403 for line in file: 404 line = line.strip() 405 if line.startswith("^"): 406 excluded_requirements.add(line[1:]) 407 continue 408 409 args = [k.strip() for k in line.split("==")] 410 if len(args) != 2: 411 raise ConfigError( 412 f"Invalid lock file entry '{line.strip()}'. Only 'dep==ver' is supported", 413 requirements_path, 414 ) 415 dep, ver = args 416 other_ver = requirements.get(dep, ver) 417 if ver != other_ver: 418 raise ConfigError( 419 f"Conflicting requirement {dep}: {ver} != {other_ver}. Fix your {c.REQUIREMENTS} file.", 420 requirements_path, 421 ) 422 requirements[dep] = ver 423 424 return requirements, excluded_requirements 425 426 def _load_linting_rules(self) -> RuleSet: 427 """Loads user linting rules""" 428 return RuleSet() 429 430 def load_model_tests(self) -> t.List[ModelTestMetadata]: 431 """Loads YAML-based model tests""" 432 return [] 433 434 def _glob_paths( 435 self, 436 path: Path, 437 ignore_patterns: t.Optional[t.List[str]] = None, 438 extension: t.Optional[str] = None, 439 ) -> t.Generator[Path, None, None]: 440 """ 441 Globs the provided path for the file extension but also removes any filepaths that match an ignore 442 pattern either set in constants or provided in config 443 444 Args: 445 path: The filepath to glob 446 ignore_patterns: A list of patterns for glob to ignore 447 extension: The extension to check for in that path (checks recursively in zero or more subdirectories) 448 449 Returns: 450 Matched paths that are not ignored 451 """ 452 ignore_patterns = ignore_patterns or [] 453 extension = extension or "" 454 455 # We try to match both ignore_pattern itself and every file returned by glob, 456 # so that we will always ignore file names that do not appear in the latter. 457 ignored_filepaths = set(ignore_patterns) | { 458 ignored_path 459 for ignore_pattern in ignore_patterns 460 for ignored_path in glob.glob(str(self.config_path / ignore_pattern), recursive=True) 461 } 462 for filepath in path.glob(f"**/*{extension}"): 463 if any(filepath.match(ignored_filepath) for ignored_filepath in ignored_filepaths): 464 continue 465 466 yield filepath 467 468 def _track_file(self, path: Path) -> None: 469 """Project file to track for modifications""" 470 self._path_mtimes[path] = path.stat().st_mtime 471 472 def _failed_to_load_model_error(self, path: Path, error: t.Union[str, Exception]) -> str: 473 base_message = f"Failed to load model from file '{path}':" 474 if isinstance(error, ValidationError): 475 return validation_error_message(error, base_message) 476 # indent all lines of error message 477 error_message = str(error).replace("\n", "\n ") 478 return f"{base_message}\n\n {error_message}" 479 480 481class SqlMeshLoader(Loader): 482 """Loads macros and models for a context using the SQLMesh file formats""" 483 484 def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]: 485 """Loads all user defined macros.""" 486 # Store a copy of the macro registry 487 standard_macros = macro.get_registry() 488 jinja_macros = JinjaMacroRegistry() 489 extractor = MacroExtractor() 490 491 macros_max_mtime: t.Optional[float] = None 492 493 for path in self._glob_paths( 494 self.config_path / c.MACROS, 495 ignore_patterns=self.config.ignore_patterns, 496 extension=".py", 497 ): 498 if import_python_file(path, self.config_path): 499 self._track_file(path) 500 macro_file_mtime = self._path_mtimes[path] 501 macros_max_mtime = ( 502 max(macros_max_mtime, macro_file_mtime) 503 if macros_max_mtime 504 else macro_file_mtime 505 ) 506 507 for path in self._glob_paths( 508 self.config_path / c.MACROS, 509 ignore_patterns=self.config.ignore_patterns, 510 extension=".sql", 511 ): 512 self._track_file(path) 513 macro_file_mtime = self._path_mtimes[path] 514 macros_max_mtime = ( 515 max(macros_max_mtime, macro_file_mtime) if macros_max_mtime else macro_file_mtime 516 ) 517 with open(path, "r", encoding="utf-8") as file: 518 jinja_macros.add_macros( 519 extractor.extract(file.read(), dialect=self.config.model_defaults.dialect) 520 ) 521 522 self._macros_max_mtime = macros_max_mtime 523 524 macros = macro.get_registry() 525 macro.set_registry(standard_macros) 526 527 return macros, jinja_macros 528 529 def _load_models( 530 self, 531 macros: MacroRegistry, 532 jinja_macros: JinjaMacroRegistry, 533 gateway: t.Optional[str], 534 audits: UniqueKeyDict[str, ModelAudit], 535 signals: UniqueKeyDict[str, signal], 536 ) -> UniqueKeyDict[str, Model]: 537 """ 538 Loads all of the models within the model directory with their associated 539 audits into a Dict and creates the dag 540 """ 541 cache = SqlMeshLoader._Cache(self, self.config_path) 542 543 sql_models = self._load_sql_models(macros, jinja_macros, audits, signals, cache, gateway) 544 external_models = self._load_external_models(audits, cache, gateway) 545 python_models = self._load_python_models(macros, jinja_macros, audits, signals) 546 547 all_model_names = list(sql_models) + list(external_models) + list(python_models) 548 duplicates = [name for name, count in Counter(all_model_names).items() if count > 1] 549 if duplicates: 550 raise ConfigError(f"Duplicate model name(s) found: {', '.join(duplicates)}.") 551 552 return UniqueKeyDict("models", **sql_models, **external_models, **python_models) 553 554 def _load_sql_models( 555 self, 556 macros: MacroRegistry, 557 jinja_macros: JinjaMacroRegistry, 558 audits: UniqueKeyDict[str, ModelAudit], 559 signals: UniqueKeyDict[str, signal], 560 cache: CacheBase, 561 gateway: t.Optional[str], 562 ) -> UniqueKeyDict[str, Model]: 563 """Loads the sql models into a Dict""" 564 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 565 paths: t.Set[Path] = set() 566 cached_paths: UniqueKeyDict[Path, t.List[Model]] = UniqueKeyDict("cached_paths") 567 568 for path in self._glob_paths( 569 self.config_path / c.MODELS, 570 ignore_patterns=self.config.ignore_patterns, 571 extension=".sql", 572 ): 573 if not os.path.getsize(path): 574 continue 575 576 self._track_file(path) 577 paths.add(path) 578 if cached_models := cache.get(path): 579 cached_paths[path] = cached_models 580 581 for path, cached_models in cached_paths.items(): 582 paths.remove(path) 583 for model in cached_models: 584 if model.enabled: 585 models[model.fqn] = model 586 587 if paths: 588 model_loading_defaults = dict( 589 get_variables=get_variables, 590 defaults=self.config.model_defaults.dict(), 591 macros=macros, 592 jinja_macros=jinja_macros, 593 audit_definitions=audits, 594 module_path=self.config_path, 595 dialect=self.config.model_defaults.dialect, 596 time_column_format=self.config.time_column_format, 597 physical_schema_mapping=self.config.physical_schema_mapping, 598 project=self.config.project, 599 default_catalog=self.context.default_catalog, 600 infer_names=self.config.model_naming.infer_names, 601 signal_definitions=signals, 602 default_catalog_per_gateway=self.context.default_catalog_per_gateway, 603 virtual_environment_mode=self.config.virtual_environment_mode, 604 ) 605 606 with create_process_pool_executor( 607 initializer=_init_model_defaults, 608 initargs=( 609 self.config_essentials, 610 gateway, 611 model_loading_defaults, 612 cache, 613 self._console, 614 ), 615 max_workers=c.MAX_FORK_WORKERS, 616 ) as pool: 617 futures_to_paths = {pool.submit(load_sql_models, path): path for path in paths} 618 for future in concurrent.futures.as_completed(futures_to_paths): 619 path = futures_to_paths[future] 620 try: 621 loaded = future.result() 622 for model in loaded or cache.get(path): 623 if model.fqn in models: 624 raise ConfigError( 625 self._failed_to_load_model_error( 626 path, f"Duplicate SQL model name: '{model.name}'." 627 ), 628 path, 629 ) 630 elif model.enabled: 631 model._path = path 632 models[model.fqn] = model 633 except Exception as ex: 634 raise ConfigError(self._failed_to_load_model_error(path, ex), path) 635 636 return models 637 638 def _load_python_models( 639 self, 640 macros: MacroRegistry, 641 jinja_macros: JinjaMacroRegistry, 642 audits: UniqueKeyDict[str, ModelAudit], 643 signals: UniqueKeyDict[str, signal], 644 ) -> UniqueKeyDict[str, Model]: 645 """Loads the python models into a Dict""" 646 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 647 registry = model_registry.registry() 648 registry.clear() 649 registered: t.Set[str] = set() 650 651 model_registry._dialect = self.config.model_defaults.dialect 652 try: 653 for path in self._glob_paths( 654 self.config_path / c.MODELS, 655 ignore_patterns=self.config.ignore_patterns, 656 extension=".py", 657 ): 658 if not os.path.getsize(path): 659 continue 660 661 self._track_file(path) 662 try: 663 import_python_file(path, self.config_path) 664 new = registry.keys() - registered 665 registered |= new 666 for name in new: 667 for model in registry[name].models( 668 get_variables, 669 path=path, 670 module_path=self.config_path, 671 defaults=self.config.model_defaults.dict(), 672 macros=macros, 673 jinja_macros=jinja_macros, 674 dialect=self.config.model_defaults.dialect, 675 time_column_format=self.config.time_column_format, 676 physical_schema_mapping=self.config.physical_schema_mapping, 677 project=self.config.project, 678 default_catalog=self.context.default_catalog, 679 infer_names=self.config.model_naming.infer_names, 680 audit_definitions=audits, 681 signal_definitions=signals, 682 default_catalog_per_gateway=self.context.default_catalog_per_gateway, 683 virtual_environment_mode=self.config.virtual_environment_mode, 684 ): 685 if model.enabled: 686 models[model.fqn] = model 687 except Exception as ex: 688 raise ConfigError(self._failed_to_load_model_error(path, ex), path) 689 690 finally: 691 model_registry._dialect = None 692 693 return models 694 695 def load_materializations(self) -> None: 696 with sys_path(self.config_path): 697 self._load_materializations() 698 699 def _load_materializations(self) -> None: 700 for path in self._glob_paths( 701 self.config_path / c.MATERIALIZATIONS, 702 ignore_patterns=self.config.ignore_patterns, 703 extension=".py", 704 ): 705 if os.path.getsize(path): 706 import_python_file(path, self.config_path) 707 708 def _load_signals(self) -> UniqueKeyDict[str, signal]: 709 """Loads signals for the built-in scheduler.""" 710 711 base_signals = signal.get_registry() 712 713 signals_max_mtime: t.Optional[float] = None 714 715 for path in self._glob_paths( 716 self.config_path / c.SIGNALS, 717 ignore_patterns=self.config.ignore_patterns, 718 extension=".py", 719 ): 720 if os.path.getsize(path): 721 self._track_file(path) 722 signal_file_mtime = self._path_mtimes[path] 723 signals_max_mtime = ( 724 max(signals_max_mtime, signal_file_mtime) 725 if signals_max_mtime 726 else signal_file_mtime 727 ) 728 import_python_file(path, self.config_path) 729 730 self._signals_max_mtime = signals_max_mtime 731 732 signals = signal.get_registry() 733 signal.set_registry(base_signals) 734 735 return signals 736 737 def _load_audits( 738 self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry 739 ) -> UniqueKeyDict[str, Audit]: 740 """Loads all the model audits.""" 741 audits_by_name: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 742 audits_max_mtime: t.Optional[float] = None 743 variables = get_variables() 744 745 for path in self._glob_paths( 746 self.config_path / c.AUDITS, 747 ignore_patterns=self.config.ignore_patterns, 748 extension=".sql", 749 ): 750 self._track_file(path) 751 with open(path, "r", encoding="utf-8") as file: 752 audits_file_mtime = self._path_mtimes[path] 753 audits_max_mtime = ( 754 max(audits_max_mtime, audits_file_mtime) 755 if audits_max_mtime 756 else audits_file_mtime 757 ) 758 expressions = parse(file.read(), default_dialect=self.config.model_defaults.dialect) 759 audits = load_multiple_audits( 760 expressions=expressions, 761 path=path, 762 module_path=self.config_path, 763 macros=macros, 764 jinja_macros=jinja_macros, 765 dialect=self.config.model_defaults.dialect, 766 default_catalog=self.context.default_catalog, 767 variables=variables, 768 project=self.config.project, 769 ) 770 for audit in audits: 771 audits_by_name[audit.name] = audit 772 773 self._audits_max_mtime = audits_max_mtime 774 775 return audits_by_name 776 777 def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]: 778 """Loads all metrics.""" 779 metrics: UniqueKeyDict[str, MetricMeta] = UniqueKeyDict("metrics") 780 781 for path in self._glob_paths( 782 self.config_path / c.METRICS, 783 ignore_patterns=self.config.ignore_patterns, 784 extension=".sql", 785 ): 786 if not os.path.getsize(path): 787 continue 788 self._track_file(path) 789 790 with open(path, "r", encoding="utf-8") as file: 791 dialect = self.config.model_defaults.dialect 792 try: 793 for expression in parse(file.read(), default_dialect=dialect): 794 metric = load_metric_ddl(expression, path=path, dialect=dialect) 795 metrics[metric.name] = metric 796 except SqlglotError as ex: 797 raise ConfigError( 798 f"Failed to parse metric definitions at '{path}': {ex}.", path 799 ) 800 801 return metrics 802 803 def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]: 804 """Loads environment statements.""" 805 806 if self.config.before_all or self.config.after_all: 807 statements = { 808 "before_all": self.config.before_all or [], 809 "after_all": self.config.after_all or [], 810 } 811 dialect = self.config.model_defaults.dialect 812 python_env = make_python_env( 813 [ 814 exp.maybe_parse(stmt, dialect=dialect) 815 for stmts in statements.values() 816 for stmt in stmts 817 ], 818 module_path=self.config_path, 819 jinja_macro_references=None, 820 macros=macros, 821 variables=get_variables(), 822 path=self.config_path, 823 ) 824 825 return [ 826 EnvironmentStatements( 827 **statements, python_env=python_env, project=self.config.project or None 828 ) 829 ] 830 return [] 831 832 def _load_linting_rules(self) -> RuleSet: 833 user_rules: UniqueKeyDict[str, type[Rule]] = UniqueKeyDict("rules") 834 835 for path in self._glob_paths( 836 self.config_path / c.LINTER, 837 ignore_patterns=self.config.ignore_patterns, 838 extension=".py", 839 ): 840 if os.path.getsize(path): 841 self._track_file(path) 842 module = import_python_file(path, self.config_path) 843 module_rules = subclasses(module.__name__, Rule, exclude={Rule}) 844 for user_rule in module_rules: 845 user_rules[user_rule.name] = user_rule 846 847 return RuleSet(user_rules.values()) 848 849 def _load_model_test_file(self, path: Path) -> dict[str, ModelTestMetadata]: 850 """Load a single model test file.""" 851 model_test_metadata = {} 852 853 with open(path, "r", encoding="utf-8") as file: 854 source = file.read() 855 # If the user has specified a quoted/escaped gateway (e.g. "gateway: 'ma\tin'"), we need to 856 # parse it as YAML to match the gateway name stored in the config 857 gateway_line = GATEWAY_PATTERN.search(source) 858 gateway = YAML().load(gateway_line.group(0))["gateway"] if gateway_line else None 859 860 contents = yaml_load(source, variables=get_variables(gateway)) 861 862 for test_name, value in contents.items(): 863 model_test_metadata[test_name] = ModelTestMetadata( 864 path=path, test_name=test_name, body=value 865 ) 866 867 return model_test_metadata 868 869 def load_model_tests(self) -> t.List[ModelTestMetadata]: 870 """Loads YAML-based model tests""" 871 test_meta_list: t.List[ModelTestMetadata] = [] 872 873 search_path = Path(self.config_path) / c.TESTS 874 875 for yaml_file in itertools.chain( 876 search_path.glob("**/test*.yaml"), 877 search_path.glob("**/test*.yml"), 878 ): 879 if any( 880 yaml_file.match(ignore_pattern) 881 for ignore_pattern in self.config.ignore_patterns or [] 882 ): 883 continue 884 885 test_meta_list.extend(self._load_model_test_file(yaml_file).values()) 886 887 return test_meta_list 888 889 class _Cache(CacheBase): 890 def __init__(self, loader: SqlMeshLoader, config_path: Path): 891 self._loader = loader 892 self.config_path = config_path 893 self._model_cache = ModelCache(self._loader.context.cache_dir) 894 895 def get_or_load_models( 896 self, target_path: Path, loader: t.Callable[[], t.List[Model]] 897 ) -> t.List[Model]: 898 models = self._model_cache.get_or_load( 899 self._cache_entry_name(target_path), 900 self._model_cache_entry_id(target_path), 901 loader=loader, 902 ) 903 904 for model in models: 905 model._path = target_path 906 907 return models 908 909 def put(self, models: t.List[Model], path: Path) -> bool: 910 return self._model_cache.put( 911 models, 912 self._cache_entry_name(path), 913 self._model_cache_entry_id(path), 914 ) 915 916 def get(self, path: Path) -> t.List[Model]: 917 models = self._model_cache.get( 918 self._cache_entry_name(path), 919 self._model_cache_entry_id(path), 920 ) 921 922 for model in models: 923 model._path = path 924 925 return models 926 927 def _cache_entry_name(self, target_path: Path) -> str: 928 return "__".join(target_path.relative_to(self.config_path).parts).replace( 929 target_path.suffix, "" 930 ) 931 932 def _model_cache_entry_id(self, model_path: Path) -> str: 933 mtimes = [ 934 self._loader._path_mtimes[model_path], 935 self._loader._macros_max_mtime, 936 self._loader._signals_max_mtime, 937 self._loader._audits_max_mtime, 938 self._loader._config_mtimes.get(self.config_path), 939 self._loader._config_mtimes.get(c.SQLMESH_PATH), 940 ] 941 return "__".join( 942 [ 943 str(max(m for m in mtimes if m is not None)), 944 self._loader.config.fingerprint, 945 # default catalog can change outside sqlmesh (e.g., DB user's 946 # default catalog), and it is retained in cached model's fully 947 # qualified name 948 self._loader.context.default_catalog or "", 949 # gateway is configurable, and it is retained in a cached 950 # model's python environment if the @gateway macro variable is 951 # used in the model 952 self._loader.context.gateway or self._loader.config.default_gateway_name, 953 ] 954 )
56@dataclass 57class LoadedProject: 58 macros: MacroRegistry 59 jinja_macros: JinjaMacroRegistry 60 models: UniqueKeyDict[str, Model] 61 standalone_audits: UniqueKeyDict[str, StandaloneAudit] 62 audits: UniqueKeyDict[str, ModelAudit] 63 metrics: UniqueKeyDict[str, Metric] 64 requirements: t.Dict[str, str] 65 excluded_requirements: t.Set[str] 66 environment_statements: t.List[EnvironmentStatements] 67 user_rules: RuleSet 68 model_test_metadata: t.List[ModelTestMetadata]
71class CacheBase(abc.ABC): 72 @abc.abstractmethod 73 def get_or_load_models( 74 self, target_path: Path, loader: t.Callable[[], t.List[Model]] 75 ) -> t.List[Model]: 76 """Get or load all models from cache.""" 77 pass 78 79 @abc.abstractmethod 80 def put(self, models: t.List[Model], path: Path) -> bool: 81 """Store models in the cache associated with the given path. 82 83 Args: 84 models: List of models to cache 85 path: File path to associate with the cached models 86 87 Returns: 88 True if the models were successfully cached, 89 False otherwise (empty list, not a list, unsupported model types) 90 """ 91 pass 92 93 @abc.abstractmethod 94 def get(self, path: Path) -> t.List[Model]: 95 """Retrieve models from the cache for a given path. 96 97 Args: 98 path: File path to look up in the cache 99 100 Returns: 101 List of cached models associated with the path, an empty list if no cache entry exists 102 """ 103 pass
Helper class that provides a standard way to create an ABC using inheritance.
72 @abc.abstractmethod 73 def get_or_load_models( 74 self, target_path: Path, loader: t.Callable[[], t.List[Model]] 75 ) -> t.List[Model]: 76 """Get or load all models from cache.""" 77 pass
Get or load all models from cache.
79 @abc.abstractmethod 80 def put(self, models: t.List[Model], path: Path) -> bool: 81 """Store models in the cache associated with the given path. 82 83 Args: 84 models: List of models to cache 85 path: File path to associate with the cached models 86 87 Returns: 88 True if the models were successfully cached, 89 False otherwise (empty list, not a list, unsupported model types) 90 """ 91 pass
Store models in the cache associated with the given path.
Arguments:
- models: List of models to cache
- path: File path to associate with the cached models
Returns:
True if the models were successfully cached, False otherwise (empty list, not a list, unsupported model types)
93 @abc.abstractmethod 94 def get(self, path: Path) -> t.List[Model]: 95 """Retrieve models from the cache for a given path. 96 97 Args: 98 path: File path to look up in the cache 99 100 Returns: 101 List of cached models associated with the path, an empty list if no cache entry exists 102 """ 103 pass
Retrieve models from the cache for a given path.
Arguments:
- path: File path to look up in the cache
Returns:
List of cached models associated with the path, an empty list if no cache entry exists
132def load_sql_models(path: Path) -> t.List[Model]: 133 assert _defaults 134 assert _cache 135 136 with open(path, "r", encoding="utf-8") as file: 137 expressions = parse(file.read(), default_dialect=_defaults["dialect"]) 138 models = load_sql_based_models(expressions, path=Path(path).absolute(), **_defaults) 139 140 return [] if _cache.put(models, path) else models
143def get_variables(gateway_name: t.Optional[str] = None) -> t.Dict[str, t.Any]: 144 assert _config_essentials 145 146 gateway_name = gateway_name or _selected_gateway 147 148 try: 149 gateway = _config_essentials["gateways"].get(gateway_name) 150 except ConfigError: 151 from sqlmesh.core.console import get_console 152 153 get_console().log_warning( 154 f"Gateway '{gateway_name}' not found in project '{_config_essentials['project']}'." 155 ) 156 gateway = None 157 158 return { 159 **_config_essentials["variables"], 160 **(gateway.variables if gateway else {}), 161 c.GATEWAY: gateway_name, 162 }
165class Loader(abc.ABC): 166 """Abstract base class to load macros and models for a context""" 167 168 def __init__(self, context: GenericContext, path: Path) -> None: 169 # This ensures pandas is imported before any model loading happens in the forked process 170 # to avoid macOS fork() safety issues, see https://stackoverflow.com/a/52230415. Without 171 # it, the following error was observerd in a macOS 15.5 system: 172 # 173 # "+[NSMutableString initialize] may have been in progress in another thread when fork() was called." 174 import pandas as pd # noqa 175 176 from sqlmesh.core.console import get_console 177 178 self._path_mtimes: t.Dict[Path, float] = {} 179 self.context = context 180 self.config_path = path 181 self.config = self.context.configs[self.config_path] 182 self._variables_by_gateway: t.Dict[str, t.Dict[str, t.Any]] = {} 183 self._console = get_console() 184 185 self.config_essentials = { 186 "project": self.config.project, 187 "variables": self.config.variables, 188 "gateways": self.config.gateways, 189 } 190 _init_model_defaults(self.config_essentials, self.context.selected_gateway) 191 192 def load(self) -> LoadedProject: 193 """ 194 Loads all macros and models in the context's path. 195 196 Returns: 197 A loaded project object. 198 """ 199 with sys_path(self.config_path): 200 # python files are cached by the system 201 # need to manually clear here so we can reload macros 202 linecache.clearcache() 203 self._path_mtimes.clear() 204 205 self._load_materializations() 206 signals = self._load_signals() 207 208 config_mtimes: t.Dict[Path, t.List[float]] = defaultdict(list) 209 210 for config_file in self.config_path.glob("config.*"): 211 self._track_file(config_file) 212 config_mtimes[self.config_path].append(self._path_mtimes[config_file]) 213 214 for config_file in c.SQLMESH_PATH.glob("config.*"): 215 self._track_file(config_file) 216 config_mtimes[c.SQLMESH_PATH].append(self._path_mtimes[config_file]) 217 218 self._config_mtimes = {path: max(mtimes) for path, mtimes in config_mtimes.items()} 219 220 macros, jinja_macros = self._load_scripts() 221 audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("audits") 222 standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 223 "standalone_audits" 224 ) 225 226 for name, audit in self._load_audits(macros=macros, jinja_macros=jinja_macros).items(): 227 if isinstance(audit, ModelAudit): 228 audits[name] = audit 229 else: 230 standalone_audits[name] = audit 231 232 models = self._load_models( 233 macros, 234 jinja_macros, 235 self.context.selected_gateway, 236 audits, 237 signals, 238 ) 239 240 metrics = self._load_metrics() 241 242 requirements, excluded_requirements = self._load_requirements() 243 244 environment_statements = self._load_environment_statements(macros=macros) 245 246 user_rules = self._load_linting_rules() 247 248 model_test_metadata = self.load_model_tests() 249 250 project = LoadedProject( 251 macros=macros, 252 jinja_macros=jinja_macros, 253 models=models, 254 audits=audits, 255 standalone_audits=standalone_audits, 256 metrics=expand_metrics(metrics), 257 requirements=requirements, 258 excluded_requirements=excluded_requirements, 259 environment_statements=environment_statements, 260 user_rules=user_rules, 261 model_test_metadata=model_test_metadata, 262 ) 263 return project 264 265 def reload_needed(self) -> bool: 266 """ 267 Checks for any modifications to the files the macros and models depend on 268 since the last load. 269 270 Returns: 271 True if a modification is found; False otherwise 272 """ 273 return any( 274 not path.exists() or path.stat().st_mtime > initial_mtime 275 for path, initial_mtime in self._path_mtimes.copy().items() 276 ) 277 278 @abc.abstractmethod 279 def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]: 280 """Loads all user defined macros.""" 281 282 @abc.abstractmethod 283 def _load_models( 284 self, 285 macros: MacroRegistry, 286 jinja_macros: JinjaMacroRegistry, 287 gateway: t.Optional[str], 288 audits: UniqueKeyDict[str, ModelAudit], 289 signals: UniqueKeyDict[str, signal], 290 ) -> UniqueKeyDict[str, Model]: 291 """Loads all models.""" 292 293 @abc.abstractmethod 294 def _load_audits( 295 self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry 296 ) -> UniqueKeyDict[str, Audit]: 297 """Loads all audits.""" 298 299 def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]: 300 """Loads environment statements.""" 301 return [] 302 303 def load_materializations(self) -> None: 304 """Loads custom materializations.""" 305 306 def _load_materializations(self) -> None: 307 pass 308 309 def _load_signals(self) -> UniqueKeyDict[str, signal]: 310 return UniqueKeyDict("signals") 311 312 def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]: 313 return UniqueKeyDict("metrics") 314 315 def _load_external_models( 316 self, 317 audits: UniqueKeyDict[str, ModelAudit], 318 cache: CacheBase, 319 gateway: t.Optional[str] = None, 320 ) -> UniqueKeyDict[str, Model]: 321 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 322 external_models_yaml = Path(self.config_path / c.EXTERNAL_MODELS_YAML) 323 deprecated_yaml = Path(self.config_path / c.EXTERNAL_MODELS_DEPRECATED_YAML) 324 external_models_path = self.config_path / c.EXTERNAL_MODELS 325 326 paths_to_load = [] 327 if external_models_yaml.exists(): 328 paths_to_load.append(external_models_yaml) 329 elif deprecated_yaml.exists(): 330 paths_to_load.append(deprecated_yaml) 331 332 if external_models_path.exists() and external_models_path.is_dir(): 333 paths_to_load.extend(self._glob_paths(external_models_path, extension=".yaml")) 334 335 def _load(path: Path) -> t.List[Model]: 336 try: 337 with open(path, "r", encoding="utf-8") as file: 338 yaml = YAML().load(file) 339 # Allow empty YAML files to return an empty list 340 if yaml is None: 341 return [] 342 return [ 343 create_external_model( 344 defaults=self.config.model_defaults.dict(), 345 path=path, 346 project=self.config.project, 347 audit_definitions=audits, 348 **{ 349 "dialect": self.config.model_defaults.dialect, 350 "default_catalog": self.context.default_catalog, 351 **row, 352 }, 353 ) 354 for row in yaml 355 ] 356 except Exception as ex: 357 raise ConfigError(self._failed_to_load_model_error(path, ex), path) 358 359 for path in paths_to_load: 360 self._track_file(path) 361 362 external_models = cache.get_or_load_models(path, lambda: _load(path)) 363 364 # external models with no explicit gateway defined form the base set 365 for model in external_models: 366 if model.gateway is None: 367 if model.fqn in models: 368 raise ConfigError( 369 self._failed_to_load_model_error( 370 path, f"Duplicate external model name: '{model.name}'." 371 ), 372 path, 373 ) 374 models[model.fqn] = model 375 376 # however, if there is a gateway defined, gateway-specific models take precedence 377 if gateway: 378 gateway = gateway.lower() 379 for model in external_models: 380 if model.gateway == gateway: 381 if model.fqn in models and models[model.fqn].gateway == gateway: 382 raise ConfigError( 383 self._failed_to_load_model_error( 384 path, f"Duplicate external model name: '{model.name}'." 385 ), 386 path, 387 ) 388 models.update({model.fqn: model}) 389 390 return models 391 392 def _load_requirements(self) -> t.Tuple[t.Dict[str, str], t.Set[str]]: 393 """Loads Python dependencies from the lock file. 394 395 Returns: 396 A tuple of requirements and excluded requirements. 397 """ 398 requirements: t.Dict[str, str] = {} 399 excluded_requirements: t.Set[str] = set() 400 401 requirements_path = self.config_path / c.REQUIREMENTS 402 if requirements_path.is_file(): 403 with open(requirements_path, "r", encoding="utf-8") as file: 404 for line in file: 405 line = line.strip() 406 if line.startswith("^"): 407 excluded_requirements.add(line[1:]) 408 continue 409 410 args = [k.strip() for k in line.split("==")] 411 if len(args) != 2: 412 raise ConfigError( 413 f"Invalid lock file entry '{line.strip()}'. Only 'dep==ver' is supported", 414 requirements_path, 415 ) 416 dep, ver = args 417 other_ver = requirements.get(dep, ver) 418 if ver != other_ver: 419 raise ConfigError( 420 f"Conflicting requirement {dep}: {ver} != {other_ver}. Fix your {c.REQUIREMENTS} file.", 421 requirements_path, 422 ) 423 requirements[dep] = ver 424 425 return requirements, excluded_requirements 426 427 def _load_linting_rules(self) -> RuleSet: 428 """Loads user linting rules""" 429 return RuleSet() 430 431 def load_model_tests(self) -> t.List[ModelTestMetadata]: 432 """Loads YAML-based model tests""" 433 return [] 434 435 def _glob_paths( 436 self, 437 path: Path, 438 ignore_patterns: t.Optional[t.List[str]] = None, 439 extension: t.Optional[str] = None, 440 ) -> t.Generator[Path, None, None]: 441 """ 442 Globs the provided path for the file extension but also removes any filepaths that match an ignore 443 pattern either set in constants or provided in config 444 445 Args: 446 path: The filepath to glob 447 ignore_patterns: A list of patterns for glob to ignore 448 extension: The extension to check for in that path (checks recursively in zero or more subdirectories) 449 450 Returns: 451 Matched paths that are not ignored 452 """ 453 ignore_patterns = ignore_patterns or [] 454 extension = extension or "" 455 456 # We try to match both ignore_pattern itself and every file returned by glob, 457 # so that we will always ignore file names that do not appear in the latter. 458 ignored_filepaths = set(ignore_patterns) | { 459 ignored_path 460 for ignore_pattern in ignore_patterns 461 for ignored_path in glob.glob(str(self.config_path / ignore_pattern), recursive=True) 462 } 463 for filepath in path.glob(f"**/*{extension}"): 464 if any(filepath.match(ignored_filepath) for ignored_filepath in ignored_filepaths): 465 continue 466 467 yield filepath 468 469 def _track_file(self, path: Path) -> None: 470 """Project file to track for modifications""" 471 self._path_mtimes[path] = path.stat().st_mtime 472 473 def _failed_to_load_model_error(self, path: Path, error: t.Union[str, Exception]) -> str: 474 base_message = f"Failed to load model from file '{path}':" 475 if isinstance(error, ValidationError): 476 return validation_error_message(error, base_message) 477 # indent all lines of error message 478 error_message = str(error).replace("\n", "\n ") 479 return f"{base_message}\n\n {error_message}"
Abstract base class to load macros and models for a context
192 def load(self) -> LoadedProject: 193 """ 194 Loads all macros and models in the context's path. 195 196 Returns: 197 A loaded project object. 198 """ 199 with sys_path(self.config_path): 200 # python files are cached by the system 201 # need to manually clear here so we can reload macros 202 linecache.clearcache() 203 self._path_mtimes.clear() 204 205 self._load_materializations() 206 signals = self._load_signals() 207 208 config_mtimes: t.Dict[Path, t.List[float]] = defaultdict(list) 209 210 for config_file in self.config_path.glob("config.*"): 211 self._track_file(config_file) 212 config_mtimes[self.config_path].append(self._path_mtimes[config_file]) 213 214 for config_file in c.SQLMESH_PATH.glob("config.*"): 215 self._track_file(config_file) 216 config_mtimes[c.SQLMESH_PATH].append(self._path_mtimes[config_file]) 217 218 self._config_mtimes = {path: max(mtimes) for path, mtimes in config_mtimes.items()} 219 220 macros, jinja_macros = self._load_scripts() 221 audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("audits") 222 standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 223 "standalone_audits" 224 ) 225 226 for name, audit in self._load_audits(macros=macros, jinja_macros=jinja_macros).items(): 227 if isinstance(audit, ModelAudit): 228 audits[name] = audit 229 else: 230 standalone_audits[name] = audit 231 232 models = self._load_models( 233 macros, 234 jinja_macros, 235 self.context.selected_gateway, 236 audits, 237 signals, 238 ) 239 240 metrics = self._load_metrics() 241 242 requirements, excluded_requirements = self._load_requirements() 243 244 environment_statements = self._load_environment_statements(macros=macros) 245 246 user_rules = self._load_linting_rules() 247 248 model_test_metadata = self.load_model_tests() 249 250 project = LoadedProject( 251 macros=macros, 252 jinja_macros=jinja_macros, 253 models=models, 254 audits=audits, 255 standalone_audits=standalone_audits, 256 metrics=expand_metrics(metrics), 257 requirements=requirements, 258 excluded_requirements=excluded_requirements, 259 environment_statements=environment_statements, 260 user_rules=user_rules, 261 model_test_metadata=model_test_metadata, 262 ) 263 return project
Loads all macros and models in the context's path.
Returns:
A loaded project object.
265 def reload_needed(self) -> bool: 266 """ 267 Checks for any modifications to the files the macros and models depend on 268 since the last load. 269 270 Returns: 271 True if a modification is found; False otherwise 272 """ 273 return any( 274 not path.exists() or path.stat().st_mtime > initial_mtime 275 for path, initial_mtime in self._path_mtimes.copy().items() 276 )
Checks for any modifications to the files the macros and models depend on since the last load.
Returns:
True if a modification is found; False otherwise
482class SqlMeshLoader(Loader): 483 """Loads macros and models for a context using the SQLMesh file formats""" 484 485 def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]: 486 """Loads all user defined macros.""" 487 # Store a copy of the macro registry 488 standard_macros = macro.get_registry() 489 jinja_macros = JinjaMacroRegistry() 490 extractor = MacroExtractor() 491 492 macros_max_mtime: t.Optional[float] = None 493 494 for path in self._glob_paths( 495 self.config_path / c.MACROS, 496 ignore_patterns=self.config.ignore_patterns, 497 extension=".py", 498 ): 499 if import_python_file(path, self.config_path): 500 self._track_file(path) 501 macro_file_mtime = self._path_mtimes[path] 502 macros_max_mtime = ( 503 max(macros_max_mtime, macro_file_mtime) 504 if macros_max_mtime 505 else macro_file_mtime 506 ) 507 508 for path in self._glob_paths( 509 self.config_path / c.MACROS, 510 ignore_patterns=self.config.ignore_patterns, 511 extension=".sql", 512 ): 513 self._track_file(path) 514 macro_file_mtime = self._path_mtimes[path] 515 macros_max_mtime = ( 516 max(macros_max_mtime, macro_file_mtime) if macros_max_mtime else macro_file_mtime 517 ) 518 with open(path, "r", encoding="utf-8") as file: 519 jinja_macros.add_macros( 520 extractor.extract(file.read(), dialect=self.config.model_defaults.dialect) 521 ) 522 523 self._macros_max_mtime = macros_max_mtime 524 525 macros = macro.get_registry() 526 macro.set_registry(standard_macros) 527 528 return macros, jinja_macros 529 530 def _load_models( 531 self, 532 macros: MacroRegistry, 533 jinja_macros: JinjaMacroRegistry, 534 gateway: t.Optional[str], 535 audits: UniqueKeyDict[str, ModelAudit], 536 signals: UniqueKeyDict[str, signal], 537 ) -> UniqueKeyDict[str, Model]: 538 """ 539 Loads all of the models within the model directory with their associated 540 audits into a Dict and creates the dag 541 """ 542 cache = SqlMeshLoader._Cache(self, self.config_path) 543 544 sql_models = self._load_sql_models(macros, jinja_macros, audits, signals, cache, gateway) 545 external_models = self._load_external_models(audits, cache, gateway) 546 python_models = self._load_python_models(macros, jinja_macros, audits, signals) 547 548 all_model_names = list(sql_models) + list(external_models) + list(python_models) 549 duplicates = [name for name, count in Counter(all_model_names).items() if count > 1] 550 if duplicates: 551 raise ConfigError(f"Duplicate model name(s) found: {', '.join(duplicates)}.") 552 553 return UniqueKeyDict("models", **sql_models, **external_models, **python_models) 554 555 def _load_sql_models( 556 self, 557 macros: MacroRegistry, 558 jinja_macros: JinjaMacroRegistry, 559 audits: UniqueKeyDict[str, ModelAudit], 560 signals: UniqueKeyDict[str, signal], 561 cache: CacheBase, 562 gateway: t.Optional[str], 563 ) -> UniqueKeyDict[str, Model]: 564 """Loads the sql models into a Dict""" 565 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 566 paths: t.Set[Path] = set() 567 cached_paths: UniqueKeyDict[Path, t.List[Model]] = UniqueKeyDict("cached_paths") 568 569 for path in self._glob_paths( 570 self.config_path / c.MODELS, 571 ignore_patterns=self.config.ignore_patterns, 572 extension=".sql", 573 ): 574 if not os.path.getsize(path): 575 continue 576 577 self._track_file(path) 578 paths.add(path) 579 if cached_models := cache.get(path): 580 cached_paths[path] = cached_models 581 582 for path, cached_models in cached_paths.items(): 583 paths.remove(path) 584 for model in cached_models: 585 if model.enabled: 586 models[model.fqn] = model 587 588 if paths: 589 model_loading_defaults = dict( 590 get_variables=get_variables, 591 defaults=self.config.model_defaults.dict(), 592 macros=macros, 593 jinja_macros=jinja_macros, 594 audit_definitions=audits, 595 module_path=self.config_path, 596 dialect=self.config.model_defaults.dialect, 597 time_column_format=self.config.time_column_format, 598 physical_schema_mapping=self.config.physical_schema_mapping, 599 project=self.config.project, 600 default_catalog=self.context.default_catalog, 601 infer_names=self.config.model_naming.infer_names, 602 signal_definitions=signals, 603 default_catalog_per_gateway=self.context.default_catalog_per_gateway, 604 virtual_environment_mode=self.config.virtual_environment_mode, 605 ) 606 607 with create_process_pool_executor( 608 initializer=_init_model_defaults, 609 initargs=( 610 self.config_essentials, 611 gateway, 612 model_loading_defaults, 613 cache, 614 self._console, 615 ), 616 max_workers=c.MAX_FORK_WORKERS, 617 ) as pool: 618 futures_to_paths = {pool.submit(load_sql_models, path): path for path in paths} 619 for future in concurrent.futures.as_completed(futures_to_paths): 620 path = futures_to_paths[future] 621 try: 622 loaded = future.result() 623 for model in loaded or cache.get(path): 624 if model.fqn in models: 625 raise ConfigError( 626 self._failed_to_load_model_error( 627 path, f"Duplicate SQL model name: '{model.name}'." 628 ), 629 path, 630 ) 631 elif model.enabled: 632 model._path = path 633 models[model.fqn] = model 634 except Exception as ex: 635 raise ConfigError(self._failed_to_load_model_error(path, ex), path) 636 637 return models 638 639 def _load_python_models( 640 self, 641 macros: MacroRegistry, 642 jinja_macros: JinjaMacroRegistry, 643 audits: UniqueKeyDict[str, ModelAudit], 644 signals: UniqueKeyDict[str, signal], 645 ) -> UniqueKeyDict[str, Model]: 646 """Loads the python models into a Dict""" 647 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 648 registry = model_registry.registry() 649 registry.clear() 650 registered: t.Set[str] = set() 651 652 model_registry._dialect = self.config.model_defaults.dialect 653 try: 654 for path in self._glob_paths( 655 self.config_path / c.MODELS, 656 ignore_patterns=self.config.ignore_patterns, 657 extension=".py", 658 ): 659 if not os.path.getsize(path): 660 continue 661 662 self._track_file(path) 663 try: 664 import_python_file(path, self.config_path) 665 new = registry.keys() - registered 666 registered |= new 667 for name in new: 668 for model in registry[name].models( 669 get_variables, 670 path=path, 671 module_path=self.config_path, 672 defaults=self.config.model_defaults.dict(), 673 macros=macros, 674 jinja_macros=jinja_macros, 675 dialect=self.config.model_defaults.dialect, 676 time_column_format=self.config.time_column_format, 677 physical_schema_mapping=self.config.physical_schema_mapping, 678 project=self.config.project, 679 default_catalog=self.context.default_catalog, 680 infer_names=self.config.model_naming.infer_names, 681 audit_definitions=audits, 682 signal_definitions=signals, 683 default_catalog_per_gateway=self.context.default_catalog_per_gateway, 684 virtual_environment_mode=self.config.virtual_environment_mode, 685 ): 686 if model.enabled: 687 models[model.fqn] = model 688 except Exception as ex: 689 raise ConfigError(self._failed_to_load_model_error(path, ex), path) 690 691 finally: 692 model_registry._dialect = None 693 694 return models 695 696 def load_materializations(self) -> None: 697 with sys_path(self.config_path): 698 self._load_materializations() 699 700 def _load_materializations(self) -> None: 701 for path in self._glob_paths( 702 self.config_path / c.MATERIALIZATIONS, 703 ignore_patterns=self.config.ignore_patterns, 704 extension=".py", 705 ): 706 if os.path.getsize(path): 707 import_python_file(path, self.config_path) 708 709 def _load_signals(self) -> UniqueKeyDict[str, signal]: 710 """Loads signals for the built-in scheduler.""" 711 712 base_signals = signal.get_registry() 713 714 signals_max_mtime: t.Optional[float] = None 715 716 for path in self._glob_paths( 717 self.config_path / c.SIGNALS, 718 ignore_patterns=self.config.ignore_patterns, 719 extension=".py", 720 ): 721 if os.path.getsize(path): 722 self._track_file(path) 723 signal_file_mtime = self._path_mtimes[path] 724 signals_max_mtime = ( 725 max(signals_max_mtime, signal_file_mtime) 726 if signals_max_mtime 727 else signal_file_mtime 728 ) 729 import_python_file(path, self.config_path) 730 731 self._signals_max_mtime = signals_max_mtime 732 733 signals = signal.get_registry() 734 signal.set_registry(base_signals) 735 736 return signals 737 738 def _load_audits( 739 self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry 740 ) -> UniqueKeyDict[str, Audit]: 741 """Loads all the model audits.""" 742 audits_by_name: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 743 audits_max_mtime: t.Optional[float] = None 744 variables = get_variables() 745 746 for path in self._glob_paths( 747 self.config_path / c.AUDITS, 748 ignore_patterns=self.config.ignore_patterns, 749 extension=".sql", 750 ): 751 self._track_file(path) 752 with open(path, "r", encoding="utf-8") as file: 753 audits_file_mtime = self._path_mtimes[path] 754 audits_max_mtime = ( 755 max(audits_max_mtime, audits_file_mtime) 756 if audits_max_mtime 757 else audits_file_mtime 758 ) 759 expressions = parse(file.read(), default_dialect=self.config.model_defaults.dialect) 760 audits = load_multiple_audits( 761 expressions=expressions, 762 path=path, 763 module_path=self.config_path, 764 macros=macros, 765 jinja_macros=jinja_macros, 766 dialect=self.config.model_defaults.dialect, 767 default_catalog=self.context.default_catalog, 768 variables=variables, 769 project=self.config.project, 770 ) 771 for audit in audits: 772 audits_by_name[audit.name] = audit 773 774 self._audits_max_mtime = audits_max_mtime 775 776 return audits_by_name 777 778 def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]: 779 """Loads all metrics.""" 780 metrics: UniqueKeyDict[str, MetricMeta] = UniqueKeyDict("metrics") 781 782 for path in self._glob_paths( 783 self.config_path / c.METRICS, 784 ignore_patterns=self.config.ignore_patterns, 785 extension=".sql", 786 ): 787 if not os.path.getsize(path): 788 continue 789 self._track_file(path) 790 791 with open(path, "r", encoding="utf-8") as file: 792 dialect = self.config.model_defaults.dialect 793 try: 794 for expression in parse(file.read(), default_dialect=dialect): 795 metric = load_metric_ddl(expression, path=path, dialect=dialect) 796 metrics[metric.name] = metric 797 except SqlglotError as ex: 798 raise ConfigError( 799 f"Failed to parse metric definitions at '{path}': {ex}.", path 800 ) 801 802 return metrics 803 804 def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]: 805 """Loads environment statements.""" 806 807 if self.config.before_all or self.config.after_all: 808 statements = { 809 "before_all": self.config.before_all or [], 810 "after_all": self.config.after_all or [], 811 } 812 dialect = self.config.model_defaults.dialect 813 python_env = make_python_env( 814 [ 815 exp.maybe_parse(stmt, dialect=dialect) 816 for stmts in statements.values() 817 for stmt in stmts 818 ], 819 module_path=self.config_path, 820 jinja_macro_references=None, 821 macros=macros, 822 variables=get_variables(), 823 path=self.config_path, 824 ) 825 826 return [ 827 EnvironmentStatements( 828 **statements, python_env=python_env, project=self.config.project or None 829 ) 830 ] 831 return [] 832 833 def _load_linting_rules(self) -> RuleSet: 834 user_rules: UniqueKeyDict[str, type[Rule]] = UniqueKeyDict("rules") 835 836 for path in self._glob_paths( 837 self.config_path / c.LINTER, 838 ignore_patterns=self.config.ignore_patterns, 839 extension=".py", 840 ): 841 if os.path.getsize(path): 842 self._track_file(path) 843 module = import_python_file(path, self.config_path) 844 module_rules = subclasses(module.__name__, Rule, exclude={Rule}) 845 for user_rule in module_rules: 846 user_rules[user_rule.name] = user_rule 847 848 return RuleSet(user_rules.values()) 849 850 def _load_model_test_file(self, path: Path) -> dict[str, ModelTestMetadata]: 851 """Load a single model test file.""" 852 model_test_metadata = {} 853 854 with open(path, "r", encoding="utf-8") as file: 855 source = file.read() 856 # If the user has specified a quoted/escaped gateway (e.g. "gateway: 'ma\tin'"), we need to 857 # parse it as YAML to match the gateway name stored in the config 858 gateway_line = GATEWAY_PATTERN.search(source) 859 gateway = YAML().load(gateway_line.group(0))["gateway"] if gateway_line else None 860 861 contents = yaml_load(source, variables=get_variables(gateway)) 862 863 for test_name, value in contents.items(): 864 model_test_metadata[test_name] = ModelTestMetadata( 865 path=path, test_name=test_name, body=value 866 ) 867 868 return model_test_metadata 869 870 def load_model_tests(self) -> t.List[ModelTestMetadata]: 871 """Loads YAML-based model tests""" 872 test_meta_list: t.List[ModelTestMetadata] = [] 873 874 search_path = Path(self.config_path) / c.TESTS 875 876 for yaml_file in itertools.chain( 877 search_path.glob("**/test*.yaml"), 878 search_path.glob("**/test*.yml"), 879 ): 880 if any( 881 yaml_file.match(ignore_pattern) 882 for ignore_pattern in self.config.ignore_patterns or [] 883 ): 884 continue 885 886 test_meta_list.extend(self._load_model_test_file(yaml_file).values()) 887 888 return test_meta_list 889 890 class _Cache(CacheBase): 891 def __init__(self, loader: SqlMeshLoader, config_path: Path): 892 self._loader = loader 893 self.config_path = config_path 894 self._model_cache = ModelCache(self._loader.context.cache_dir) 895 896 def get_or_load_models( 897 self, target_path: Path, loader: t.Callable[[], t.List[Model]] 898 ) -> t.List[Model]: 899 models = self._model_cache.get_or_load( 900 self._cache_entry_name(target_path), 901 self._model_cache_entry_id(target_path), 902 loader=loader, 903 ) 904 905 for model in models: 906 model._path = target_path 907 908 return models 909 910 def put(self, models: t.List[Model], path: Path) -> bool: 911 return self._model_cache.put( 912 models, 913 self._cache_entry_name(path), 914 self._model_cache_entry_id(path), 915 ) 916 917 def get(self, path: Path) -> t.List[Model]: 918 models = self._model_cache.get( 919 self._cache_entry_name(path), 920 self._model_cache_entry_id(path), 921 ) 922 923 for model in models: 924 model._path = path 925 926 return models 927 928 def _cache_entry_name(self, target_path: Path) -> str: 929 return "__".join(target_path.relative_to(self.config_path).parts).replace( 930 target_path.suffix, "" 931 ) 932 933 def _model_cache_entry_id(self, model_path: Path) -> str: 934 mtimes = [ 935 self._loader._path_mtimes[model_path], 936 self._loader._macros_max_mtime, 937 self._loader._signals_max_mtime, 938 self._loader._audits_max_mtime, 939 self._loader._config_mtimes.get(self.config_path), 940 self._loader._config_mtimes.get(c.SQLMESH_PATH), 941 ] 942 return "__".join( 943 [ 944 str(max(m for m in mtimes if m is not None)), 945 self._loader.config.fingerprint, 946 # default catalog can change outside sqlmesh (e.g., DB user's 947 # default catalog), and it is retained in cached model's fully 948 # qualified name 949 self._loader.context.default_catalog or "", 950 # gateway is configurable, and it is retained in a cached 951 # model's python environment if the @gateway macro variable is 952 # used in the model 953 self._loader.context.gateway or self._loader.config.default_gateway_name, 954 ] 955 )
Loads macros and models for a context using the SQLMesh file formats
696 def load_materializations(self) -> None: 697 with sys_path(self.config_path): 698 self._load_materializations()
Loads custom materializations.
870 def load_model_tests(self) -> t.List[ModelTestMetadata]: 871 """Loads YAML-based model tests""" 872 test_meta_list: t.List[ModelTestMetadata] = [] 873 874 search_path = Path(self.config_path) / c.TESTS 875 876 for yaml_file in itertools.chain( 877 search_path.glob("**/test*.yaml"), 878 search_path.glob("**/test*.yml"), 879 ): 880 if any( 881 yaml_file.match(ignore_pattern) 882 for ignore_pattern in self.config.ignore_patterns or [] 883 ): 884 continue 885 886 test_meta_list.extend(self._load_model_test_file(yaml_file).values()) 887 888 return test_meta_list
Loads YAML-based model tests