Edit on GitHub

sqlmesh.core.loader

  1from __future__ import annotations
  2
  3import abc
  4import glob
  5import itertools
  6import linecache
  7import os
  8import re
  9import typing as t
 10from collections import Counter, defaultdict
 11from dataclasses import dataclass
 12from pathlib import Path
 13from pydantic import ValidationError
 14import concurrent.futures
 15
 16from sqlglot.errors import SqlglotError
 17from sqlglot import exp
 18from sqlglot.helper import subclasses
 19
 20from sqlmesh.core import constants as c
 21from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit, load_multiple_audits
 22from sqlmesh.core.console import Console
 23from sqlmesh.core.dialect import parse
 24from sqlmesh.core.environment import EnvironmentStatements
 25from sqlmesh.core.linter.rule import Rule
 26from sqlmesh.core.linter.definition import RuleSet
 27from sqlmesh.core.macros import MacroRegistry, macro
 28from sqlmesh.core.metric import Metric, MetricMeta, expand_metrics, load_metric_ddl
 29from sqlmesh.core.model import (
 30    Model,
 31    ModelCache,
 32    create_external_model,
 33    load_sql_based_models,
 34)
 35from sqlmesh.core.model import model as model_registry
 36from sqlmesh.core.model.common import make_python_env
 37from sqlmesh.core.signal import signal
 38from sqlmesh.core.test import ModelTestMetadata
 39from sqlmesh.utils import UniqueKeyDict, sys_path
 40from sqlmesh.utils.errors import ConfigError
 41from sqlmesh.utils.jinja import JinjaMacroRegistry, MacroExtractor
 42from sqlmesh.utils.metaprogramming import import_python_file
 43from sqlmesh.utils.pydantic import validation_error_message
 44from sqlmesh.utils.process import create_process_pool_executor
 45from sqlmesh.utils.yaml import YAML, load as yaml_load
 46
 47
 48if t.TYPE_CHECKING:
 49    from sqlmesh.core.context import GenericContext
 50
 51
 52GATEWAY_PATTERN = re.compile(r"gateway:\s*([^\s]+)")
 53
 54
 55@dataclass
 56class LoadedProject:
 57    macros: MacroRegistry
 58    jinja_macros: JinjaMacroRegistry
 59    models: UniqueKeyDict[str, Model]
 60    standalone_audits: UniqueKeyDict[str, StandaloneAudit]
 61    audits: UniqueKeyDict[str, ModelAudit]
 62    metrics: UniqueKeyDict[str, Metric]
 63    requirements: t.Dict[str, str]
 64    excluded_requirements: t.Set[str]
 65    environment_statements: t.List[EnvironmentStatements]
 66    user_rules: RuleSet
 67    model_test_metadata: t.List[ModelTestMetadata]
 68
 69
 70class CacheBase(abc.ABC):
 71    @abc.abstractmethod
 72    def get_or_load_models(
 73        self, target_path: Path, loader: t.Callable[[], t.List[Model]]
 74    ) -> t.List[Model]:
 75        """Get or load all models from cache."""
 76        pass
 77
 78    @abc.abstractmethod
 79    def put(self, models: t.List[Model], path: Path) -> bool:
 80        """Store models in the cache associated with the given path.
 81
 82        Args:
 83            models: List of models to cache
 84            path: File path to associate with the cached models
 85
 86        Returns:
 87            True if the models were successfully cached,
 88            False otherwise (empty list, not a list, unsupported model types)
 89        """
 90        pass
 91
 92    @abc.abstractmethod
 93    def get(self, path: Path) -> t.List[Model]:
 94        """Retrieve models from the cache for a given path.
 95
 96        Args:
 97            path: File path to look up in the cache
 98
 99        Returns:
100            List of cached models associated with the path, an empty list if no cache entry exists
101        """
102        pass
103
104
105_defaults: t.Optional[t.Dict[str, t.Any]] = None
106_cache: t.Optional[CacheBase] = None
107_config_essentials: t.Optional[t.Dict[str, t.Any]] = None
108_selected_gateway: t.Optional[str] = None
109
110
111def _init_model_defaults(
112    config_essentials: t.Dict[str, t.Any],
113    selected_gateway: t.Optional[str],
114    model_loading_defaults: t.Optional[t.Dict[str, t.Any]] = None,
115    cache: t.Optional[CacheBase] = None,
116    console: t.Optional[Console] = None,
117) -> None:
118    global _defaults, _cache, _config_essentials, _selected_gateway
119    _defaults = model_loading_defaults
120    _cache = cache
121    _config_essentials = config_essentials
122    _selected_gateway = selected_gateway
123
124    # Set the console passed from the parent process
125    if console is not None:
126        from sqlmesh.core.console import set_console
127
128        set_console(console)
129
130
131def load_sql_models(path: Path) -> t.List[Model]:
132    assert _defaults
133    assert _cache
134
135    with open(path, "r", encoding="utf-8") as file:
136        expressions = parse(file.read(), default_dialect=_defaults["dialect"])
137    models = load_sql_based_models(expressions, path=Path(path).absolute(), **_defaults)
138
139    return [] if _cache.put(models, path) else models
140
141
142def get_variables(gateway_name: t.Optional[str] = None) -> t.Dict[str, t.Any]:
143    assert _config_essentials
144
145    gateway_name = gateway_name or _selected_gateway
146
147    try:
148        gateway = _config_essentials["gateways"].get(gateway_name)
149    except ConfigError:
150        from sqlmesh.core.console import get_console
151
152        get_console().log_warning(
153            f"Gateway '{gateway_name}' not found in project '{_config_essentials['project']}'."
154        )
155        gateway = None
156
157    return {
158        **_config_essentials["variables"],
159        **(gateway.variables if gateway else {}),
160        c.GATEWAY: gateway_name,
161    }
162
163
164class Loader(abc.ABC):
165    """Abstract base class to load macros and models for a context"""
166
167    def __init__(self, context: GenericContext, path: Path) -> None:
168        # This ensures pandas is imported before any model loading happens in the forked process
169        # to avoid macOS fork() safety issues, see https://stackoverflow.com/a/52230415. Without
170        # it, the following error was observerd in a macOS 15.5 system:
171        #
172        # "+[NSMutableString initialize] may have been in progress in another thread when fork() was called."
173        import pandas as pd  # noqa
174
175        from sqlmesh.core.console import get_console
176
177        self._path_mtimes: t.Dict[Path, float] = {}
178        self.context = context
179        self.config_path = path
180        self.config = self.context.configs[self.config_path]
181        self._variables_by_gateway: t.Dict[str, t.Dict[str, t.Any]] = {}
182        self._console = get_console()
183
184        self.config_essentials = {
185            "project": self.config.project,
186            "variables": self.config.variables,
187            "gateways": self.config.gateways,
188        }
189        _init_model_defaults(self.config_essentials, self.context.selected_gateway)
190
191    def load(self) -> LoadedProject:
192        """
193        Loads all macros and models in the context's path.
194
195        Returns:
196            A loaded project object.
197        """
198        with sys_path(self.config_path):
199            # python files are cached by the system
200            # need to manually clear here so we can reload macros
201            linecache.clearcache()
202            self._path_mtimes.clear()
203
204            self._load_materializations()
205            signals = self._load_signals()
206
207            config_mtimes: t.Dict[Path, t.List[float]] = defaultdict(list)
208
209            for config_file in self.config_path.glob("config.*"):
210                self._track_file(config_file)
211                config_mtimes[self.config_path].append(self._path_mtimes[config_file])
212
213            for config_file in c.SQLMESH_PATH.glob("config.*"):
214                self._track_file(config_file)
215                config_mtimes[c.SQLMESH_PATH].append(self._path_mtimes[config_file])
216
217            self._config_mtimes = {path: max(mtimes) for path, mtimes in config_mtimes.items()}
218
219            macros, jinja_macros = self._load_scripts()
220            audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("audits")
221            standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
222                "standalone_audits"
223            )
224
225            for name, audit in self._load_audits(macros=macros, jinja_macros=jinja_macros).items():
226                if isinstance(audit, ModelAudit):
227                    audits[name] = audit
228                else:
229                    standalone_audits[name] = audit
230
231            models = self._load_models(
232                macros,
233                jinja_macros,
234                self.context.selected_gateway,
235                audits,
236                signals,
237            )
238
239            metrics = self._load_metrics()
240
241            requirements, excluded_requirements = self._load_requirements()
242
243            environment_statements = self._load_environment_statements(macros=macros)
244
245            user_rules = self._load_linting_rules()
246
247            model_test_metadata = self.load_model_tests()
248
249            project = LoadedProject(
250                macros=macros,
251                jinja_macros=jinja_macros,
252                models=models,
253                audits=audits,
254                standalone_audits=standalone_audits,
255                metrics=expand_metrics(metrics),
256                requirements=requirements,
257                excluded_requirements=excluded_requirements,
258                environment_statements=environment_statements,
259                user_rules=user_rules,
260                model_test_metadata=model_test_metadata,
261            )
262            return project
263
264    def reload_needed(self) -> bool:
265        """
266        Checks for any modifications to the files the macros and models depend on
267        since the last load.
268
269        Returns:
270            True if a modification is found; False otherwise
271        """
272        return any(
273            not path.exists() or path.stat().st_mtime > initial_mtime
274            for path, initial_mtime in self._path_mtimes.copy().items()
275        )
276
277    @abc.abstractmethod
278    def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]:
279        """Loads all user defined macros."""
280
281    @abc.abstractmethod
282    def _load_models(
283        self,
284        macros: MacroRegistry,
285        jinja_macros: JinjaMacroRegistry,
286        gateway: t.Optional[str],
287        audits: UniqueKeyDict[str, ModelAudit],
288        signals: UniqueKeyDict[str, signal],
289    ) -> UniqueKeyDict[str, Model]:
290        """Loads all models."""
291
292    @abc.abstractmethod
293    def _load_audits(
294        self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry
295    ) -> UniqueKeyDict[str, Audit]:
296        """Loads all audits."""
297
298    def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]:
299        """Loads environment statements."""
300        return []
301
302    def load_materializations(self) -> None:
303        """Loads custom materializations."""
304
305    def _load_materializations(self) -> None:
306        pass
307
308    def _load_signals(self) -> UniqueKeyDict[str, signal]:
309        return UniqueKeyDict("signals")
310
311    def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]:
312        return UniqueKeyDict("metrics")
313
314    def _load_external_models(
315        self,
316        audits: UniqueKeyDict[str, ModelAudit],
317        cache: CacheBase,
318        gateway: t.Optional[str] = None,
319    ) -> UniqueKeyDict[str, Model]:
320        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
321        external_models_yaml = Path(self.config_path / c.EXTERNAL_MODELS_YAML)
322        deprecated_yaml = Path(self.config_path / c.EXTERNAL_MODELS_DEPRECATED_YAML)
323        external_models_path = self.config_path / c.EXTERNAL_MODELS
324
325        paths_to_load = []
326        if external_models_yaml.exists():
327            paths_to_load.append(external_models_yaml)
328        elif deprecated_yaml.exists():
329            paths_to_load.append(deprecated_yaml)
330
331        if external_models_path.exists() and external_models_path.is_dir():
332            paths_to_load.extend(self._glob_paths(external_models_path, extension=".yaml"))
333
334        def _load(path: Path) -> t.List[Model]:
335            try:
336                with open(path, "r", encoding="utf-8") as file:
337                    yaml = YAML().load(file)
338                    # Allow empty YAML files to return an empty list
339                    if yaml is None:
340                        return []
341                    return [
342                        create_external_model(
343                            defaults=self.config.model_defaults.dict(),
344                            path=path,
345                            project=self.config.project,
346                            audit_definitions=audits,
347                            **{
348                                "dialect": self.config.model_defaults.dialect,
349                                "default_catalog": self.context.default_catalog,
350                                **row,
351                            },
352                        )
353                        for row in yaml
354                    ]
355            except Exception as ex:
356                raise ConfigError(self._failed_to_load_model_error(path, ex), path)
357
358        for path in paths_to_load:
359            self._track_file(path)
360
361            external_models = cache.get_or_load_models(path, lambda: _load(path))
362
363            # external models with no explicit gateway defined form the base set
364            for model in external_models:
365                if model.gateway is None:
366                    if model.fqn in models:
367                        raise ConfigError(
368                            self._failed_to_load_model_error(
369                                path, f"Duplicate external model name: '{model.name}'."
370                            ),
371                            path,
372                        )
373                    models[model.fqn] = model
374
375            # however, if there is a gateway defined, gateway-specific models take precedence
376            if gateway:
377                gateway = gateway.lower()
378                for model in external_models:
379                    if model.gateway == gateway:
380                        if model.fqn in models and models[model.fqn].gateway == gateway:
381                            raise ConfigError(
382                                self._failed_to_load_model_error(
383                                    path, f"Duplicate external model name: '{model.name}'."
384                                ),
385                                path,
386                            )
387                        models.update({model.fqn: model})
388
389        return models
390
391    def _load_requirements(self) -> t.Tuple[t.Dict[str, str], t.Set[str]]:
392        """Loads Python dependencies from the lock file.
393
394        Returns:
395            A tuple of requirements and excluded requirements.
396        """
397        requirements: t.Dict[str, str] = {}
398        excluded_requirements: t.Set[str] = set()
399
400        requirements_path = self.config_path / c.REQUIREMENTS
401        if requirements_path.is_file():
402            with open(requirements_path, "r", encoding="utf-8") as file:
403                for line in file:
404                    line = line.strip()
405                    if line.startswith("^"):
406                        excluded_requirements.add(line[1:])
407                        continue
408
409                    args = [k.strip() for k in line.split("==")]
410                    if len(args) != 2:
411                        raise ConfigError(
412                            f"Invalid lock file entry '{line.strip()}'. Only 'dep==ver' is supported",
413                            requirements_path,
414                        )
415                    dep, ver = args
416                    other_ver = requirements.get(dep, ver)
417                    if ver != other_ver:
418                        raise ConfigError(
419                            f"Conflicting requirement {dep}: {ver} != {other_ver}. Fix your {c.REQUIREMENTS} file.",
420                            requirements_path,
421                        )
422                    requirements[dep] = ver
423
424        return requirements, excluded_requirements
425
426    def _load_linting_rules(self) -> RuleSet:
427        """Loads user linting rules"""
428        return RuleSet()
429
430    def load_model_tests(self) -> t.List[ModelTestMetadata]:
431        """Loads YAML-based model tests"""
432        return []
433
434    def _glob_paths(
435        self,
436        path: Path,
437        ignore_patterns: t.Optional[t.List[str]] = None,
438        extension: t.Optional[str] = None,
439    ) -> t.Generator[Path, None, None]:
440        """
441        Globs the provided path for the file extension but also removes any filepaths that match an ignore
442        pattern either set in constants or provided in config
443
444        Args:
445            path: The filepath to glob
446            ignore_patterns: A list of patterns for glob to ignore
447            extension: The extension to check for in that path (checks recursively in zero or more subdirectories)
448
449        Returns:
450            Matched paths that are not ignored
451        """
452        ignore_patterns = ignore_patterns or []
453        extension = extension or ""
454
455        # We try to match both ignore_pattern itself and every file returned by glob,
456        # so that we will always ignore file names that do not appear in the latter.
457        ignored_filepaths = set(ignore_patterns) | {
458            ignored_path
459            for ignore_pattern in ignore_patterns
460            for ignored_path in glob.glob(str(self.config_path / ignore_pattern), recursive=True)
461        }
462        for filepath in path.glob(f"**/*{extension}"):
463            if any(filepath.match(ignored_filepath) for ignored_filepath in ignored_filepaths):
464                continue
465
466            yield filepath
467
468    def _track_file(self, path: Path) -> None:
469        """Project file to track for modifications"""
470        self._path_mtimes[path] = path.stat().st_mtime
471
472    def _failed_to_load_model_error(self, path: Path, error: t.Union[str, Exception]) -> str:
473        base_message = f"Failed to load model from file '{path}':"
474        if isinstance(error, ValidationError):
475            return validation_error_message(error, base_message)
476        # indent all lines of error message
477        error_message = str(error).replace("\n", "\n  ")
478        return f"{base_message}\n\n  {error_message}"
479
480
481class SqlMeshLoader(Loader):
482    """Loads macros and models for a context using the SQLMesh file formats"""
483
484    def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]:
485        """Loads all user defined macros."""
486        # Store a copy of the macro registry
487        standard_macros = macro.get_registry()
488        jinja_macros = JinjaMacroRegistry()
489        extractor = MacroExtractor()
490
491        macros_max_mtime: t.Optional[float] = None
492
493        for path in self._glob_paths(
494            self.config_path / c.MACROS,
495            ignore_patterns=self.config.ignore_patterns,
496            extension=".py",
497        ):
498            if import_python_file(path, self.config_path):
499                self._track_file(path)
500                macro_file_mtime = self._path_mtimes[path]
501                macros_max_mtime = (
502                    max(macros_max_mtime, macro_file_mtime)
503                    if macros_max_mtime
504                    else macro_file_mtime
505                )
506
507        for path in self._glob_paths(
508            self.config_path / c.MACROS,
509            ignore_patterns=self.config.ignore_patterns,
510            extension=".sql",
511        ):
512            self._track_file(path)
513            macro_file_mtime = self._path_mtimes[path]
514            macros_max_mtime = (
515                max(macros_max_mtime, macro_file_mtime) if macros_max_mtime else macro_file_mtime
516            )
517            with open(path, "r", encoding="utf-8") as file:
518                jinja_macros.add_macros(
519                    extractor.extract(file.read(), dialect=self.config.model_defaults.dialect)
520                )
521
522        self._macros_max_mtime = macros_max_mtime
523
524        macros = macro.get_registry()
525        macro.set_registry(standard_macros)
526
527        return macros, jinja_macros
528
529    def _load_models(
530        self,
531        macros: MacroRegistry,
532        jinja_macros: JinjaMacroRegistry,
533        gateway: t.Optional[str],
534        audits: UniqueKeyDict[str, ModelAudit],
535        signals: UniqueKeyDict[str, signal],
536    ) -> UniqueKeyDict[str, Model]:
537        """
538        Loads all of the models within the model directory with their associated
539        audits into a Dict and creates the dag
540        """
541        cache = SqlMeshLoader._Cache(self, self.config_path)
542
543        sql_models = self._load_sql_models(macros, jinja_macros, audits, signals, cache, gateway)
544        external_models = self._load_external_models(audits, cache, gateway)
545        python_models = self._load_python_models(macros, jinja_macros, audits, signals)
546
547        all_model_names = list(sql_models) + list(external_models) + list(python_models)
548        duplicates = [name for name, count in Counter(all_model_names).items() if count > 1]
549        if duplicates:
550            raise ConfigError(f"Duplicate model name(s) found: {', '.join(duplicates)}.")
551
552        return UniqueKeyDict("models", **sql_models, **external_models, **python_models)
553
554    def _load_sql_models(
555        self,
556        macros: MacroRegistry,
557        jinja_macros: JinjaMacroRegistry,
558        audits: UniqueKeyDict[str, ModelAudit],
559        signals: UniqueKeyDict[str, signal],
560        cache: CacheBase,
561        gateway: t.Optional[str],
562    ) -> UniqueKeyDict[str, Model]:
563        """Loads the sql models into a Dict"""
564        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
565        paths: t.Set[Path] = set()
566        cached_paths: UniqueKeyDict[Path, t.List[Model]] = UniqueKeyDict("cached_paths")
567
568        for path in self._glob_paths(
569            self.config_path / c.MODELS,
570            ignore_patterns=self.config.ignore_patterns,
571            extension=".sql",
572        ):
573            if not os.path.getsize(path):
574                continue
575
576            self._track_file(path)
577            paths.add(path)
578            if cached_models := cache.get(path):
579                cached_paths[path] = cached_models
580
581        for path, cached_models in cached_paths.items():
582            paths.remove(path)
583            for model in cached_models:
584                if model.enabled:
585                    models[model.fqn] = model
586
587        if paths:
588            model_loading_defaults = dict(
589                get_variables=get_variables,
590                defaults=self.config.model_defaults.dict(),
591                macros=macros,
592                jinja_macros=jinja_macros,
593                audit_definitions=audits,
594                module_path=self.config_path,
595                dialect=self.config.model_defaults.dialect,
596                time_column_format=self.config.time_column_format,
597                physical_schema_mapping=self.config.physical_schema_mapping,
598                project=self.config.project,
599                default_catalog=self.context.default_catalog,
600                infer_names=self.config.model_naming.infer_names,
601                signal_definitions=signals,
602                default_catalog_per_gateway=self.context.default_catalog_per_gateway,
603                virtual_environment_mode=self.config.virtual_environment_mode,
604            )
605
606            with create_process_pool_executor(
607                initializer=_init_model_defaults,
608                initargs=(
609                    self.config_essentials,
610                    gateway,
611                    model_loading_defaults,
612                    cache,
613                    self._console,
614                ),
615                max_workers=c.MAX_FORK_WORKERS,
616            ) as pool:
617                futures_to_paths = {pool.submit(load_sql_models, path): path for path in paths}
618                for future in concurrent.futures.as_completed(futures_to_paths):
619                    path = futures_to_paths[future]
620                    try:
621                        loaded = future.result()
622                        for model in loaded or cache.get(path):
623                            if model.fqn in models:
624                                raise ConfigError(
625                                    self._failed_to_load_model_error(
626                                        path, f"Duplicate SQL model name: '{model.name}'."
627                                    ),
628                                    path,
629                                )
630                            elif model.enabled:
631                                model._path = path
632                                models[model.fqn] = model
633                    except Exception as ex:
634                        raise ConfigError(self._failed_to_load_model_error(path, ex), path)
635
636        return models
637
638    def _load_python_models(
639        self,
640        macros: MacroRegistry,
641        jinja_macros: JinjaMacroRegistry,
642        audits: UniqueKeyDict[str, ModelAudit],
643        signals: UniqueKeyDict[str, signal],
644    ) -> UniqueKeyDict[str, Model]:
645        """Loads the python models into a Dict"""
646        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
647        registry = model_registry.registry()
648        registry.clear()
649        registered: t.Set[str] = set()
650
651        model_registry._dialect = self.config.model_defaults.dialect
652        try:
653            for path in self._glob_paths(
654                self.config_path / c.MODELS,
655                ignore_patterns=self.config.ignore_patterns,
656                extension=".py",
657            ):
658                if not os.path.getsize(path):
659                    continue
660
661                self._track_file(path)
662                try:
663                    import_python_file(path, self.config_path)
664                    new = registry.keys() - registered
665                    registered |= new
666                    for name in new:
667                        for model in registry[name].models(
668                            get_variables,
669                            path=path,
670                            module_path=self.config_path,
671                            defaults=self.config.model_defaults.dict(),
672                            macros=macros,
673                            jinja_macros=jinja_macros,
674                            dialect=self.config.model_defaults.dialect,
675                            time_column_format=self.config.time_column_format,
676                            physical_schema_mapping=self.config.physical_schema_mapping,
677                            project=self.config.project,
678                            default_catalog=self.context.default_catalog,
679                            infer_names=self.config.model_naming.infer_names,
680                            audit_definitions=audits,
681                            signal_definitions=signals,
682                            default_catalog_per_gateway=self.context.default_catalog_per_gateway,
683                            virtual_environment_mode=self.config.virtual_environment_mode,
684                        ):
685                            if model.enabled:
686                                models[model.fqn] = model
687                except Exception as ex:
688                    raise ConfigError(self._failed_to_load_model_error(path, ex), path)
689
690        finally:
691            model_registry._dialect = None
692
693        return models
694
695    def load_materializations(self) -> None:
696        with sys_path(self.config_path):
697            self._load_materializations()
698
699    def _load_materializations(self) -> None:
700        for path in self._glob_paths(
701            self.config_path / c.MATERIALIZATIONS,
702            ignore_patterns=self.config.ignore_patterns,
703            extension=".py",
704        ):
705            if os.path.getsize(path):
706                import_python_file(path, self.config_path)
707
708    def _load_signals(self) -> UniqueKeyDict[str, signal]:
709        """Loads signals for the built-in scheduler."""
710
711        base_signals = signal.get_registry()
712
713        signals_max_mtime: t.Optional[float] = None
714
715        for path in self._glob_paths(
716            self.config_path / c.SIGNALS,
717            ignore_patterns=self.config.ignore_patterns,
718            extension=".py",
719        ):
720            if os.path.getsize(path):
721                self._track_file(path)
722                signal_file_mtime = self._path_mtimes[path]
723                signals_max_mtime = (
724                    max(signals_max_mtime, signal_file_mtime)
725                    if signals_max_mtime
726                    else signal_file_mtime
727                )
728                import_python_file(path, self.config_path)
729
730        self._signals_max_mtime = signals_max_mtime
731
732        signals = signal.get_registry()
733        signal.set_registry(base_signals)
734
735        return signals
736
737    def _load_audits(
738        self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry
739    ) -> UniqueKeyDict[str, Audit]:
740        """Loads all the model audits."""
741        audits_by_name: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
742        audits_max_mtime: t.Optional[float] = None
743        variables = get_variables()
744
745        for path in self._glob_paths(
746            self.config_path / c.AUDITS,
747            ignore_patterns=self.config.ignore_patterns,
748            extension=".sql",
749        ):
750            self._track_file(path)
751            with open(path, "r", encoding="utf-8") as file:
752                audits_file_mtime = self._path_mtimes[path]
753                audits_max_mtime = (
754                    max(audits_max_mtime, audits_file_mtime)
755                    if audits_max_mtime
756                    else audits_file_mtime
757                )
758                expressions = parse(file.read(), default_dialect=self.config.model_defaults.dialect)
759                audits = load_multiple_audits(
760                    expressions=expressions,
761                    path=path,
762                    module_path=self.config_path,
763                    macros=macros,
764                    jinja_macros=jinja_macros,
765                    dialect=self.config.model_defaults.dialect,
766                    default_catalog=self.context.default_catalog,
767                    variables=variables,
768                    project=self.config.project,
769                )
770                for audit in audits:
771                    audits_by_name[audit.name] = audit
772
773        self._audits_max_mtime = audits_max_mtime
774
775        return audits_by_name
776
777    def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]:
778        """Loads all metrics."""
779        metrics: UniqueKeyDict[str, MetricMeta] = UniqueKeyDict("metrics")
780
781        for path in self._glob_paths(
782            self.config_path / c.METRICS,
783            ignore_patterns=self.config.ignore_patterns,
784            extension=".sql",
785        ):
786            if not os.path.getsize(path):
787                continue
788            self._track_file(path)
789
790            with open(path, "r", encoding="utf-8") as file:
791                dialect = self.config.model_defaults.dialect
792                try:
793                    for expression in parse(file.read(), default_dialect=dialect):
794                        metric = load_metric_ddl(expression, path=path, dialect=dialect)
795                        metrics[metric.name] = metric
796                except SqlglotError as ex:
797                    raise ConfigError(
798                        f"Failed to parse metric definitions at '{path}': {ex}.", path
799                    )
800
801        return metrics
802
803    def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]:
804        """Loads environment statements."""
805
806        if self.config.before_all or self.config.after_all:
807            statements = {
808                "before_all": self.config.before_all or [],
809                "after_all": self.config.after_all or [],
810            }
811            dialect = self.config.model_defaults.dialect
812            python_env = make_python_env(
813                [
814                    exp.maybe_parse(stmt, dialect=dialect)
815                    for stmts in statements.values()
816                    for stmt in stmts
817                ],
818                module_path=self.config_path,
819                jinja_macro_references=None,
820                macros=macros,
821                variables=get_variables(),
822                path=self.config_path,
823            )
824
825            return [
826                EnvironmentStatements(
827                    **statements, python_env=python_env, project=self.config.project or None
828                )
829            ]
830        return []
831
832    def _load_linting_rules(self) -> RuleSet:
833        user_rules: UniqueKeyDict[str, type[Rule]] = UniqueKeyDict("rules")
834
835        for path in self._glob_paths(
836            self.config_path / c.LINTER,
837            ignore_patterns=self.config.ignore_patterns,
838            extension=".py",
839        ):
840            if os.path.getsize(path):
841                self._track_file(path)
842                module = import_python_file(path, self.config_path)
843                module_rules = subclasses(module.__name__, Rule, exclude={Rule})
844                for user_rule in module_rules:
845                    user_rules[user_rule.name] = user_rule
846
847        return RuleSet(user_rules.values())
848
849    def _load_model_test_file(self, path: Path) -> dict[str, ModelTestMetadata]:
850        """Load a single model test file."""
851        model_test_metadata = {}
852
853        with open(path, "r", encoding="utf-8") as file:
854            source = file.read()
855            # If the user has specified a quoted/escaped gateway (e.g. "gateway: 'ma\tin'"), we need to
856            # parse it as YAML to match the gateway name stored in the config
857            gateway_line = GATEWAY_PATTERN.search(source)
858            gateway = YAML().load(gateway_line.group(0))["gateway"] if gateway_line else None
859
860        contents = yaml_load(source, variables=get_variables(gateway))
861
862        for test_name, value in contents.items():
863            model_test_metadata[test_name] = ModelTestMetadata(
864                path=path, test_name=test_name, body=value
865            )
866
867        return model_test_metadata
868
869    def load_model_tests(self) -> t.List[ModelTestMetadata]:
870        """Loads YAML-based model tests"""
871        test_meta_list: t.List[ModelTestMetadata] = []
872
873        search_path = Path(self.config_path) / c.TESTS
874
875        for yaml_file in itertools.chain(
876            search_path.glob("**/test*.yaml"),
877            search_path.glob("**/test*.yml"),
878        ):
879            if any(
880                yaml_file.match(ignore_pattern)
881                for ignore_pattern in self.config.ignore_patterns or []
882            ):
883                continue
884
885            test_meta_list.extend(self._load_model_test_file(yaml_file).values())
886
887        return test_meta_list
888
889    class _Cache(CacheBase):
890        def __init__(self, loader: SqlMeshLoader, config_path: Path):
891            self._loader = loader
892            self.config_path = config_path
893            self._model_cache = ModelCache(self._loader.context.cache_dir)
894
895        def get_or_load_models(
896            self, target_path: Path, loader: t.Callable[[], t.List[Model]]
897        ) -> t.List[Model]:
898            models = self._model_cache.get_or_load(
899                self._cache_entry_name(target_path),
900                self._model_cache_entry_id(target_path),
901                loader=loader,
902            )
903
904            for model in models:
905                model._path = target_path
906
907            return models
908
909        def put(self, models: t.List[Model], path: Path) -> bool:
910            return self._model_cache.put(
911                models,
912                self._cache_entry_name(path),
913                self._model_cache_entry_id(path),
914            )
915
916        def get(self, path: Path) -> t.List[Model]:
917            models = self._model_cache.get(
918                self._cache_entry_name(path),
919                self._model_cache_entry_id(path),
920            )
921
922            for model in models:
923                model._path = path
924
925            return models
926
927        def _cache_entry_name(self, target_path: Path) -> str:
928            return "__".join(target_path.relative_to(self.config_path).parts).replace(
929                target_path.suffix, ""
930            )
931
932        def _model_cache_entry_id(self, model_path: Path) -> str:
933            mtimes = [
934                self._loader._path_mtimes[model_path],
935                self._loader._macros_max_mtime,
936                self._loader._signals_max_mtime,
937                self._loader._audits_max_mtime,
938                self._loader._config_mtimes.get(self.config_path),
939                self._loader._config_mtimes.get(c.SQLMESH_PATH),
940            ]
941            return "__".join(
942                [
943                    str(max(m for m in mtimes if m is not None)),
944                    self._loader.config.fingerprint,
945                    # default catalog can change outside sqlmesh (e.g., DB user's
946                    # default catalog), and it is retained in cached model's fully
947                    # qualified name
948                    self._loader.context.default_catalog or "",
949                    # gateway is configurable, and it is retained in a cached
950                    # model's python environment if the @gateway macro variable is
951                    # used in the model
952                    self._loader.context.gateway or self._loader.config.default_gateway_name,
953                ]
954            )
GATEWAY_PATTERN = re.compile('gateway:\\s*([^\\s]+)')
@dataclass
class LoadedProject:
56@dataclass
57class LoadedProject:
58    macros: MacroRegistry
59    jinja_macros: JinjaMacroRegistry
60    models: UniqueKeyDict[str, Model]
61    standalone_audits: UniqueKeyDict[str, StandaloneAudit]
62    audits: UniqueKeyDict[str, ModelAudit]
63    metrics: UniqueKeyDict[str, Metric]
64    requirements: t.Dict[str, str]
65    excluded_requirements: t.Set[str]
66    environment_statements: t.List[EnvironmentStatements]
67    user_rules: RuleSet
68    model_test_metadata: t.List[ModelTestMetadata]
requirements: Dict[str, str]
excluded_requirements: Set[str]
environment_statements: List[sqlmesh.core.environment.EnvironmentStatements]
class CacheBase(abc.ABC):
 71class CacheBase(abc.ABC):
 72    @abc.abstractmethod
 73    def get_or_load_models(
 74        self, target_path: Path, loader: t.Callable[[], t.List[Model]]
 75    ) -> t.List[Model]:
 76        """Get or load all models from cache."""
 77        pass
 78
 79    @abc.abstractmethod
 80    def put(self, models: t.List[Model], path: Path) -> bool:
 81        """Store models in the cache associated with the given path.
 82
 83        Args:
 84            models: List of models to cache
 85            path: File path to associate with the cached models
 86
 87        Returns:
 88            True if the models were successfully cached,
 89            False otherwise (empty list, not a list, unsupported model types)
 90        """
 91        pass
 92
 93    @abc.abstractmethod
 94    def get(self, path: Path) -> t.List[Model]:
 95        """Retrieve models from the cache for a given path.
 96
 97        Args:
 98            path: File path to look up in the cache
 99
100        Returns:
101            List of cached models associated with the path, an empty list if no cache entry exists
102        """
103        pass

