sqlmesh.dbt.context
1from __future__ import annotations 2 3import logging 4import typing as t 5from dataclasses import dataclass, field, replace 6from pathlib import Path 7 8from dbt.adapters.base import BaseRelation 9 10from sqlmesh.core.config import Config as SQLMeshConfig 11from sqlmesh.dbt.builtin import _relation_info_to_relation 12from sqlmesh.dbt.common import Dependencies 13from sqlmesh.dbt.manifest import ManifestHelper 14from sqlmesh.dbt.target import TargetConfig 15from sqlmesh.utils import AttributeDict 16from sqlmesh.utils.errors import ConfigError, SQLMeshError, MissingModelError, MissingSourceError 17from sqlmesh.utils.jinja import ( 18 JinjaGlobalAttribute, 19 JinjaMacroRegistry, 20 MacroInfo, 21 MacroReference, 22) 23 24if t.TYPE_CHECKING: 25 from jinja2 import Environment 26 27 from sqlmesh.dbt.model import ModelConfig 28 from sqlmesh.dbt.relation import Policy 29 from sqlmesh.dbt.seed import SeedConfig 30 from sqlmesh.dbt.source import SourceConfig 31 32logger = logging.getLogger(__name__) 33 34 35@dataclass 36class DbtContext: 37 """Context for DBT environment""" 38 39 project_root: Path = Path() 40 profiles_dir: t.Optional[Path] = None 41 """Optional override to specify the directory where profiles.yml is located, if not at the :project_root""" 42 target_name: t.Optional[str] = None 43 profile_name: t.Optional[str] = None 44 project_schema: t.Optional[str] = None 45 jinja_macros: JinjaMacroRegistry = field( 46 default_factory=lambda: JinjaMacroRegistry( 47 create_builtins_module=SQLMESH_DBT_PACKAGE, top_level_packages=["dbt"] 48 ) 49 ) 50 51 sqlmesh_config: SQLMeshConfig = field(default_factory=SQLMeshConfig) 52 53 _project_name: t.Optional[str] = None 54 _variables: t.Dict[str, t.Any] = field(default_factory=dict) 55 _models: t.Dict[str, ModelConfig] = field(default_factory=dict) 56 _model_fqns: t.Set[str] = field(default_factory=set) 57 _seeds: t.Dict[str, SeedConfig] = field(default_factory=dict) 58 _sources: t.Dict[str, SourceConfig] = field(default_factory=dict) 59 _refs: t.Dict[str, t.Union[ModelConfig, SeedConfig]] = field(default_factory=dict) 60 61 _target: t.Optional[TargetConfig] = None 62 63 _jinja_environment: t.Optional[Environment] = None 64 65 _manifest: t.Optional[ManifestHelper] = None 66 67 @property 68 def default_dialect(self) -> str: 69 if self.sqlmesh_config.dialect: 70 return self.sqlmesh_config.dialect 71 if not self.target: 72 raise SQLMeshError( 73 "Target must be configured before calling the default_dialect property." 74 ) 75 return self.target.dialect 76 77 @property 78 def project_name(self) -> t.Optional[str]: 79 return self._project_name 80 81 @project_name.setter 82 def project_name(self, project_name: str) -> None: 83 self._project_name = project_name 84 self.jinja_macros.root_package_name = project_name 85 86 @property 87 def manifest(self) -> ManifestHelper: 88 if self._manifest is None: 89 raise SQLMeshError("Manifest is not set in the context.") 90 return self._manifest 91 92 @manifest.setter 93 def manifest(self, mainfest: ManifestHelper) -> None: 94 self._manifest = mainfest 95 96 @property 97 def variables(self) -> t.Dict[str, t.Any]: 98 return self._variables 99 100 @variables.setter 101 def variables(self, variables: t.Dict[str, t.Any]) -> None: 102 self._variables = {} 103 self.add_variables(variables) 104 105 def add_variables(self, variables: t.Dict[str, t.Any]) -> None: 106 self._variables.update(variables) 107 self._jinja_environment = None 108 109 def set_and_render_variables(self, variables: t.Dict[str, t.Any], package: str) -> None: 110 package_macros = self.jinja_macros.copy( 111 update={"top_level_packages": [*self.jinja_macros.top_level_packages, package]} 112 ) 113 jinja_environment = package_macros.build_environment(**self.jinja_globals) 114 115 def _render_var(value: t.Any) -> t.Any: 116 if isinstance(value, str): 117 return jinja_environment.from_string(value).render() 118 if isinstance(value, list): 119 return [_render_var(v) for v in value] 120 if isinstance(value, dict): 121 return {k: _render_var(v) for k, v in value.items()} 122 return value 123 124 def _var(name: str, default: t.Optional[t.Any] = None) -> t.Any: 125 return _render_var(variables.get(name, default)) 126 127 jinja_environment.globals["var"] = _var 128 129 rendered_variables = {} 130 for k, v in variables.items(): 131 try: 132 rendered_variables[k] = _render_var(v) 133 except Exception as ex: 134 logger.warning(f"Failed to render variable '{k}', value '{v}': {ex}") 135 136 self.variables = rendered_variables 137 138 def add_macros(self, macros: t.Dict[str, MacroInfo], package: str) -> None: 139 self.jinja_macros.add_macros(macros, package=package) 140 self._jinja_environment = None 141 142 @property 143 def models(self) -> t.Dict[str, ModelConfig]: 144 return self._models 145 146 @models.setter 147 def models(self, models: t.Dict[str, ModelConfig]) -> None: 148 self._models = {} 149 self._refs = {} 150 self._model_fqns = set() 151 self.add_models(models) 152 153 def add_models(self, models: t.Dict[str, ModelConfig]) -> None: 154 self._refs = {} 155 self._models.update(models) 156 self._jinja_environment = None 157 158 @property 159 def model_fqns(self) -> t.Set[str]: 160 if not self._model_fqns: 161 self._model_fqns = {model.fqn for model in self._models.values()} 162 return self._model_fqns 163 164 @property 165 def seeds(self) -> t.Dict[str, SeedConfig]: 166 return self._seeds 167 168 @seeds.setter 169 def seeds(self, seeds: t.Dict[str, SeedConfig]) -> None: 170 self._seeds = {} 171 self._refs = {} 172 self.add_seeds(seeds) 173 174 def add_seeds(self, seeds: t.Dict[str, SeedConfig]) -> None: 175 self._refs = {} 176 self._seeds.update(seeds) 177 self._jinja_environment = None 178 179 @property 180 def sources(self) -> t.Dict[str, SourceConfig]: 181 return self._sources 182 183 @sources.setter 184 def sources(self, sources: t.Dict[str, SourceConfig]) -> None: 185 self._sources = {} 186 self.add_sources(sources) 187 188 def add_sources(self, sources: t.Dict[str, SourceConfig]) -> None: 189 self._sources.update(sources) 190 self._jinja_environment = None 191 192 @property 193 def refs(self) -> t.Dict[str, t.Union[ModelConfig, SeedConfig]]: 194 from sqlmesh.dbt.model import ModelConfig 195 from sqlmesh.dbt.seed import SeedConfig 196 197 if not self._refs: 198 # Refs can be called with or without package name. 199 for model in t.cast( 200 t.Dict[str, t.Union[ModelConfig, SeedConfig]], {**self._seeds, **self._models} 201 ).values(): 202 name = model.name 203 config_name = model.config_name 204 if model.version == model.latest_version: 205 self._refs[name] = model 206 self._refs[config_name] = model 207 if model.version: 208 self._refs[f"{name}_v{model.version}"] = model 209 self._refs[f"{config_name}_v{model.version}"] = model 210 return self._refs 211 212 @property 213 def target(self) -> TargetConfig: 214 if not self._target: 215 raise SQLMeshError("Target has not been set in the context.") 216 return self._target 217 218 @target.setter 219 def target(self, value: TargetConfig) -> None: 220 if not self.project_name: 221 raise ConfigError("Project name must be set in the context in order to use a target.") 222 223 self._target = value 224 self._jinja_environment = None 225 226 def render(self, source: str, **kwargs: t.Any) -> str: 227 return self.jinja_environment.from_string(source).render(**kwargs) 228 229 def get_callable_macro( 230 self, name: str, package: t.Optional[str] = None 231 ) -> t.Optional[t.Callable]: 232 return self.jinja_macros.build_macro( 233 MacroReference(name=name, package=package), **self.jinja_globals 234 ) 235 236 def copy(self) -> DbtContext: 237 return replace(self) 238 239 @property 240 def jinja_environment(self) -> Environment: 241 if self._jinja_environment is None: 242 self._jinja_environment = self.jinja_macros.build_environment(**self.jinja_globals) 243 return self._jinja_environment 244 245 @property 246 def jinja_globals(self) -> t.Dict[str, JinjaGlobalAttribute]: 247 output: t.Dict[str, JinjaGlobalAttribute] = { 248 "vars": AttributeDict(self.variables), 249 "refs": AttributeDict({k: v.relation_info for k, v in self.refs.items()}), 250 "sources": AttributeDict({k: v.relation_info for k, v in self.sources.items()}), 251 } 252 if self.project_name is not None: 253 output["project_name"] = self.project_name 254 if self._target is not None: 255 output["target"] = self._target.attribute_dict() 256 # pass user-specified default dialect if we have already loaded the config 257 if self.sqlmesh_config.dialect: 258 output["dialect"] = self.sqlmesh_config.dialect 259 # Pass flat graph structure like dbt 260 if self._manifest is not None: 261 output["flat_graph"] = AttributeDict(self.manifest.flat_graph) 262 return output 263 264 def context_for_dependencies(self, dependencies: Dependencies) -> DbtContext: 265 from sqlmesh.dbt.model import ModelConfig 266 from sqlmesh.dbt.seed import SeedConfig 267 268 dependency_context = self.copy() 269 270 models = {} 271 seeds = {} 272 sources = {} 273 274 for ref in dependencies.refs: 275 model = self.refs.get(ref) 276 if model: 277 if isinstance(model, SeedConfig): 278 seeds[ref] = t.cast(SeedConfig, model) 279 else: 280 models[ref] = t.cast(ModelConfig, model) 281 else: 282 raise MissingModelError(ref) 283 284 for source in dependencies.sources: 285 if source in self.sources: 286 sources[source] = self.sources[source] 287 else: 288 raise MissingSourceError(source) 289 290 variables = {k: v for k, v in self.variables.items() if k in dependencies.variables} 291 292 dependency_context.sources = sources 293 dependency_context.seeds = seeds 294 dependency_context.models = models 295 dependency_context.variables = variables 296 dependency_context._refs = {**dependency_context._seeds, **dependency_context._models} # type: ignore 297 298 return dependency_context 299 300 def create_relation( 301 self, relation_info: AttributeDict[str, t.Any], quote_policy: t.Optional[Policy] = None 302 ) -> BaseRelation: 303 if not self.target: 304 raise SQLMeshError("Target must be configured before calling create_relation.") 305 return _relation_info_to_relation( 306 relation_info, self.target.relation_class, quote_policy or self.target.quote_policy 307 ) 308 309 310SQLMESH_DBT_PACKAGE = "sqlmesh.dbt"
logger =
<Logger sqlmesh.dbt.context (WARNING)>
@dataclass
class
DbtContext:
36@dataclass 37class DbtContext: 38 """Context for DBT environment""" 39 40 project_root: Path = Path() 41 profiles_dir: t.Optional[Path] = None 42 """Optional override to specify the directory where profiles.yml is located, if not at the :project_root""" 43 target_name: t.Optional[str] = None 44 profile_name: t.Optional[str] = None 45 project_schema: t.Optional[str] = None 46 jinja_macros: JinjaMacroRegistry = field( 47 default_factory=lambda: JinjaMacroRegistry( 48 create_builtins_module=SQLMESH_DBT_PACKAGE, top_level_packages=["dbt"] 49 ) 50 ) 51 52 sqlmesh_config: SQLMeshConfig = field(default_factory=SQLMeshConfig) 53 54 _project_name: t.Optional[str] = None 55 _variables: t.Dict[str, t.Any] = field(default_factory=dict) 56 _models: t.Dict[str, ModelConfig] = field(default_factory=dict) 57 _model_fqns: t.Set[str] = field(default_factory=set) 58 _seeds: t.Dict[str, SeedConfig] = field(default_factory=dict) 59 _sources: t.Dict[str, SourceConfig] = field(default_factory=dict) 60 _refs: t.Dict[str, t.Union[ModelConfig, SeedConfig]] = field(default_factory=dict) 61 62 _target: t.Optional[TargetConfig] = None 63 64 _jinja_environment: t.Optional[Environment] = None 65 66 _manifest: t.Optional[ManifestHelper] = None 67 68 @property 69 def default_dialect(self) -> str: 70 if self.sqlmesh_config.dialect: 71 return self.sqlmesh_config.dialect 72 if not self.target: 73 raise SQLMeshError( 74 "Target must be configured before calling the default_dialect property." 75 ) 76 return self.target.dialect 77 78 @property 79 def project_name(self) -> t.Optional[str]: 80 return self._project_name 81 82 @project_name.setter 83 def project_name(self, project_name: str) -> None: 84 self._project_name = project_name 85 self.jinja_macros.root_package_name = project_name 86 87 @property 88 def manifest(self) -> ManifestHelper: 89 if self._manifest is None: 90 raise SQLMeshError("Manifest is not set in the context.") 91 return self._manifest 92 93 @manifest.setter 94 def manifest(self, mainfest: ManifestHelper) -> None: 95 self._manifest = mainfest 96 97 @property 98 def variables(self) -> t.Dict[str, t.Any]: 99 return self._variables 100 101 @variables.setter 102 def variables(self, variables: t.Dict[str, t.Any]) -> None: 103 self._variables = {} 104 self.add_variables(variables) 105 106 def add_variables(self, variables: t.Dict[str, t.Any]) -> None: 107 self._variables.update(variables) 108 self._jinja_environment = None 109 110 def set_and_render_variables(self, variables: t.Dict[str, t.Any], package: str) -> None: 111 package_macros = self.jinja_macros.copy( 112 update={"top_level_packages": [*self.jinja_macros.top_level_packages, package]} 113 ) 114 jinja_environment = package_macros.build_environment(**self.jinja_globals) 115 116 def _render_var(value: t.Any) -> t.Any: 117 if isinstance(value, str): 118 return jinja_environment.from_string(value).render() 119 if isinstance(value, list): 120 return [_render_var(v) for v in value] 121 if isinstance(value, dict): 122 return {k: _render_var(v) for k, v in value.items()} 123 return value 124 125 def _var(name: str, default: t.Optional[t.Any] = None) -> t.Any: 126 return _render_var(variables.get(name, default)) 127 128 jinja_environment.globals["var"] = _var 129 130 rendered_variables = {} 131 for k, v in variables.items(): 132 try: 133 rendered_variables[k] = _render_var(v) 134 except Exception as ex: 135 logger.warning(f"Failed to render variable '{k}', value '{v}': {ex}") 136 137 self.variables = rendered_variables 138 139 def add_macros(self, macros: t.Dict[str, MacroInfo], package: str) -> None: 140 self.jinja_macros.add_macros(macros, package=package) 141 self._jinja_environment = None 142 143 @property 144 def models(self) -> t.Dict[str, ModelConfig]: 145 return self._models 146 147 @models.setter 148 def models(self, models: t.Dict[str, ModelConfig]) -> None: 149 self._models = {} 150 self._refs = {} 151 self._model_fqns = set() 152 self.add_models(models) 153 154 def add_models(self, models: t.Dict[str, ModelConfig]) -> None: 155 self._refs = {} 156 self._models.update(models) 157 self._jinja_environment = None 158 159 @property 160 def model_fqns(self) -> t.Set[str]: 161 if not self._model_fqns: 162 self._model_fqns = {model.fqn for model in self._models.values()} 163 return self._model_fqns 164 165 @property 166 def seeds(self) -> t.Dict[str, SeedConfig]: 167 return self._seeds 168 169 @seeds.setter 170 def seeds(self, seeds: t.Dict[str, SeedConfig]) -> None: 171 self._seeds = {} 172 self._refs = {} 173 self.add_seeds(seeds) 174 175 def add_seeds(self, seeds: t.Dict[str, SeedConfig]) -> None: 176 self._refs = {} 177 self._seeds.update(seeds) 178 self._jinja_environment = None 179 180 @property 181 def sources(self) -> t.Dict[str, SourceConfig]: 182 return self._sources 183 184 @sources.setter 185 def sources(self, sources: t.Dict[str, SourceConfig]) -> None: 186 self._sources = {} 187 self.add_sources(sources) 188 189 def add_sources(self, sources: t.Dict[str, SourceConfig]) -> None: 190 self._sources.update(sources) 191 self._jinja_environment = None 192 193 @property 194 def refs(self) -> t.Dict[str, t.Union[ModelConfig, SeedConfig]]: 195 from sqlmesh.dbt.model import ModelConfig 196 from sqlmesh.dbt.seed import SeedConfig 197 198 if not self._refs: 199 # Refs can be called with or without package name. 200 for model in t.cast( 201 t.Dict[str, t.Union[ModelConfig, SeedConfig]], {**self._seeds, **self._models} 202 ).values(): 203 name = model.name 204 config_name = model.config_name 205 if model.version == model.latest_version: 206 self._refs[name] = model 207 self._refs[config_name] = model 208 if model.version: 209 self._refs[f"{name}_v{model.version}"] = model 210 self._refs[f"{config_name}_v{model.version}"] = model 211 return self._refs 212 213 @property 214 def target(self) -> TargetConfig: 215 if not self._target: 216 raise SQLMeshError("Target has not been set in the context.") 217 return self._target 218 219 @target.setter 220 def target(self, value: TargetConfig) -> None: 221 if not self.project_name: 222 raise ConfigError("Project name must be set in the context in order to use a target.") 223 224 self._target = value 225 self._jinja_environment = None 226 227 def render(self, source: str, **kwargs: t.Any) -> str: 228 return self.jinja_environment.from_string(source).render(**kwargs) 229 230 def get_callable_macro( 231 self, name: str, package: t.Optional[str] = None 232 ) -> t.Optional[t.Callable]: 233 return self.jinja_macros.build_macro( 234 MacroReference(name=name, package=package), **self.jinja_globals 235 ) 236 237 def copy(self) -> DbtContext: 238 return replace(self) 239 240 @property 241 def jinja_environment(self) -> Environment: 242 if self._jinja_environment is None: 243 self._jinja_environment = self.jinja_macros.build_environment(**self.jinja_globals) 244 return self._jinja_environment 245 246 @property 247 def jinja_globals(self) -> t.Dict[str, JinjaGlobalAttribute]: 248 output: t.Dict[str, JinjaGlobalAttribute] = { 249 "vars": AttributeDict(self.variables), 250 "refs": AttributeDict({k: v.relation_info for k, v in self.refs.items()}), 251 "sources": AttributeDict({k: v.relation_info for k, v in self.sources.items()}), 252 } 253 if self.project_name is not None: 254 output["project_name"] = self.project_name 255 if self._target is not None: 256 output["target"] = self._target.attribute_dict() 257 # pass user-specified default dialect if we have already loaded the config 258 if self.sqlmesh_config.dialect: 259 output["dialect"] = self.sqlmesh_config.dialect 260 # Pass flat graph structure like dbt 261 if self._manifest is not None: 262 output["flat_graph"] = AttributeDict(self.manifest.flat_graph) 263 return output 264 265 def context_for_dependencies(self, dependencies: Dependencies) -> DbtContext: 266 from sqlmesh.dbt.model import ModelConfig 267 from sqlmesh.dbt.seed import SeedConfig 268 269 dependency_context = self.copy() 270 271 models = {} 272 seeds = {} 273 sources = {} 274 275 for ref in dependencies.refs: 276 model = self.refs.get(ref) 277 if model: 278 if isinstance(model, SeedConfig): 279 seeds[ref] = t.cast(SeedConfig, model) 280 else: 281 models[ref] = t.cast(ModelConfig, model) 282 else: 283 raise MissingModelError(ref) 284 285 for source in dependencies.sources: 286 if source in self.sources: 287 sources[source] = self.sources[source] 288 else: 289 raise MissingSourceError(source) 290 291 variables = {k: v for k, v in self.variables.items() if k in dependencies.variables} 292 293 dependency_context.sources = sources 294 dependency_context.seeds = seeds 295 dependency_context.models = models 296 dependency_context.variables = variables 297 dependency_context._refs = {**dependency_context._seeds, **dependency_context._models} # type: ignore 298 299 return dependency_context 300 301 def create_relation( 302 self, relation_info: AttributeDict[str, t.Any], quote_policy: t.Optional[Policy] = None 303 ) -> BaseRelation: 304 if not self.target: 305 raise SQLMeshError("Target must be configured before calling create_relation.") 306 return _relation_info_to_relation( 307 relation_info, self.target.relation_class, quote_policy or self.target.quote_policy 308 )
Context for DBT environment
DbtContext( project_root: pathlib.Path = PosixPath('.'), profiles_dir: Optional[pathlib.Path] = None, target_name: Optional[str] = None, profile_name: Optional[str] = None, project_schema: Optional[str] = None, jinja_macros: sqlmesh.utils.jinja.JinjaMacroRegistry = <factory>, sqlmesh_config: sqlmesh.core.config.root.Config = <factory>, _project_name: Optional[str] = None, _variables: Dict[str, Any] = <factory>, _models: Dict[str, sqlmesh.dbt.model.ModelConfig] = <factory>, _model_fqns: Set[str] = <factory>, _seeds: Dict[str, sqlmesh.dbt.seed.SeedConfig] = <factory>, _sources: Dict[str, sqlmesh.dbt.source.SourceConfig] = <factory>, _refs: Dict[str, Union[sqlmesh.dbt.model.ModelConfig, sqlmesh.dbt.seed.SeedConfig]] = <factory>, _target: Optional[sqlmesh.dbt.target.TargetConfig] = None, _jinja_environment: Optional[jinja2.environment.Environment] = None, _manifest: Optional[sqlmesh.dbt.manifest.ManifestHelper] = None)
profiles_dir: Optional[pathlib.Path] =
None
Optional override to specify the directory where profiles.yml is located, if not at the :project_root
jinja_macros: sqlmesh.utils.jinja.JinjaMacroRegistry
sqlmesh_config: sqlmesh.core.config.root.Config
manifest: sqlmesh.dbt.manifest.ManifestHelper
def
set_and_render_variables(self, variables: Dict[str, Any], package: str) -> None:
110 def set_and_render_variables(self, variables: t.Dict[str, t.Any], package: str) -> None: 111 package_macros = self.jinja_macros.copy( 112 update={"top_level_packages": [*self.jinja_macros.top_level_packages, package]} 113 ) 114 jinja_environment = package_macros.build_environment(**self.jinja_globals) 115 116 def _render_var(value: t.Any) -> t.Any: 117 if isinstance(value, str): 118 return jinja_environment.from_string(value).render() 119 if isinstance(value, list): 120 return [_render_var(v) for v in value] 121 if isinstance(value, dict): 122 return {k: _render_var(v) for k, v in value.items()} 123 return value 124 125 def _var(name: str, default: t.Optional[t.Any] = None) -> t.Any: 126 return _render_var(variables.get(name, default)) 127 128 jinja_environment.globals["var"] = _var 129 130 rendered_variables = {} 131 for k, v in variables.items(): 132 try: 133 rendered_variables[k] = _render_var(v) 134 except Exception as ex: 135 logger.warning(f"Failed to render variable '{k}', value '{v}': {ex}") 136 137 self.variables = rendered_variables
models: Dict[str, sqlmesh.dbt.model.ModelConfig]
seeds: Dict[str, sqlmesh.dbt.seed.SeedConfig]
sources: Dict[str, sqlmesh.dbt.source.SourceConfig]
refs: Dict[str, Union[sqlmesh.dbt.model.ModelConfig, sqlmesh.dbt.seed.SeedConfig]]
193 @property 194 def refs(self) -> t.Dict[str, t.Union[ModelConfig, SeedConfig]]: 195 from sqlmesh.dbt.model import ModelConfig 196 from sqlmesh.dbt.seed import SeedConfig 197 198 if not self._refs: 199 # Refs can be called with or without package name. 200 for model in t.cast( 201 t.Dict[str, t.Union[ModelConfig, SeedConfig]], {**self._seeds, **self._models} 202 ).values(): 203 name = model.name 204 config_name = model.config_name 205 if model.version == model.latest_version: 206 self._refs[name] = model 207 self._refs[config_name] = model 208 if model.version: 209 self._refs[f"{name}_v{model.version}"] = model 210 self._refs[f"{config_name}_v{model.version}"] = model 211 return self._refs
target: sqlmesh.dbt.target.TargetConfig
jinja_globals: Dict[str, Union[str, int, float, bool, sqlmesh.utils.AttributeDict]]
246 @property 247 def jinja_globals(self) -> t.Dict[str, JinjaGlobalAttribute]: 248 output: t.Dict[str, JinjaGlobalAttribute] = { 249 "vars": AttributeDict(self.variables), 250 "refs": AttributeDict({k: v.relation_info for k, v in self.refs.items()}), 251 "sources": AttributeDict({k: v.relation_info for k, v in self.sources.items()}), 252 } 253 if self.project_name is not None: 254 output["project_name"] = self.project_name 255 if self._target is not None: 256 output["target"] = self._target.attribute_dict() 257 # pass user-specified default dialect if we have already loaded the config 258 if self.sqlmesh_config.dialect: 259 output["dialect"] = self.sqlmesh_config.dialect 260 # Pass flat graph structure like dbt 261 if self._manifest is not None: 262 output["flat_graph"] = AttributeDict(self.manifest.flat_graph) 263 return output
265 def context_for_dependencies(self, dependencies: Dependencies) -> DbtContext: 266 from sqlmesh.dbt.model import ModelConfig 267 from sqlmesh.dbt.seed import SeedConfig 268 269 dependency_context = self.copy() 270 271 models = {} 272 seeds = {} 273 sources = {} 274 275 for ref in dependencies.refs: 276 model = self.refs.get(ref) 277 if model: 278 if isinstance(model, SeedConfig): 279 seeds[ref] = t.cast(SeedConfig, model) 280 else: 281 models[ref] = t.cast(ModelConfig, model) 282 else: 283 raise MissingModelError(ref) 284 285 for source in dependencies.sources: 286 if source in self.sources: 287 sources[source] = self.sources[source] 288 else: 289 raise MissingSourceError(source) 290 291 variables = {k: v for k, v in self.variables.items() if k in dependencies.variables} 292 293 dependency_context.sources = sources 294 dependency_context.seeds = seeds 295 dependency_context.models = models 296 dependency_context.variables = variables 297 dependency_context._refs = {**dependency_context._seeds, **dependency_context._models} # type: ignore 298 299 return dependency_context
def
create_relation( self, relation_info: sqlmesh.utils.AttributeDict[str, typing.Any], quote_policy: Optional[dbt.adapters.contracts.relation.Policy] = None) -> dbt.adapters.base.relation.BaseRelation:
301 def create_relation( 302 self, relation_info: AttributeDict[str, t.Any], quote_policy: t.Optional[Policy] = None 303 ) -> BaseRelation: 304 if not self.target: 305 raise SQLMeshError("Target must be configured before calling create_relation.") 306 return _relation_info_to_relation( 307 relation_info, self.target.relation_class, quote_policy or self.target.quote_policy 308 )
SQLMESH_DBT_PACKAGE =
'sqlmesh.dbt'