Edit on GitHub

sqlmesh.core.config.loader

  1from __future__ import annotations
  2
  3import glob
  4import os
  5import typing as t
  6from pathlib import Path
  7
  8from pydantic import ValidationError
  9from dotenv import load_dotenv
 10from sqlglot.helper import ensure_list
 11
 12from sqlmesh.core import constants as c
 13from sqlmesh.core.config.common import (
 14    ALL_CONFIG_FILENAMES,
 15    YAML_CONFIG_FILENAMES,
 16    DBT_PROJECT_FILENAME,
 17)
 18from sqlmesh.core.config.model import ModelDefaultsConfig
 19from sqlmesh.core.config.root import Config
 20from sqlmesh.utils import env_vars, merge_dicts, sys_path
 21from sqlmesh.utils.errors import ConfigError
 22from sqlmesh.utils.metaprogramming import import_python_file
 23from sqlmesh.utils.pydantic import validation_error_message
 24from sqlmesh.utils.yaml import load as yaml_load
 25
 26C = t.TypeVar("C", bound=Config)
 27
 28
 29def load_configs(
 30    config: t.Optional[t.Union[str, C]],
 31    config_type: t.Type[C],
 32    paths: t.Union[str | Path, t.Iterable[str | Path]],
 33    sqlmesh_path: t.Optional[Path] = None,
 34    dotenv_path: t.Optional[Path] = None,
 35    **kwargs: t.Any,
 36) -> t.Dict[Path, C]:
 37    sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
 38    config = config or "config"
 39
 40    absolute_paths = [
 41        Path(t.cast(t.Union[str, Path], p)).absolute()
 42        for path in ensure_list(paths)
 43        for p in (glob.glob(str(path)) or [str(path)])
 44    ]
 45
 46    if dotenv_path and dotenv_path.exists() and dotenv_path.is_file():
 47        load_dotenv(dotenv_path=dotenv_path, override=True)
 48    else:
 49        for path in absolute_paths:
 50            env_file = path / ".env"
 51            if env_file.exists() and env_file.is_file():
 52                load_dotenv(dotenv_path=env_file, override=True)
 53
 54    if not isinstance(config, str):
 55        if type(config) != config_type:
 56            config = convert_config_type(config, config_type)
 57        return {path: config for path in absolute_paths}
 58
 59    config_env_vars = None
 60    personal_paths = [sqlmesh_path / name for name in YAML_CONFIG_FILENAMES]
 61    for path in personal_paths:
 62        if path.exists():
 63            config_env_vars = load_config_from_yaml(path).get("env_vars")
 64            if config_env_vars:
 65                break
 66
 67    with env_vars(config_env_vars if config_env_vars else {}):
 68        return {
 69            path: load_config_from_paths(
 70                config_type,
 71                project_paths=[path / name for name in ALL_CONFIG_FILENAMES],
 72                personal_paths=personal_paths,
 73                config_name=config,
 74                **kwargs,
 75            )
 76            for path in absolute_paths
 77        }
 78
 79
 80def load_config_from_paths(
 81    config_type: t.Type[C],
 82    project_paths: t.Optional[t.List[Path]] = None,
 83    personal_paths: t.Optional[t.List[Path]] = None,
 84    config_name: str = "config",
 85    load_from_env: bool = True,
 86    variables: t.Optional[t.Dict[str, t.Any]] = None,
 87    **kwargs: t.Any,
 88) -> C:
 89    project_paths = project_paths or []
 90    personal_paths = personal_paths or []
 91    visited_folders: t.Set[Path] = set()
 92    python_config: t.Optional[C] = None
 93    non_python_configs = []
 94
 95    if not project_paths or not any(path.exists() for path in project_paths):
 96        raise ConfigError(
 97            "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`."
 98        )
 99