Helper class that provides a standard way to create an ABC using inheritance.

72    @abc.abstractmethod
73    def get_or_load_models(
74        self, target_path: Path, loader: t.Callable[[], t.List[Model]]
75    ) -> t.List[Model]:
76        """Get or load all models from cache."""
77        pass

Get or load all models from cache.

79    @abc.abstractmethod
80    def put(self, models: t.List[Model], path: Path) -> bool:
81        """Store models in the cache associated with the given path.
82
83        Args:
84            models: List of models to cache
85            path: File path to associate with the cached models
86
87        Returns:
88            True if the models were successfully cached,
89            False otherwise (empty list, not a list, unsupported model types)
90        """
91        pass

Store models in the cache associated with the given path.

Arguments:
  • models: List of models to cache
  • path: File path to associate with the cached models
Returns:

True if the models were successfully cached, False otherwise (empty list, not a list, unsupported model types)

 93    @abc.abstractmethod
 94    def get(self, path: Path) -> t.List[Model]:
 95        """Retrieve models from the cache for a given path.
 96
 97        Args:
 98            path: File path to look up in the cache
 99
100        Returns:
101            List of cached models associated with the path, an empty list if no cache entry exists
102        """
103        pass

Retrieve models from the cache for a given path.

Arguments:
  • path: File path to look up in the cache
