sqlmesh.dbt.manifest
1# ruff: noqa: E402 2from __future__ import annotations 3 4import json 5import logging 6import os 7import re 8import typing as t 9from argparse import Namespace 10from collections import defaultdict 11from functools import cached_property 12from pathlib import Path 13 14from dbt import flags 15 16from sqlmesh.dbt.util import DBT_VERSION 17from sqlmesh.utils.conversions import make_serializable 18 19# Override the file name to prevent dbt commands from invalidating the cache. 20 21if DBT_VERSION >= (1, 6, 0): 22 from dbt import constants as dbt_constants 23 24 dbt_constants.PARTIAL_PARSE_FILE_NAME = "sqlmesh_partial_parse.msgpack" # type: ignore 25else: 26 from dbt.parser import manifest as dbt_manifest # type: ignore 27 28 dbt_manifest.PARTIAL_PARSE_FILE_NAME = "sqlmesh_partial_parse.msgpack" # type: ignore 29 30import jinja2 31from dbt.adapters.factory import register_adapter, reset_adapters 32from dbt.config import Profile, Project, RuntimeConfig 33from dbt.config.profile import read_profile 34from dbt.config.renderer import DbtProjectYamlRenderer, ProfileRenderer 35from dbt.parser.manifest import ManifestLoader 36 37try: 38 from dbt.parser.sources import merge_freshness # type: ignore[attr-defined] 39except ImportError: 40 # merge_freshness was renamed to merge_source_freshness in dbt 1.10 41 # ref: https://github.com/dbt-labs/dbt-core/commit/14fc39a76ff4830cdf2fcbe73f57ca27db500018#diff-1f09db95588f46879a83378c2a86d6b16b7cdfcaddbfe46afc5d919ee5e9a4d9R430 42 from dbt.parser.sources import merge_source_freshness as merge_freshness # type: ignore[no-redef,attr-defined] 43 44from dbt.tracking import do_not_track 45 46from sqlmesh.core import constants as c 47from sqlmesh.utils.errors import SQLMeshError 48from sqlmesh.core.config import ModelDefaultsConfig 49from sqlmesh.dbt.builtin import BUILTIN_FILTERS, BUILTIN_GLOBALS, OVERRIDDEN_MACROS 50from sqlmesh.dbt.common import Dependencies 51from sqlmesh.dbt.model import ModelConfig 52from sqlmesh.dbt.package import HookConfig, MacroConfig, MaterializationConfig 53from sqlmesh.dbt.seed import SeedConfig 54from sqlmesh.dbt.source import SourceConfig 55from sqlmesh.dbt.target import TargetConfig 56from sqlmesh.dbt.test import TestConfig 57from sqlmesh.dbt.util import DBT_VERSION 58from sqlmesh.utils.cache import FileCache 59from sqlmesh.utils.errors import ConfigError 60from sqlmesh.utils.jinja import ( 61 MacroInfo, 62 MacroReference, 63 extract_call_names, 64 jinja_call_arg_name, 65) 66from sqlglot.helper import ensure_list 67 68if t.TYPE_CHECKING: 69 from dbt.contracts.graph.manifest import Macro, Manifest 70 from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition 71 from sqlmesh.utils.jinja import CallNames 72 73logger = logging.getLogger(__name__) 74 75TestConfigs = t.Dict[str, TestConfig] 76ModelConfigs = t.Dict[str, ModelConfig] 77SeedConfigs = t.Dict[str, SeedConfig] 78SourceConfigs = t.Dict[str, SourceConfig] 79MacroConfigs = t.Dict[str, MacroConfig] 80HookConfigs = t.Dict[str, HookConfig] 81MaterializationConfigs = t.Dict[str, MaterializationConfig] 82 83 84IGNORED_PACKAGES = {"elementary"} 85BUILTIN_CALLS = {*BUILTIN_GLOBALS, *BUILTIN_FILTERS} 86 87# Patch Semantic Manifest to skip validation and avoid Pydantic v1 errors on DBT 1.6 88# We patch for 1.7+ since we don't care about semantic models 89if DBT_VERSION >= (1, 6, 0): 90 from dbt.contracts.graph.semantic_manifest import SemanticManifest # type: ignore 91 92 SemanticManifest.validate = lambda _: True # type: ignore 93 94 95class ManifestHelper: 96 def __init__( 97 self, 98 project_path: Path, 99 profiles_path: Path, 100 profile_name: str, 101 target: TargetConfig, 102 variable_overrides: t.Optional[t.Dict[str, t.Any]] = None, 103 cache_dir: t.Optional[str] = None, 104 model_defaults: t.Optional[ModelDefaultsConfig] = None, 105 ): 106 self.project_path = project_path 107 self.profiles_path = profiles_path 108 self.profile_name = profile_name 109 self.target = target 110 self.variable_overrides = variable_overrides or {} 111 self.model_defaults = model_defaults or ModelDefaultsConfig() 112 113 self.__manifest: t.Optional[Manifest] = None 114 self._project_name: str = "" 115 116 self._is_loaded: bool = False 117 self._tests_per_package: t.Dict[str, TestConfigs] = defaultdict(dict) 118 self._models_per_package: t.Dict[str, ModelConfigs] = defaultdict(dict) 119 self._seeds_per_package: t.Dict[str, SeedConfigs] = defaultdict(dict) 120 self._sources_per_package: t.Dict[str, SourceConfigs] = defaultdict(dict) 121 self._macros_per_package: t.Dict[str, MacroConfigs] = defaultdict(dict) 122 123 self._macro_flatten_dependencies: t.Dict[str, t.Dict[str, Dependencies]] = defaultdict(dict) 124 125 self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list) 126 self._disabled_refs: t.Optional[t.Set[str]] = None 127 self._disabled_sources: t.Optional[t.Set[str]] = None 128 129 if cache_dir is not None: 130 cache_path = Path(cache_dir) 131 if not cache_path.is_absolute(): 132 cache_path = self.project_path / cache_path 133 else: 134 cache_path = self.project_path / c.CACHE 135 136 self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache( 137 cache_path, "jinja_calls" 138 ) 139 140 self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) 141 self._on_run_end_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) 142 self._materializations: MaterializationConfigs = {} 143 144 def tests(self, package_name: t.Optional[str] = None) -> TestConfigs: 145 self._load_all() 146 return self._tests_per_package[package_name or self._project_name] 147 148 def models(self, package_name: t.Optional[str] = None) -> ModelConfigs: 149 self._load_all() 150 return self._models_per_package[package_name or self._project_name] 151 152 def seeds(self, package_name: t.Optional[str] = None) -> SeedConfigs: 153 self._load_all() 154 return self._seeds_per_package[package_name or self._project_name] 155 156 def sources(self, package_name: t.Optional[str] = None) -> SourceConfigs: 157 self._load_all() 158 return self._sources_per_package[package_name or self._project_name] 159 160 def macros(self, package_name: t.Optional[str] = None) -> MacroConfigs: 161 self._load_all() 162 return self._macros_per_package[package_name or self._project_name] 163 164 def on_run_start(self, package_name: t.Optional[str] = None) -> HookConfigs: 165 self._load_all() 166 return self._on_run_start_per_package[package_name or self._project_name] 167 168 def on_run_end(self, package_name: t.Optional[str] = None) -> HookConfigs: 169 self._load_all() 170 return self._on_run_end_per_package[package_name or self._project_name] 171 172 def materializations(self) -> MaterializationConfigs: 173 self._load_all() 174 return self._materializations 175 176 @property 177 def all_macros(self) -> t.Dict[str, t.Dict[str, MacroInfo]]: 178 self._load_all() 179 result: t.Dict[str, t.Dict[str, MacroInfo]] = defaultdict(dict) 180 for package_name, macro_configs in self._macros_per_package.items(): 181 for macro_name, macro_config in macro_configs.items(): 182 result[package_name][macro_name] = macro_config.info 183 return result 184 185 @cached_property 186 def flat_graph(self) -> t.Dict[str, t.Any]: 187 return { 188 "exposures": { 189 k: make_serializable(v.to_dict(omit_none=False)) 190 for k, v in getattr(self._manifest, "exposures", {}).items() 191 }, 192 "groups": { 193 k: make_serializable(v.to_dict(omit_none=False)) 194 for k, v in getattr(self._manifest, "groups", {}).items() 195 }, 196 "metrics": { 197 k: make_serializable(v.to_dict(omit_none=False)) 198 for k, v in getattr(self._manifest, "metrics", {}).items() 199 }, 200 "nodes": { 201 k: make_serializable(v.to_dict(omit_none=False)) 202 for k, v in self._manifest.nodes.items() 203 }, 204 "sources": { 205 k: make_serializable(v.to_dict(omit_none=False)) 206 for k, v in self._manifest.sources.items() 207 }, 208 "semantic_models": { 209 k: make_serializable(v.to_dict(omit_none=False)) 210 for k, v in getattr(self._manifest, "semantic_models", {}).items() 211 }, 212 "saved_queries": { 213 k: make_serializable(v.to_dict(omit_none=False)) 214 for k, v in getattr(self._manifest, "saved_queries", {}).items() 215 }, 216 } 217 218 def _load_all(self) -> None: 219 if self._is_loaded: 220 return 221 222 self._calls = {k: (v, False) for k, v in (self._call_cache.get("") or {}).items()} 223 224 self._load_macros() 225 self._load_materializations() 226 self._load_sources() 227 self._load_tests() 228 self._load_models_and_seeds() 229 self._load_on_run_start_end() 230 self._is_loaded = True 231 232 self._call_cache.put("", value={k: v for k, (v, used) in self._calls.items() if used}) 233 234 def _load_sources(self) -> None: 235 for source in self._manifest.sources.values(): 236 # starting in dbt-core 1.9.5, freshness can be set in both source and source config 237 source_dict = source.to_dict() 238 source_dict.pop("freshness", None) 239 240 source_config_dict = _config(source) 241 source_config_dict.pop("freshness", None) 242 243 source_config_freshness = getattr(source.config, "freshness", None) 244 freshness = ( 245 merge_freshness(source.freshness, source_config_freshness) 246 if source_config_freshness 247 else source.freshness 248 ) 249 250 source_config = SourceConfig( 251 **{ 252 **source_dict, 253 **source_config_dict, 254 "freshness": freshness.to_dict() if freshness else None, 255 } 256 ) 257 self._sources_per_package[source.package_name][source_config.config_name] = ( 258 source_config 259 ) 260 261 def _load_macros(self) -> None: 262 for macro in self._manifest.macros.values(): 263 if macro.name.startswith("materialization_"): 264 continue 265 266 if macro.name.startswith("test_"): 267 macro.macro_sql = _convert_jinja_test_to_macro(macro.macro_sql) 268 269 dependencies = Dependencies(macros=_macro_references(self._manifest, macro)) 270 if not macro.name.startswith("test_"): 271 dependencies = dependencies.union( 272 self._extra_dependencies(macro.macro_sql, macro.package_name) 273 ) 274 275 self._macros_per_package[macro.package_name][macro.name] = MacroConfig( 276 info=MacroInfo( 277 definition=macro.macro_sql, 278 depends_on=dependencies.macros, 279 ), 280 dependencies=dependencies, 281 path=Path(macro.original_file_path), 282 ) 283 284 # This is a workaround for dbt adapter macros (eg. "spark__dateadd") whcih are expected to be 285 # available in the global scope regardless of the package they came from. 286 adapter_macro_names = { 287 name[name.find("__") + 2 :] 288 for name in self._macros_per_package.get("dbt", {}) 289 if "__" in name 290 } 291 for macros in self._macros_per_package.values(): 292 for name, macro_config in macros.items(): 293 pos = name.find("__") 294 if pos > 0 and name[pos + 2 :] in adapter_macro_names: 295 macro_config.info.is_top_level = True 296 297 def _load_materializations(self) -> None: 298 for macro in self._manifest.macros.values(): 299 if macro.name.startswith("materialization_"): 300 # Extract name and adapter ( "materialization_{name}_{adapter}" or "materialization_{name}_default") 301 name_parts = macro.name.split("_") 302 if len(name_parts) >= 3: 303 mat_name = "_".join(name_parts[1:-1]) 304 adapter = name_parts[-1] 305 306 dependencies = Dependencies(macros=_macro_references(self._manifest, macro)) 307 macro.macro_sql = _strip_jinja_materialization_tags(macro.macro_sql) 308 dependencies = dependencies.union( 309 self._extra_dependencies(macro.macro_sql, macro.package_name) 310 ) 311 312 materialization_config = MaterializationConfig( 313 name=mat_name, 314 adapter=adapter, 315 definition=macro.macro_sql, 316 dependencies=dependencies, 317 path=Path(macro.original_file_path), 318 ) 319 320 key = f"{mat_name}_{adapter}" 321 self._materializations[key] = materialization_config 322 323 def _load_tests(self) -> None: 324 for node in self._manifest.nodes.values(): 325 if node.resource_type != "test": 326 continue 327 328 skip_test = False 329 refs = _refs(node) 330 for ref in refs: 331 if self._is_disabled_ref(ref): 332 logger.info( 333 "Skipping test '%s' which references a disabled model '%s'", 334 node.name, 335 ref, 336 ) 337 skip_test = True 338 break 339 340 if skip_test: 341 continue 342 343 dependencies = Dependencies( 344 macros=_macro_references(self._manifest, node), 345 refs=refs, 346 sources=_sources(node), 347 ) 348 # Implicit dependencies for model test arg 349 dependencies.macros.append(MacroReference(package="dbt", name="get_where_subquery")) 350 dependencies.macros.append(MacroReference(package="dbt", name="should_store_failures")) 351 352 sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql # type: ignore 353 dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name)) 354 dependencies = dependencies.union( 355 self._flatten_dependencies_from_macros(dependencies.macros, node.package_name) 356 ) 357 358 test_model = _test_model(node) 359 node_config = _node_base_config(node) 360 node_config["name"] = _build_test_name(node, dependencies) 361 362 test = TestConfig( 363 sql=sql, 364 model_name=test_model, 365 test_kwargs=node.test_metadata.kwargs if hasattr(node, "test_metadata") else {}, 366 dependencies=dependencies, 367 **node_config, 368 ) 369 self._tests_per_package[node.package_name][node.unique_id] = test 370 if test_model: 371 self._tests_by_owner[test_model].append(test) 372 373 def _load_models_and_seeds(self) -> None: 374 for node in self._manifest.nodes.values(): 375 if ( 376 node.resource_type not in ("model", "seed", "snapshot") 377 or node.package_name in IGNORED_PACKAGES 378 ): 379 continue 380 381 macro_references = _macro_references(self._manifest, node) 382 all_tests = ( 383 self._tests_by_owner[node.name] 384 + self._tests_by_owner[f"{node.package_name}.{node.name}"] 385 ) 386 # Only include non-standalone tests (tests that don't reference other models) 387 tests = [test for test in all_tests if not test.is_standalone] 388 node_config = _node_base_config(node) 389 390 node_name = node.name 391 node_version = getattr(node, "version", None) 392 if node_version: 393 node_name = f"{node_name}_v{node_version}" 394 395 if node.resource_type in {"model", "snapshot"}: 396 sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql # type: ignore 397 dependencies = Dependencies( 398 macros=macro_references, refs=_refs(node), sources=_sources(node) 399 ) 400 dependencies = dependencies.union( 401 self._extra_dependencies(sql, node.package_name, track_all_model_attrs=True) 402 ) 403 for hook in [*node_config.get("pre-hook", []), *node_config.get("post-hook", [])]: 404 dependencies = dependencies.union( 405 self._extra_dependencies( 406 hook["sql"], node.package_name, track_all_model_attrs=True 407 ) 408 ) 409 dependencies = dependencies.union( 410 self._flatten_dependencies_from_macros(dependencies.macros, node.package_name) 411 ) 412 413 self._models_per_package[node.package_name][node_name] = ModelConfig( 414 **dict( 415 node_config, 416 sql=sql, 417 dependencies=dependencies, 418 tests=tests, 419 ) 420 ) 421 else: 422 self._seeds_per_package[node.package_name][node_name] = SeedConfig( 423 **dict( 424 node_config, 425 dependencies=Dependencies(macros=macro_references), 426 tests=tests, 427 ) 428 ) 429 430 def _load_on_run_start_end(self) -> None: 431 for node in self._manifest.nodes.values(): 432 if node.resource_type == "operation" and ( 433 set(node.tags) & {"on-run-start", "on-run-end"} 434 ): 435 sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql # type: ignore 436 node_name = node.name 437 node_path = Path(node.original_file_path) 438 439 dependencies = Dependencies( 440 macros=_macro_references(self._manifest, node), 441 refs=_refs(node), 442 sources=_sources(node), 443 ) 444 dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name)) 445 dependencies = dependencies.union( 446 self._flatten_dependencies_from_macros(dependencies.macros, node.package_name) 447 ) 448 449 if "on-run-start" in node.tags: 450 self._on_run_start_per_package[node.package_name][node_name] = HookConfig( 451 sql=sql, 452 index=getattr(node, "index", None) or 0, 453 path=node_path, 454 dependencies=dependencies, 455 ) 456 else: 457 self._on_run_end_per_package[node.package_name][node_name] = HookConfig( 458 sql=sql, 459 index=getattr(node, "index", None) or 0, 460 path=node_path, 461 dependencies=dependencies, 462 ) 463 464 @property 465 def _manifest(self) -> Manifest: 466 if not self.__manifest: 467 try: 468 self.__manifest = self._load_manifest() 469 except Exception as ex: 470 raise SQLMeshError(f"Failed to load dbt manifest: {ex}") from ex 471 return self.__manifest 472 473 def _load_manifest(self) -> Manifest: 474 do_not_track() 475 476 variables = ( 477 self.variable_overrides 478 if DBT_VERSION >= (1, 5, 0) 479 else json.dumps(self.variable_overrides) 480 ) 481 482 args: Namespace = Namespace( 483 vars=variables, 484 profile=self.profile_name, 485 project_dir=str(self.project_path), 486 profiles_dir=str(self.profiles_path), 487 target=self.target.name, 488 macro_debugging=False, 489 REQUIRE_RESOURCE_NAMES_WITHOUT_SPACES=True, 490 ) 491 flags.set_from_args(args, None) 492 493 if DBT_VERSION >= (1, 8, 0): 494 from dbt_common.context import set_invocation_context # type: ignore 495 496 set_invocation_context(os.environ) 497 498 profile = self._load_profile() 499 project = self._load_project(profile) 500 501 if ( 502 not any(k in project.models for k in ("start", "+start")) 503 and not self.model_defaults.start 504 ): 505 raise ConfigError( 506 "SQLMesh requires a start date in order to have a finite range of backfilling data. Add start to the 'models:' block in dbt_project.yml. https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#setting-model-backfill-start-dates" 507 ) 508 509 runtime_config = RuntimeConfig.from_parts(project, profile, args) 510 511 self._project_name = project.project_name 512 513 if DBT_VERSION >= (1, 8, 0): 514 from dbt.mp_context import get_mp_context # type: ignore 515 516 register_adapter(runtime_config, get_mp_context()) # type: ignore 517 else: 518 register_adapter(runtime_config) # type: ignore 519 520 manifest = ManifestLoader.get_full_manifest(runtime_config) 521 # This adapter doesn't care about semantic models so we clear them out to avoid issues 522 manifest.semantic_models = {} 523 reset_adapters() 524 return manifest 525 526 def _load_project(self, profile: Profile) -> Project: 527 project_renderer = DbtProjectYamlRenderer(profile, cli_vars=self.variable_overrides) 528 return Project.from_project_root(str(self.project_path), project_renderer) 529 530 def _load_profile(self) -> Profile: 531 profile_renderer = ProfileRenderer(cli_vars=self.variable_overrides) 532 raw_profiles = read_profile(str(self.profiles_path)) 533 return Profile.from_raw_profiles( 534 raw_profiles=raw_profiles, 535 profile_name=self.profile_name, 536 renderer=profile_renderer, 537 target_override=self.target.name, 538 ) 539 540 def _is_disabled_ref(self, ref: str) -> bool: 541 if self._disabled_refs is None: 542 self._load_disabled() 543 544 return ref in self._disabled_refs # type: ignore 545 546 def _is_disabled_source(self, source: str) -> bool: 547 if self._disabled_sources is None: 548 self._load_disabled() 549 550 return source in self._disabled_sources # type: ignore 551 552 def _load_disabled(self) -> None: 553 self._disabled_refs = set() 554 self._disabled_sources = set() 555 for nodes in self._manifest.disabled.values(): 556 for node in nodes: 557 if node.resource_type in ("model", "snapshot", "seed"): 558 self._disabled_refs.add(f"{node.package_name}.{node.name}") 559 self._disabled_refs.add(node.name) 560 elif node.resource_type == "source": 561 self._disabled_sources.add(f"{node.package_name}.{node.name}") 562 563 for node in self._manifest.nodes.values(): 564 if node.resource_type in ("model", "snapshot", "seed"): 565 self._disabled_refs.discard(node.name) 566 elif node.resource_type == "source": 567 self._disabled_sources.discard(node.name) 568 569 def _flatten_dependencies_from_macros( 570 self, 571 macros: t.List[MacroReference], 572 default_package: str, 573 visited: t.Optional[t.Set[t.Tuple[str, str]]] = None, 574 ) -> Dependencies: 575 if visited is None: 576 visited = set() 577 578 dependencies = Dependencies() 579 for macro in macros: 580 macro_package = macro.package or default_package 581 582 if (macro_package, macro.name) in visited: 583 continue 584 visited.add((macro_package, macro.name)) 585 586 macro_dependencies = self._macro_flatten_dependencies.get(macro_package, {}).get( 587 macro.name 588 ) 589 if not macro_dependencies: 590 macro_config = self._macros_per_package[macro_package].get(macro.name) 591 if not macro_config: 592 continue 593 594 macro_dependencies = macro_config.dependencies.union( 595 self._flatten_dependencies_from_macros( 596 macro_config.dependencies.macros, macro_package, visited=visited 597 ) 598 ) 599 # We don't need flatten macro dependencies. The jinja macro registry takes care of recursive 600 # dependencies for us. 601 macro_dependencies.macros = [] 602 self._macro_flatten_dependencies[macro_package][macro.name] = macro_dependencies 603 dependencies = dependencies.union(macro_dependencies) 604 return dependencies 605 606 def _extra_dependencies( 607 self, 608 target: str, 609 package: str, 610 track_all_model_attrs: bool = False, 611 ) -> Dependencies: 612 """ 613 We sometimes observe that the manifest doesn't capture all macros, refs, and sources within a macro. 614 This behavior has been observed with macros like dbt.current_timestamp(), dbt_utils.slugify(), and source(). 615 Here we apply our custom extractor to make a best effort to supplement references captured in the manifest. 616 """ 617 dependencies = Dependencies() 618 619 # Whether all `model` attributes (e.g., `model.config`) should be included in the dependencies 620 all_model_attrs = False 621 622 for call_name, node in extract_call_names(target, cache=self._calls): 623 if call_name[0] == "config": 624 continue 625 626 if ( 627 track_all_model_attrs 628 and not all_model_attrs 629 and isinstance(node, jinja2.nodes.Call) 630 and any(isinstance(a, jinja2.nodes.Name) and a.name == "model" for a in node.args) 631 ): 632 all_model_attrs = True 633 634 if isinstance(node, jinja2.nodes.Getattr): 635 if call_name[0] == "model": 636 dependencies.model_attrs.attrs.add(call_name[1]) 637 elif call_name[0] == "source": 638 args = [jinja_call_arg_name(arg) for arg in node.args] 639 if args and all(arg for arg in args): 640 source = ".".join(args) 641 if not self._is_disabled_source(source): 642 dependencies.sources.add(source) 643 dependencies.macros.append(MacroReference(name="source")) 644 elif call_name[0] == "ref": 645 args = [jinja_call_arg_name(arg) for arg in node.args] 646 if args and all(arg for arg in args): 647 ref = ".".join(args) 648 if not self._is_disabled_ref(ref): 649 dependencies.refs.add(ref) 650 dependencies.macros.append(MacroReference(name="ref")) 651 elif call_name[0] == "var": 652 args = [jinja_call_arg_name(arg) for arg in node.args] 653 if args and args[0]: 654 dependencies.variables.add(args[0]) 655 else: 656 # We couldn't determine the var name statically 657 dependencies.has_dynamic_var_names = True 658 dependencies.macros.append(MacroReference(name="var")) 659 elif len(call_name) == 1: 660 macro_name = call_name[0] 661 if macro_name in BUILTIN_CALLS: 662 continue 663 if ( 664 f"macro.{package}.{macro_name}" not in self._manifest.macros 665 and f"macro.dbt.{macro_name}" in self._manifest.macros 666 ): 667 package_name: t.Optional[str] = "dbt" 668 else: 669 # dbt doesn't include the package name for project macros 670 package_name = package if package != self._project_name else None 671 _macro_reference_if_not_overridden( 672 package_name, macro_name, dependencies.macros.append 673 ) 674 else: 675 if call_name[0] != "adapter": 676 _macro_reference_if_not_overridden( 677 call_name[0], call_name[1], dependencies.macros.append 678 ) 679 680 # When `model` is referenced as-is, e.g. it's passed as an argument to a macro call like 681 # `{{ foo(model) }}`, we can't easily track the attributes that are actually used, because 682 # it may be aliased and hence tracking actual uses of `model` requires a proper data flow 683 # analysis. We conservatively deal with this by including all of its supported attributes 684 # if a standalone reference is found. 685 if all_model_attrs: 686 dependencies.model_attrs.all_attrs = True 687 688 return dependencies 689 690 691def _macro_reference_if_not_overridden( 692 package: t.Optional[str], name: str, if_not_overridden: t.Callable[[MacroReference], None] 693) -> None: 694 reference = MacroReference(package=package, name=name) 695 if reference not in OVERRIDDEN_MACROS: 696 if_not_overridden(reference) 697 698 699def _config(node: t.Union[ManifestNode, SourceDefinition]) -> t.Dict[str, t.Any]: 700 return node.config.to_dict() 701 702 703def _macro_references( 704 manifest: Manifest, node: t.Union[ManifestNode, Macro] 705) -> t.Set[MacroReference]: 706 result: t.Set[MacroReference] = set() 707 if not hasattr(node, "depends_on"): 708 return result 709 710 for macro_node_id in node.depends_on.macros: 711 if not macro_node_id or macro_node_id == "None": 712 continue 713 714 macro_node = manifest.macros[macro_node_id] 715 macro_name = macro_node.name 716 macro_package = ( 717 macro_node.package_name if macro_node.package_name != node.package_name else None 718 ) 719 _macro_reference_if_not_overridden(macro_package, macro_name, result.add) 720 return result 721 722 723def _refs(node: ManifestNode) -> t.Set[str]: 724 if DBT_VERSION >= (1, 5, 0): 725 result: t.Set[str] = set() 726 if not hasattr(node, "refs"): 727 return result 728 for r in node.refs: 729 ref_name = f"{r.package}.{r.name}" if r.package else r.name # type: ignore 730 if getattr(r, "version", None): 731 ref_name = f"{ref_name}_v{r.version}" # type: ignore 732 result.add(ref_name) 733 return result 734 return {".".join(r) for r in node.refs} # type: ignore 735 736 737def _sources(node: ManifestNode) -> t.Set[str]: 738 return {".".join(s) for s in getattr(node, "sources", [])} 739 740 741def _model_node_id(model_name: str, package: str) -> str: 742 return f"model.{package}.{model_name}" 743 744 745def _test_model(node: ManifestNode) -> t.Optional[str]: 746 attached_node = getattr(node, "attached_node", None) 747 if attached_node: 748 pieces = attached_node.split(".") 749 if pieces[0] in ["model", "seed"]: 750 # versioned models have format "model.package.model_name.v1" (4 parts) 751 if len(pieces) == 4: 752 return f"{pieces[2]}_{pieces[3]}" 753 return pieces[-1] 754 return None 755 756 key_name = getattr(node, "file_key_name", None) 757 if key_name: 758 pieces = key_name.split(".") 759 return pieces[-1] if pieces[0] in ["models", "seeds"] else None 760 761 return None 762 763 764def _node_base_config(node: ManifestNode) -> t.Dict[str, t.Any]: 765 return { 766 **_config(node), 767 **node.to_dict(), 768 "path": Path(node.original_file_path), 769 } 770 771 772def _convert_jinja_test_to_macro(test_jinja: str) -> str: 773 TEST_TAG_REGEX = r"\s*{%-?\s*test\s+" 774 ENDTEST_REGEX = r"{%-?\s*endtest\s*-?%}" 775 776 match = re.match(TEST_TAG_REGEX, test_jinja) 777 if not match: 778 # already a macro 779 return test_jinja 780 781 test_tag = test_jinja[: match.span()[-1]] 782 783 macro_tag = re.sub(r"({%-?\s*)test\s+", r"\1macro test_", test_tag) 784 macro = macro_tag + test_jinja[match.span()[-1] :] 785 786 return re.sub(ENDTEST_REGEX, lambda m: m.group(0).replace("endtest", "endmacro"), macro) 787 788 789def _strip_jinja_materialization_tags(materialization_jinja: str) -> str: 790 MATERIALIZATION_TAG_REGEX = r"\s*{%-?\s*materialization\s+[^%]*%}\s*\n?" 791 ENDMATERIALIZATION_REGEX = r"{%-?\s*endmaterialization\s*-?%}\s*\n?" 792 793 if not re.match(MATERIALIZATION_TAG_REGEX, materialization_jinja): 794 return materialization_jinja 795 796 materialization_jinja = re.sub( 797 MATERIALIZATION_TAG_REGEX, 798 "", 799 materialization_jinja, 800 flags=re.IGNORECASE, 801 ) 802 803 materialization_jinja = re.sub( 804 ENDMATERIALIZATION_REGEX, 805 "", 806 materialization_jinja, 807 flags=re.IGNORECASE, 808 ) 809 810 return materialization_jinja.strip() 811 812 813def _build_test_name(node: ManifestNode, dependencies: Dependencies) -> str: 814 """ 815 Build a user-friendly test name that includes the test's model/source, column, 816 and args for tests with custom user names. Needed because dbt only generates these 817 names for tests that do not specify the "name" field in their YAML definition. 818 819 Name structure 820 - Model test: [namespace]_[test name]_[model name]_[column name]__[arg values] 821 - Source test: [namespace]_source_[test name]_[source name]_[table name]_[column name]__[arg values] 822 """ 823 # standalone test 824 if not hasattr(node, "test_metadata"): 825 return node.name 826 827 model_name = _test_model(node) 828 source_name = None 829 if not model_name and dependencies.sources: 830 # extract source and table names 831 source_parts = list(dependencies.sources)[0].split(".") 832 source_name = "_".join(source_parts) if len(source_parts) == 2 else source_parts[-1] 833 entity_name = model_name or source_name or "" 834 entity_name = f"_{entity_name}" if entity_name else "" 835 836 name_prefix = "" 837 if namespace := getattr(node.test_metadata, "namespace", None): 838 name_prefix += f"{namespace}_" 839 if source_name and not model_name: 840 name_prefix += "source_" 841 842 metadata_kwargs = node.test_metadata.kwargs 843 arg_val_parts = [] 844 for arg, val in sorted(metadata_kwargs.items()): 845 if arg == "model": 846 continue 847 if isinstance(val, dict): 848 val = list(val.values()) 849 val = [re.sub("[^0-9a-zA-Z_]+", "_", str(v)) for v in ensure_list(val)] 850 arg_val_parts.extend(val) 851 unique_args = "__".join(arg_val_parts) if arg_val_parts else "" 852 unique_args = f"_{unique_args}" if unique_args else "" 853 854 auto_name = f"{name_prefix}{node.test_metadata.name}{entity_name}{unique_args}" 855 856 if node.name == auto_name: 857 return node.name 858 859 custom_prefix = name_prefix if source_name and not model_name else "" 860 return f"{custom_prefix}{node.name}{entity_name}{unique_args}"
logger =
<Logger sqlmesh.dbt.manifest (WARNING)>
TestConfigs =
typing.Dict[str, sqlmesh.dbt.test.TestConfig]
ModelConfigs =
typing.Dict[str, sqlmesh.dbt.model.ModelConfig]
SeedConfigs =
typing.Dict[str, sqlmesh.dbt.seed.SeedConfig]
SourceConfigs =
typing.Dict[str, sqlmesh.dbt.source.SourceConfig]
MacroConfigs =
typing.Dict[str, sqlmesh.dbt.package.MacroConfig]
HookConfigs =
typing.Dict[str, sqlmesh.dbt.package.HookConfig]
MaterializationConfigs =
typing.Dict[str, sqlmesh.dbt.package.MaterializationConfig]
IGNORED_PACKAGES =
{'elementary'}
BUILTIN_CALLS =
{'set_strict', 'sqlmesh_incremental', 'as_native', 'print', 'zip', 'env_var', 'sqlmesh', 'as_text', 'fromyaml', 'log', 'set', 'exceptions', 'fromjson', 'tojson', 'try_or_compiler_error', 'as_number', 'return', 'dbt_version', 'zip_strict', 'as_bool', 'toyaml', 'modules'}
class
ManifestHelper:
96class ManifestHelper: 97 def __init__( 98 self, 99 project_path: Path, 100 profiles_path: Path, 101 profile_name: str, 102 target: TargetConfig, 103 variable_overrides: t.Optional[t.Dict[str, t.Any]] = None, 104 cache_dir: t.Optional[str] = None, 105 model_defaults: t.Optional[ModelDefaultsConfig] = None, 106 ): 107 self.project_path = project_path 108 self.profiles_path = profiles_path 109 self.profile_name = profile_name 110 self.target = target 111 self.variable_overrides = variable_overrides or {} 112 self.model_defaults = model_defaults or ModelDefaultsConfig() 113 114 self.__manifest: t.Optional[Manifest] = None 115 self._project_name: str = "" 116 117 self._is_loaded: bool = False 118 self._tests_per_package: t.Dict[str, TestConfigs] = defaultdict(dict) 119 self._models_per_package: t.Dict[str, ModelConfigs] = defaultdict(dict) 120 self._seeds_per_package: t.Dict[str, SeedConfigs] = defaultdict(dict) 121 self._sources_per_package: t.Dict[str, SourceConfigs] = defaultdict(dict) 122 self._macros_per_package: t.Dict[str, MacroConfigs] = defaultdict(dict) 123 124 self._macro_flatten_dependencies: t.Dict[str, t.Dict[str, Dependencies]] = defaultdict(dict) 125 126 self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list) 127 self._disabled_refs: t.Optional[t.Set[str]] = None 128 self._disabled_sources: t.Optional[t.Set[str]] = None 129 130 if cache_dir is not None: 131 cache_path = Path(cache_dir) 132 if not cache_path.is_absolute(): 133 cache_path = self.project_path / cache_path 134 else: 135 cache_path = self.project_path / c.CACHE 136 137 self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache( 138 cache_path, "jinja_calls" 139 ) 140 141 self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) 142 self._on_run_end_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) 143 self._materializations: MaterializationConfigs = {} 144 145 def tests(self, package_name: t.Optional[str] = None) -> TestConfigs: 146 self._load_all() 147 return self._tests_per_package[package_name or self._project_name] 148 149 def models(self, package_name: t.Optional[str] = None) -> ModelConfigs: 150 self._load_all() 151 return self._models_per_package[package_name or self._project_name] 152 153 def seeds(self, package_name: t.Optional[str] = None) -> SeedConfigs: 154 self._load_all() 155 return self._seeds_per_package[package_name or self._project_name] 156 157 def sources(self, package_name: t.Optional[str] = None) -> SourceConfigs: 158 self._load_all() 159 return self._sources_per_package[package_name or self._project_name] 160 161 def macros(self, package_name: t.Optional[str] = None) -> MacroConfigs: 162 self._load_all() 163 return self._macros_per_package[package_name or self._project_name] 164 165 def on_run_start(self, package_name: t.Optional[str] = None) -> HookConfigs: 166 self._load_all() 167 return self._on_run_start_per_package[package_name or self._project_name] 168 169 def on_run_end(self, package_name: t.Optional[str] = None) -> HookConfigs: 170 self._load_all() 171 return self._on_run_end_per_package[package_name or self._project_name] 172 173 def materializations(self) -> MaterializationConfigs: 174 self._load_all() 175 return self._materializations 176 177 @property 178 def all_macros(self) -> t.Dict[str, t.Dict[str, MacroInfo]]: 179 self._load_all() 180 result: t.Dict[str, t.Dict[str, MacroInfo]] = defaultdict(dict) 181 for package_name, macro_configs in self._macros_per_package.items(): 182 for macro_name, macro_config in macro_configs.items(): 183 result[package_name][macro_name] = macro_config.info 184 return result 185 186 @cached_property 187 def flat_graph(self) -> t.Dict[str, t.Any]: 188 return { 189 "exposures": { 190 k: make_serializable(v.to_dict(omit_none=False)) 191 for k, v in getattr(self._manifest, "exposures", {}).items() 192 }, 193 "groups": { 194 k: make_serializable(v.to_dict(omit_none=False)) 195 for k, v in getattr(self._manifest, "groups", {}).items() 196 }, 197 "metrics": { 198 k: make_serializable(v.to_dict(omit_none=False)) 199 for k, v in getattr(self._manifest, "metrics", {}).items() 200 }, 201 "nodes": { 202 k: make_serializable(v.to_dict(omit_none=False)) 203 for k, v in self._manifest.nodes.items() 204 }, 205 "sources": { 206 k: make_serializable(v.to_dict(omit_none=False)) 207 for k, v in self._manifest.sources.items() 208 }, 209 "semantic_models": { 210 k: make_serializable(v.to_dict(omit_none=False)) 211 for k, v in getattr(self._manifest, "semantic_models", {}).items() 212 }, 213 "saved_queries": { 214 k: make_serializable(v.to_dict(omit_none=False)) 215 for k, v in getattr(self._manifest, "saved_queries", {}).items() 216 }, 217 } 218 219 def _load_all(self) -> None: 220 if self._is_loaded: 221 return 222 223 self._calls = {k: (v, False) for k, v in (self._call_cache.get("") or {}).items()} 224 225 self._load_macros() 226 self._load_materializations() 227 self._load_sources() 228 self._load_tests() 229 self._load_models_and_seeds() 230 self._load_on_run_start_end() 231 self._is_loaded = True 232 233 self._call_cache.put("", value={k: v for k, (v, used) in self._calls.items() if used}) 234 235 def _load_sources(self) -> None: 236 for source in self._manifest.sources.values(): 237 # starting in dbt-core 1.9.5, freshness can be set in both source and source config 238 source_dict = source.to_dict() 239 source_dict.pop("freshness", None) 240 241 source_config_dict = _config(source) 242 source_config_dict.pop("freshness", None) 243 244 source_config_freshness = getattr(source.config, "freshness", None) 245 freshness = ( 246 merge_freshness(source.freshness, source_config_freshness) 247 if source_config_freshness 248 else source.freshness 249 ) 250 251 source_config = SourceConfig( 252 **{ 253 **source_dict, 254 **source_config_dict, 255 "freshness": freshness.to_dict() if freshness else None, 256 } 257 ) 258 self._sources_per_package[source.package_name][source_config.config_name] = ( 259 source_config 260 ) 261 262 def _load_macros(self) -> None: 263 for macro in self._manifest.macros.values(): 264 if macro.name.startswith("materialization_"): 265 continue 266 267 if macro.name.startswith("test_"): 268 macro.macro_sql = _convert_jinja_test_to_macro(macro.macro_sql) 269 270 dependencies = Dependencies(macros=_macro_references(self._manifest, macro)) 271 if not macro.name.startswith("test_"): 272 dependencies = dependencies.union( 273 self._extra_dependencies(macro.macro_sql, macro.package_name) 274 ) 275 276 self._macros_per_package[macro.package_name][macro.name] = MacroConfig( 277 info=MacroInfo( 278 definition=macro.macro_sql, 279 depends_on=dependencies.macros, 280 ), 281 dependencies=dependencies, 282 path=Path(macro.original_file_path), 283 ) 284 285 # This is a workaround for dbt adapter macros (eg. "spark__dateadd") whcih are expected to be 286 # available in the global scope regardless of the package they came from. 287 adapter_macro_names = { 288 name[name.find("__") + 2 :] 289 for name in self._macros_per_package.get("dbt", {}) 290 if "__" in name 291 } 292 for macros in self._macros_per_package.values(): 293 for name, macro_config in macros.items(): 294 pos = name.find("__") 295 if pos > 0 and name[pos + 2 :] in adapter_macro_names: 296 macro_config.info.is_top_level = True 297 298 def _load_materializations(self) -> None: 299 for macro in self._manifest.macros.values(): 300 if macro.name.startswith("materialization_"): 301 # Extract name and adapter ( "materialization_{name}_{adapter}" or "materialization_{name}_default") 302 name_parts = macro.name.split("_") 303 if len(name_parts) >= 3: 304 mat_name = "_".join(name_parts[1:-1]) 305 adapter = name_parts[-1] 306 307 dependencies = Dependencies(macros=_macro_references(self._manifest, macro)) 308 macro.macro_sql = _strip_jinja_materialization_tags(macro.macro_sql) 309 dependencies = dependencies.union( 310 self._extra_dependencies(macro.macro_sql, macro.package_name) 311 ) 312 313 materialization_config = MaterializationConfig( 314 name=mat_name, 315 adapter=adapter, 316 definition=macro.macro_sql, 317 dependencies=dependencies, 318 path=Path(macro.original_file_path), 319 ) 320 321 key = f"{mat_name}_{adapter}" 322 self._materializations[key] = materialization_config 323 324 def _load_tests(self) -> None: 325 for node in self._manifest.nodes.values(): 326 if node.resource_type != "test": 327 continue 328 329 skip_test = False 330 refs = _refs(node) 331 for ref in refs: 332 if self._is_disabled_ref(ref): 333 logger.info( 334 "Skipping test '%s' which references a disabled model '%s'", 335 node.name, 336 ref, 337 ) 338 skip_test = True 339 break 340 341 if skip_test: 342 continue 343 344 dependencies = Dependencies( 345 macros=_macro_references(self._manifest, node), 346 refs=refs, 347 sources=_sources(node), 348 ) 349 # Implicit dependencies for model test arg 350 dependencies.macros.append(MacroReference(package="dbt", name="get_where_subquery")) 351 dependencies.macros.append(MacroReference(package="dbt", name="should_store_failures")) 352 353 sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql # type: ignore 354 dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name)) 355 dependencies = dependencies.union( 356 self._flatten_dependencies_from_macros(dependencies.macros, node.package_name) 357 ) 358 359 test_model = _test_model(node) 360 node_config = _node_base_config(node) 361 node_config["name"] = _build_test_name(node, dependencies) 362 363 test = TestConfig( 364 sql=sql, 365 model_name=test_model, 366 test_kwargs=node.test_metadata.kwargs if hasattr(node, "test_metadata") else {}, 367 dependencies=dependencies, 368 **node_config, 369 ) 370 self._tests_per_package[node.package_name][node.unique_id] = test 371 if test_model: 372 self._tests_by_owner[test_model].append(test) 373 374 def _load_models_and_seeds(self) -> None: 375 for node in self._manifest.nodes.values(): 376 if ( 377 node.resource_type not in ("model", "seed", "snapshot") 378 or node.package_name in IGNORED_PACKAGES 379 ): 380 continue 381 382 macro_references = _macro_references(self._manifest, node) 383 all_tests = ( 384 self._tests_by_owner[node.name] 385 + self._tests_by_owner[f"{node.package_name}.{node.name}"] 386 ) 387 # Only include non-standalone tests (tests that don't reference other models) 388 tests = [test for test in all_tests if not test.is_standalone] 389 node_config = _node_base_config(node) 390 391 node_name = node.name 392 node_version = getattr(node, "version", None) 393 if node_version: 394 node_name = f"{node_name}_v{node_version}" 395 396 if node.resource_type in {"model", "snapshot"}: 397 sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql # type: ignore 398 dependencies = Dependencies( 399 macros=macro_references, refs=_refs(node), sources=_sources(node) 400 ) 401 dependencies = dependencies.union( 402 self._extra_dependencies(sql, node.package_name, track_all_model_attrs=True) 403 ) 404 for hook in [*node_config.get("pre-hook", []), *node_config.get("post-hook", [])]: 405 dependencies = dependencies.union( 406 self._extra_dependencies( 407 hook["sql"], node.package_name, track_all_model_attrs=True 408 ) 409 ) 410 dependencies = dependencies.union( 411 self._flatten_dependencies_from_macros(dependencies.macros, node.package_name) 412 ) 413 414 self._models_per_package[node.package_name][node_name] = ModelConfig( 415 **dict( 416 node_config, 417 sql=sql, 418 dependencies=dependencies, 419 tests=tests, 420 ) 421 ) 422 else: 423 self._seeds_per_package[node.package_name][node_name] = SeedConfig( 424 **dict( 425 node_config, 426 dependencies=Dependencies(macros=macro_references), 427 tests=tests, 428 ) 429 ) 430 431 def _load_on_run_start_end(self) -> None: 432 for node in self._manifest.nodes.values(): 433 if node.resource_type == "operation" and ( 434 set(node.tags) & {"on-run-start", "on-run-end"} 435 ): 436 sql = node.raw_code if DBT_VERSION >= (1, 3, 0) else node.raw_sql # type: ignore 437 node_name = node.name 438 node_path = Path(node.original_file_path) 439 440 dependencies = Dependencies( 441 macros=_macro_references(self._manifest, node), 442 refs=_refs(node), 443 sources=_sources(node), 444 ) 445 dependencies = dependencies.union(self._extra_dependencies(sql, node.package_name)) 446 dependencies = dependencies.union( 447 self._flatten_dependencies_from_macros(dependencies.macros, node.package_name) 448 ) 449 450 if "on-run-start" in node.tags: 451 self._on_run_start_per_package[node.package_name][node_name] = HookConfig( 452 sql=sql, 453 index=getattr(node, "index", None) or 0, 454 path=node_path, 455 dependencies=dependencies, 456 ) 457 else: 458 self._on_run_end_per_package[node.package_name][node_name] = HookConfig( 459 sql=sql, 460 index=getattr(node, "index", None) or 0, 461 path=node_path, 462 dependencies=dependencies, 463 ) 464 465 @property 466 def _manifest(self) -> Manifest: 467 if not self.__manifest: 468 try: 469 self.__manifest = self._load_manifest() 470 except Exception as ex: 471 raise SQLMeshError(f"Failed to load dbt manifest: {ex}") from ex 472 return self.__manifest 473 474 def _load_manifest(self) -> Manifest: 475 do_not_track() 476 477 variables = ( 478 self.variable_overrides 479 if DBT_VERSION >= (1, 5, 0) 480 else json.dumps(self.variable_overrides) 481 ) 482 483 args: Namespace = Namespace( 484 vars=variables, 485 profile=self.profile_name, 486 project_dir=str(self.project_path), 487 profiles_dir=str(self.profiles_path), 488 target=self.target.name, 489 macro_debugging=False, 490 REQUIRE_RESOURCE_NAMES_WITHOUT_SPACES=True, 491 ) 492 flags.set_from_args(args, None) 493 494 if DBT_VERSION >= (1, 8, 0): 495 from dbt_common.context import set_invocation_context # type: ignore 496 497 set_invocation_context(os.environ) 498 499 profile = self._load_profile() 500 project = self._load_project(profile) 501 502 if ( 503 not any(k in project.models for k in ("start", "+start")) 504 and not self.model_defaults.start 505 ): 506 raise ConfigError( 507 "SQLMesh requires a start date in order to have a finite range of backfilling data. Add start to the 'models:' block in dbt_project.yml. https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#setting-model-backfill-start-dates" 508 ) 509 510 runtime_config = RuntimeConfig.from_parts(project, profile, args) 511 512 self._project_name = project.project_name 513 514 if DBT_VERSION >= (1, 8, 0): 515 from dbt.mp_context import get_mp_context # type: ignore 516 517 register_adapter(runtime_config, get_mp_context()) # type: ignore 518 else: 519 register_adapter(runtime_config) # type: ignore 520 521 manifest = ManifestLoader.get_full_manifest(runtime_config) 522 # This adapter doesn't care about semantic models so we clear them out to avoid issues 523 manifest.semantic_models = {} 524 reset_adapters() 525 return manifest 526 527 def _load_project(self, profile: Profile) -> Project: 528 project_renderer = DbtProjectYamlRenderer(profile, cli_vars=self.variable_overrides) 529 return Project.from_project_root(str(self.project_path), project_renderer) 530 531 def _load_profile(self) -> Profile: 532 profile_renderer = ProfileRenderer(cli_vars=self.variable_overrides) 533 raw_profiles = read_profile(str(self.profiles_path)) 534 return Profile.from_raw_profiles( 535 raw_profiles=raw_profiles, 536 profile_name=self.profile_name, 537 renderer=profile_renderer, 538 target_override=self.target.name, 539 ) 540 541 def _is_disabled_ref(self, ref: str) -> bool: 542 if self._disabled_refs is None: 543 self._load_disabled() 544 545 return ref in self._disabled_refs # type: ignore 546 547 def _is_disabled_source(self, source: str) -> bool: 548 if self._disabled_sources is None: 549 self._load_disabled() 550 551 return source in self._disabled_sources # type: ignore 552 553 def _load_disabled(self) -> None: 554 self._disabled_refs = set() 555 self._disabled_sources = set() 556 for nodes in self._manifest.disabled.values(): 557 for node in nodes: 558 if node.resource_type in ("model", "snapshot", "seed"): 559 self._disabled_refs.add(f"{node.package_name}.{node.name}") 560 self._disabled_refs.add(node.name) 561 elif node.resource_type == "source": 562 self._disabled_sources.add(f"{node.package_name}.{node.name}") 563 564 for node in self._manifest.nodes.values(): 565 if node.resource_type in ("model", "snapshot", "seed"): 566 self._disabled_refs.discard(node.name) 567 elif node.resource_type == "source": 568 self._disabled_sources.discard(node.name) 569 570 def _flatten_dependencies_from_macros( 571 self, 572 macros: t.List[MacroReference], 573 default_package: str, 574 visited: t.Optional[t.Set[t.Tuple[str, str]]] = None, 575 ) -> Dependencies: 576 if visited is None: 577 visited = set() 578 579 dependencies = Dependencies() 580 for macro in macros: 581 macro_package = macro.package or default_package 582 583 if (macro_package, macro.name) in visited: 584 continue 585 visited.add((macro_package, macro.name)) 586 587 macro_dependencies = self._macro_flatten_dependencies.get(macro_package, {}).get( 588 macro.name 589 ) 590 if not macro_dependencies: 591 macro_config = self._macros_per_package[macro_package].get(macro.name) 592 if not macro_config: 593 continue 594 595 macro_dependencies = macro_config.dependencies.union( 596 self._flatten_dependencies_from_macros( 597 macro_config.dependencies.macros, macro_package, visited=visited 598 ) 599 ) 600 # We don't need flatten macro dependencies. The jinja macro registry takes care of recursive 601 # dependencies for us. 602 macro_dependencies.macros = [] 603 self._macro_flatten_dependencies[macro_package][macro.name] = macro_dependencies 604 dependencies = dependencies.union(macro_dependencies) 605 return dependencies 606 607 def _extra_dependencies( 608 self, 609 target: str, 610 package: str, 611 track_all_model_attrs: bool = False, 612 ) -> Dependencies: 613 """ 614 We sometimes observe that the manifest doesn't capture all macros, refs, and sources within a macro. 615 This behavior has been observed with macros like dbt.current_timestamp(), dbt_utils.slugify(), and source(). 616 Here we apply our custom extractor to make a best effort to supplement references captured in the manifest. 617 """ 618 dependencies = Dependencies() 619 620 # Whether all `model` attributes (e.g., `model.config`) should be included in the dependencies 621 all_model_attrs = False 622 623 for call_name, node in extract_call_names(target, cache=self._calls): 624 if call_name[0] == "config": 625 continue 626 627 if ( 628 track_all_model_attrs 629 and not all_model_attrs 630 and isinstance(node, jinja2.nodes.Call) 631 and any(isinstance(a, jinja2.nodes.Name) and a.name == "model" for a in node.args) 632 ): 633 all_model_attrs = True 634 635 if isinstance(node, jinja2.nodes.Getattr): 636 if call_name[0] == "model": 637 dependencies.model_attrs.attrs.add(call_name[1]) 638 elif call_name[0] == "source": 639 args = [jinja_call_arg_name(arg) for arg in node.args] 640 if args and all(arg for arg in args): 641 source = ".".join(args) 642 if not self._is_disabled_source(source): 643 dependencies.sources.add(source) 644 dependencies.macros.append(MacroReference(name="source")) 645 elif call_name[0] == "ref": 646 args = [jinja_call_arg_name(arg) for arg in node.args] 647 if args and all(arg for arg in args): 648 ref = ".".join(args) 649 if not self._is_disabled_ref(ref): 650 dependencies.refs.add(ref) 651 dependencies.macros.append(MacroReference(name="ref")) 652 elif call_name[0] == "var": 653 args = [jinja_call_arg_name(arg) for arg in node.args] 654 if args and args[0]: 655 dependencies.variables.add(args[0]) 656 else: 657 # We couldn't determine the var name statically 658 dependencies.has_dynamic_var_names = True 659 dependencies.macros.append(MacroReference(name="var")) 660 elif len(call_name) == 1: 661 macro_name = call_name[0] 662 if macro_name in BUILTIN_CALLS: 663 continue 664 if ( 665 f"macro.{package}.{macro_name}" not in self._manifest.macros 666 and f"macro.dbt.{macro_name}" in self._manifest.macros 667 ): 668 package_name: t.Optional[str] = "dbt" 669 else: 670 # dbt doesn't include the package name for project macros 671 package_name = package if package != self._project_name else None 672 _macro_reference_if_not_overridden( 673 package_name, macro_name, dependencies.macros.append 674 ) 675 else: 676 if call_name[0] != "adapter": 677 _macro_reference_if_not_overridden( 678 call_name[0], call_name[1], dependencies.macros.append 679 ) 680 681 # When `model` is referenced as-is, e.g. it's passed as an argument to a macro call like 682 # `{{ foo(model) }}`, we can't easily track the attributes that are actually used, because 683 # it may be aliased and hence tracking actual uses of `model` requires a proper data flow 684 # analysis. We conservatively deal with this by including all of its supported attributes 685 # if a standalone reference is found. 686 if all_model_attrs: 687 dependencies.model_attrs.all_attrs = True 688 689 return dependencies
ManifestHelper( project_path: pathlib.Path, profiles_path: pathlib.Path, profile_name: str, target: sqlmesh.dbt.target.TargetConfig, variable_overrides: Optional[Dict[str, Any]] = None, cache_dir: Optional[str] = None, model_defaults: Optional[sqlmesh.core.config.model.ModelDefaultsConfig] = None)
97 def __init__( 98 self, 99 project_path: Path, 100 profiles_path: Path, 101 profile_name: str, 102 target: TargetConfig, 103 variable_overrides: t.Optional[t.Dict[str, t.Any]] = None, 104 cache_dir: t.Optional[str] = None, 105 model_defaults: t.Optional[ModelDefaultsConfig] = None, 106 ): 107 self.project_path = project_path 108 self.profiles_path = profiles_path 109 self.profile_name = profile_name 110 self.target = target 111 self.variable_overrides = variable_overrides or {} 112 self.model_defaults = model_defaults or ModelDefaultsConfig() 113 114 self.__manifest: t.Optional[Manifest] = None 115 self._project_name: str = "" 116 117 self._is_loaded: bool = False 118 self._tests_per_package: t.Dict[str, TestConfigs] = defaultdict(dict) 119 self._models_per_package: t.Dict[str, ModelConfigs] = defaultdict(dict) 120 self._seeds_per_package: t.Dict[str, SeedConfigs] = defaultdict(dict) 121 self._sources_per_package: t.Dict[str, SourceConfigs] = defaultdict(dict) 122 self._macros_per_package: t.Dict[str, MacroConfigs] = defaultdict(dict) 123 124 self._macro_flatten_dependencies: t.Dict[str, t.Dict[str, Dependencies]] = defaultdict(dict) 125 126 self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list) 127 self._disabled_refs: t.Optional[t.Set[str]] = None 128 self._disabled_sources: t.Optional[t.Set[str]] = None 129 130 if cache_dir is not None: 131 cache_path = Path(cache_dir) 132 if not cache_path.is_absolute(): 133 cache_path = self.project_path / cache_path 134 else: 135 cache_path = self.project_path / c.CACHE 136 137 self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache( 138 cache_path, "jinja_calls" 139 ) 140 141 self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) 142 self._on_run_end_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) 143 self._materializations: MaterializationConfigs = {}
def
sources( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.source.SourceConfig]:
def
macros( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.package.MacroConfig]:
def
on_run_start( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.package.HookConfig]:
def
on_run_end( self, package_name: Optional[str] = None) -> Dict[str, sqlmesh.dbt.package.HookConfig]:
all_macros: Dict[str, Dict[str, sqlmesh.utils.jinja.MacroInfo]]
177 @property 178 def all_macros(self) -> t.Dict[str, t.Dict[str, MacroInfo]]: 179 self._load_all() 180 result: t.Dict[str, t.Dict[str, MacroInfo]] = defaultdict(dict) 181 for package_name, macro_configs in self._macros_per_package.items(): 182 for macro_name, macro_config in macro_configs.items(): 183 result[package_name][macro_name] = macro_config.info 184 return result
flat_graph: Dict[str, Any]
186 @cached_property 187 def flat_graph(self) -> t.Dict[str, t.Any]: 188 return { 189 "exposures": { 190 k: make_serializable(v.to_dict(omit_none=False)) 191 for k, v in getattr(self._manifest, "exposures", {}).items() 192 }, 193 "groups": { 194 k: make_serializable(v.to_dict(omit_none=False)) 195 for k, v in getattr(self._manifest, "groups", {}).items() 196 }, 197 "metrics": { 198 k: make_serializable(v.to_dict(omit_none=False)) 199 for k, v in getattr(self._manifest, "metrics", {}).items() 200 }, 201 "nodes": { 202 k: make_serializable(v.to_dict(omit_none=False)) 203 for k, v in self._manifest.nodes.items() 204 }, 205 "sources": { 206 k: make_serializable(v.to_dict(omit_none=False)) 207 for k, v in self._manifest.sources.items() 208 }, 209 "semantic_models": { 210 k: make_serializable(v.to_dict(omit_none=False)) 211 for k, v in getattr(self._manifest, "semantic_models", {}).items() 212 }, 213 "saved_queries": { 214 k: make_serializable(v.to_dict(omit_none=False)) 215 for k, v in getattr(self._manifest, "saved_queries", {}).items() 216 }, 217 }