100    yaml_config_path: t.Optional[Path] = None
101    for path in [*project_paths, *personal_paths]:
102        if not path.exists():
103            continue
104
105        if not path.is_file():
106            raise ConfigError(f"Path '{path}' must be a file.")
107
108        parent_path = path.parent
109        if parent_path in visited_folders:
110            raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.")
111        visited_folders.add(parent_path)
112
113        extension = path.name.split(".")[-1].lower()
114        if extension in ("yml", "yaml"):
115            if config_name != "config" and not python_config:
116                raise ConfigError(
117                    "YAML configs do not support multiple configs. Use Python instead.",
118                )
119            yaml_config_path = path.resolve()
120            non_python_configs.append(load_config_from_yaml(path, variables))
121        elif extension == "py":
122            try:
123                python_config = load_config_from_python_module(
124                    config_type, path, config_name=config_name
125                )
126            except ValidationError as e:
127                raise ConfigError(
128                    validation_error_message(e, f"Invalid project config '{config_name}':")
129                    + "\n\nVerify your config.py."
130                )
131        else:
132            raise ConfigError(
133                f"Unsupported config file extension '{extension}' in config file '{path}'."
134            )
135
136    if load_from_env:
137        env_config = load_config_from_env()
138        if env_config:
139            non_python_configs.append(load_config_from_env())
140
141    if not non_python_configs and not python_config:
142        raise ConfigError(
143            "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`."
144        )
145
146    non_python_config_dict = merge_dicts(*non_python_configs)
147
148    supported_model_defaults = ModelDefaultsConfig.all_fields()
149    for default in non_python_config_dict.get("model_defaults", {}):
150        if default not in supported_model_defaults:
151            raise ConfigError(
152                f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file."
153            )
154
155    try:
156        non_python_config = config_type.parse_obj(non_python_config_dict)
157    except ValidationError as e:
158        raise ConfigError(
159            validation_error_message(e, "Invalid project config:")
160            + "\n\nVerify your config.yaml and environment variables.",
161            location=yaml_config_path,
162        )
163
164    no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file."
165
166    # if "dbt_project.yml" is present *and there was no python config already defined*,
167    # create a basic one to ensure we are using the DBT loader.
168    # any config within yaml files will get overlayed on top of it.
169    if not python_config:
170        potential_project_files = [f / DBT_PROJECT_FILENAME for f in visited_folders]
171        dbt_project_file = next((f for f in potential_project_files if f.exists()), None)
172        if dbt_project_file:
173            from sqlmesh.dbt.loader import sqlmesh_config
174
175            infer_state_schema_name = False
176            if dbt := non_python_config.dbt:
177                infer_state_schema_name = dbt.infer_state_schema_name
178
179            dbt_python_config = sqlmesh_config(
180                project_root=dbt_project_file.parent,
181                profiles_dir=kwargs.pop("profiles_dir", None),
182                dbt_profile_name=kwargs.pop("profile", None),
183                dbt_target_name=kwargs.pop("target", None),
184                variables=variables,
185                threads=kwargs.pop("threads", None),
186                infer_state_schema_name=infer_state_schema_name,
187            )
188            if type(dbt_python_config) != config_type:
189                dbt_python_config = convert_config_type(dbt_python_config, config_type)
190
191            python_config = dbt_python_config
192
193    if python_config:
194        model_defaults = python_config.model_defaults
195        if model_defaults.dialect is None:
196            raise ConfigError(no_dialect_err_msg)
197        return python_config.update_with(non_python_config)
198
199    model_defaults = non_python_config.model_defaults
200    if model_defaults.dialect is None:
201        raise ConfigError(no_dialect_err_msg)
202
203    return non_python_config
204
205
206def load_config_from_yaml(
207    path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None
208) -> t.Dict[str, t.Any]:
209    content = yaml_load(path, variables=variables)
210    if not isinstance(content, dict):
211        raise ConfigError(
212            f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. "
213            f"Please check the YAML syntax in your config file.",
214            location=path,
215        )
216    return content
217
218
219def load_config_from_python_module(
220    config_type: t.Type[C],
221    module_path: Path,
222    config_name: str = "config",
223) -> C:
224    try:
225        with sys_path(module_path.parent):
226            config_module = import_python_file(module_path, module_path.parent)
227    except Exception as e:
228        raise ConfigError(
229            f"Failed to load config file: {e}",
230            location=module_path,
231        )
232
233    try:
234        config_obj = getattr(config_module, config_name)
235    except AttributeError:
236        raise ConfigError(f"Config '{config_name}' was not found.")
237
238    if config_obj is None or not isinstance(config_obj, Config):
239        raise ConfigError(
240            f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'.",
241            module_path,
242        )
243
244    return (
245        config_obj
246        if type(config_obj) == config_type
247        else convert_config_type(config_obj, config_type)
248    )
249
250
251def load_config_from_env() -> t.Dict[str, t.Any]:
252    config_dict: t.Dict[str, t.Any] = {}
253
254    for key, value in os.environ.items():
255        key = key.lower()
256        if key.startswith(f"{c.SQLMESH}__") and key != (c.DISABLE_SQLMESH_STATE_MIGRATION).lower():
257            segments = key.split("__")[1:]
258            if not segments or not segments[-1]:
259                raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.")
260
261            target_dict = config_dict
262            for config_key in segments[:-1]:
263                if config_key not in target_dict:
264                    target_dict[config_key] = {}
265                target_dict = target_dict[config_key]
266            target_dict[segments[-1]] = value
267
268    return config_dict
269
270
271def convert_config_type(
272    config_obj: Config,
273    config_type: t.Type[C],
274) -> C:
275    return config_type.parse_obj(config_obj.dict())
def load_configs( config: Union[str, ~C, NoneType], config_type: Type[~C], paths: Union[str, pathlib.Path, Iterable[str | pathlib.Path]], sqlmesh_path: Optional[pathlib.Path] = None, dotenv_path: Optional[pathlib.Path] = None, **kwargs: Any) -> Dict[pathlib.Path, ~C]:
30def load_configs(
31    config: t.Optional[t.Union[str, C]],
32    config_type: t.Type[C],
33    paths: t.Union[str | Path, t.Iterable[str | Path]],
34    sqlmesh_path: t.Optional[Path] = None,
35    dotenv_path: t.Optional[Path] = None,
36    **kwargs: t.Any,
37) -> t.Dict[Path, C]:
38    sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
39    config = config or "config"
40
41    absolute_paths = [
42        Path(t.cast(t.Union[str, Path], p)).absolute()
43        for path in ensure_list(paths)
44        for p in (glob.glob(str(path)) or [str(path)])
45    ]
46
47    if dotenv_path and dotenv_path.exists() and dotenv_path.is_file():
48        load_dotenv(dotenv_path=dotenv_path, override=True)
49    else:
50        for path in absolute_paths:
51            env_file = path / ".env"
52            if env_file.exists() and env_file.is_file():
53                load_dotenv(dotenv_path=env_file, override=True)
54
55    if not isinstance(config, str):
56        if type(config) != config_type:
57            config = convert_config_type(config, config_type)
58        return {path: config for path in absolute_paths}
59
60    config_env_vars = None
61    personal_paths = [sqlmesh_path / name for name in YAML_CONFIG_FILENAMES]
62    for path in personal_paths:
63        if path.exists():
64            config_env_vars = load_config_from_yaml(path).get("env_vars")
65            if config_env_vars:
66                break
67
68    with env_vars(config_env_vars if config_env_vars else {}):
69        return {
70            path: load_config_from_paths(
71                config_type,
72                project_paths=[path / name for name in ALL_CONFIG_FILENAMES],
73                personal_paths=personal_paths,
74                config_name=config,
75                **kwargs,
76            )
77            for path in absolute_paths
78        }
def load_config_from_paths( config_type: Type[~C], project_paths: Optional[List[pathlib.Path]] = None, personal_paths: Optional[List[pathlib.Path]] = None, config_name: str = 'config', load_from_env: bool = True, variables: Optional[Dict[str, Any]] = None, **kwargs: Any) -> ~C:
 81def load_config_from_paths(
 82    config_type: t.Type[C],
 83    project_paths: t.Optional[t.List[Path]] = None,
 84    personal_paths: t.Optional[t.List[Path]] = None,
 85    config_name: str = "config",
 86    load_from_env: bool = True,
 87    variables: t.Optional[t.Dict[str, t.Any]] = None,
 88    **kwargs: t.Any,
 89) -> C:
 90    project_paths = project_paths or []
 91    personal_paths = personal_paths or []
 92    visited_folders: t.Set[Path] = set()
 93    python_config: t.Optional[C] = None
 94    non_python_configs = []
 95
 96    if not project_paths or not any(path.exists() for path in project_paths):
 97        raise ConfigError(
 98            "SQLMesh project config could not be found. Point the cli to the project path with `sqlmesh -p`. If you haven't set up the SQLMesh project, run `sqlmesh init`."
 99        )