Returns:

List of cached models associated with the path, an empty list if no cache entry exists

132def load_sql_models(path: Path) -> t.List[Model]:
133    assert _defaults
134    assert _cache
135
136    with open(path, "r", encoding="utf-8") as file:
137        expressions = parse(file.read(), default_dialect=_defaults["dialect"])
138    models = load_sql_based_models(expressions, path=Path(path).absolute(), **_defaults)
139
140    return [] if _cache.put(models, path) else models
def get_variables(gateway_name: Optional[str] = None) -> Dict[str, Any]:
143def get_variables(gateway_name: t.Optional[str] = None) -> t.Dict[str, t.Any]:
144    assert _config_essentials
145
146    gateway_name = gateway_name or _selected_gateway
147
148    try:
149        gateway = _config_essentials["gateways"].get(gateway_name)
150    except ConfigError:
151        from sqlmesh.core.console import get_console
152
153        get_console().log_warning(
154            f"Gateway '{gateway_name}' not found in project '{_config_essentials['project']}'."
155        )
156        gateway = None
157
158    return {
159        **_config_essentials["variables"],
160        **(gateway.variables if gateway else {}),
161        c.GATEWAY: gateway_name,
162    }
class Loader(abc.ABC):
165class Loader(abc.ABC):
166    """Abstract base class to load macros and models for a context"""
167
168    def __init__(self, context: GenericContext, path: Path) -> None:
169        # This ensures pandas is imported before any model loading happens in the forked process
170        # to avoid macOS fork() safety issues, see https://stackoverflow.com/a/52230415. Without
171        # it, the following error was observerd in a macOS 15.5 system:
172        #
173        # "+[NSMutableString initialize] may have been in progress in another thread when fork() was called."
174        import pandas as pd  # noqa
175
176        from sqlmesh.core.console import get_console
177
178        self._path_mtimes: t.Dict[Path, float] = {}
179        self.context = context
180        self.config_path = path
181        self.config = self.context.configs[self.config_path]
182        self._variables_by_gateway: t.Dict[str, t.Dict[str, t.Any]] = {}
183        self._console = get_console()
184
185        self.config_essentials = {
186            "project": self.config.project,
187            "variables": self.config.variables,
188            "gateways": self.config.gateways,
189        }
190        _init_model_defaults(self.config_essentials, self.context.selected_gateway)
191
192    def load(self) -> LoadedProject:
193        """
194        Loads all macros and models in the context's path.
195
196        Returns:
197            A loaded project object.
198        """
199        with sys_path(self.config_path):
200            # python files are cached by the system
201            # need to manually clear here so we can reload macros
202            linecache.clearcache()
203            self._path_mtimes.clear()
204
205            self._load_materializations()
206            signals = self._load_signals()
207
208            config_mtimes: t.Dict[Path, t.List[float]] = defaultdict(list)
209
210            for config_file in self.config_path.glob("config.*"):
211                self._track_file(config_file)
212                config_mtimes[self.config_path].append(self._path_mtimes[config_file])
213
214            for config_file in c.SQLMESH_PATH.glob("config.*"):
215                self._track_file(config_file)
216                config_mtimes[c.SQLMESH_PATH].append(self._path_mtimes[config_file])
217
218            self._config_mtimes = {path: max(mtimes) for path, mtimes in config_mtimes.items()}
219
220            macros, jinja_macros = self._load_scripts()
221            audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("audits")
222            standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
223                "standalone_audits"
224            )
225
226            for name, audit in self._load_audits(macros=macros, jinja_macros=jinja_macros).items():
227                if isinstance(audit, ModelAudit):
228                    audits[name] = audit
229                else:
230                    standalone_audits[name] = audit
231
232            models = self._load_models(
233                macros,
234                jinja_macros,
235                self.context.selected_gateway,
236                audits,
237                signals,
238            )
239
240            metrics = self._load_metrics()
241
242            requirements, excluded_requirements = self._load_requirements()
243
244            environment_statements = self._load_environment_statements(macros=macros)
245
246            user_rules = self._load_linting_rules()
247
248            model_test_metadata = self.load_model_tests()
249
250            project = LoadedProject(
251                macros=macros,
252                jinja_macros=jinja_macros,
253                models=models,
254                audits=audits,
255                standalone_audits=standalone_audits,
256                metrics=expand_metrics(metrics),
257                requirements=requirements,
258                excluded_requirements=excluded_requirements,
259                environment_statements=environment_statements,
260                user_rules=user_rules,
261                model_test_metadata=model_test_metadata,
262            )
263            return project
264
265    def reload_needed(self) -> bool:
266        """
267        Checks for any modifications to the files the macros and models depend on
268        since the last load.
269
270        Returns:
271            True if a modification is found; False otherwise
272        """
273        return any(
274            not path.exists() or path.stat().st_mtime > initial_mtime
275            for path, initial_mtime in self._path_mtimes.copy().items()
276        )
277
278    @abc.abstractmethod
279    def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]:
280        """Loads all user defined macros."""
281
282    @abc.abstractmethod
283    def _load_models(
284        self,
285        macros: MacroRegistry,
286        jinja_macros: JinjaMacroRegistry,
287        gateway: t.Optional[str],
288        audits: UniqueKeyDict[str, ModelAudit],
289        signals: UniqueKeyDict[str, signal],
290    ) -> UniqueKeyDict[str, Model]:
291        """Loads all models."""
292
293    @abc.abstractmethod
294    def _load_audits(
295        self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry
296    ) -> UniqueKeyDict[str, Audit]:
297        """Loads all audits."""
298
299    def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]:
300        """Loads environment statements."""
301        return []
302
303    def load_materializations(self) -> None:
304        """Loads custom materializations."""
305
306    def _load_materializations(self) -> None:
307        pass
308
309    def _load_signals(self) -> UniqueKeyDict[str, signal]:
310        return UniqueKeyDict("signals")
311
312    def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]:
313        return UniqueKeyDict("metrics")
314
315    def _load_external_models(
316        self,
317        audits: UniqueKeyDict[str, ModelAudit],
318        cache: CacheBase,
319        gateway: t.Optional[str] = None,
320    ) -> UniqueKeyDict[str, Model]:
321        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
322        external_models_yaml = Path(self.config_path / c.EXTERNAL_MODELS_YAML)
323        deprecated_yaml = Path(self.config_path / c.EXTERNAL_MODELS_DEPRECATED_YAML)
324        external_models_path = self.config_path / c.EXTERNAL_MODELS
325
326        paths_to_load = []
327        if external_models_yaml.exists():
328            paths_to_load.append(external_models_yaml)
329        elif deprecated_yaml.exists():
330            paths_to_load.append(deprecated_yaml)
331
332        if external_models_path.exists() and external_models_path.is_dir():
333            paths_to_load.extend(self._glob_paths(external_models_path, extension=".yaml"))
334
335        def _load(path: Path) -> t.List[Model]:
336            try:
337                with open(path, "r", encoding="utf-8") as file:
338                    yaml = YAML().load(file)
339                    # Allow empty YAML files to return an empty list
340                    if yaml is None:
341                        return []
342                    return [
343                        create_external_model(
344                            defaults=self.config.model_defaults.dict(),
345                            path=path,
346                            project=self.config.project,
347                            audit_definitions=audits,
348                            **{
349                                "dialect": self.config.model_defaults.dialect,
350                                "default_catalog": self.context.default_catalog,
351                                **row,
352                            },
353                        )
354                        for row in yaml
355                    ]
356            except Exception as ex:
357                raise ConfigError(self._failed_to_load_model_error(path, ex), path)
358
359        for path in paths_to_load:
360            self._track_file(path)
361
362            external_models = cache.get_or_load_models(path, lambda: _load(path))
363
364            # external models with no explicit gateway defined form the base set
365            for model in external_models:
366                if model.gateway is None:
367                    if model.fqn in models:
368                        raise ConfigError(
369                            self._failed_to_load_model_error(
370                                path, f"Duplicate external model name: '{model.name}'."
371                            ),
372                            path,
373                        )
374                    models[model.fqn] = model
375
376            # however, if there is a gateway defined, gateway-specific models take precedence
377            if gateway:
378                gateway = gateway.lower()
379                for model in external_models:
380                    if model.gateway == gateway:
381                        if model.fqn in models and models[model.fqn].gateway == gateway:
382                            raise ConfigError(
383                                self._failed_to_load_model_error(
384                                    path, f"Duplicate external model name: '{model.name}'."
385                                ),
386                                path,
387                            )
388                        models.update({model.fqn: model})
389
390        return models
391
392    def _load_requirements(self) -> t.Tuple[t.Dict[str, str], t.Set[str]]:
393        """Loads Python dependencies from the lock file.
394
395        Returns:
396            A tuple of requirements and excluded requirements.
397        """
398        requirements: t.Dict[str, str] = {}
399        excluded_requirements: t.Set[str] = set()
400
401        requirements_path = self.config_path / c.REQUIREMENTS
402        if requirements_path.is_file():
403            with open(requirements_path, "r", encoding="utf-8") as file:
404                for line in file:
405                    line = line.strip()
406                    if line.startswith("^"):
407                        excluded_requirements.add(line[1:])
408                        continue
409
410                    args = [k.strip() for k in line.split("==")]
411                    if len(args) != 2:
412                        raise ConfigError(
413                            f"Invalid lock file entry '{line.strip()}'. Only 'dep==ver' is supported",
414                            requirements_path,
415                        )
416                    dep, ver = args
417                    other_ver = requirements.get(dep, ver)
418                    if ver != other_ver:
419                        raise ConfigError(
420                            f"Conflicting requirement {dep}: {ver} != {other_ver}. Fix your {c.REQUIREMENTS} file.",
421                            requirements_path,
422                        )
423                    requirements[dep] = ver
424
425        return requirements, excluded_requirements
426
427    def _load_linting_rules(self) -> RuleSet:
428        """Loads user linting rules"""
429        return RuleSet()
430
431    def load_model_tests(self) -> t.List[ModelTestMetadata]:
432        """Loads YAML-based model tests"""
433        return []
434
435    def _glob_paths(
436        self,
437        path: Path,
438        ignore_patterns: t.Optional[t.List[str]] = None,
439        extension: t.Optional[str] = None,
440    ) -> t.Generator[Path, None, None]:
441        """
442        Globs the provided path for the file extension but also removes any filepaths that match an ignore
443        pattern either set in constants or provided in config
444
445        Args:
446            path: The filepath to glob
447            ignore_patterns: A list of patterns for glob to ignore
448            extension: The extension to check for in that path (checks recursively in zero or more subdirectories)
449
450        Returns:
451            Matched paths that are not ignored
452        """
453        ignore_patterns = ignore_patterns or []
454        extension = extension or ""
455
456        # We try to match both ignore_pattern itself and every file returned by glob,
457        # so that we will always ignore file names that do not appear in the latter.
458        ignored_filepaths = set(ignore_patterns) | {
459            ignored_path
460            for ignore_pattern in ignore_patterns
461            for ignored_path in glob.glob(str(self.config_path / ignore_pattern), recursive=True)
462        }
463        for filepath in path.glob(f"**/*{extension}"):
464            if any(filepath.match(ignored_filepath) for ignored_filepath in ignored_filepaths):
465                continue
466
467            yield filepath
468
469    def _track_file(self, path: Path) -> None:
470        """Project file to track for modifications"""
471        self._path_mtimes[path] = path.stat().st_mtime
472
473    def _failed_to_load_model_error(self, path: Path, error: t.Union[str, Exception]) -> str:
474        base_message = f"Failed to load model from file '{path}':"
475        if isinstance(error, ValidationError):
476            return validation_error_message(error, base_message)
477        # indent all lines of error message
478        error_message = str(error).replace("\n", "\n  ")
479        return f"{base_message}\n\n  {error_message}"

