Edit on GitHub

sqlmesh.core.config.loader

  1from __future__ import annotations
  2
  3import os
  4import typing as t
  5from pathlib import Path
  6
  7from sqlglot.helper import ensure_list
  8
  9from sqlmesh.core import constants as c
 10from sqlmesh.core.config.model import ModelDefaultsConfig
 11from sqlmesh.core.config.root import Config
 12from sqlmesh.utils import env_vars, merge_dicts, sys_path
 13from sqlmesh.utils.errors import ConfigError
 14from sqlmesh.utils.metaprogramming import import_python_file
 15from sqlmesh.utils.yaml import load as yaml_load
 16
 17C = t.TypeVar("C", bound=Config)
 18
 19
 20def load_configs(
 21    config: t.Optional[t.Union[str, C]],
 22    config_type: t.Type[C],
 23    paths: t.Union[str | Path, t.Iterable[str | Path]],
 24    sqlmesh_path: t.Optional[Path] = None,
 25) -> t.Dict[Path, C]:
 26    sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
 27    config = config or "config"
 28
 29    absolute_paths = [
 30        Path(t.cast(t.Union[str, Path], path)).absolute() for path in ensure_list(paths)
 31    ]
 32
 33    if not isinstance(config, str):
 34        if type(config) != config_type:
 35            config = convert_config_type(config, config_type)
 36        return {path: config for path in absolute_paths}
 37
 38    config_env_vars = None
 39    personal_paths = [
 40        sqlmesh_path / "config.yml",
 41        sqlmesh_path / "config.yaml",
 42    ]
 43    for path in personal_paths:
 44        if path.exists():
 45            config_env_vars = load_config_from_yaml(path).get("env_vars")
 46            if config_env_vars:
 47                break
 48
 49    with env_vars(config_env_vars if config_env_vars else {}):
 50        return {
 51            path: load_config_from_paths(
 52                config_type,
 53                project_paths=[path / "config.py", path / "config.yml", path / "config.yaml"],
 54                personal_paths=personal_paths,
 55                config_name=config,
 56            )
 57            for path in absolute_paths
 58        }
 59
 60
 61def load_config_from_paths(
 62    config_type: t.Type[C],
 63    project_paths: t.Optional[t.List[Path]] = None,
 64    personal_paths: t.Optional[t.List[Path]] = None,
 65    config_name: str = "config",
 66    load_from_env: bool = True,
 67) -> C:
 68    project_paths = project_paths or []
 69    personal_paths = personal_paths or []
 70    visited_folders: t.Set[Path] = set()
 71    python_config: t.Optional[C] = None
 72    non_python_configs = []
 73
 74    if not project_paths or not any(path.exists() for path in project_paths):
 75        raise ConfigError(
 76            "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`."
 77        )
 78
 79    for path in [*project_paths, *personal_paths]:
 80        if not path.exists():
 81            continue
 82
 83        if not path.is_file():
 84            raise ConfigError(f"Path '{path}' must be a file.")
 85
 86        parent_path = path.parent
 87        if parent_path in visited_folders:
 88            raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.")
 89        visited_folders.add(parent_path)
 90
 91        extension = path.name.split(".")[-1].lower()
 92        if extension in ("yml", "yaml"):
 93            if config_name != "config" and not python_config:
 94                raise ConfigError(
 95                    "YAML configs do not support multiple configs. Use Python instead."
 96                )
 97            non_python_configs.append(load_config_from_yaml(path))
 98        elif extension == "py":
 99            python_config = load_config_from_python_module(
100                config_type, path, config_name=config_name
101            )
102        else:
103            raise ConfigError(
104                f"Unsupported config file extension '{extension}' in config file '{path}'."
105            )
106
107    if load_from_env:
108        env_config = load_config_from_env()
109        if env_config:
110            non_python_configs.append(load_config_from_env())
111
112    if not non_python_configs and not python_config:
113        raise ConfigError(
114            "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`."
115        )
116
117    non_python_config_dict = merge_dicts(*non_python_configs)
118
119    supported_model_defaults = ModelDefaultsConfig.all_fields()
120    for default in non_python_config_dict.get("model_defaults", {}):
121        if default not in supported_model_defaults:
122            raise ConfigError(
123                f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file."
124            )
125
126    non_python_config = config_type.parse_obj(non_python_config_dict)
127
128    no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file."
129    if python_config:
130        model_defaults = python_config.model_defaults
131        if model_defaults.dialect is None:
132            raise ConfigError(no_dialect_err_msg)
133        return python_config.update_with(non_python_config)
134
135    model_defaults = non_python_config.model_defaults
136    if model_defaults.dialect is None:
137        raise ConfigError(no_dialect_err_msg)
138    return non_python_config
139
140
141def load_config_from_yaml(path: Path) -> t.Dict[str, t.Any]:
142    return yaml_load(path)
143
144
145def load_config_from_python_module(
146    config_type: t.Type[C],
147    module_path: Path,
148    config_name: str = "config",
149) -> C:
150    with sys_path(module_path.parent):
151        config_module = import_python_file(module_path, module_path.parent)
152
153    try:
154        config_obj = getattr(config_module, config_name)
155    except AttributeError:
156        raise ConfigError(f"Config '{config_name}' was not found.")
157
158    if config_obj is None or not isinstance(config_obj, Config):
159        raise ConfigError(
160            f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'."
161        )
162
163    return (
164        config_obj
165        if type(config_obj) == config_type
166        else convert_config_type(config_obj, config_type)
167    )
168
169
170def load_config_from_env() -> t.Dict[str, t.Any]:
171    config_dict: t.Dict[str, t.Any] = {}
172
173    for key, value in os.environ.items():
174        key = key.lower()
175        if key.startswith(f"{c.SQLMESH}__"):
176            segments = key.split("__")[1:]
177            if not segments or not segments[-1]:
178                raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.")
179
180            target_dict = config_dict
181            for config_key in segments[:-1]:
182                if config_key not in target_dict:
183                    target_dict[config_key] = {}
184                target_dict = target_dict[config_key]
185            target_dict[segments[-1]] = value
186
187    return config_dict
188
189
190def convert_config_type(
191    config_obj: Config,
192    config_type: t.Type[C],
193) -> C:
194    return config_type.parse_obj(config_obj.dict())
def load_configs( config: Union[str, ~C, NoneType], config_type: Type[~C], paths: 't.Union[str | Path, t.Iterable[str | Path]]', sqlmesh_path: Union[pathlib.Path, NoneType] = None) -> Dict[pathlib.Path, ~C]:
21def load_configs(
22    config: t.Optional[t.Union[str, C]],
23    config_type: t.Type[C],
24    paths: t.Union[str | Path, t.Iterable[str | Path]],
25    sqlmesh_path: t.Optional[Path] = None,
26) -> t.Dict[Path, C]:
27    sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
28    config = config or "config"
29
30    absolute_paths = [
31        Path(t.cast(t.Union[str, Path], path)).absolute() for path in ensure_list(paths)
32    ]
33
34    if not isinstance(config, str):
35        if type(config) != config_type:
36            config = convert_config_type(config, config_type)
37        return {path: config for path in absolute_paths}
38
39    config_env_vars = None
40    personal_paths = [
41        sqlmesh_path / "config.yml",
42        sqlmesh_path / "config.yaml",
43    ]
44    for path in personal_paths:
45        if path.exists():
46            config_env_vars = load_config_from_yaml(path).get("env_vars")
47            if config_env_vars:
48                break
49
50    with env_vars(config_env_vars if config_env_vars else {}):
51        return {
52            path: load_config_from_paths(
53                config_type,
54                project_paths=[path / "config.py", path / "config.yml", path / "config.yaml"],
55                personal_paths=personal_paths,
56                config_name=config,
57            )
58            for path in absolute_paths
59        }
def load_config_from_paths( config_type: Type[~C], project_paths: Union[List[pathlib.Path], NoneType] = None, personal_paths: Union[List[pathlib.Path], NoneType] = None, config_name: str = 'config', load_from_env: bool = True) -> ~C:
 62def load_config_from_paths(
 63    config_type: t.Type[C],
 64    project_paths: t.Optional[t.List[Path]] = None,
 65    personal_paths: t.Optional[t.List[Path]] = None,
 66    config_name: str = "config",
 67    load_from_env: bool = True,
 68) -> C:
 69    project_paths = project_paths or []
 70    personal_paths = personal_paths or []
 71    visited_folders: t.Set[Path] = set()
 72    python_config: t.Optional[C] = None
 73    non_python_configs = []
 74
 75    if not project_paths or not any(path.exists() for path in project_paths):
 76        raise ConfigError(
 77            "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`."
 78        )
 79
 80    for path in [*project_paths, *personal_paths]:
 81        if not path.exists():
 82            continue
 83
 84        if not path.is_file():
 85            raise ConfigError(f"Path '{path}' must be a file.")
 86
 87        parent_path = path.parent
 88        if parent_path in visited_folders:
 89            raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.")
 90        visited_folders.add(parent_path)
 91
 92        extension = path.name.split(".")[-1].lower()
 93        if extension in ("yml", "yaml"):
 94            if config_name != "config" and not python_config:
 95                raise ConfigError(
 96                    "YAML configs do not support multiple configs. Use Python instead."
 97                )
 98            non_python_configs.append(load_config_from_yaml(path))
 99        elif extension == "py":
