sqlmesh.core.config.loader
1from __future__ import annotations 2 3import os 4import typing as t 5from pathlib import Path 6 7from sqlglot.helper import ensure_list 8 9from sqlmesh.core import constants as c 10from sqlmesh.core.config.model import ModelDefaultsConfig 11from sqlmesh.core.config.root import Config 12from sqlmesh.utils import env_vars, merge_dicts, sys_path 13from sqlmesh.utils.errors import ConfigError 14from sqlmesh.utils.metaprogramming import import_python_file 15from sqlmesh.utils.yaml import load as yaml_load 16 17C = t.TypeVar("C", bound=Config) 18 19 20def load_configs( 21 config: t.Optional[t.Union[str, C]], 22 config_type: t.Type[C], 23 paths: t.Union[str | Path, t.Iterable[str | Path]], 24 sqlmesh_path: t.Optional[Path] = None, 25) -> t.Dict[Path, C]: 26 sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 27 config = config or "config" 28 29 absolute_paths = [ 30 Path(t.cast(t.Union[str, Path], path)).absolute() for path in ensure_list(paths) 31 ] 32 33 if not isinstance(config, str): 34 if type(config) != config_type: 35 config = convert_config_type(config, config_type) 36 return {path: config for path in absolute_paths} 37 38 config_env_vars = None 39 personal_paths = [ 40 sqlmesh_path / "config.yml", 41 sqlmesh_path / "config.yaml", 42 ] 43 for path in personal_paths: 44 if path.exists(): 45 config_env_vars = load_config_from_yaml(path).get("env_vars") 46 if config_env_vars: 47 break 48 49 with env_vars(config_env_vars if config_env_vars else {}): 50 return { 51 path: load_config_from_paths( 52 config_type, 53 project_paths=[path / "config.py", path / "config.yml", path / "config.yaml"], 54 personal_paths=personal_paths, 55 config_name=config, 56 ) 57 for path in absolute_paths 58 } 59 60 61def load_config_from_paths( 62 config_type: t.Type[C], 63 project_paths: t.Optional[t.List[Path]] = None, 64 personal_paths: t.Optional[t.List[Path]] = None, 65 config_name: str = "config", 66 load_from_env: bool = True, 67) -> C: 68 project_paths = project_paths or [] 69 personal_paths = personal_paths or [] 70 visited_folders: t.Set[Path] = set() 71 python_config: t.Optional[C] = None 72 non_python_configs = [] 73 74 if not project_paths or not any(path.exists() for path in project_paths): 75 raise ConfigError( 76 "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`." 77 ) 78 79 for path in [*project_paths, *personal_paths]: 80 if not path.exists(): 81 continue 82 83 if not path.is_file(): 84 raise ConfigError(f"Path '{path}' must be a file.") 85 86 parent_path = path.parent 87 if parent_path in visited_folders: 88 raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.") 89 visited_folders.add(parent_path) 90 91 extension = path.name.split(".")[-1].lower() 92 if extension in ("yml", "yaml"): 93 if config_name != "config" and not python_config: 94 raise ConfigError( 95 "YAML configs do not support multiple configs. Use Python instead." 96 ) 97 non_python_configs.append(load_config_from_yaml(path)) 98 elif extension == "py": 99 python_config = load_config_from_python_module( 100 config_type, path, config_name=config_name 101 ) 102 else: 103 raise ConfigError( 104 f"Unsupported config file extension '{extension}' in config file '{path}'." 105 ) 106 107 if load_from_env: 108 env_config = load_config_from_env() 109 if env_config: 110 non_python_configs.append(load_config_from_env()) 111 112 if not non_python_configs and not python_config: 113 raise ConfigError( 114 "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`." 115 ) 116 117 non_python_config_dict = merge_dicts(*non_python_configs) 118 119 supported_model_defaults = ModelDefaultsConfig.all_fields() 120 for default in non_python_config_dict.get("model_defaults", {}): 121 if default not in supported_model_defaults: 122 raise ConfigError( 123 f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file." 124 ) 125 126 non_python_config = config_type.parse_obj(non_python_config_dict) 127 128 no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file." 129 if python_config: 130 model_defaults = python_config.model_defaults 131 if model_defaults.dialect is None: 132 raise ConfigError(no_dialect_err_msg) 133 return python_config.update_with(non_python_config) 134 135 model_defaults = non_python_config.model_defaults 136 if model_defaults.dialect is None: 137 raise ConfigError(no_dialect_err_msg) 138 return non_python_config 139 140 141def load_config_from_yaml(path: Path) -> t.Dict[str, t.Any]: 142 return yaml_load(path) 143 144 145def load_config_from_python_module( 146 config_type: t.Type[C], 147 module_path: Path, 148 config_name: str = "config", 149) -> C: 150 with sys_path(module_path.parent): 151 config_module = import_python_file(module_path, module_path.parent) 152 153 try: 154 config_obj = getattr(config_module, config_name) 155 except AttributeError: 156 raise ConfigError(f"Config '{config_name}' was not found.") 157 158 if config_obj is None or not isinstance(config_obj, Config): 159 raise ConfigError( 160 f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'." 161 ) 162 163 return ( 164 config_obj 165 if type(config_obj) == config_type 166 else convert_config_type(config_obj, config_type) 167 ) 168 169 170def load_config_from_env() -> t.Dict[str, t.Any]: 171 config_dict: t.Dict[str, t.Any] = {} 172 173 for key, value in os.environ.items(): 174 key = key.lower() 175 if key.startswith(f"{c.SQLMESH}__"): 176 segments = key.split("__")[1:] 177 if not segments or not segments[-1]: 178 raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.") 179 180 target_dict = config_dict 181 for config_key in segments[:-1]: 182 if config_key not in target_dict: 183 target_dict[config_key] = {} 184 target_dict = target_dict[config_key] 185 target_dict[segments[-1]] = value 186 187 return config_dict 188 189 190def convert_config_type( 191 config_obj: Config, 192 config_type: t.Type[C], 193) -> C: 194 return config_type.parse_obj(config_obj.dict())
def
load_configs( config: Union[str, ~C, NoneType], config_type: Type[~C], paths: 't.Union[str | Path, t.Iterable[str | Path]]', sqlmesh_path: Union[pathlib.Path, NoneType] = None) -> Dict[pathlib.Path, ~C]:
21def load_configs( 22 config: t.Optional[t.Union[str, C]], 23 config_type: t.Type[C], 24 paths: t.Union[str | Path, t.Iterable[str | Path]], 25 sqlmesh_path: t.Optional[Path] = None, 26) -> t.Dict[Path, C]: 27 sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 28 config = config or "config" 29 30 absolute_paths = [ 31 Path(t.cast(t.Union[str, Path], path)).absolute() for path in ensure_list(paths) 32 ] 33 34 if not isinstance(config, str): 35 if type(config) != config_type: 36 config = convert_config_type(config, config_type) 37 return {path: config for path in absolute_paths} 38 39 config_env_vars = None 40 personal_paths = [ 41 sqlmesh_path / "config.yml", 42 sqlmesh_path / "config.yaml", 43 ] 44 for path in personal_paths: 45 if path.exists(): 46 config_env_vars = load_config_from_yaml(path).get("env_vars") 47 if config_env_vars: 48 break 49 50 with env_vars(config_env_vars if config_env_vars else {}): 51 return { 52 path: load_config_from_paths( 53 config_type, 54 project_paths=[path / "config.py", path / "config.yml", path / "config.yaml"], 55 personal_paths=personal_paths, 56 config_name=config, 57 ) 58 for path in absolute_paths 59 }
def
load_config_from_paths( config_type: Type[~C], project_paths: Union[List[pathlib.Path], NoneType] = None, personal_paths: Union[List[pathlib.Path], NoneType] = None, config_name: str = 'config', load_from_env: bool = True) -> ~C:
62def load_config_from_paths( 63 config_type: t.Type[C], 64 project_paths: t.Optional[t.List[Path]] = None, 65 personal_paths: t.Optional[t.List[Path]] = None, 66 config_name: str = "config", 67 load_from_env: bool = True, 68) -> C: 69 project_paths = project_paths or [] 70 personal_paths = personal_paths or [] 71 visited_folders: t.Set[Path] = set() 72 python_config: t.Optional[C] = None 73 non_python_configs = [] 74 75 if not project_paths or not any(path.exists() for path in project_paths): 76 raise ConfigError( 77 "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`." 78 ) 79 80 for path in [*project_paths, *personal_paths]: 81 if not path.exists(): 82 continue 83 84 if not path.is_file(): 85 raise ConfigError(f"Path '{path}' must be a file.") 86 87 parent_path = path.parent 88 if parent_path in visited_folders: 89 raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.") 90 visited_folders.add(parent_path) 91 92 extension = path.name.split(".")[-1].lower() 93 if extension in ("yml", "yaml"): 94 if config_name != "config" and not python_config: 95 raise ConfigError( 96 "YAML configs do not support multiple configs. Use Python instead." 97 ) 98 non_python_configs.append(load_config_from_yaml(path)) 99 elif extension == "py": 100 python_config = load_config_from_python_module( 101 config_type, path, config_name=config_name 102 ) 103 else: 104 raise ConfigError( 105 f"Unsupported config file extension '{extension}' in config file '{path}'." 106 ) 107 108 if load_from_env: 109 env_config = load_config_from_env() 110 if env_config: 111 non_python_configs.append(load_config_from_env()) 112 113 if not non_python_configs and not python_config: 114 raise ConfigError( 115 "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`." 116 ) 117 118 non_python_config_dict = merge_dicts(*non_python_configs) 119 120 supported_model_defaults = ModelDefaultsConfig.all_fields() 121 for default in non_python_config_dict.get("model_defaults", {}): 122 if default not in supported_model_defaults: 123 raise ConfigError( 124 f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file." 125 ) 126 127 non_python_config = config_type.parse_obj(non_python_config_dict) 128 129 no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file." 130 if python_config: 131 model_defaults = python_config.model_defaults 132 if model_defaults.dialect is None: 133 raise ConfigError(no_dialect_err_msg) 134 return python_config.update_with(non_python_config) 135 136 model_defaults = non_python_config.model_defaults 137 if model_defaults.dialect is None: 138 raise ConfigError(no_dialect_err_msg) 139 return non_python_config
def
load_config_from_yaml(path: pathlib.Path) -> Dict[str, Any]:
def
load_config_from_python_module( config_type: Type[~C], module_path: pathlib.Path, config_name: str = 'config') -> ~C:
146def load_config_from_python_module( 147 config_type: t.Type[C], 148 module_path: Path, 149 config_name: str = "config", 150) -> C: 151 with sys_path(module_path.parent): 152 config_module = import_python_file(module_path, module_path.parent) 153 154 try: 155 config_obj = getattr(config_module, config_name) 156 except AttributeError: 157 raise ConfigError(f"Config '{config_name}' was not found.") 158 159 if config_obj is None or not isinstance(config_obj, Config): 160 raise ConfigError( 161 f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'." 162 ) 163 164 return ( 165 config_obj 166 if type(config_obj) == config_type 167 else convert_config_type(config_obj, config_type) 168 )
def
load_config_from_env() -> Dict[str, Any]:
171def load_config_from_env() -> t.Dict[str, t.Any]: 172 config_dict: t.Dict[str, t.Any] = {} 173 174 for key, value in os.environ.items(): 175 key = key.lower() 176 if key.startswith(f"{c.SQLMESH}__"): 177 segments = key.split("__")[1:] 178 if not segments or not segments[-1]: 179 raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.") 180 181 target_dict = config_dict 182 for config_key in segments[:-1]: 183 if config_key not in target_dict: 184 target_dict[config_key] = {} 185 target_dict = target_dict[config_key] 186 target_dict[segments[-1]] = value 187 188 return config_dict