Edit on GitHub

sqlmesh.core.schema_loader

  1from __future__ import annotations
  2
  3import typing as t
  4from concurrent.futures import ThreadPoolExecutor
  5from pathlib import Path
  6
  7from sqlglot import exp
  8from sqlglot.dialects.dialect import DialectType
  9
 10from sqlmesh.core.console import get_console
 11from sqlmesh.core.engine_adapter import EngineAdapter
 12from sqlmesh.core.model.definition import Model
 13from sqlmesh.core.state_sync import StateReader
 14from sqlmesh.utils import UniqueKeyDict, yaml
 15from sqlmesh.utils.errors import SQLMeshError
 16
 17
 18def create_external_models_file(
 19    path: Path,
 20    models: UniqueKeyDict[str, Model],
 21    adapter: EngineAdapter,
 22    state_reader: StateReader,
 23    dialect: DialectType,
 24    gateway: t.Optional[str] = None,
 25    max_workers: int = 1,
 26    strict: bool = False,
 27) -> None:
 28    """Create or replace a YAML file with column and types of all columns in all external models.
 29
 30    Args:
 31        path: The path to store the YAML file.
 32        models: FQN to model
 33        adapter: The engine adapter.
 34        state_reader: The state reader.
 35        dialect: The dialect to serialize the schema as.
 36        gateway: If the model should be associated with a specific gateway; the gateway key
 37        max_workers: The max concurrent workers to fetch columns.
 38        strict: If True, raise an error if the external model is missing in the database.
 39    """
 40    external_model_fqns = set()
 41
 42    for fqn, model in models.items():
 43        if model.kind.is_external:
 44            external_model_fqns.add(fqn)
 45        for dep in model.depends_on:
 46            if dep not in models:
 47                external_model_fqns.add(dep)
 48
 49    # Make sure we don't convert internal models into external ones.
 50    existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True)
 51    if existing_model_fqns:
 52        existing_model_fqns_str = ", ".join(existing_model_fqns)
 53        get_console().log_warning(
 54            f"The following models already exist and can't be converted to external: {existing_model_fqns_str}. "
 55            "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly."
 56        )
 57        external_model_fqns -= existing_model_fqns
 58
 59    with ThreadPoolExecutor(max_workers=max_workers) as pool:
 60        gateway_part = {"gateway": gateway} if gateway else {}
 61
 62        schemas = [
 63            {
 64                "name": exp.to_table(table).sql(dialect=dialect),
 65                "columns": columns,
 66                **gateway_part,
 67            }
 68            for table, columns in sorted(
 69                pool.map(
 70                    lambda table: (table, get_columns(adapter, dialect, table, strict)),
 71                    external_model_fqns,
 72                )
 73            )
 74            if columns
 75        ]
 76
 77        # dont clobber existing entries from other gateways
 78        entries_to_keep = (
 79            [e for e in yaml.load(path) if e.get("gateway", None) != gateway]
 80            if path.exists()
 81            else []
 82        )
 83
 84        with open(path, "w", encoding="utf-8") as file:
 85            yaml.dump(entries_to_keep + schemas, file)
 86
 87
 88def get_columns(
 89    adapter: EngineAdapter, dialect: DialectType, table: str, strict: bool
 90) -> t.Optional[t.Dict[str, t.Any]]:
 91    """
 92    Return the column and their types in a dictionary
 93    """
 94    try:
 95        columns = adapter.columns(table, include_pseudo_columns=True)
 96        return {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()}
 97    except Exception as e:
 98        msg = f"Unable to get schema for '{table}': '{e}'."
 99        if strict:
100            raise SQLMeshError(msg) from e
101        get_console().log_warning(msg)
102        return None
def create_external_models_file( path: pathlib.Path, models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, state_reader: sqlmesh.core.state_sync.base.StateReader, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], gateway: Optional[str] = None, max_workers: int = 1, strict: bool = False) -> None:
19def create_external_models_file(
20    path: Path,
21    models: UniqueKeyDict[str, Model],
22    adapter: EngineAdapter,
23    state_reader: StateReader,
24    dialect: DialectType,
25    gateway: t.Optional[str] = None,
26    max_workers: int = 1,
27    strict: bool = False,
28) -> None:
29    """Create or replace a YAML file with column and types of all columns in all external models.
30
31    Args:
32        path: The path to store the YAML file.
33        models: FQN to model
34        adapter: The engine adapter.
35        state_reader: The state reader.
36        dialect: The dialect to serialize the schema as.
37        gateway: If the model should be associated with a specific gateway; the gateway key
38        max_workers: The max concurrent workers to fetch columns.
39        strict: If True, raise an error if the external model is missing in the database.
40    """
41    external_model_fqns = set()
42
43    for fqn, model in models.items():
44        if model.kind.is_external:
45            external_model_fqns.add(fqn)
46        for dep in model.depends_on:
47            if dep not in models:
48                external_model_fqns.add(dep)
49
50    # Make sure we don't convert internal models into external ones.
51    existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True)
52    if existing_model_fqns:
53        existing_model_fqns_str = ", ".join(existing_model_fqns)
54        get_console().log_warning(
55            f"The following models already exist and can't be converted to external: {existing_model_fqns_str}. "
56            "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly."
57        )
58        external_model_fqns -= existing_model_fqns
59
60    with ThreadPoolExecutor(max_workers=max_workers) as pool:
61        gateway_part = {"gateway": gateway} if gateway else {}
62
63        schemas = [
64            {
65                "name": exp.to_table(table).sql(dialect=dialect),
66                "columns": columns,
67                **gateway_part,
68            }
69            for table, columns in sorted(
70                pool.map(
71                    lambda table: (table, get_columns(adapter, dialect, table, strict)),
72                    external_model_fqns,
73                )
74            )
75            if columns
76        ]
77
78        # dont clobber existing entries from other gateways
79        entries_to_keep = (
80            [e for e in yaml.load(path) if e.get("gateway", None) != gateway]
81            if path.exists()
82            else []
83        )
84
85        with open(path, "w", encoding="utf-8") as file:
86            yaml.dump(entries_to_keep + schemas, file)

Create or replace a YAML file with column and types of all columns in all external models.

Arguments:
  • path: The path to store the YAML file.
  • models: FQN to model
  • adapter: The engine adapter.
  • state_reader: The state reader.
  • dialect: The dialect to serialize the schema as.
  • gateway: If the model should be associated with a specific gateway; the gateway key
  • max_workers: The max concurrent workers to fetch columns.
  • strict: If True, raise an error if the external model is missing in the database.
def get_columns( adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], table: str, strict: bool) -> Optional[Dict[str, Any]]:
 89def get_columns(
 90    adapter: EngineAdapter, dialect: DialectType, table: str, strict: bool
 91) -> t.Optional[t.Dict[str, t.Any]]:
 92    """
 93    Return the column and their types in a dictionary
 94    """
 95    try:
 96        columns = adapter.columns(table, include_pseudo_columns=True)
 97        return {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()}
 98    except Exception as e:
 99        msg = f"Unable to get schema for '{table}': '{e}'."
100        if strict:
101            raise SQLMeshError(msg) from e
102        get_console().log_warning(msg)
103        return None

Return the column and their types in a dictionary