Edit on GitHub

sqlmesh.core.schema_loader

 1from __future__ import annotations
 2
 3import logging
 4import typing as t
 5from concurrent.futures import ThreadPoolExecutor
 6from pathlib import Path
 7
 8from sqlglot import exp
 9from sqlglot.dialects.dialect import DialectType
10
11from sqlmesh.core.engine_adapter import EngineAdapter
12from sqlmesh.core.model.definition import Model
13from sqlmesh.core.state_sync import StateReader
14from sqlmesh.utils import UniqueKeyDict, yaml
15
16logger = logging.getLogger(__name__)
17
18
19def create_schema_file(
20    path: Path,
21    models: UniqueKeyDict[str, Model],
22    adapter: EngineAdapter,
23    state_reader: StateReader,
24    dialect: DialectType,
25    max_workers: int = 1,
26) -> None:
27    """Create or replace a YAML file with model schemas.
28
29    Args:
30        path: The path to store the YAML file.
31        models: FQN to model
32        adapter: The engine adapter.
33        state_reader: The state reader.
34        dialect: The dialect to serialize the schema as.
35        max_workers: The max concurrent workers to fetch columns.
36    """
37    external_model_fqns = set()
38
39    for fqn, model in models.items():
40        if model.kind.is_external:
41            external_model_fqns.add(fqn)
42        for dep in model.depends_on:
43            if dep not in models:
44                external_model_fqns.add(dep)
45
46    # Make sure we don't convert internal models into external ones.
47    existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True)
48    if existing_model_fqns:
49        logger.warning(
50            "The following models already exist and can't be converted to external: %s."
51            "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly",
52            ", ".join(existing_model_fqns),
53        )
54        external_model_fqns -= existing_model_fqns
55
56    with ThreadPoolExecutor(max_workers=max_workers) as pool:
57
58        def _get_columns(table: str) -> t.Optional[t.Dict[str, t.Any]]:
59            try:
60                return adapter.columns(table, include_pseudo_columns=True)
61            except Exception as e:
62                logger.warning(f"Unable to get schema for '{table}': '{e}'.")
63                return None
64
65        schemas = [
66            {
67                "name": exp.to_table(table).sql(dialect=dialect),
68                "columns": {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()},
69            }
70            for table, columns in sorted(
71                pool.map(
72                    lambda table: (table, _get_columns(table)),
73                    external_model_fqns,
74                )
75            )
76            if columns
77        ]
78
79        with open(path, "w", encoding="utf-8") as file:
80            yaml.dump(schemas, file)
def create_schema_file( path: pathlib.Path, models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, state_reader: sqlmesh.core.state_sync.base.StateReader, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], max_workers: int = 1) -> None:
20def create_schema_file(
21    path: Path,
22    models: UniqueKeyDict[str, Model],
23    adapter: EngineAdapter,
24    state_reader: StateReader,
25    dialect: DialectType,
26    max_workers: int = 1,
27) -> None:
28    """Create or replace a YAML file with model schemas.
29
30    Args:
31        path: The path to store the YAML file.
32        models: FQN to model
33        adapter: The engine adapter.
34        state_reader: The state reader.
35        dialect: The dialect to serialize the schema as.
36        max_workers: The max concurrent workers to fetch columns.
37    """
38    external_model_fqns = set()
39
40    for fqn, model in models.items():
41        if model.kind.is_external:
42            external_model_fqns.add(fqn)
43        for dep in model.depends_on:
44            if dep not in models:
45                external_model_fqns.add(dep)
46
47    # Make sure we don't convert internal models into external ones.
48    existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True)
49    if existing_model_fqns:
50        logger.warning(
51            "The following models already exist and can't be converted to external: %s."
52            "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly",
53            ", ".join(existing_model_fqns),
54        )
55        external_model_fqns -= existing_model_fqns
56
57    with ThreadPoolExecutor(max_workers=max_workers) as pool:
58
59        def _get_columns(table: str) -> t.Optional[t.Dict[str, t.Any]]:
60            try:
61                return adapter.columns(table, include_pseudo_columns=True)
62            except Exception as e:
63                logger.warning(f"Unable to get schema for '{table}': '{e}'.")
64                return None
65
66        schemas = [
67            {
68                "name": exp.to_table(table).sql(dialect=dialect),
69                "columns": {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()},
70            }
71            for table, columns in sorted(
72                pool.map(
73                    lambda table: (table, _get_columns(table)),
74                    external_model_fqns,
75                )
76            )
77            if columns
78        ]
79
80        with open(path, "w", encoding="utf-8") as file:
81            yaml.dump(schemas, file)

Create or replace a YAML file with model schemas.

Arguments:
  • path: The path to store the YAML file.
  • models: FQN to model
  • adapter: The engine adapter.
  • state_reader: The state reader.
  • dialect: The dialect to serialize the schema as.
  • max_workers: The max concurrent workers to fetch columns.