sqlmesh.dbt.project
1from __future__ import annotations 2 3import typing as t 4import logging 5from pathlib import Path 6 7from sqlmesh.core.console import get_console 8from sqlmesh.dbt.common import PROJECT_FILENAME, load_yaml 9from sqlmesh.dbt.context import DbtContext 10from sqlmesh.dbt.manifest import ManifestHelper 11from sqlmesh.dbt.package import Package, PackageLoader 12from sqlmesh.dbt.profile import Profile 13from sqlmesh.utils.errors import ConfigError 14 15logger = logging.getLogger(__name__) 16 17 18class Project: 19 """Configuration for a DBT project""" 20 21 def __init__( 22 self, 23 context: DbtContext, 24 profile: Profile, 25 packages: t.Dict[str, Package], 26 ): 27 """ 28 Args: 29 context: DBT context for the project 30 profile: The profile associated with the project 31 packages: The packages in this project. The project should be included 32 with the project name as the key 33 """ 34 self.context = context 35 self.profile = profile 36 self.packages = packages 37 38 @classmethod 39 def load(cls, context: DbtContext, variables: t.Optional[t.Dict[str, t.Any]] = None) -> Project: 40 """ 41 Loads the configuration for the specified DBT project 42 43 Args: 44 context: dbt context for this project 45 variables: dbt variable overrides 46 47 Returns: 48 Project instance for the specified DBT project 49 """ 50 context = context.copy() 51 52 project_file_path = Path(context.project_root, PROJECT_FILENAME) 53 logger.debug("Processing project file '%s'.", project_file_path) 54 if not project_file_path.exists(): 55 raise ConfigError(f"Could not find {PROJECT_FILENAME} in {context.project_root}") 56 project_yaml = load_yaml(project_file_path) 57 58 project_name = context.render(project_yaml.get("name", "")) 59 context.project_name = project_name 60 if not context.project_name: 61 raise ConfigError(f"{project_file_path.stem} must include project name.") 62 63 profile_name = context.render(project_yaml.get("profile", "")) or context.project_name 64 context.profile_name = profile_name 65 66 profile = Profile.load(context, context.target_name) 67 context.target = profile.target 68 69 variable_overrides = variables or {} 70 context.manifest = ManifestHelper( 71 project_file_path.parent, 72 profile.path.parent, 73 profile_name, 74 target=profile.target, 75 variable_overrides=variable_overrides, 76 cache_dir=context.sqlmesh_config.cache_dir, 77 model_defaults=context.sqlmesh_config.model_defaults, 78 ) 79 80 extra_fields = profile.target.extra 81 if extra_fields: 82 extra_str = ",".join(f"'{field}'" for field in extra_fields) 83 get_console().log_warning( 84 f"{profile.target.type} adapter does not currently support {extra_str}." 85 ) 86 87 packages = {} 88 package_loader = PackageLoader(context) 89 90 packages[context.project_name] = package_loader.load(context.project_root) 91 92 packages_dir = Path( 93 context.render(project_yaml.get("packages-install-path", "dbt_packages")) 94 ) 95 if not packages_dir.is_absolute(): 96 packages_dir = Path(context.project_root, packages_dir) 97 98 for path in packages_dir.glob(f"*/{PROJECT_FILENAME}"): 99 package = package_loader.load(path.parent) 100 packages[package.name] = package 101 102 # Variable resolution precedence: 103 # 1. Variable overrides 104 # 2. Package-scoped variables in the root project's dbt_project.yml 105 # 3. Global project variables in the root project's dbt_project.yml 106 # 4. Variables in the package's dbt_project.yml 107 all_project_variables = {**(project_yaml.get("vars") or {}), **(variable_overrides or {})} 108 for name, package in packages.items(): 109 if isinstance(all_project_variables.get(name), dict): 110 project_vars_copy = all_project_variables.copy() 111 package_scoped_vars = project_vars_copy.pop(name) 112 package.variables.update(project_vars_copy) 113 package.variables.update(package_scoped_vars) 114 else: 115 package.variables.update(all_project_variables) 116 if variable_overrides: 117 package.variables.update(variable_overrides) 118 119 return Project(context, profile, packages) 120 121 @property 122 def project_files(self) -> t.Set[Path]: 123 paths = {self.profile.path} 124 for package in self.packages.values(): 125 paths.update(package.files) 126 127 return paths
logger =
<Logger sqlmesh.dbt.project (WARNING)>
class
Project:
19class Project: 20 """Configuration for a DBT project""" 21 22 def __init__( 23 self, 24 context: DbtContext, 25 profile: Profile, 26 packages: t.Dict[str, Package], 27 ): 28 """ 29 Args: 30 context: DBT context for the project 31 profile: The profile associated with the project 32 packages: The packages in this project. The project should be included 33 with the project name as the key 34 """ 35 self.context = context 36 self.profile = profile 37 self.packages = packages 38 39 @classmethod 40 def load(cls, context: DbtContext, variables: t.Optional[t.Dict[str, t.Any]] = None) -> Project: 41 """ 42 Loads the configuration for the specified DBT project 43 44 Args: 45 context: dbt context for this project 46 variables: dbt variable overrides 47 48 Returns: 49 Project instance for the specified DBT project 50 """ 51 context = context.copy() 52 53 project_file_path = Path(context.project_root, PROJECT_FILENAME) 54 logger.debug("Processing project file '%s'.", project_file_path) 55 if not project_file_path.exists(): 56 raise ConfigError(f"Could not find {PROJECT_FILENAME} in {context.project_root}") 57 project_yaml = load_yaml(project_file_path) 58 59 project_name = context.render(project_yaml.get("name", "")) 60 context.project_name = project_name 61 if not context.project_name: 62 raise ConfigError(f"{project_file_path.stem} must include project name.") 63 64 profile_name = context.render(project_yaml.get("profile", "")) or context.project_name 65 context.profile_name = profile_name 66 67 profile = Profile.load(context, context.target_name) 68 context.target = profile.target 69 70 variable_overrides = variables or {} 71 context.manifest = ManifestHelper( 72 project_file_path.parent, 73 profile.path.parent, 74 profile_name, 75 target=profile.target, 76 variable_overrides=variable_overrides, 77 cache_dir=context.sqlmesh_config.cache_dir, 78 model_defaults=context.sqlmesh_config.model_defaults, 79 ) 80 81 extra_fields = profile.target.extra 82 if extra_fields: 83 extra_str = ",".join(f"'{field}'" for field in extra_fields) 84 get_console().log_warning( 85 f"{profile.target.type} adapter does not currently support {extra_str}." 86 ) 87 88 packages = {} 89 package_loader = PackageLoader(context) 90 91 packages[context.project_name] = package_loader.load(context.project_root) 92 93 packages_dir = Path( 94 context.render(project_yaml.get("packages-install-path", "dbt_packages")) 95 ) 96 if not packages_dir.is_absolute(): 97 packages_dir = Path(context.project_root, packages_dir) 98 99 for path in packages_dir.glob(f"*/{PROJECT_FILENAME}"): 100 package = package_loader.load(path.parent) 101 packages[package.name] = package 102 103 # Variable resolution precedence: 104 # 1. Variable overrides 105 # 2. Package-scoped variables in the root project's dbt_project.yml 106 # 3. Global project variables in the root project's dbt_project.yml 107 # 4. Variables in the package's dbt_project.yml 108 all_project_variables = {**(project_yaml.get("vars") or {}), **(variable_overrides or {})} 109 for name, package in packages.items(): 110 if isinstance(all_project_variables.get(name), dict): 111 project_vars_copy = all_project_variables.copy() 112 package_scoped_vars = project_vars_copy.pop(name) 113 package.variables.update(project_vars_copy) 114 package.variables.update(package_scoped_vars) 115 else: 116 package.variables.update(all_project_variables) 117 if variable_overrides: 118 package.variables.update(variable_overrides) 119 120 return Project(context, profile, packages) 121 122 @property 123 def project_files(self) -> t.Set[Path]: 124 paths = {self.profile.path} 125 for package in self.packages.values(): 126 paths.update(package.files) 127 128 return paths
Configuration for a DBT project
Project( context: sqlmesh.dbt.context.DbtContext, profile: sqlmesh.dbt.profile.Profile, packages: Dict[str, sqlmesh.dbt.package.Package])
22 def __init__( 23 self, 24 context: DbtContext, 25 profile: Profile, 26 packages: t.Dict[str, Package], 27 ): 28 """ 29 Args: 30 context: DBT context for the project 31 profile: The profile associated with the project 32 packages: The packages in this project. The project should be included 33 with the project name as the key 34 """ 35 self.context = context 36 self.profile = profile 37 self.packages = packages
Arguments:
- context: DBT context for the project
- profile: The profile associated with the project
- packages: The packages in this project. The project should be included with the project name as the key
@classmethod
def
load( cls, context: sqlmesh.dbt.context.DbtContext, variables: Optional[Dict[str, Any]] = None) -> Project:
39 @classmethod 40 def load(cls, context: DbtContext, variables: t.Optional[t.Dict[str, t.Any]] = None) -> Project: 41 """ 42 Loads the configuration for the specified DBT project 43 44 Args: 45 context: dbt context for this project 46 variables: dbt variable overrides 47 48 Returns: 49 Project instance for the specified DBT project 50 """ 51 context = context.copy() 52 53 project_file_path = Path(context.project_root, PROJECT_FILENAME) 54 logger.debug("Processing project file '%s'.", project_file_path) 55 if not project_file_path.exists(): 56 raise ConfigError(f"Could not find {PROJECT_FILENAME} in {context.project_root}") 57 project_yaml = load_yaml(project_file_path) 58 59 project_name = context.render(project_yaml.get("name", "")) 60 context.project_name = project_name 61 if not context.project_name: 62 raise ConfigError(f"{project_file_path.stem} must include project name.") 63 64 profile_name = context.render(project_yaml.get("profile", "")) or context.project_name 65 context.profile_name = profile_name 66 67 profile = Profile.load(context, context.target_name) 68 context.target = profile.target 69 70 variable_overrides = variables or {} 71 context.manifest = ManifestHelper( 72 project_file_path.parent, 73 profile.path.parent, 74 profile_name, 75 target=profile.target, 76 variable_overrides=variable_overrides, 77 cache_dir=context.sqlmesh_config.cache_dir, 78 model_defaults=context.sqlmesh_config.model_defaults, 79 ) 80 81 extra_fields = profile.target.extra 82 if extra_fields: 83 extra_str = ",".join(f"'{field}'" for field in extra_fields) 84 get_console().log_warning( 85 f"{profile.target.type} adapter does not currently support {extra_str}." 86 ) 87 88 packages = {} 89 package_loader = PackageLoader(context) 90 91 packages[context.project_name] = package_loader.load(context.project_root) 92 93 packages_dir = Path( 94 context.render(project_yaml.get("packages-install-path", "dbt_packages")) 95 ) 96 if not packages_dir.is_absolute(): 97 packages_dir = Path(context.project_root, packages_dir) 98 99 for path in packages_dir.glob(f"*/{PROJECT_FILENAME}"): 100 package = package_loader.load(path.parent) 101 packages[package.name] = package 102 103 # Variable resolution precedence: 104 # 1. Variable overrides 105 # 2. Package-scoped variables in the root project's dbt_project.yml 106 # 3. Global project variables in the root project's dbt_project.yml 107 # 4. Variables in the package's dbt_project.yml 108 all_project_variables = {**(project_yaml.get("vars") or {}), **(variable_overrides or {})} 109 for name, package in packages.items(): 110 if isinstance(all_project_variables.get(name), dict): 111 project_vars_copy = all_project_variables.copy() 112 package_scoped_vars = project_vars_copy.pop(name) 113 package.variables.update(project_vars_copy) 114 package.variables.update(package_scoped_vars) 115 else: 116 package.variables.update(all_project_variables) 117 if variable_overrides: 118 package.variables.update(variable_overrides) 119 120 return Project(context, profile, packages)
Loads the configuration for the specified DBT project
Arguments:
- context: dbt context for this project
- variables: dbt variable overrides
Returns:
Project instance for the specified DBT project