Abstract base class to load macros and models for a context

context
config_path
config
config_essentials
def load(self) -> LoadedProject:
192    def load(self) -> LoadedProject:
193        """
194        Loads all macros and models in the context's path.
195
196        Returns:
197            A loaded project object.
198        """
199        with sys_path(self.config_path):
200            # python files are cached by the system
201            # need to manually clear here so we can reload macros
202            linecache.clearcache()
203            self._path_mtimes.clear()
204
205            self._load_materializations()
206            signals = self._load_signals()
207
208            config_mtimes: t.Dict[Path, t.List[float]] = defaultdict(list)
209
210            for config_file in self.config_path.glob("config.*"):
211                self._track_file(config_file)
212                config_mtimes[self.config_path].append(self._path_mtimes[config_file])
213
214            for config_file in c.SQLMESH_PATH.glob("config.*"):
215                self._track_file(config_file)
216                config_mtimes[c.SQLMESH_PATH].append(self._path_mtimes[config_file])
217
218            self._config_mtimes = {path: max(mtimes) for path, mtimes in config_mtimes.items()}
219
220            macros, jinja_macros = self._load_scripts()
221            audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("audits")
222            standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
223                "standalone_audits"
224            )
225
226            for name, audit in self._load_audits(macros=macros, jinja_macros=jinja_macros).items():
227                if isinstance(audit, ModelAudit):
228                    audits[name] = audit
229                else:
230                    standalone_audits[name] = audit
231
232            models = self._load_models(
233                macros,
234                jinja_macros,
235                self.context.selected_gateway,
236                audits,
237                signals,
238            )
239
240            metrics = self._load_metrics()
241
242            requirements, excluded_requirements = self._load_requirements()
243
244            environment_statements = self._load_environment_statements(macros=macros)
245
246            user_rules = self._load_linting_rules()
247
248            model_test_metadata = self.load_model_tests()
249
250            project = LoadedProject(
251                macros=macros,
252                jinja_macros=jinja_macros,
253                models=models,
254                audits=audits,
255                standalone_audits=standalone_audits,
256                metrics=expand_metrics(metrics),
257                requirements=requirements,
258                excluded_requirements=excluded_requirements,
259                environment_statements=environment_statements,
260                user_rules=user_rules,
261                model_test_metadata=model_test_metadata,
262            )
263            return project