100            python_config = load_config_from_python_module(
101                config_type, path, config_name=config_name
102            )
103        else:
104            raise ConfigError(
105                f"Unsupported config file extension '{extension}' in config file '{path}'."
106            )
107
108    if load_from_env:
109        env_config = load_config_from_env()
110        if env_config:
111            non_python_configs.append(load_config_from_env())
112
113    if not non_python_configs and not python_config:
114        raise ConfigError(
115            "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`."
116        )
117
118    non_python_config_dict = merge_dicts(*non_python_configs)
119
120    supported_model_defaults = ModelDefaultsConfig.all_fields()
121    for default in non_python_config_dict.get("model_defaults", {}):
122        if default not in supported_model_defaults:
123            raise ConfigError(
124                f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file."
125            )
126
127    non_python_config = config_type.parse_obj(non_python_config_dict)
128
129    no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file."
130    if python_config:
131        model_defaults = python_config.model_defaults
132        if model_defaults.dialect is None:
133            raise ConfigError(no_dialect_err_msg)
134        return python_config.update_with(non_python_config)
135
136    model_defaults = non_python_config.model_defaults
137    if model_defaults.dialect is None:
138        raise ConfigError(no_dialect_err_msg)
139    return non_python_config
def load_config_from_yaml(path: pathlib.Path) -> Dict[str, Any]:
142def load_config_from_yaml(path: Path) -> t.Dict[str, t.Any]:
143    return yaml_load(path)
def load_config_from_python_module( config_type: Type[~C], module_path: pathlib.Path, config_name: str = 'config') -> ~C:
146def load_config_from_python_module(
147    config_type: t.Type[C],
148    module_path: Path,
149    config_name: str = "config",
150) -> C:
151    with sys_path(module_path.parent):
152        config_module = import_python_file(module_path, module_path.parent)
153
154    try:
155        config_obj = getattr(config_module, config_name)
156    except AttributeError:
157        raise ConfigError(f"Config '{config_name}' was not found.")
158
159    if config_obj is None or not isinstance(config_obj, Config):
160        raise ConfigError(
161            f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'."
162        )
163
164    return (
165        config_obj
166        if type(config_obj) == config_type
167        else convert_config_type(config_obj, config_type)
168    )
def load_config_from_env() -> Dict[str, Any]:
171def load_config_from_env() -> t.Dict[str, t.Any]:
172    config_dict: t.Dict[str, t.Any] = {}
173
174    for key, value in os.environ.items():
175        key = key.lower()
176        if key.startswith(f"{c.SQLMESH}__"):
177            segments = key.split("__")[1:]
178            if not segments or not segments[-1]:
179                raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.")
180
181            target_dict = config_dict
182            for config_key in segments[:-1]:
183                if config_key not in target_dict:
184                    target_dict[config_key] = {}
185                target_dict = target_dict[config_key]
186            target_dict[segments[-1]] = value
187
188    return config_dict
def convert_config_type(config_obj: sqlmesh.core.config.root.Config, config_type: Type[~C]) -> ~C:
191def convert_config_type(
192    config_obj: Config,
193    config_type: t.Type[C],
194) -> C:
195    return config_type.parse_obj(config_obj.dict())