sqlmesh.core.config.loader
1from __future__ import annotations 2 3import glob 4import os 5import typing as t 6from pathlib import Path 7 8from pydantic import ValidationError 9from dotenv import load_dotenv 10from sqlglot.helper import ensure_list 11 12from sqlmesh.core import constants as c 13from sqlmesh.core.config.common import ( 14 ALL_CONFIG_FILENAMES, 15 YAML_CONFIG_FILENAMES, 16 DBT_PROJECT_FILENAME, 17) 18from sqlmesh.core.config.model import ModelDefaultsConfig 19from sqlmesh.core.config.root import Config 20from sqlmesh.utils import env_vars, merge_dicts, sys_path 21from sqlmesh.utils.errors import ConfigError 22from sqlmesh.utils.metaprogramming import import_python_file 23from sqlmesh.utils.pydantic import validation_error_message 24from sqlmesh.utils.yaml import load as yaml_load 25 26C = t.TypeVar("C", bound=Config) 27 28 29def load_configs( 30 config: t.Optional[t.Union[str, C]], 31 config_type: t.Type[C], 32 paths: t.Union[str | Path, t.Iterable[str | Path]], 33 sqlmesh_path: t.Optional[Path] = None, 34 dotenv_path: t.Optional[Path] = None, 35 **kwargs: t.Any, 36) -> t.Dict[Path, C]: 37 sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 38 config = config or "config" 39 40 absolute_paths = [ 41 Path(t.cast(t.Union[str, Path], p)).absolute() 42 for path in ensure_list(paths) 43 for p in (glob.glob(str(path)) or [str(path)]) 44 ] 45 46 if dotenv_path and dotenv_path.exists() and dotenv_path.is_file(): 47 load_dotenv(dotenv_path=dotenv_path, override=True) 48 else: 49 for path in absolute_paths: 50 env_file = path / ".env" 51 if env_file.exists() and env_file.is_file(): 52 load_dotenv(dotenv_path=env_file, override=True) 53 54 if not isinstance(config, str): 55 if type(config) != config_type: 56 config = convert_config_type(config, config_type) 57 return {path: config for path in absolute_paths} 58 59 config_env_vars = None 60 personal_paths = [sqlmesh_path / name for name in YAML_CONFIG_FILENAMES] 61 for path in personal_paths: 62 if path.exists(): 63 config_env_vars = load_config_from_yaml(path).get("env_vars") 64 if config_env_vars: 65 break 66 67 with env_vars(config_env_vars if config_env_vars else {}): 68 return { 69 path: load_config_from_paths( 70 config_type, 71 project_paths=[path / name for name in ALL_CONFIG_FILENAMES], 72 personal_paths=personal_paths, 73 config_name=config, 74 **kwargs, 75 ) 76 for path in absolute_paths 77 } 78 79 80def load_config_from_paths( 81 config_type: t.Type[C], 82 project_paths: t.Optional[t.List[Path]] = None, 83 personal_paths: t.Optional[t.List[Path]] = None, 84 config_name: str = "config", 85 load_from_env: bool = True, 86 variables: t.Optional[t.Dict[str, t.Any]] = None, 87 **kwargs: t.Any, 88) -> C: 89 project_paths = project_paths or [] 90 personal_paths = personal_paths or [] 91 visited_folders: t.Set[Path] = set() 92 python_config: t.Optional[C] = None 93 non_python_configs = [] 94 95 if not project_paths or not any(path.exists() for path in project_paths): 96 raise ConfigError( 97 "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`." 98 ) 99 100 yaml_config_path: t.Optional[Path] = None 101 for path in [*project_paths, *personal_paths]: 102 if not path.exists(): 103 continue 104 105 if not path.is_file(): 106 raise ConfigError(f"Path '{path}' must be a file.") 107 108 parent_path = path.parent 109 if parent_path in visited_folders: 110 raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.") 111 visited_folders.add(parent_path) 112 113 extension = path.name.split(".")[-1].lower() 114 if extension in ("yml", "yaml"): 115 if config_name != "config" and not python_config: 116 raise ConfigError( 117 "YAML configs do not support multiple configs. Use Python instead.", 118 ) 119 yaml_config_path = path.resolve() 120 non_python_configs.append(load_config_from_yaml(path, variables)) 121 elif extension == "py": 122 try: 123 python_config = load_config_from_python_module( 124 config_type, path, config_name=config_name 125 ) 126 except ValidationError as e: 127 raise ConfigError( 128 validation_error_message(e, f"Invalid project config '{config_name}':") 129 + "\n\nVerify your config.py." 130 ) 131 else: 132 raise ConfigError( 133 f"Unsupported config file extension '{extension}' in config file '{path}'." 134 ) 135 136 if load_from_env: 137 env_config = load_config_from_env() 138 if env_config: 139 non_python_configs.append(load_config_from_env()) 140 141 if not non_python_configs and not python_config: 142 raise ConfigError( 143 "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`." 144 ) 145 146 non_python_config_dict = merge_dicts(*non_python_configs) 147 148 supported_model_defaults = ModelDefaultsConfig.all_fields() 149 for default in non_python_config_dict.get("model_defaults", {}): 150 if default not in supported_model_defaults: 151 raise ConfigError( 152 f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file." 153 ) 154 155 try: 156 non_python_config = config_type.parse_obj(non_python_config_dict) 157 except ValidationError as e: 158 raise ConfigError( 159 validation_error_message(e, "Invalid project config:") 160 + "\n\nVerify your config.yaml and environment variables.", 161 location=yaml_config_path, 162 ) 163 164 no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file." 165 166 # if "dbt_project.yml" is present *and there was no python config already defined*, 167 # create a basic one to ensure we are using the DBT loader. 168 # any config within yaml files will get overlayed on top of it. 169 if not python_config: 170 potential_project_files = [f / DBT_PROJECT_FILENAME for f in visited_folders] 171 dbt_project_file = next((f for f in potential_project_files if f.exists()), None) 172 if dbt_project_file: 173 from sqlmesh.dbt.loader import sqlmesh_config 174 175 infer_state_schema_name = False 176 if dbt := non_python_config.dbt: 177 infer_state_schema_name = dbt.infer_state_schema_name 178 179 dbt_python_config = sqlmesh_config( 180 project_root=dbt_project_file.parent, 181 profiles_dir=kwargs.pop("profiles_dir", None), 182 dbt_profile_name=kwargs.pop("profile", None), 183 dbt_target_name=kwargs.pop("target", None), 184 variables=variables, 185 threads=kwargs.pop("threads", None), 186 infer_state_schema_name=infer_state_schema_name, 187 ) 188 if type(dbt_python_config) != config_type: 189 dbt_python_config = convert_config_type(dbt_python_config, config_type) 190 191 python_config = dbt_python_config 192 193 if python_config: 194 model_defaults = python_config.model_defaults 195 if model_defaults.dialect is None: 196 raise ConfigError(no_dialect_err_msg) 197 return python_config.update_with(non_python_config) 198 199 model_defaults = non_python_config.model_defaults 200 if model_defaults.dialect is None: 201 raise ConfigError(no_dialect_err_msg) 202 203 return non_python_config 204 205 206def load_config_from_yaml( 207 path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None 208) -> t.Dict[str, t.Any]: 209 content = yaml_load(path, variables=variables) 210 if not isinstance(content, dict): 211 raise ConfigError( 212 f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. " 213 f"Please check the YAML syntax in your config file.", 214 location=path, 215 ) 216 return content 217 218 219def load_config_from_python_module( 220 config_type: t.Type[C], 221 module_path: Path, 222 config_name: str = "config", 223) -> C: 224 try: 225 with sys_path(module_path.parent): 226 config_module = import_python_file(module_path, module_path.parent) 227 except Exception as e: 228 raise ConfigError( 229 f"Failed to load config file: {e}", 230 location=module_path, 231 ) 232 233 try: 234 config_obj = getattr(config_module, config_name) 235 except AttributeError: 236 raise ConfigError(f"Config '{config_name}' was not found.") 237 238 if config_obj is None or not isinstance(config_obj, Config): 239 raise ConfigError( 240 f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'.", 241 module_path, 242 ) 243 244 return ( 245 config_obj 246 if type(config_obj) == config_type 247 else convert_config_type(config_obj, config_type) 248 ) 249 250 251def load_config_from_env() -> t.Dict[str, t.Any]: 252 config_dict: t.Dict[str, t.Any] = {} 253 254 for key, value in os.environ.items(): 255 key = key.lower() 256 if key.startswith(f"{c.SQLMESH}__") and key != (c.DISABLE_SQLMESH_STATE_MIGRATION).lower(): 257 segments = key.split("__")[1:] 258 if not segments or not segments[-1]: 259 raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.") 260 261 target_dict = config_dict 262 for config_key in segments[:-1]: 263 if config_key not in target_dict: 264 target_dict[config_key] = {} 265 target_dict = target_dict[config_key] 266 target_dict[segments[-1]] = value 267 268 return config_dict 269 270 271def convert_config_type( 272 config_obj: Config, 273 config_type: t.Type[C], 274) -> C: 275 return config_type.parse_obj(config_obj.dict())
def
load_configs( config: Union[str, ~C, NoneType], config_type: Type[~C], paths: Union[str, pathlib.Path, Iterable[str | pathlib.Path]], sqlmesh_path: Optional[pathlib.Path] = None, dotenv_path: Optional[pathlib.Path] = None, **kwargs: Any) -> Dict[pathlib.Path, ~C]:
30def load_configs( 31 config: t.Optional[t.Union[str, C]], 32 config_type: t.Type[C], 33 paths: t.Union[str | Path, t.Iterable[str | Path]], 34 sqlmesh_path: t.Optional[Path] = None, 35 dotenv_path: t.Optional[Path] = None, 36 **kwargs: t.Any, 37) -> t.Dict[Path, C]: 38 sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 39 config = config or "config" 40 41 absolute_paths = [ 42 Path(t.cast(t.Union[str, Path], p)).absolute() 43 for path in ensure_list(paths) 44 for p in (glob.glob(str(path)) or [str(path)]) 45 ] 46 47 if dotenv_path and dotenv_path.exists() and dotenv_path.is_file(): 48 load_dotenv(dotenv_path=dotenv_path, override=True) 49 else: 50 for path in absolute_paths: 51 env_file = path / ".env" 52 if env_file.exists() and env_file.is_file(): 53 load_dotenv(dotenv_path=env_file, override=True) 54 55 if not isinstance(config, str): 56 if type(config) != config_type: 57 config = convert_config_type(config, config_type) 58 return {path: config for path in absolute_paths} 59 60 config_env_vars = None 61 personal_paths = [sqlmesh_path / name for name in YAML_CONFIG_FILENAMES] 62 for path in personal_paths: 63 if path.exists(): 64 config_env_vars = load_config_from_yaml(path).get("env_vars") 65 if config_env_vars: 66 break 67 68 with env_vars(config_env_vars if config_env_vars else {}): 69 return { 70 path: load_config_from_paths( 71 config_type, 72 project_paths=[path / name for name in ALL_CONFIG_FILENAMES], 73 personal_paths=personal_paths, 74 config_name=config, 75 **kwargs, 76 ) 77 for path in absolute_paths 78 }
def
load_config_from_paths( config_type: Type[~C], project_paths: Optional[List[pathlib.Path]] = None, personal_paths: Optional[List[pathlib.Path]] = None, config_name: str = 'config', load_from_env: bool = True, variables: Optional[Dict[str, Any]] = None, **kwargs: Any) -> ~C:
81def load_config_from_paths( 82 config_type: t.Type[C], 83 project_paths: t.Optional[t.List[Path]] = None, 84 personal_paths: t.Optional[t.List[Path]] = None, 85 config_name: str = "config", 86 load_from_env: bool = True, 87 variables: t.Optional[t.Dict[str, t.Any]] = None, 88 **kwargs: t.Any, 89) -> C: 90 project_paths = project_paths or [] 91 personal_paths = personal_paths or [] 92 visited_folders: t.Set[Path] = set() 93 python_config: t.Optional[C] = None 94 non_python_configs = [] 95 96 if not project_paths or not any(path.exists() for path in project_paths): 97 raise ConfigError( 98 "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`." 99 ) 100 101 yaml_config_path: t.Optional[Path] = None 102 for path in [*project_paths, *personal_paths]: 103 if not path.exists(): 104 continue 105 106 if not path.is_file(): 107 raise ConfigError(f"Path '{path}' must be a file.") 108 109 parent_path = path.parent 110 if parent_path in visited_folders: 111 raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.") 112 visited_folders.add(parent_path) 113 114 extension = path.name.split(".")[-1].lower() 115 if extension in ("yml", "yaml"): 116 if config_name != "config" and not python_config: 117 raise ConfigError( 118 "YAML configs do not support multiple configs. Use Python instead.", 119 ) 120 yaml_config_path = path.resolve() 121 non_python_configs.append(load_config_from_yaml(path, variables)) 122 elif extension == "py": 123 try: 124 python_config = load_config_from_python_module( 125 config_type, path, config_name=config_name 126 ) 127 except ValidationError as e: 128 raise ConfigError( 129 validation_error_message(e, f"Invalid project config '{config_name}':") 130 + "\n\nVerify your config.py." 131 ) 132 else: 133 raise ConfigError( 134 f"Unsupported config file extension '{extension}' in config file '{path}'." 135 ) 136 137 if load_from_env: 138 env_config = load_config_from_env() 139 if env_config: 140 non_python_configs.append(load_config_from_env()) 141 142 if not non_python_configs and not python_config: 143 raise ConfigError( 144 "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`." 145 ) 146 147 non_python_config_dict = merge_dicts(*non_python_configs) 148 149 supported_model_defaults = ModelDefaultsConfig.all_fields() 150 for default in non_python_config_dict.get("model_defaults", {}): 151 if default not in supported_model_defaults: 152 raise ConfigError( 153 f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file." 154 ) 155 156 try: 157 non_python_config = config_type.parse_obj(non_python_config_dict) 158 except ValidationError as e: 159 raise ConfigError( 160 validation_error_message(e, "Invalid project config:") 161 + "\n\nVerify your config.yaml and environment variables.", 162 location=yaml_config_path, 163 ) 164 165 no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file." 166 167 # if "dbt_project.yml" is present *and there was no python config already defined*, 168 # create a basic one to ensure we are using the DBT loader. 169 # any config within yaml files will get overlayed on top of it. 170 if not python_config: 171 potential_project_files = [f / DBT_PROJECT_FILENAME for f in visited_folders] 172 dbt_project_file = next((f for f in potential_project_files if f.exists()), None) 173 if dbt_project_file: 174 from sqlmesh.dbt.loader import sqlmesh_config 175 176 infer_state_schema_name = False 177 if dbt := non_python_config.dbt: 178 infer_state_schema_name = dbt.infer_state_schema_name 179 180 dbt_python_config = sqlmesh_config( 181 project_root=dbt_project_file.parent, 182 profiles_dir=kwargs.pop("profiles_dir", None), 183 dbt_profile_name=kwargs.pop("profile", None), 184 dbt_target_name=kwargs.pop("target", None), 185 variables=variables, 186 threads=kwargs.pop("threads", None), 187 infer_state_schema_name=infer_state_schema_name, 188 ) 189 if type(dbt_python_config) != config_type: 190 dbt_python_config = convert_config_type(dbt_python_config, config_type) 191 192 python_config = dbt_python_config 193 194 if python_config: 195 model_defaults = python_config.model_defaults 196 if model_defaults.dialect is None: 197 raise ConfigError(no_dialect_err_msg) 198 return python_config.update_with(non_python_config) 199 200 model_defaults = non_python_config.model_defaults 201 if model_defaults.dialect is None: 202 raise ConfigError(no_dialect_err_msg) 203 204 return non_python_config
def
load_config_from_yaml( path: pathlib.Path, variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
207def load_config_from_yaml( 208 path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None 209) -> t.Dict[str, t.Any]: 210 content = yaml_load(path, variables=variables) 211 if not isinstance(content, dict): 212 raise ConfigError( 213 f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. " 214 f"Please check the YAML syntax in your config file.", 215 location=path, 216 ) 217 return content
def
load_config_from_python_module( config_type: Type[~C], module_path: pathlib.Path, config_name: str = 'config') -> ~C:
220def load_config_from_python_module( 221 config_type: t.Type[C], 222 module_path: Path, 223 config_name: str = "config", 224) -> C: 225 try: 226 with sys_path(module_path.parent): 227 config_module = import_python_file(module_path, module_path.parent) 228 except Exception as e: 229 raise ConfigError( 230 f"Failed to load config file: {e}", 231 location=module_path, 232 ) 233 234 try: 235 config_obj = getattr(config_module, config_name) 236 except AttributeError: 237 raise ConfigError(f"Config '{config_name}' was not found.") 238 239 if config_obj is None or not isinstance(config_obj, Config): 240 raise ConfigError( 241 f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'.", 242 module_path, 243 ) 244 245 return ( 246 config_obj 247 if type(config_obj) == config_type 248 else convert_config_type(config_obj, config_type) 249 )
def
load_config_from_env() -> Dict[str, Any]:
252def load_config_from_env() -> t.Dict[str, t.Any]: 253 config_dict: t.Dict[str, t.Any] = {} 254 255 for key, value in os.environ.items(): 256 key = key.lower() 257 if key.startswith(f"{c.SQLMESH}__") and key != (c.DISABLE_SQLMESH_STATE_MIGRATION).lower(): 258 segments = key.split("__")[1:] 259 if not segments or not segments[-1]: 260 raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.") 261 262 target_dict = config_dict 263 for config_key in segments[:-1]: 264 if config_key not in target_dict: 265 target_dict[config_key] = {} 266 target_dict = target_dict[config_key] 267 target_dict[segments[-1]] = value 268 269 return config_dict