Loads all macros and models in the context's path.

Returns:

A loaded project object.

def reload_needed(self) -> bool:
265    def reload_needed(self) -> bool:
266        """
267        Checks for any modifications to the files the macros and models depend on
268        since the last load.
269
270        Returns:
271            True if a modification is found; False otherwise
272        """
273        return any(
274            not path.exists() or path.stat().st_mtime > initial_mtime
275            for path, initial_mtime in self._path_mtimes.copy().items()
276        )

Checks for any modifications to the files the macros and models depend on since the last load.

Returns:

True if a modification is found; False otherwise

def load_materializations(self) -> None:
303    def load_materializations(self) -> None:
304        """Loads custom materializations."""

Loads custom materializations.

def load_model_tests(self) -> List[sqlmesh.core.test.discovery.ModelTestMetadata]:
431    def load_model_tests(self) -> t.List[ModelTestMetadata]:
432        """Loads YAML-based model tests"""
433        return []

Loads YAML-based model tests

class SqlMeshLoader(Loader):
482class SqlMeshLoader(Loader):
483    """Loads macros and models for a context using the SQLMesh file formats"""
484
485    def _load_scripts(self) -> t.Tuple[MacroRegistry, JinjaMacroRegistry]:
486        """Loads all user defined macros."""
487        # Store a copy of the macro registry
488        standard_macros = macro.get_registry()
489        jinja_macros = JinjaMacroRegistry()
490        extractor = MacroExtractor()
491
492        macros_max_mtime: t.Optional[float] = None
493
494        for path in self._glob_paths(
495            self.config_path / c.MACROS,
496            ignore_patterns=self.config.ignore_patterns,
497            extension=".py",
498        ):
499            if import_python_file(path, self.config_path):
500                self._track_file(path)
501                macro_file_mtime = self._path_mtimes[path]
502                macros_max_mtime = (
503                    max(macros_max_mtime, macro_file_mtime)
504                    if macros_max_mtime
505                    else macro_file_mtime
506                )
507
508        for path in self._glob_paths(
509            self.config_path / c.MACROS,
510            ignore_patterns=self.config.ignore_patterns,
511            extension=".sql",
512        ):
513            self._track_file(path)
514            macro_file_mtime = self._path_mtimes[path]
515            macros_max_mtime = (
516                max(macros_max_mtime, macro_file_mtime) if macros_max_mtime else macro_file_mtime
517            )
518            with open(path, "r", encoding="utf-8") as file:
519                jinja_macros.add_macros(
520                    extractor.extract(file.read(), dialect=self.config.model_defaults.dialect)
521                )
522
523        self._macros_max_mtime = macros_max_mtime
524
525        macros = macro.get_registry()
526        macro.set_registry(standard_macros)
527
528        return macros, jinja_macros
529
530    def _load_models(
531        self,
532        macros: MacroRegistry,
533        jinja_macros: JinjaMacroRegistry,
534        gateway: t.Optional[str],
535        audits: UniqueKeyDict[str, ModelAudit],
536        signals: UniqueKeyDict[str, signal],
537    ) -> UniqueKeyDict[str, Model]:
538        """
539        Loads all of the models within the model directory with their associated
540        audits into a Dict and creates the dag
541        """
542        cache = SqlMeshLoader._Cache(self, self.config_path)
543
544        sql_models = self._load_sql_models(macros, jinja_macros, audits, signals, cache, gateway)
545        external_models = self._load_external_models(audits, cache, gateway)
546        python_models = self._load_python_models(macros, jinja_macros, audits, signals)
547
548        all_model_names = list(sql_models) + list(external_models) + list(python_models)
549        duplicates = [name for name, count in Counter(all_model_names).items() if count > 1]
550        if duplicates:
551            raise ConfigError(f"Duplicate model name(s) found: {', '.join(duplicates)}.")
552
553        return UniqueKeyDict("models", **sql_models, **external_models, **python_models)
554
555    def _load_sql_models(
556        self,
557        macros: MacroRegistry,
558        jinja_macros: JinjaMacroRegistry,
559        audits: UniqueKeyDict[str, ModelAudit],
560        signals: UniqueKeyDict[str, signal],
561        cache: CacheBase,
562        gateway: t.Optional[str],
563    ) -> UniqueKeyDict[str, Model]:
564        """Loads the sql models into a Dict"""
565        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
566        paths: t.Set[Path] = set()
567        cached_paths: UniqueKeyDict[Path, t.List[Model]] = UniqueKeyDict("cached_paths")
568
569        for path in self._glob_paths(
570            self.config_path / c.MODELS,
571            ignore_patterns=self.config.ignore_patterns,
572            extension=".sql",
573        ):
574            if not os.path.getsize(path):
575                continue
576
577            self._track_file(path)
578            paths.add(path)
579            if cached_models := cache.get(path):
580                cached_paths[path] = cached_models
581
582        for path, cached_models in cached_paths.items():
583            paths.remove(path)
584            for model in cached_models:
585                if model.enabled:
586                    models[model.fqn] = model
587
588        if paths:
589            model_loading_defaults = dict(
590                get_variables=get_variables,
591                defaults=self.config.model_defaults.dict(),
592                macros=macros,
593                jinja_macros=jinja_macros,
594                audit_definitions=audits,
595                module_path=self.config_path,
596                dialect=self.config.model_defaults.dialect,
597                time_column_format=self.config.time_column_format,
598                physical_schema_mapping=self.config.physical_schema_mapping,
599                project=self.config.project,
600                default_catalog=self.context.default_catalog,
601                infer_names=self.config.model_naming.infer_names,
602                signal_definitions=signals,
603                default_catalog_per_gateway=self.context.default_catalog_per_gateway,
604                virtual_environment_mode=self.config.virtual_environment_mode,
605            )
606
607            with create_process_pool_executor(
608                initializer=_init_model_defaults,
609                initargs=(
610                    self.config_essentials,
611                    gateway,
612                    model_loading_defaults,
613                    cache,
614                    self._console,
615                ),
616                max_workers=c.MAX_FORK_WORKERS,
617            ) as pool:
618                futures_to_paths = {pool.submit(load_sql_models, path): path for path in paths}
619                for future in concurrent.futures.as_completed(futures_to_paths):
620                    path = futures_to_paths[future]
621                    try:
622                        loaded = future.result()
623                        for model in loaded or cache.get(path):
624                            if model.fqn in models:
625                                raise ConfigError(
626                                    self._failed_to_load_model_error(
627                                        path, f"Duplicate SQL model name: '{model.name}'."
628                                    ),
629                                    path,
630                                )
631                            elif model.enabled:
632                                model._path = path
633                                models[model.fqn] = model
634                    except Exception as ex:
635                        raise ConfigError(self._failed_to_load_model_error(path, ex), path)
636
637        return models
638
639    def _load_python_models(
640        self,
641        macros: MacroRegistry,
642        jinja_macros: JinjaMacroRegistry,
643        audits: UniqueKeyDict[str, ModelAudit],
644        signals: UniqueKeyDict[str, signal],
645    ) -> UniqueKeyDict[str, Model]:
646        """Loads the python models into a Dict"""
647        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
648        registry = model_registry.registry()
649        registry.clear()
650        registered: t.Set[str] = set()
651
652        model_registry._dialect = self.config.model_defaults.dialect
653        try:
654            for path in self._glob_paths(
655                self.config_path / c.MODELS,
656                ignore_patterns=self.config.ignore_patterns,
657                extension=".py",
658            ):
659                if not os.path.getsize(path):
660                    continue
661
662                self._track_file(path)
663                try:
664                    import_python_file(path, self.config_path)
665                    new = registry.keys() - registered
666                    registered |= new
667                    for name in new:
668                        for model in registry[name].models(
669                            get_variables,
670                            path=path,
671                            module_path=self.config_path,
672                            defaults=self.config.model_defaults.dict(),
673                            macros=macros,
674                            jinja_macros=jinja_macros,
675                            dialect=self.config.model_defaults.dialect,
676                            time_column_format=self.config.time_column_format,
677                            physical_schema_mapping=self.config.physical_schema_mapping,
678                            project=self.config.project,
679                            default_catalog=self.context.default_catalog,
680                            infer_names=self.config.model_naming.infer_names,
681                            audit_definitions=audits,
682                            signal_definitions=signals,
683                            default_catalog_per_gateway=self.context.default_catalog_per_gateway,
684                            virtual_environment_mode=self.config.virtual_environment_mode,
685                        ):
686                            if model.enabled:
687                                models[model.fqn] = model
688                except Exception as ex:
689                    raise ConfigError(self._failed_to_load_model_error(path, ex), path)
690
691        finally:
692            model_registry._dialect = None
693
694        return models
695
696    def load_materializations(self) -> None:
697        with sys_path(self.config_path):
698            self._load_materializations()
699
700    def _load_materializations(self) -> None:
701        for path in self._glob_paths(
702            self.config_path / c.MATERIALIZATIONS,
703            ignore_patterns=self.config.ignore_patterns,
704            extension=".py",
705        ):
706            if os.path.getsize(path):
707                import_python_file(path, self.config_path)
708
709    def _load_signals(self) -> UniqueKeyDict[str, signal]:
710        """Loads signals for the built-in scheduler."""
711
712        base_signals = signal.get_registry()
713
714        signals_max_mtime: t.Optional[float] = None
715
716        for path in self._glob_paths(
717            self.config_path / c.SIGNALS,
718            ignore_patterns=self.config.ignore_patterns,
719            extension=".py",
720        ):
721            if os.path.getsize(path):
722                self._track_file(path)
723                signal_file_mtime = self._path_mtimes[path]
724                signals_max_mtime = (
725                    max(signals_max_mtime, signal_file_mtime)
726                    if signals_max_mtime
727                    else signal_file_mtime
728                )
729                import_python_file(path, self.config_path)
730
731        self._signals_max_mtime = signals_max_mtime
732
733        signals = signal.get_registry()
734        signal.set_registry(base_signals)
735
736        return signals
737
738    def _load_audits(
739        self, macros: MacroRegistry, jinja_macros: JinjaMacroRegistry
740    ) -> UniqueKeyDict[str, Audit]:
741        """Loads all the model audits."""
742        audits_by_name: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
743        audits_max_mtime: t.Optional[float] = None
744        variables = get_variables()
745
746        for path in self._glob_paths(
747            self.config_path / c.AUDITS,
748            ignore_patterns=self.config.ignore_patterns,
749            extension=".sql",
750        ):
751            self._track_file(path)
752            with open(path, "r", encoding="utf-8") as file:
753                audits_file_mtime = self._path_mtimes[path]
754                audits_max_mtime = (
755                    max(audits_max_mtime, audits_file_mtime)
756                    if audits_max_mtime
757                    else audits_file_mtime
758                )
759                expressions = parse(file.read(), default_dialect=self.config.model_defaults.dialect)
760                audits = load_multiple_audits(
761                    expressions=expressions,
762                    path=path,
763                    module_path=self.config_path,
764                    macros=macros,
765                    jinja_macros=jinja_macros,
766                    dialect=self.config.model_defaults.dialect,
767                    default_catalog=self.context.default_catalog,
768                    variables=variables,
769                    project=self.config.project,
770                )
771                for audit in audits:
772                    audits_by_name[audit.name] = audit
773
774        self._audits_max_mtime = audits_max_mtime
775
776        return audits_by_name
777
778    def _load_metrics(self) -> UniqueKeyDict[str, MetricMeta]:
779        """Loads all metrics."""
780        metrics: UniqueKeyDict[str, MetricMeta] = UniqueKeyDict("metrics")
781
782        for path in self._glob_paths(
783            self.config_path / c.METRICS,
784            ignore_patterns=self.config.ignore_patterns,
785            extension=".sql",
786        ):
787            if not os.path.getsize(path):
788                continue
789            self._track_file(path)
790
791            with open(path, "r", encoding="utf-8") as file:
792                dialect = self.config.model_defaults.dialect
793                try:
794                    for expression in parse(file.read(), default_dialect=dialect):
795                        metric = load_metric_ddl(expression, path=path, dialect=dialect)
796                        metrics[metric.name] = metric
797                except SqlglotError as ex:
798                    raise ConfigError(
799                        f"Failed to parse metric definitions at '{path}': {ex}.", path
800                    )
801
802        return metrics
803
804    def _load_environment_statements(self, macros: MacroRegistry) -> t.List[EnvironmentStatements]:
805        """Loads environment statements."""
806
807        if self.config.before_all or self.config.after_all:
808            statements = {
809                "before_all": self.config.before_all or [],
810                "after_all": self.config.after_all or [],
811            }
812            dialect = self.config.model_defaults.dialect
813            python_env = make_python_env(
814                [
815                    exp.maybe_parse(stmt, dialect=dialect)
816                    for stmts in statements.values()
817                    for stmt in stmts
818                ],
819                module_path=self.config_path,
820                jinja_macro_references=None,
821                macros=macros,
822                variables=get_variables(),
823                path=self.config_path,
824            )
825
826            return [
827                EnvironmentStatements(
828                    **statements, python_env=python_env, project=self.config.project or None
829                )
830            ]
831        return []
832
833    def _load_linting_rules(self) -> RuleSet:
834        user_rules: UniqueKeyDict[str, type[Rule]] = UniqueKeyDict("rules")
835
836        for path in self._glob_paths(
837            self.config_path / c.LINTER,
838            ignore_patterns=self.config.ignore_patterns,
839            extension=".py",
840        ):
841            if os.path.getsize(path):
842                self._track_file(path)
843                module = import_python_file(path, self.config_path)
844                module_rules = subclasses(module.__name__, Rule, exclude={Rule})
845                for user_rule in module_rules:
846                    user_rules[user_rule.name] = user_rule
847
848        return RuleSet(user_rules.values())
849
850    def _load_model_test_file(self, path: Path) -> dict[str, ModelTestMetadata]:
851        """Load a single model test file."""
852        model_test_metadata = {}
853
854        with open(path, "r", encoding="utf-8") as file:
855            source = file.read()
856            # If the user has specified a quoted/escaped gateway (e.g. "gateway: 'ma\tin'"), we need to
857            # parse it as YAML to match the gateway name stored in the config
858            gateway_line = GATEWAY_PATTERN.search(source)
859            gateway = YAML().load(gateway_line.group(0))["gateway"] if gateway_line else None
860
861        contents = yaml_load(source, variables=get_variables(gateway))
862
863        for test_name, value in contents.items():
864            model_test_metadata[test_name] = ModelTestMetadata(
865                path=path, test_name=test_name, body=value
866            )
867
868        return model_test_metadata
869
870    def load_model_tests(self) -> t.List[ModelTestMetadata]:
871        """Loads YAML-based model tests"""
872        test_meta_list: t.List[ModelTestMetadata] = []
873
874        search_path = Path(self.config_path) / c.TESTS
875
876        for yaml_file in itertools.chain(
877            search_path.glob("**/test*.yaml"),
878            search_path.glob("**/test*.yml"),
879        ):
880            if any(
881                yaml_file.match(ignore_pattern)
882                for ignore_pattern in self.config.ignore_patterns or []
883            ):
884                continue
885
886            test_meta_list.extend(self._load_model_test_file(yaml_file).values())
887
888        return test_meta_list
889
890    class _Cache(CacheBase):
891        def __init__(self, loader: SqlMeshLoader, config_path: Path):
892            self._loader = loader
893            self.config_path = config_path
894            self._model_cache = ModelCache(self._loader.context.cache_dir)
895
896        def get_or_load_models(
897            self, target_path: Path, loader: t.Callable[[], t.List[Model]]
898        ) -> t.List[Model]:
899            models = self._model_cache.get_or_load(
900                self._cache_entry_name(target_path),
901                self._model_cache_entry_id(target_path),
902                loader=loader,
903            )
904
905            for model in models:
906                model._path = target_path
907
908            return models
909
910        def put(self, models: t.List[Model], path: Path) -> bool:
911            return self._model_cache.put(
912                models,
913                self._cache_entry_name(path),
914                self._model_cache_entry_id(path),
915            )
916
917        def get(self, path: Path) -> t.List[Model]:
918            models = self._model_cache.get(
919                self._cache_entry_name(path),
920                self._model_cache_entry_id(path),
921            )
922
923            for model in models:
924                model._path = path
925
926            return models
927
928        def _cache_entry_name(self, target_path: Path) -> str:
929            return "__".join(target_path.relative_to(self.config_path).parts).replace(
930                target_path.suffix, ""
931            )
932
933        def _model_cache_entry_id(self, model_path: Path) -> str:
934            mtimes = [
935                self._loader._path_mtimes[model_path],
936                self._loader._macros_max_mtime,
937                self._loader._signals_max_mtime,
938                self._loader._audits_max_mtime,
939                self._loader._config_mtimes.get(self.config_path),
940                self._loader._config_mtimes.get(c.SQLMESH_PATH),
941            ]
942            return "__".join(
943                [
944                    str(max(m for m in mtimes if m is not None)),
945                    self._loader.config.fingerprint,
946                    # default catalog can change outside sqlmesh (e.g., DB user's
947                    # default catalog), and it is retained in cached model's fully
948                    # qualified name
949                    self._loader.context.default_catalog or "",
950                    # gateway is configurable, and it is retained in a cached
951                    # model's python environment if the @gateway macro variable is
952                    # used in the model
953                    self._loader.context.gateway or self._loader.config.default_gateway_name,
954                ]
955            )

Loads macros and models for a context using the SQLMesh file formats

def load_materializations(self) -> None:
696    def load_materializations(self) -> None:
697        with sys_path(self.config_path):
698            self._load_materializations()

Loads custom materializations.

def load_model_tests(self) -> List[sqlmesh.core.test.discovery.ModelTestMetadata]:
870    def load_model_tests(self) -> t.List[ModelTestMetadata]:
871        """Loads YAML-based model tests"""
872        test_meta_list: t.List[ModelTestMetadata] = []
873
874        search_path = Path(self.config_path) / c.TESTS
875
876        for yaml_file in itertools.chain(
877            search_path.glob("**/test*.yaml"),
878            search_path.glob("**/test*.yml"),
879        ):
880            if any(
881                yaml_file.match(ignore_pattern)
882                for ignore_pattern in self.config.ignore_patterns or []
883            ):
884                continue
885
886            test_meta_list.extend(self._load_model_test_file(yaml_file).values())
887
888        return test_meta_list

Loads YAML-based model tests