Edit on GitHub

sqlmesh.dbt.manifest

  1# ruff: noqa: E402
  2from __future__ import annotations
  3
  4import json
  5import logging
  6import os
  7import re
  8import typing as t
  9from argparse import Namespace
 10from collections import defaultdict
 11from functools import cached_property
 12from pathlib import Path
 13
 14from dbt import flags
 15
 16from sqlmesh.dbt.util import DBT_VERSION
 17from sqlmesh.utils.conversions import make_serializable
 18
 19# Override the file name to prevent dbt commands from invalidating the cache.
 20
 21if DBT_VERSION >= (1, 6, 0):
 22    from dbt import constants as dbt_constants
 23
 24    dbt_constants.PARTIAL_PARSE_FILE_NAME = "sqlmesh_partial_parse.msgpack"  # type: ignore
 25else:
 26    from dbt.parser import manifest as dbt_manifest  # type: ignore
 27
 28    dbt_manifest.PARTIAL_PARSE_FILE_NAME = "sqlmesh_partial_parse.msgpack"  # type: ignore
 29
 30import jinja2
 31from dbt.adapters.factory import register_adapter, reset_adapters
 32from dbt.config import Profile, Project, RuntimeConfig
 33from dbt.config.profile import read_profile
 34from dbt.config.renderer import DbtProjectYamlRenderer, ProfileRenderer
 35from dbt.parser.manifest import ManifestLoader
 36
 37try:
 38    from dbt.parser.sources import merge_freshness  # type: ignore[attr-defined]
 39except ImportError:
 40    # merge_freshness was renamed to merge_source_freshness in dbt 1.10
 41    # ref: https://github.com/dbt-labs/dbt-core/commit/14fc39a76ff4830cdf2fcbe73f57ca27db500018#diff-1f09db95588f46879a83378c2a86d6b16b7cdfcaddbfe46afc5d919ee5e9a4d9R430
 42    from dbt.parser.sources import merge_source_freshness as merge_freshness  # type: ignore[no-redef,attr-defined]
 43
 44from dbt.tracking import do_not_track
 45
 46from sqlmesh.core import constants as c
 47from sqlmesh.utils.errors import SQLMeshError
 48from sqlmesh.core.config import ModelDefaultsConfig
 49from sqlmesh.dbt.builtin import BUILTIN_FILTERS, BUILTIN_GLOBALS, OVERRIDDEN_MACROS
 50from sqlmesh.dbt.common import Dependencies
 51from sqlmesh.dbt.model import ModelConfig
 52from sqlmesh.dbt.package import HookConfig, MacroConfig, MaterializationConfig
 53from sqlmesh.dbt.seed import SeedConfig
 54from sqlmesh.dbt.source import SourceConfig
 55from sqlmesh.dbt.target import TargetConfig
 56from sqlmesh.dbt.test import TestConfig
 57from sqlmesh.dbt.util import DBT_VERSION
 58from sqlmesh.utils.cache import FileCache
 59from sqlmesh.utils.errors import ConfigError
 60from sqlmesh.utils.jinja import (
 61    MacroInfo,
 62    MacroReference,
 63    extract_call_names,
 64    jinja_call_arg_name,
 65)
 66from sqlglot.helper import ensure_list
 67
 68if t.TYPE_CHECKING:
 69    from dbt.contracts.graph.manifest import Macro, Manifest
 70    from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition
 71    from sqlmesh.utils.jinja import CallNames
 72
 73logger = logging.getLogger(__name__)
 74
 75TestConfigs = t.Dict[str, TestConfig]
 76ModelConfigs = t.Dict[str, ModelConfig]
 77SeedConfigs = t.Dict[str, SeedConfig]
 78SourceConfigs = t.Dict[str, SourceConfig]
 79MacroConfigs = t.Dict[str, MacroConfig]
 80HookConfigs = t.Dict[str, HookConfig]
 81MaterializationConfigs = t.Dict[str, MaterializationConfig]
 82
 83
 84IGNORED_PACKAGES = {"elementary"}
 85BUILTIN_CALLS = {*BUILTIN_GLOBALS, *BUILTIN_FILTERS}
 86
 87# Patch Semantic Manifest to skip validation and avoid Pydantic v1 errors on DBT 1.6
 88# We patch for 1.7+ since we don't care about semantic models
 89if DBT_VERSION >= (1, 6, 0):
 90    from dbt.contracts.graph.semantic_manifest import SemanticManifest  # type: ignore
 91
 92    SemanticManifest.validate = lambda _: True  # type: ignore
 93
 94
 95class ManifestHelper:
 96    def __init__(
 97        self,
 98        project_path: Path,
 99        profiles_path: Path,
100        profile_name: str,
101        target: TargetConfig,
102        variable_overrides: t.Optional[t.Dict[str, t.Any]] = None,
103        cache_dir: t.Optional[str] = None,
104        model_defaults: t.Optional[ModelDefaultsConfig] = None,
105    ):
106        self.project_path = project_path
107        self.profiles_path = profiles_path
108        self.profile_name = profile_name
109        self.target = target
110        self.variable_overrides = variable_overrides or {}
111        self.model_defaults = model_defaults or ModelDefaultsConfig()
112
113        self.__manifest: t.Optional[Manifest] = None
114        self._project_name: str = ""
115
116        self._is_loaded: bool = False
117        self._tests_per_package: t.Dict[str, TestConfigs] = defaultdict(dict)
118        self._models_per_package: t.Dict[str, ModelConfigs] = defaultdict(dict)
119        self._seeds_per_package: t.Dict[str, SeedConfigs] = defaultdict(dict)
120        self._sources_per_package: t.Dict[str, SourceConfigs] = defaultdict(dict)
121        self._macros_per_package: t.Dict[str, MacroConfigs] = defaultdict(dict)
122
123        self._macro_flatten_dependencies: t.Dict[str, t.Dict[str, Dependencies]] = defaultdict(dict)
124
125        self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list)
126        self._disabled_refs: t.Optional[t.Set[str]] = None
127        self._disabled_sources: t.Optional[t.Set[str]] = None
128
129        if cache_dir is not None:
130            cache_path = Path(cache_dir)
131            if not cache_path.is_absolute():
132                cache_path = self.project_path / cache_path
133        else:
134            cache_path = self.project_path / c.CACHE
135
136        self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache(
137            cache_path, "jinja_calls"
138        )
139
140        self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
141        self._on_run_end_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
142        self._materializations: MaterializationConfigs = {}
143
144    def tests(self, package_name: t.Optional[str] = None) -> TestConfigs:
145        self._load_all()
146        return self._tests_per_package[package_name or self._project_name]
147
148    def models(self, package_name: t.Optional[str] = None) -> ModelConfigs:
149        self._load_all()
150        return self._models_per_package[package_name or self._project_name]
151
152    def seeds(self, package_name: t.Optional[str] = None) -> SeedConfigs:
153        self._load_all()
154        return self._seeds_per_package[package_name or self._project_name]
155
156    def sources(self, package_name: t.Optional[str] = None) -> SourceConfigs:
157        self._load_all()
158        return self._sources_per_package[package_name or self._project_name]
159
160    def macros(self, package_name: t.Optional[str] = None) -> MacroConfigs:
161        self._load_all()
162        return self._macros_per_package[package_name or self._project_name]
163
164    def on_run_start(self, package_name: t.Optional[str] = None) -> HookConfigs:
165        self._load_all()
166        return self._on_run_start_per_package[package_name or self._project_name]
167
168    def on_run_end(self, package_name: t.Optional[str] = None) -> HookConfigs:
169        self._load_all()
170        return self._on_run_end_per_package[package_name or self._project_name]
171
172    def materializations(self) -> MaterializationConfigs:
173        self._load_all()
174        return self._materializations
175
176    @property
177    def all_macros(self) -> t.Dict[str, t.Dict[str, MacroInfo]]:
178        self._load_all()
179        result: t.Dict[str, t.Dict[str, MacroInfo]] = defaultdict(dict)
180        for package_name, macro_configs in self._macros_per_package.items():
181            for macro_name, macro_config in macro_configs.items():
182                result[package_name][macro_name] = macro_config.info
183        return result
184
185    @cached_property
186    def flat_graph(self) -> t.Dict[str, t.Any]:
187        return {
188            "exposures": {
189                k: make_serializable(v.to_dict(omit_none=False))
190                for k, v in getattr(self._manifest, "exposures", {}).items()
191            },
192            "groups": {
193                k: make_serializable(v.to_dict(omit_none=False))
194                for k, v in getattr(self._manifest, "groups", {}).items()
195            },
196            "metrics": {
197                k: make_serializable(v.to_dict(omit_none=False))
198                for k, v in getattr(self._manifest, "metrics", {}).items()
199            },
200            "nodes": {
201                k: make_serializable(v.to_dict(omit_none=False))
202                for k, v in self._manifest.nodes.items()
203            },
204            "sources": {
205                k: make_serializable(v.to_dict(omit_none=False))
206                for k, v in self._manifest.sources.items()
207            },
208            "semantic_models": {
209                k: make_serializable(v.to_dict(omit_none=False))
210                for k, v in getattr(self._manifest, "semantic_models", {}).items()
211            },
212            "saved_queries": {
213                k: make_serializable(v.to_dict(omit_none=False))
214                for k, v in getattr(self._manifest, "saved_queries", {}).items()
215            },
216        }
217
218    def _load_all(self) -> None:
219        if self._is_loaded:
220            return
221
222        self._calls = {k: (v, False) for k, v in (self._call_cache.get("") or {}).items()}
223
224        self._load_macros()
225        self._load_materializations()
226        self._load_sources()
227        self._load_tests()
228        self._load_models_and_seeds()
229        self._load_on_run_start_end()
230        self._is_loaded = True
231
232        self._call_cache.put("", value={k: v for k, (v, used) in self._calls.items() if used})
233
234    def _load_sources(self) -> None:
235        for source in self._manifest.sources.values():
236            # starting in dbt-core 1.9.5, freshness can be set in both source and source config
237            source_dict = source.to_dict()
238            source_dict.pop("freshness", None)
239
240            source_config_dict = _config(source)
241            source_config_dict.pop("freshness", None)
242
243            source_config_freshness = getattr(source.config, "freshness", None)
244            freshness = (
245                merge_freshness(source.freshness, source_config_freshness)
246                if source_config_freshness
247                else source.freshness
248            )
249
250            source_config = SourceConfig(
251                **{
252                    **source_dict,
253                    **source_config_dict,
254                    "freshness": freshness.to_dict() if freshness else None,
255                }
256            )
257            self._sources_per_package[source.package_name][source_config.config_name] = (
258                source_config
259            )
260
261    def _load_macros(self) -> None:
262        for macro in self._manifest.macros.values():
263            if macro.name.startswith("materialization_"):
264                continue
265
266            if macro.name.startswith("test_"):
267                macro.macro_sql = _convert_jinja_test_to_macro(macro.macro_sql)
268
269            dependencies = Dependencies(macros=_macro_references(self._manifest, macro))
270            if not macro.name.startswith("test_"):
271                dependencies = dependencies.union(
272                    self._extra_dependencies(macro.macro_sql, macro.package_name)
273                )
274
275            self._macros_per_package[macro.package_name][macro.name] = MacroConfig(
276                info=MacroInfo(
277                    definition=macro.macro_sql,
278                    depends_on=dependencies.macros,
279                ),
280                dependencies=dependencies,
281                path=Path(macro.original_file_path),
282            )
283
284        # This is a workaround for dbt adapter macros (eg. "spark__dateadd") whcih are expected to be
285        # available in the global scope regardless of the package they came from.
286        adapter_macro_names = {
287            name[name.find("__") + 2 :]
288            for name in self._macros_per_package.get("dbt", {})
289            if "__" in name
290        }
291        for macros in self._macros_per_package.values():
292            for name, macro_config in macros.items():
293                pos = name.find("__")
294                if pos > 0 and name[pos + 2 :] in adapter_macro_names:
295                    macro_config.info.is_top_level = True
296
297    def _load_materializations(self) -> None:
298        for macro in self._manifest.macros.values():
299            if macro.name.startswith("materialization_"):
300                # Extract name and adapter ( "materialization_{name}_{adapter}" or "materialization_{name}_default")
301                name_parts = macro.name.split("_")
302                if len(name_parts) >= 3:
303                    mat_name = "_".join(name_parts[1:-1])
304                    adapter = name_parts[-1]
305
306                    dependencies = Dependencies(macros=_macro_references(self._manifest, macro))
307                    macro.macro_sql = _strip_jinja_materialization_tags(macro.macro_sql)
308                    dependencies = dependencies.union(
309                        self._extra_dependencies(macro.macro_sql, macro.package_name)
310                    )
311
312                    materialization_config = MaterializationConfig(
313                        name=mat_name,
314                        adapter=adapter,
315                        definition=macro.macro_sql,
316                        dependencies=dependencies,
317                        path=Path(macro.original_file_path),
318                    )
319
320                    key = f"{mat_name}_{adapter}"
321                    self._materializations[key] = materialization_config
322
323    def _load_tests(self) -> None:
324        for node in self._manifest.nodes.values():
325            if node.resource_type != "test":
326                continue
327
328            skip_test = False
329            refs = _refs(node)
330            for ref in refs:
331                if self._is_disabled_ref(ref):
332                    logger.info(
333                        "Skipping test '%s' which references a disabled model '%s'",
334                        node.name,
335                        ref,
336                    )
337                    skip_test = True
338                    break
339
340            if skip_test:
341                continue
342
343            dependencies = Dependencies(
344                macros=_macro_references(self._manifest, node),
345                refs=refs,
346                sources=_sources(node),
347            )
348            # Implicit dependencies for model test arg
349            dependencies.macros.append(MacroReference(package="dbt", name="get_where_subquery"))
350            dependencies.macros.append(MacroReference(package="dbt", name="should_store_failures"))
351
352            sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql  # type: ignore
353            dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name))
354            dependencies = dependencies.union(
355                self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
356            )
357
358            test_model = _test_model(node)
359            node_config = _node_base_config(node)
360            node_config["name"] = _build_test_name(node, dependencies)
361
362            test = TestConfig(
363                sql=sql,
364                model_name=test_model,
365                test_kwargs=node.test_metadata.kwargs if hasattr(node, "test_metadata") else {},
366                dependencies=dependencies,
367                **node_config,
368            )
369            self._tests_per_package[node.package_name][node.unique_id] = test
370            if test_model:
371                self._tests_by_owner[test_model].append(test)
372
373    def _load_models_and_seeds(self) -> None:
374        for node in self._manifest.nodes.values():
375            if (
376                node.resource_type not in ("model", "seed", "snapshot")
377                or node.package_name in IGNORED_PACKAGES
378            ):
379                continue
380
381            macro_references = _macro_references(self._manifest, node)
382            all_tests = (
383                self._tests_by_owner[node.name]
384                + self._tests_by_owner[f"{node.package_name}.{node.name}"]
385            )
386            # Only include non-standalone tests (tests that don't reference other models)
387            tests = [test for test in all_tests if not test.is_standalone]
388            node_config = _node_base_config(node)
389
390            node_name = node.name
391            node_version = getattr(node, "version", None)
392            if node_version:
393                node_name = f"{node_name}_v{node_version}"
394
395            if node.resource_type in {"model", "snapshot"}:
396                sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql  # type: ignore
397                dependencies = Dependencies(
398                    macros=macro_references, refs=_refs(node), sources=_sources(node)
399                )
400                dependencies = dependencies.union(
401                    self._extra_dependencies(sql, node.package_name, track_all_model_attrs=True)
402                )
403                for hook in [*node_config.get("pre-hook", []), *node_config.get("post-hook", [])]:
404                    dependencies = dependencies.union(
405                        self._extra_dependencies(
406                            hook["sql"], node.package_name, track_all_model_attrs=True
407                        )
408                    )
409                dependencies = dependencies.union(
410                    self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
411                )
412
413                self._models_per_package[node.package_name][node_name] = ModelConfig(
414                    **dict(
415                        node_config,
416                        sql=sql,
417                        dependencies=dependencies,
418                        tests=tests,
419                    )
420                )
421            else:
422                self._seeds_per_package[node.package_name][node_name] = SeedConfig(
423                    **dict(
424                        node_config,
425                        dependencies=Dependencies(macros=macro_references),
426                        tests=tests,
427                    )
428                )
429
430    def _load_on_run_start_end(self) -> None:
431        for node in self._manifest.nodes.values():
432            if node.resource_type == "operation" and (
433                set(node.tags) & {"on-run-start", "on-run-end"}
434            ):
435                sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql  # type: ignore
436                node_name = node.name
437                node_path = Path(node.original_file_path)
438
439                dependencies = Dependencies(
440                    macros=_macro_references(self._manifest, node),
441                    refs=_refs(node),
442                    sources=_sources(node),
443                )
444                dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name))
445                dependencies = dependencies.union(
446                    self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
447                )
448
449                if "on-run-start" in node.tags:
450                    self._on_run_start_per_package[node.package_name][node_name] = HookConfig(
451                        sql=sql,
452                        index=getattr(node, "index", None) or 0,
453                        path=node_path,
454                        dependencies=dependencies,
455                    )
456                else:
457                    self._on_run_end_per_package[node.package_name][node_name] = HookConfig(
458                        sql=sql,
459                        index=getattr(node, "index", None) or 0,
460                        path=node_path,
461                        dependencies=dependencies,
462                    )
463
464    @property
465    def _manifest(self) -> Manifest:
466        if not self.__manifest:
467            try:
468                self.__manifest = self._load_manifest()
469            except Exception as ex:
470                raise SQLMeshError(f"Failed to load dbt manifest: {ex}") from ex
471        return self.__manifest
472
473    def _load_manifest(self) -> Manifest:
474        do_not_track()
475
476        variables = (
477            self.variable_overrides
478            if DBT_VERSION >= (1, 5, 0)
479            else json.dumps(self.variable_overrides)
480        )
481
482        args: Namespace = Namespace(
483            vars=variables,
484            profile=self.profile_name,
485            project_dir=str(self.project_path),
486            profiles_dir=str(self.profiles_path),
487            target=self.target.name,
488            macro_debugging=False,
489            REQUIRE_RESOURCE_NAMES_WITHOUT_SPACES=True,
490        )
491        flags.set_from_args(args, None)
492
493        if DBT_VERSION >= (1, 8, 0):
494            from dbt_common.context import set_invocation_context  # type: ignore
495
496            set_invocation_context(os.environ)
497
498        profile = self._load_profile()
499        project = self._load_project(profile)
500
501        if (
502            not any(k in project.models for k in ("start", "+start"))
503            and not self.model_defaults.start
504        ):
505            raise ConfigError(
506                "SQLMesh requires a start date in order to have a finite range of backfilling data. Add start to the 'models:' block in dbt_project.yml. https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#setting-model-backfill-start-dates"
507            )
508
509        runtime_config = RuntimeConfig.from_parts(project, profile, args)
510
511        self._project_name = project.project_name
512
513        if DBT_VERSION >= (1, 8, 0):
514            from dbt.mp_context import get_mp_context  # type: ignore
515
516            register_adapter(runtime_config, get_mp_context())  # type: ignore
517        else:
518            register_adapter(runtime_config)  # type: ignore
519
520        manifest = ManifestLoader.get_full_manifest(runtime_config)
521        # This adapter doesn't care about semantic models so we clear them out to avoid issues
522        manifest.semantic_models = {}
523        reset_adapters()
524        return manifest
525
526    def _load_project(self, profile: Profile) -> Project:
527        project_renderer = DbtProjectYamlRenderer(profile, cli_vars=self.variable_overrides)
528        return Project.from_project_root(str(self.project_path), project_renderer)
529
530    def _load_profile(self) -> Profile:
531        profile_renderer = ProfileRenderer(cli_vars=self.variable_overrides)
532        raw_profiles = read_profile(str(self.profiles_path))
533        return Profile.from_raw_profiles(
534            raw_profiles=raw_profiles,
535            profile_name=self.profile_name,
536            renderer=profile_renderer,
537            target_override=self.target.name,
538        )
539
540    def _is_disabled_ref(self, ref: str) -> bool:
541        if self._disabled_refs is None:
542            self._load_disabled()
543
544        return ref in self._disabled_refs  # type: ignore
545
546    def _is_disabled_source(self, source: str) -> bool:
547        if self._disabled_sources is None:
548            self._load_disabled()
549
550        return source in self._disabled_sources  # type: ignore
551
552    def _load_disabled(self) -> None:
553        self._disabled_refs = set()
554        self._disabled_sources = set()
555        for nodes in self._manifest.disabled.values():
556            for node in nodes:
557                if node.resource_type in ("model", "snapshot", "seed"):
558                    self._disabled_refs.add(f"{node.package_name}.{node.name}")
559                    self._disabled_refs.add(node.name)
560                elif node.resource_type == "source":
561                    self._disabled_sources.add(f"{node.package_name}.{node.name}")
562
563        for node in self._manifest.nodes.values():
564            if node.resource_type in ("model", "snapshot", "seed"):
565                self._disabled_refs.discard(node.name)
566            elif node.resource_type == "source":
567                self._disabled_sources.discard(node.name)
568
569    def _flatten_dependencies_from_macros(
570        self,
571        macros: t.List[MacroReference],
572        default_package: str,
573        visited: t.Optional[t.Set[t.Tuple[str, str]]] = None,
574    ) -> Dependencies:
575        if visited is None:
576            visited = set()
577
578        dependencies = Dependencies()
579        for macro in macros:
580            macro_package = macro.package or default_package
581
582            if (macro_package, macro.name) in visited:
583                continue
584            visited.add((macro_package, macro.name))
585
586            macro_dependencies = self._macro_flatten_dependencies.get(macro_package, {}).get(
587                macro.name
588            )
589            if not macro_dependencies:
590                macro_config = self._macros_per_package[macro_package].get(macro.name)
591                if not macro_config:
592                    continue
593
594                macro_dependencies = macro_config.dependencies.union(
595                    self._flatten_dependencies_from_macros(
596                        macro_config.dependencies.macros, macro_package, visited=visited
597                    )
598                )
599                # We don't need flatten macro dependencies. The jinja macro registry takes care of recursive
600                # dependencies for us.
601                macro_dependencies.macros = []
602                self._macro_flatten_dependencies[macro_package][macro.name] = macro_dependencies
603            dependencies = dependencies.union(macro_dependencies)
604        return dependencies
605
606    def _extra_dependencies(
607        self,
608        target: str,
609        package: str,
610        track_all_model_attrs: bool = False,
611    ) -> Dependencies:
612        """
613        We sometimes observe that the manifest doesn't capture all macros, refs, and sources within a macro.
614        This behavior has been observed with macros like dbt.current_timestamp(), dbt_utils.slugify(), and source().
615        Here we apply our custom extractor to make a best effort to supplement references captured in the manifest.
616        """
617        dependencies = Dependencies()
618
619        # Whether all `model` attributes (e.g., `model.config`) should be included in the dependencies
620        all_model_attrs = False
621
622        for call_name, node in extract_call_names(target, cache=self._calls):
623            if call_name[0] == "config":
624                continue
625
626            if (
627                track_all_model_attrs
628                and not all_model_attrs
629                and isinstance(node, jinja2.nodes.Call)
630                and any(isinstance(a, jinja2.nodes.Name) and a.name == "model" for a in node.args)
631            ):
632                all_model_attrs = True
633
634            if isinstance(node, jinja2.nodes.Getattr):
635                if call_name[0] == "model":
636                    dependencies.model_attrs.attrs.add(call_name[1])
637            elif call_name[0] == "source":
638                args = [jinja_call_arg_name(arg) for arg in node.args]
639                if args and all(arg for arg in args):
640                    source = ".".join(args)
641                    if not self._is_disabled_source(source):
642                        dependencies.sources.add(source)
643                dependencies.macros.append(MacroReference(name="source"))
644            elif call_name[0] == "ref":
645                args = [jinja_call_arg_name(arg) for arg in node.args]
646                if args and all(arg for arg in args):
647                    ref = ".".join(args)
648                    if not self._is_disabled_ref(ref):
649                        dependencies.refs.add(ref)
650                dependencies.macros.append(MacroReference(name="ref"))
651            elif call_name[0] == "var":
652                args = [jinja_call_arg_name(arg) for arg in node.args]
653                if args and args[0]:
654                    dependencies.variables.add(args[0])
655                else:
656                    # We couldn't determine the var name statically
657                    dependencies.has_dynamic_var_names = True
658                dependencies.macros.append(MacroReference(name="var"))
659            elif len(call_name) == 1:
660                macro_name = call_name[0]
661                if macro_name in BUILTIN_CALLS:
662                    continue
663                if (
664                    f"macro.{package}.{macro_name}" not in self._manifest.macros
665                    and f"macro.dbt.{macro_name}" in self._manifest.macros
666                ):
667                    package_name: t.Optional[str] = "dbt"
668                else:
669                    # dbt doesn't include the package name for project macros
670                    package_name = package if package != self._project_name else None
671                _macro_reference_if_not_overridden(
672                    package_name, macro_name, dependencies.macros.append
673                )
674            else:
675                if call_name[0] != "adapter":
676                    _macro_reference_if_not_overridden(
677                        call_name[0], call_name[1], dependencies.macros.append
678                    )
679
680        # When `model` is referenced as-is, e.g. it's passed as an argument to a macro call like
681        # `{{ foo(model) }}`, we can't easily track the attributes that are actually used, because
682        # it may be aliased and hence tracking actual uses of `model` requires a proper data flow
683        # analysis. We conservatively deal with this by including all of its supported attributes
684        # if a standalone reference is found.
685        if all_model_attrs:
686            dependencies.model_attrs.all_attrs = True
687
688        return dependencies
689
690
691def _macro_reference_if_not_overridden(
692    package: t.Optional[str], name: str, if_not_overridden: t.Callable[[MacroReference], None]
693) -> None:
694    reference = MacroReference(package=package, name=name)
695    if reference not in OVERRIDDEN_MACROS:
696        if_not_overridden(reference)
697
698
699def _config(node: t.Union[ManifestNode, SourceDefinition]) -> t.Dict[str, t.Any]:
700    return node.config.to_dict()
701
702
703def _macro_references(
704    manifest: Manifest, node: t.Union[ManifestNode, Macro]
705) -> t.Set[MacroReference]:
706    result: t.Set[MacroReference] = set()
707    if not hasattr(node, "depends_on"):
708        return result
709
710    for macro_node_id in node.depends_on.macros:
711        if not macro_node_id or macro_node_id == "None":
712            continue
713
714        macro_node = manifest.macros[macro_node_id]
715        macro_name = macro_node.name
716        macro_package = (
717            macro_node.package_name if macro_node.package_name != node.package_name else None
718        )
719        _macro_reference_if_not_overridden(macro_package, macro_name, result.add)
720    return result
721
722
723def _refs(node: ManifestNode) -> t.Set[str]:
724    if DBT_VERSION >= (1, 5, 0):
725        result: t.Set[str] = set()
726        if not hasattr(node, "refs"):
727            return result
728        for r in node.refs:
729            ref_name = f"{r.package}.{r.name}" if r.package else r.name  # type: ignore
730            if getattr(r, "version", None):
731                ref_name = f"{ref_name}_v{r.version}"  # type: ignore
732            result.add(ref_name)
733        return result
734    return {".".join(r) for r in node.refs}  # type: ignore
735
736
737def _sources(node: ManifestNode) -> t.Set[str]:
738    return {".".join(s) for s in getattr(node, "sources", [])}
739
740
741def _model_node_id(model_name: str, package: str) -> str:
742    return f"model.{package}.{model_name}"
743
744
745def _test_model(node: ManifestNode) -> t.Optional[str]:
746    attached_node = getattr(node, "attached_node", None)
747    if attached_node:
748        pieces = attached_node.split(".")
749        if pieces[0] in ["model", "seed"]:
750            # versioned models have format "model.package.model_name.v1" (4 parts)
751            if len(pieces) == 4:
752                return f"{pieces[2]}_{pieces[3]}"
753            return pieces[-1]
754        return None
755
756    key_name = getattr(node, "file_key_name", None)
757    if key_name:
758        pieces = key_name.split(".")
759        return pieces[-1] if pieces[0] in ["models", "seeds"] else None
760
761    return None
762
763
764def _node_base_config(node: ManifestNode) -> t.Dict[str, t.Any]:
765    return {
766        **_config(node),
767        **node.to_dict(),
768        "path": Path(node.original_file_path),
769    }
770
771
772def _convert_jinja_test_to_macro(test_jinja: str) -> str:
773    TEST_TAG_REGEX = r"\s*{%-?\s*test\s+"
774    ENDTEST_REGEX = r"{%-?\s*endtest\s*-?%}"
775
776    match = re.match(TEST_TAG_REGEX, test_jinja)
777    if not match:
778        # already a macro
779        return test_jinja
780
781    test_tag = test_jinja[: match.span()[-1]]
782
783    macro_tag = re.sub(r"({%-?\s*)test\s+", r"\1macro test_", test_tag)
784    macro = macro_tag + test_jinja[match.span()[-1] :]
785
786    return re.sub(ENDTEST_REGEX, lambda m: m.group(0).replace("endtest", "endmacro"), macro)
787
788
789def _strip_jinja_materialization_tags(materialization_jinja: str) -> str:
790    MATERIALIZATION_TAG_REGEX = r"\s*{%-?\s*materialization\s+[^%]*%}\s*\n?"
791    ENDMATERIALIZATION_REGEX = r"{%-?\s*endmaterialization\s*-?%}\s*\n?"
792
793    if not re.match(MATERIALIZATION_TAG_REGEX, materialization_jinja):
794        return materialization_jinja
795
796    materialization_jinja = re.sub(
797        MATERIALIZATION_TAG_REGEX,
798        "",
799        materialization_jinja,
800        flags=re.IGNORECASE,
801    )
802
803    materialization_jinja = re.sub(
804        ENDMATERIALIZATION_REGEX,
805        "",
806        materialization_jinja,
807        flags=re.IGNORECASE,
808    )
809
810    return materialization_jinja.strip()
811
812
813def _build_test_name(node: ManifestNode, dependencies: Dependencies) -> str:
814    """
815    Build a user-friendly test name that includes the test's model/source, column,
816    and args for tests with custom user names. Needed because dbt only generates these
817    names for tests that do not specify the "name" field in their YAML definition.
818
819    Name structure
820    - Model test:  [namespace]_[test name]_[model name]_[column name]__[arg values]
821    - Source test: [namespace]_source_[test name]_[source name]_[table name]_[column name]__[arg values]
822    """
823    # standalone test
824    if not hasattr(node, "test_metadata"):
825        return node.name
826
827    model_name = _test_model(node)
828    source_name = None
829    if not model_name and dependencies.sources:
830        # extract source and table names
831        source_parts = list(dependencies.sources)[0].split(".")
832        source_name = "_".join(source_parts) if len(source_parts) == 2 else source_parts[-1]
833    entity_name = model_name or source_name or ""
834    entity_name = f"_{entity_name}" if entity_name else ""
835
836    name_prefix = ""
837    if namespace := getattr(node.test_metadata, "namespace", None):
838        name_prefix += f"{namespace}_"
839    if source_name and not model_name:
840        name_prefix += "source_"
841
842    metadata_kwargs = node.test_metadata.kwargs
843    arg_val_parts = []
844    for arg, val in sorted(metadata_kwargs.items()):
845        if arg == "model":
846            continue
847        if isinstance(val, dict):
848            val = list(val.values())
849        val = [re.sub("[^0-9a-zA-Z_]+", "_", str(v)) for v in ensure_list(val)]
850        arg_val_parts.extend(val)
851    unique_args = "__".join(arg_val_parts) if arg_val_parts else ""
852    unique_args = f"_{unique_args}" if unique_args else ""
853
854    auto_name = f"{name_prefix}{node.test_metadata.name}{entity_name}{unique_args}"
855
856    if node.name == auto_name:
857        return node.name
858
859    custom_prefix = name_prefix if source_name and not model_name else ""
860    return f"{custom_prefix}{node.name}{entity_name}{unique_args}"
logger = <Logger sqlmesh.dbt.manifest (WARNING)>
TestConfigs = typing.Dict[str, sqlmesh.dbt.test.TestConfig]
ModelConfigs = typing.Dict[str, sqlmesh.dbt.model.ModelConfig]
SeedConfigs = typing.Dict[str, sqlmesh.dbt.seed.SeedConfig]
SourceConfigs = typing.Dict[str, sqlmesh.dbt.source.SourceConfig]
MacroConfigs = typing.Dict[str, sqlmesh.dbt.package.MacroConfig]
HookConfigs = typing.Dict[str, sqlmesh.dbt.package.HookConfig]
MaterializationConfigs = typing.Dict[str, sqlmesh.dbt.package.MaterializationConfig]
IGNORED_PACKAGES = {'elementary'}
BUILTIN_CALLS = {'set_strict', 'sqlmesh_incremental', 'as_native', 'print', 'zip', 'env_var', 'sqlmesh', 'as_text', 'fromyaml', 'log', 'set', 'exceptions', 'fromjson', 'tojson', 'try_or_compiler_error', 'as_number', 'return', 'dbt_version', 'zip_strict', 'as_bool', 'toyaml', 'modules'}
class ManifestHelper:
 96class ManifestHelper:
 97    def __init__(
 98        self,
 99        project_path: Path,
100        profiles_path: Path,
101        profile_name: str,
102        target: TargetConfig,
103        variable_overrides: t.Optional[t.Dict[str, t.Any]] = None,
104        cache_dir: t.Optional[str] = None,
105        model_defaults: t.Optional[ModelDefaultsConfig] = None,
106    ):
107        self.project_path = project_path
108        self.profiles_path = profiles_path
109        self.profile_name = profile_name
110        self.target = target
111        self.variable_overrides = variable_overrides or {}
112        self.model_defaults = model_defaults or ModelDefaultsConfig()
113
114        self.__manifest: t.Optional[Manifest] = None
115        self._project_name: str = ""
116
117        self._is_loaded: bool = False
118        self._tests_per_package: t.Dict[str, TestConfigs] = defaultdict(dict)
119        self._models_per_package: t.Dict[str, ModelConfigs] = defaultdict(dict)
120        self._seeds_per_package: t.Dict[str, SeedConfigs] = defaultdict(dict)
121        self._sources_per_package: t.Dict[str, SourceConfigs] = defaultdict(dict)
122        self._macros_per_package: t.Dict[str, MacroConfigs] = defaultdict(dict)
123
124        self._macro_flatten_dependencies: t.Dict[str, t.Dict[str, Dependencies]] = defaultdict(dict)
125
126        self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list)
127        self._disabled_refs: t.Optional[t.Set[str]] = None
128        self._disabled_sources: t.Optional[t.Set[str]] = None
129
130        if cache_dir is not None:
131            cache_path = Path(cache_dir)
132            if not cache_path.is_absolute():
133                cache_path = self.project_path / cache_path
134        else:
135            cache_path = self.project_path / c.CACHE
136
137        self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache(
138            cache_path, "jinja_calls"
139        )
140
141        self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
142        self._on_run_end_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
143        self._materializations: MaterializationConfigs = {}
144
145    def tests(self, package_name: t.Optional[str] = None) -> TestConfigs:
146        self._load_all()
147        return self._tests_per_package[package_name or self._project_name]
148
149    def models(self, package_name: t.Optional[str] = None) -> ModelConfigs:
150        self._load_all()
151        return self._models_per_package[package_name or self._project_name]
152
153    def seeds(self, package_name: t.Optional[str] = None) -> SeedConfigs:
154        self._load_all()
155        return self._seeds_per_package[package_name or self._project_name]
156
157    def sources(self, package_name: t.Optional[str] = None) -> SourceConfigs:
158        self._load_all()
159        return self._sources_per_package[package_name or self._project_name]
160
161    def macros(self, package_name: t.Optional[str] = None) -> MacroConfigs:
162        self._load_all()
163        return self._macros_per_package[package_name or self._project_name]
164
165    def on_run_start(self, package_name: t.Optional[str] = None) -> HookConfigs:
166        self._load_all()
167        return self._on_run_start_per_package[package_name or self._project_name]
168
169    def on_run_end(self, package_name: t.Optional[str] = None) -> HookConfigs:
170        self._load_all()
171        return self._on_run_end_per_package[package_name or self._project_name]
172
173    def materializations(self) -> MaterializationConfigs:
174        self._load_all()
175        return self._materializations
176
177    @property
178    def all_macros(self) -> t.Dict[str, t.Dict[str, MacroInfo]]:
179        self._load_all()
180        result: t.Dict[str, t.Dict[str, MacroInfo]] = defaultdict(dict)
181        for package_name, macro_configs in self._macros_per_package.items():
182            for macro_name, macro_config in macro_configs.items():
183                result[package_name][macro_name] = macro_config.info
184        return result
185
186    @cached_property
187    def flat_graph(self) -> t.Dict[str, t.Any]:
188        return {
189            "exposures": {
190                k: make_serializable(v.to_dict(omit_none=False))
191                for k, v in getattr(self._manifest, "exposures", {}).items()
192            },
193            "groups": {
194                k: make_serializable(v.to_dict(omit_none=False))
195                for k, v in getattr(self._manifest, "groups", {}).items()
196            },
197            "metrics": {
198                k: make_serializable(v.to_dict(omit_none=False))
199                for k, v in getattr(self._manifest, "metrics", {}).items()
200            },
201            "nodes": {
202                k: make_serializable(v.to_dict(omit_none=False))
203                for k, v in self._manifest.nodes.items()
204            },
205            "sources": {
206                k: make_serializable(v.to_dict(omit_none=False))
207                for k, v in self._manifest.sources.items()
208            },
209            "semantic_models": {
210                k: make_serializable(v.to_dict(omit_none=False))
211                for k, v in getattr(self._manifest, "semantic_models", {}).items()
212            },
213            "saved_queries": {
214                k: make_serializable(v.to_dict(omit_none=False))
215                for k, v in getattr(self._manifest, "saved_queries", {}).items()
216            },
217        }
218
219    def _load_all(self) -> None:
220        if self._is_loaded:
221            return
222
223        self._calls = {k: (v, False) for k, v in (self._call_cache.get("") or {}).items()}
224
225        self._load_macros()
226        self._load_materializations()
227        self._load_sources()
228        self._load_tests()
229        self._load_models_and_seeds()
230        self._load_on_run_start_end()
231        self._is_loaded = True
232
233        self._call_cache.put("", value={k: v for k, (v, used) in self._calls.items() if used})
234
235    def _load_sources(self) -> None:
236        for source in self._manifest.sources.values():
237            # starting in dbt-core 1.9.5, freshness can be set in both source and source config
238            source_dict = source.to_dict()
239            source_dict.pop("freshness", None)
240
241            source_config_dict = _config(source)
242            source_config_dict.pop("freshness", None)
243
244            source_config_freshness = getattr(source.config, "freshness", None)
245            freshness = (
246                merge_freshness(source.freshness, source_config_freshness)
247                if source_config_freshness
248                else source.freshness
249            )
250
251            source_config = SourceConfig(
252                **{
253                    **source_dict,
254                    **source_config_dict,
255                    "freshness": freshness.to_dict() if freshness else None,
256                }
257            )
258            self._sources_per_package[source.package_name][source_config.config_name] = (
259                source_config
260            )
261
262    def _load_macros(self) -> None:
263        for macro in self._manifest.macros.values():
264            if macro.name.startswith("materialization_"):
265                continue
266
267            if macro.name.startswith("test_"):
268                macro.macro_sql = _convert_jinja_test_to_macro(macro.macro_sql)
269
270            dependencies = Dependencies(macros=_macro_references(self._manifest, macro))
271            if not macro.name.startswith("test_"):
272                dependencies = dependencies.union(
273                    self._extra_dependencies(macro.macro_sql, macro.package_name)
274                )
275
276            self._macros_per_package[macro.package_name][macro.name] = MacroConfig(
277                info=MacroInfo(
278                    definition=macro.macro_sql,
279                    depends_on=dependencies.macros,
280                ),
281                dependencies=dependencies,
282                path=Path(macro.original_file_path),
283            )
284
285        # This is a workaround for dbt adapter macros (eg. "spark__dateadd") whcih are expected to be
286        # available in the global scope regardless of the package they came from.
287        adapter_macro_names = {
288            name[name.find("__") + 2 :]
289            for name in self._macros_per_package.get("dbt", {})
290            if "__" in name
291        }
292        for macros in self._macros_per_package.values():
293            for name, macro_config in macros.items():
294                pos = name.find("__")
295                if pos > 0 and name[pos + 2 :] in adapter_macro_names:
296                    macro_config.info.is_top_level = True
297
298    def _load_materializations(self) -> None:
299        for macro in self._manifest.macros.values():
300            if macro.name.startswith("materialization_"):
301                # Extract name and adapter ( "materialization_{name}_{adapter}" or "materialization_{name}_default")
302                name_parts = macro.name.split("_")
303                if len(name_parts) >= 3:
304                    mat_name = "_".join(name_parts[1:-1])
305                    adapter = name_parts[-1]
306
307                    dependencies = Dependencies(macros=_macro_references(self._manifest, macro))
308                    macro.macro_sql = _strip_jinja_materialization_tags(macro.macro_sql)
309                    dependencies = dependencies.union(
310                        self._extra_dependencies(macro.macro_sql, macro.package_name)
311                    )
312
313                    materialization_config = MaterializationConfig(
314                        name=mat_name,
315                        adapter=adapter,
316                        definition=macro.macro_sql,
317                        dependencies=dependencies,
318                        path=Path(macro.original_file_path),
319                    )
320
321                    key = f"{mat_name}_{adapter}"
322                    self._materializations[key] = materialization_config
323
324    def _load_tests(self) -> None:
325        for node in self._manifest.nodes.values():
326            if node.resource_type != "test":
327                continue
328
329            skip_test = False
330            refs = _refs(node)
331            for ref in refs:
332                if self._is_disabled_ref(ref):
333                    logger.info(
334                        "Skipping test '%s' which references a disabled model '%s'",
335                        node.name,
336                        ref,
337                    )
338                    skip_test = True
339                    break
340
341            if skip_test:
342                continue
343
344            dependencies = Dependencies(
345                macros=_macro_references(self._manifest, node),
346                refs=refs,
347                sources=_sources(node),
348            )
349            # Implicit dependencies for model test arg
350            dependencies.macros.append(MacroReference(package="dbt", name="get_where_subquery"))
351            dependencies.macros.append(MacroReference(package="dbt", name="should_store_failures"))
352
353            sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql  # type: ignore
354            dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name))
355            dependencies = dependencies.union(
356                self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
357            )
358
359            test_model = _test_model(node)
360            node_config = _node_base_config(node)
361            node_config["name"] = _build_test_name(node, dependencies)
362
363            test = TestConfig(
364                sql=sql,
365                model_name=test_model,
366                test_kwargs=node.test_metadata.kwargs if hasattr(node, "test_metadata") else {},
367                dependencies=dependencies,
368                **node_config,
369            )
370            self._tests_per_package[node.package_name][node.unique_id] = test
371            if test_model:
372                self._tests_by_owner[test_model].append(test)
373
374    def _load_models_and_seeds(self) -> None:
375        for node in self._manifest.nodes.values():
376            if (
377                node.resource_type not in ("model", "seed", "snapshot")
378                or node.package_name in IGNORED_PACKAGES
379            ):
380                continue
381
382            macro_references = _macro_references(self._manifest, node)
383            all_tests = (
384                self._tests_by_owner[node.name]
385                + self._tests_by_owner[f"{node.package_name}.{node.name}"]
386            )
387            # Only include non-standalone tests (tests that don't reference other models)
388            tests = [test for test in all_tests if not test.is_standalone]
389            node_config = _node_base_config(node)
390
391            node_name = node.name
392            node_version = getattr(node, "version", None)
393            if node_version:
394                node_name = f"{node_name}_v{node_version}"
395
396            if node.resource_type in {"model", "snapshot"}:
397                sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql  # type: ignore
398                dependencies = Dependencies(
399                    macros=macro_references, refs=_refs(node), sources=_sources(node)
400                )
401                dependencies = dependencies.union(
402                    self._extra_dependencies(sql, node.package_name, track_all_model_attrs=True)
403                )
404                for hook in [*node_config.get("pre-hook", []), *node_config.get("post-hook", [])]:
405                    dependencies = dependencies.union(
406                        self._extra_dependencies(
407                            hook["sql"], node.package_name, track_all_model_attrs=True
408                        )
409                    )
410                dependencies = dependencies.union(
411                    self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
412                )
413
414                self._models_per_package[node.package_name][node_name] = ModelConfig(
415                    **dict(
416                        node_config,
417                        sql=sql,
418                        dependencies=dependencies,
419                        tests=tests,
420                    )
421                )
422            else:
423                self._seeds_per_package[node.package_name][node_name] = SeedConfig(
424                    **dict(
425                        node_config,
426                        dependencies=Dependencies(macros=macro_references),
427                        tests=tests,
428                    )
429                )
430
431    def _load_on_run_start_end(self) -> None:
432        for node in self._manifest.nodes.values():
433            if node.resource_type == "operation" and (
434                set(node.tags) & {"on-run-start", "on-run-end"}
435            ):
436                sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql  # type: ignore
437                node_name = node.name
438                node_path = Path(node.original_file_path)
439
440                dependencies = Dependencies(
441                    macros=_macro_references(self._manifest, node),
442                    refs=_refs(node),
443                    sources=_sources(node),
444                )
445                dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name))
446                dependencies = dependencies.union(
447                    self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
448                )
449
450                if "on-run-start" in node.tags:
451                    self._on_run_start_per_package[node.package_name][node_name] = HookConfig(
452                        sql=sql,
453                        index=getattr(node, "index", None) or 0,
454                        path=node_path,
455                        dependencies=dependencies,
456                    )
457                else:
458                    self._on_run_end_per_package[node.package_name][node_name] = HookConfig(
459                        sql=sql,
460                        index=getattr(node, "index", None) or 0,
461                        path=node_path,
462                        dependencies=dependencies,
463                    )
464
465    @property
466    def _manifest(self) -> Manifest:
467        if not self.__manifest:
468            try:
469                self.__manifest = self._load_manifest()
470            except Exception as ex:
471                raise SQLMeshError(f"Failed to load dbt manifest: {ex}") from ex
472        return self.__manifest
473
474    def _load_manifest(self) -> Manifest:
475        do_not_track()
476
477        variables = (
478            self.variable_overrides
479            if DBT_VERSION >= (1, 5, 0)
480            else json.dumps(self.variable_overrides)
481        )
482
483        args: Namespace = Namespace(
484            vars=variables,
485            profile=self.profile_name,
486            project_dir=str(self.project_path),
487            profiles_dir=str(self.profiles_path),
488            target=self.target.name,
489            macro_debugging=False,
490            REQUIRE_RESOURCE_NAMES_WITHOUT_SPACES=True,
491        )
492        flags.set_from_args(args, None)
493
494        if DBT_VERSION >= (1, 8, 0):
495            from dbt_common.context import set_invocation_context  # type: ignore
496
497            set_invocation_context(os.environ)
498
499        profile = self._load_profile()
500        project = self._load_project(profile)
501
502        if (
503            not any(k in project.models for k in ("start", "+start"))
504            and not self.model_defaults.start
505        ):
506            raise ConfigError(
507                "SQLMesh requires a start date in order to have a finite range of backfilling data. Add start to the 'models:' block in dbt_project.yml. https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#setting-model-backfill-start-dates"
508            )
509
510        runtime_config = RuntimeConfig.from_parts(project, profile, args)
511
512        self._project_name = project.project_name
513
514        if DBT_VERSION >= (1, 8, 0):
515            from dbt.mp_context import get_mp_context  # type: ignore
516
517            register_adapter(runtime_config, get_mp_context())  # type: ignore
518        else:
519            register_adapter(runtime_config)  # type: ignore
520
521        manifest = ManifestLoader.get_full_manifest(runtime_config)
522        # This adapter doesn't care about semantic models so we clear them out to avoid issues
523        manifest.semantic_models = {}
524        reset_adapters()
525        return manifest
526
527    def _load_project(self, profile: Profile) -> Project:
528        project_renderer = DbtProjectYamlRenderer(profile, cli_vars=self.variable_overrides)
529        return Project.from_project_root(str(self.project_path), project_renderer)
530
531    def _load_profile(self) -> Profile:
532        profile_renderer = ProfileRenderer(cli_vars=self.variable_overrides)
533        raw_profiles = read_profile(str(self.profiles_path))
534        return Profile.from_raw_profiles(
535            raw_profiles=raw_profiles,
536            profile_name=self.profile_name,
537            renderer=profile_renderer,
538            target_override=self.target.name,
539        )
540
541    def _is_disabled_ref(self, ref: str) -> bool:
542        if self._disabled_refs is None:
543            self._load_disabled()
544
545        return ref in self._disabled_refs  # type: ignore
546
547    def _is_disabled_source(self, source: str) -> bool:
548        if self._disabled_sources is None:
549            self._load_disabled()
550
551        return source in self._disabled_sources  # type: ignore
552
553    def _load_disabled(self) -> None:
554        self._disabled_refs = set()
555        self._disabled_sources = set()
556        for nodes in self._manifest.disabled.values():
557            for node in nodes:
558                if node.resource_type in ("model", "snapshot", "seed"):
559                    self._disabled_refs.add(f"{node.package_name}.{node.name}")
560                    self._disabled_refs.add(node.name)
561                elif node.resource_type == "source":
562                    self._disabled_sources.add(f"{node.package_name}.{node.name}")
563
564        for node in self._manifest.nodes.values():
565            if node.resource_type in ("model", "snapshot", "seed"):
566                self._disabled_refs.discard(node.name)
567            elif node.resource_type == "source":
568                self._disabled_sources.discard(node.name)
569
570    def _flatten_dependencies_from_macros(
571        self,
572        macros: t.List[MacroReference],
573        default_package: str,
574        visited: t.Optional[t.Set[t.Tuple[str, str]]] = None,
575    ) -> Dependencies:
576        if visited is None:
577            visited = set()
578
579        dependencies = Dependencies()
580        for macro in macros:
581            macro_package = macro.package or default_package
582
583            if (macro_package, macro.name) in visited:
584                continue
585            visited.add((macro_package, macro.name))
586
587            macro_dependencies = self._macro_flatten_dependencies.get(macro_package, {}).get(
588                macro.name
589            )
590            if not macro_dependencies:
591                macro_config = self._macros_per_package[macro_package].get(macro.name)
592                if not macro_config:
593                    continue
594
595                macro_dependencies = macro_config.dependencies.union(
596                    self._flatten_dependencies_from_macros(
597                        macro_config.dependencies.macros, macro_package, visited=visited
598                    )
599                )
600                # We don't need flatten macro dependencies. The jinja macro registry takes care of recursive
601                # dependencies for us.
602                macro_dependencies.macros = []
603                self._macro_flatten_dependencies[macro_package][macro.name] = macro_dependencies
604            dependencies = dependencies.union(macro_dependencies)
605        return dependencies
606
607    def _extra_dependencies(
608        self,
609        target: str,
610        package: str,
611        track_all_model_attrs: bool = False,
612    ) -> Dependencies:
613        """
614        We sometimes observe that the manifest doesn't capture all macros, refs, and sources within a macro.
615        This behavior has been observed with macros like dbt.current_timestamp(), dbt_utils.slugify(), and source().
616        Here we apply our custom extractor to make a best effort to supplement references captured in the manifest.
617        """
618        dependencies = Dependencies()
619
620        # Whether all `model` attributes (e.g., `model.config`) should be included in the dependencies
621        all_model_attrs = False
622
623        for call_name, node in extract_call_names(target, cache=self._calls):
624            if call_name[0] == "config":
625                continue
626
627            if (
628                track_all_model_attrs
629                and not all_model_attrs
630                and isinstance(node, jinja2.nodes.Call)
631                and any(isinstance(a, jinja2.nodes.Name) and a.name == "model" for a in node.args)
632            ):
633                all_model_attrs = True
634
635            if isinstance(node, jinja2.nodes.Getattr):
636                if call_name[0] == "model":
637                    dependencies.model_attrs.attrs.add(call_name[1])
638            elif call_name[0] == "source":
639                args = [jinja_call_arg_name(arg) for arg in node.args]
640                if args and all(arg for arg in args):
641                    source = ".".join(args)
642                    if not self._is_disabled_source(source):
643                        dependencies.sources.add(source)
644                dependencies.macros.append(MacroReference(name="source"))
645            elif call_name[0] == "ref":
646                args = [jinja_call_arg_name(arg) for arg in node.args]
647                if args and all(arg for arg in args):
648                    ref = ".".join(args)
649                    if not self._is_disabled_ref(ref):
650                        dependencies.refs.add(ref)
651                dependencies.macros.append(MacroReference(name="ref"))
652            elif call_name[0] == "var":
653                args = [jinja_call_arg_name(arg) for arg in node.args]
654                if args and args[0]:
655                    dependencies.variables.add(args[0])
656                else:
657                    # We couldn't determine the var name statically
658                    dependencies.has_dynamic_var_names = True
659                dependencies.macros.append(MacroReference(name="var"))
660            elif len(call_name) == 1:
661                macro_name = call_name[0]
662                if macro_name in BUILTIN_CALLS:
663                    continue
664                if (
665                    f"macro.{package}.{macro_name}" not in self._manifest.macros
666                    and f"macro.dbt.{macro_name}" in self._manifest.macros
667                ):
668                    package_name: t.Optional[str] = "dbt"
669                else:
670                    # dbt doesn't include the package name for project macros
671                    package_name = package if package != self._project_name else None
672                _macro_reference_if_not_overridden(
673                    package_name, macro_name, dependencies.macros.append
674                )
675            else:
676                if call_name[0] != "adapter":
677                    _macro_reference_if_not_overridden(
678                        call_name[0], call_name[1], dependencies.macros.append
679                    )
680
681        # When `model` is referenced as-is, e.g. it's passed as an argument to a macro call like
682        # `{{ foo(model) }}`, we can't easily track the attributes that are actually used, because
683        # it may be aliased and hence tracking actual uses of `model` requires a proper data flow
684        # analysis. We conservatively deal with this by including all of its supported attributes
685        # if a standalone reference is found.
686        if all_model_attrs:
687            dependencies.model_attrs.all_attrs = True
688
689        return dependencies
ManifestHelper( project_path: pathlib.Path, profiles_path: pathlib.Path, profile_name: str, target: sqlmesh.dbt.target.TargetConfig, variable_overrides: Optional[Dict[str, Any]] = None, cache_dir: Optional[str] = None, model_defaults: Optional[sqlmesh.core.config.model.ModelDefaultsConfig] = None)
 97    def __init__(
 98        self,
 99        project_path: Path,
100        profiles_path: Path,
101        profile_name: str,
102        target: TargetConfig,
103        variable_overrides: t.Optional[t.Dict[str, t.Any]] = None,
104        cache_dir: t.Optional[str] = None,
105        model_defaults: t.Optional[ModelDefaultsConfig] = None,
106    ):
107        self.project_path = project_path
108        self.profiles_path = profiles_path
109        self.profile_name = profile_name
110        self.target = target
111        self.variable_overrides = variable_overrides or {}
112        self.model_defaults = model_defaults or ModelDefaultsConfig()
113
114        self.__manifest: t.Optional[Manifest] = None
115        self._project_name: str = ""
116
117        self._is_loaded: bool = False
118        self._tests_per_package: t.Dict[str, TestConfigs] = defaultdict(dict)
119        self._models_per_package: t.Dict[str, ModelConfigs] = defaultdict(dict)
120        self._seeds_per_package: t.Dict[str, SeedConfigs] = defaultdict(dict)
121        self._sources_per_package: t.Dict[str, SourceConfigs] = defaultdict(dict)
122        self._macros_per_package: t.Dict[str, MacroConfigs] = defaultdict(dict)
123
124        self._macro_flatten_dependencies: t.Dict[str, t.Dict[str, Dependencies]] = defaultdict(dict)
125
126        self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list)
127        self._disabled_refs: t.Optional[t.Set[str]] = None
128        self._disabled_sources: t.Optional[t.Set[str]] = None
129
130        if cache_dir is not None:
131            cache_path = Path(cache_dir)
132            if not cache_path.is_absolute():
133                cache_path = self.project_path / cache_path
134        else:
135            cache_path = self.project_path / c.CACHE
136
137        self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache(
138            cache_path, "jinja_calls"
139        )
140
141        self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
142        self._on_run_end_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
143        self._materializations: MaterializationConfigs = {}
project_path
profiles_path
profile_name
target
variable_overrides
model_defaults
def tests( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.test.TestConfig]:
145    def tests(self, package_name: t.Optional[str] = None) -> TestConfigs:
146        self._load_all()
147        return self._tests_per_package[package_name or self._project_name]
def models( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.model.ModelConfig]:
149    def models(self, package_name: t.Optional[str] = None) -> ModelConfigs:
150        self._load_all()
151        return self._models_per_package[package_name or self._project_name]
def seeds( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.seed.SeedConfig]:
153    def seeds(self, package_name: t.Optional[str] = None) -> SeedConfigs:
154        self._load_all()
155        return self._seeds_per_package[package_name or self._project_name]
def sources( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.source.SourceConfig]:
157    def sources(self, package_name: t.Optional[str] = None) -> SourceConfigs:
158        self._load_all()
159        return self._sources_per_package[package_name or self._project_name]
def macros( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.package.MacroConfig]:
161    def macros(self, package_name: t.Optional[str] = None) -> MacroConfigs:
162        self._load_all()
163        return self._macros_per_package[package_name or self._project_name]
def on_run_start( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.package.HookConfig]:
165    def on_run_start(self, package_name: t.Optional[str] = None) -> HookConfigs:
166        self._load_all()
167        return self._on_run_start_per_package[package_name or self._project_name]
def on_run_end( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.package.HookConfig]:
169    def on_run_end(self, package_name: t.Optional[str] = None) -> HookConfigs:
170        self._load_all()
171        return self._on_run_end_per_package[package_name or self._project_name]
def materializations(self) -> Dict[str, sqlmesh.dbt.package.MaterializationConfig]:
173    def materializations(self) -> MaterializationConfigs:
174        self._load_all()
175        return self._materializations
all_macros: Dict[str, Dict[str, sqlmesh.utils.jinja.MacroInfo]]
177    @property
178    def all_macros(self) -> t.Dict[str, t.Dict[str, MacroInfo]]:
179        self._load_all()
180        result: t.Dict[str, t.Dict[str, MacroInfo]] = defaultdict(dict)
181        for package_name, macro_configs in self._macros_per_package.items():
182            for macro_name, macro_config in macro_configs.items():
183                result[package_name][macro_name] = macro_config.info
184        return result
flat_graph: Dict[str, Any]
186    @cached_property
187    def flat_graph(self) -> t.Dict[str, t.Any]:
188        return {
189            "exposures": {
190                k: make_serializable(v.to_dict(omit_none=False))
191                for k, v in getattr(self._manifest, "exposures", {}).items()
192            },
193            "groups": {
194                k: make_serializable(v.to_dict(omit_none=False))
195                for k, v in getattr(self._manifest, "groups", {}).items()
196            },
197            "metrics": {
198                k: make_serializable(v.to_dict(omit_none=False))
199                for k, v in getattr(self._manifest, "metrics", {}).items()
200            },
201            "nodes": {
202                k: make_serializable(v.to_dict(omit_none=False))
203                for k, v in self._manifest.nodes.items()
204            },
205            "sources": {
206                k: make_serializable(v.to_dict(omit_none=False))
207                for k, v in self._manifest.sources.items()
208            },
209            "semantic_models": {
210                k: make_serializable(v.to_dict(omit_none=False))
211                for k, v in getattr(self._manifest, "semantic_models", {}).items()
212            },
213            "saved_queries": {
214                k: make_serializable(v.to_dict(omit_none=False))
215                for k, v in getattr(self._manifest, "saved_queries", {}).items()
216            },
217        }