100
101    yaml_config_path: t.Optional[Path] = None
102    for path in [*project_paths, *personal_paths]:
103        if not path.exists():
104            continue
105
106        if not path.is_file():
107            raise ConfigError(f"Path '{path}' must be a file.")
108
109        parent_path = path.parent
110        if parent_path in visited_folders:
111            raise ConfigError(f"Multiple configuration files found in folder '{parent_path}'.")
112        visited_folders.add(parent_path)
113
114        extension = path.name.split(".")[-1].lower()
115        if extension in ("yml", "yaml"):
116            if config_name != "config" and not python_config:
117                raise ConfigError(
118                    "YAML configs do not support multiple configs. Use Python instead.",
119                )
120            yaml_config_path = path.resolve()
121            non_python_configs.append(load_config_from_yaml(path, variables))
122        elif extension == "py":
123            try:
124                python_config = load_config_from_python_module(
125                    config_type, path, config_name=config_name
126                )
127            except ValidationError as e:
128                raise ConfigError(
129                    validation_error_message(e, f"Invalid project config '{config_name}':")
130                    + "\n\nVerify your config.py."
131                )
132        else:
133            raise ConfigError(
134                f"Unsupported config file extension '{extension}' in config file '{path}'."
135            )
136
137    if load_from_env:
138        env_config = load_config_from_env()
139        if env_config:
140            non_python_configs.append(load_config_from_env())
141
142    if not non_python_configs and not python_config:
143        raise ConfigError(
144            "SQLMesh config could not be found. Point the cli to the right path with `sqlmesh -p`. If you haven't set up SQLMesh, run `sqlmesh init`."
145        )
146
147    non_python_config_dict = merge_dicts(*non_python_configs)
148
149    supported_model_defaults = ModelDefaultsConfig.all_fields()
150    for default in non_python_config_dict.get("model_defaults", {}):
151        if default not in supported_model_defaults:
152            raise ConfigError(
153                f"'{default}' is not a valid model default configuration key. Please remove it from the `model_defaults` specification in your config file."
154            )
155
156    try:
157        non_python_config = config_type.parse_obj(non_python_config_dict)
158    except ValidationError as e:
159        raise ConfigError(
160            validation_error_message(e, "Invalid project config:")
161            + "\n\nVerify your config.yaml and environment variables.",
162            location=yaml_config_path,
163        )
164
165    no_dialect_err_msg = "Default model SQL dialect is a required configuration parameter. Set it in the `model_defaults` `dialect` key in your config file."
166
167    # if "dbt_project.yml" is present *and there was no python config already defined*,
168    # create a basic one to ensure we are using the DBT loader.
169    # any config within yaml files will get overlayed on top of it.
170    if not python_config:
171        potential_project_files = [f / DBT_PROJECT_FILENAME for f in visited_folders]
172        dbt_project_file = next((f for f in potential_project_files if f.exists()), None)
173        if dbt_project_file:
174            from sqlmesh.dbt.loader import sqlmesh_config
175
176            infer_state_schema_name = False
177            if dbt := non_python_config.dbt:
178                infer_state_schema_name = dbt.infer_state_schema_name
179
180            dbt_python_config = sqlmesh_config(
181                project_root=dbt_project_file.parent,
182                profiles_dir=kwargs.pop("profiles_dir", None),
183                dbt_profile_name=kwargs.pop("profile", None),
184                dbt_target_name=kwargs.pop("target", None),
185                variables=variables,
186                threads=kwargs.pop("threads", None),
187                infer_state_schema_name=infer_state_schema_name,
188            )
189            if type(dbt_python_config) != config_type:
190                dbt_python_config = convert_config_type(dbt_python_config, config_type)
191
192            python_config = dbt_python_config
193
194    if python_config:
195        model_defaults = python_config.model_defaults
196        if model_defaults.dialect is None:
197            raise ConfigError(no_dialect_err_msg)
198        return python_config.update_with(non_python_config)
199
200    model_defaults = non_python_config.model_defaults
201    if model_defaults.dialect is None:
202        raise ConfigError(no_dialect_err_msg)
203
204    return non_python_config
def load_config_from_yaml( path: pathlib.Path, variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
207def load_config_from_yaml(
208    path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None
209) -> t.Dict[str, t.Any]:
210    content = yaml_load(path, variables=variables)
211    if not isinstance(content, dict):
212        raise ConfigError(
213            f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. "
214            f"Please check the YAML syntax in your config file.",
215            location=path,
216        )
217    return content
def load_config_from_python_module( config_type: Type[~C], module_path: pathlib.Path, config_name: str = 'config') -> ~C:
220def load_config_from_python_module(
221    config_type: t.Type[C],
222    module_path: Path,
223    config_name: str = "config",
224) -> C:
225    try:
226        with sys_path(module_path.parent):
227            config_module = import_python_file(module_path, module_path.parent)
228    except Exception as e:
229        raise ConfigError(
230            f"Failed to load config file: {e}",
231            location=module_path,
232        )
233
234    try:
235        config_obj = getattr(config_module, config_name)
236    except AttributeError:
237        raise ConfigError(f"Config '{config_name}' was not found.")
238
239    if config_obj is None or not isinstance(config_obj, Config):
240        raise ConfigError(
241            f"Config needs to be a valid object of type sqlmesh.core.config.Config. Found `{config_obj}` instead at '{module_path}'.",
242            module_path,
243        )
244
245    return (
246        config_obj
247        if type(config_obj) == config_type
248        else convert_config_type(config_obj, config_type)
249    )
def load_config_from_env() -> Dict[str, Any]:
252def load_config_from_env() -> t.Dict[str, t.Any]:
253    config_dict: t.Dict[str, t.Any] = {}
254
255    for key, value in os.environ.items():
256        key = key.lower()
257        if key.startswith(f"{c.SQLMESH}__") and key != (c.DISABLE_SQLMESH_STATE_MIGRATION).lower():
258            segments = key.split("__")[1:]
259            if not segments or not segments[-1]:
260                raise ConfigError(f"Invalid SQLMesh configuration variable '{key}'.")
261
262            target_dict = config_dict
263            for config_key in segments[:-1]:
264                if config_key not in target_dict:
265                    target_dict[config_key] = {}
266                target_dict = target_dict[config_key]
267            target_dict[segments[-1]] = value
268
269    return config_dict
def convert_config_type(config_obj: sqlmesh.core.config.root.Config, config_type: Type[~C]) -> ~C:
272def convert_config_type(
273    config_obj: Config,
274    config_type: t.Type[C],
275) -> C:
276    return config_type.parse_obj(config_obj.dict())