sqlmesh.core.schema_loader
1from __future__ import annotations 2 3import logging 4import typing as t 5from concurrent.futures import ThreadPoolExecutor 6from pathlib import Path 7 8from sqlglot import exp 9from sqlglot.dialects.dialect import DialectType 10 11from sqlmesh.core.engine_adapter import EngineAdapter 12from sqlmesh.core.model.definition import Model 13from sqlmesh.core.state_sync import StateReader 14from sqlmesh.utils import UniqueKeyDict, yaml 15 16logger = logging.getLogger(__name__) 17 18 19def create_schema_file( 20 path: Path, 21 models: UniqueKeyDict[str, Model], 22 adapter: EngineAdapter, 23 state_reader: StateReader, 24 dialect: DialectType, 25 max_workers: int = 1, 26) -> None: 27 """Create or replace a YAML file with model schemas. 28 29 Args: 30 path: The path to store the YAML file. 31 models: FQN to model 32 adapter: The engine adapter. 33 state_reader: The state reader. 34 dialect: The dialect to serialize the schema as. 35 max_workers: The max concurrent workers to fetch columns. 36 """ 37 external_model_fqns = set() 38 39 for fqn, model in models.items(): 40 if model.kind.is_external: 41 external_model_fqns.add(fqn) 42 for dep in model.depends_on: 43 if dep not in models: 44 external_model_fqns.add(dep) 45 46 # Make sure we don't convert internal models into external ones. 47 existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True) 48 if existing_model_fqns: 49 logger.warning( 50 "The following models already exist and can't be converted to external: %s." 51 "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly", 52 ", ".join(existing_model_fqns), 53 ) 54 external_model_fqns -= existing_model_fqns 55 56 with ThreadPoolExecutor(max_workers=max_workers) as pool: 57 58 def _get_columns(table: str) -> t.Optional[t.Dict[str, t.Any]]: 59 try: 60 return adapter.columns(table, include_pseudo_columns=True) 61 except Exception as e: 62 logger.warning(f"Unable to get schema for '{table}': '{e}'.") 63 return None 64 65 schemas = [ 66 { 67 "name": exp.to_table(table).sql(dialect=dialect), 68 "columns": {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()}, 69 } 70 for table, columns in sorted( 71 pool.map( 72 lambda table: (table, _get_columns(table)), 73 external_model_fqns, 74 ) 75 ) 76 if columns 77 ] 78 79 with open(path, "w", encoding="utf-8") as file: 80 yaml.dump(schemas, file)
def
create_schema_file( path: pathlib.Path, models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, state_reader: sqlmesh.core.state_sync.base.StateReader, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], max_workers: int = 1) -> None:
20def create_schema_file( 21 path: Path, 22 models: UniqueKeyDict[str, Model], 23 adapter: EngineAdapter, 24 state_reader: StateReader, 25 dialect: DialectType, 26 max_workers: int = 1, 27) -> None: 28 """Create or replace a YAML file with model schemas. 29 30 Args: 31 path: The path to store the YAML file. 32 models: FQN to model 33 adapter: The engine adapter. 34 state_reader: The state reader. 35 dialect: The dialect to serialize the schema as. 36 max_workers: The max concurrent workers to fetch columns. 37 """ 38 external_model_fqns = set() 39 40 for fqn, model in models.items(): 41 if model.kind.is_external: 42 external_model_fqns.add(fqn) 43 for dep in model.depends_on: 44 if dep not in models: 45 external_model_fqns.add(dep) 46 47 # Make sure we don't convert internal models into external ones. 48 existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True) 49 if existing_model_fqns: 50 logger.warning( 51 "The following models already exist and can't be converted to external: %s." 52 "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly", 53 ", ".join(existing_model_fqns), 54 ) 55 external_model_fqns -= existing_model_fqns 56 57 with ThreadPoolExecutor(max_workers=max_workers) as pool: 58 59 def _get_columns(table: str) -> t.Optional[t.Dict[str, t.Any]]: 60 try: 61 return adapter.columns(table, include_pseudo_columns=True) 62 except Exception as e: 63 logger.warning(f"Unable to get schema for '{table}': '{e}'.") 64 return None 65 66 schemas = [ 67 { 68 "name": exp.to_table(table).sql(dialect=dialect), 69 "columns": {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()}, 70 } 71 for table, columns in sorted( 72 pool.map( 73 lambda table: (table, _get_columns(table)), 74 external_model_fqns, 75 ) 76 ) 77 if columns 78 ] 79 80 with open(path, "w", encoding="utf-8") as file: 81 yaml.dump(schemas, file)
Create or replace a YAML file with model schemas.
Arguments:
- path: The path to store the YAML file.
- models: FQN to model
- adapter: The engine adapter.
- state_reader: The state reader.
- dialect: The dialect to serialize the schema as.
- max_workers: The max concurrent workers to fetch columns.