sqlmesh.core.schema_loader
1from __future__ import annotations 2 3import typing as t 4from concurrent.futures import ThreadPoolExecutor 5from pathlib import Path 6 7from sqlglot import exp 8from sqlglot.dialects.dialect import DialectType 9 10from sqlmesh.core.console import get_console 11from sqlmesh.core.engine_adapter import EngineAdapter 12from sqlmesh.core.model.definition import Model 13from sqlmesh.core.state_sync import StateReader 14from sqlmesh.utils import UniqueKeyDict, yaml 15from sqlmesh.utils.errors import SQLMeshError 16 17 18def create_external_models_file( 19 path: Path, 20 models: UniqueKeyDict[str, Model], 21 adapter: EngineAdapter, 22 state_reader: StateReader, 23 dialect: DialectType, 24 gateway: t.Optional[str] = None, 25 max_workers: int = 1, 26 strict: bool = False, 27) -> None: 28 """Create or replace a YAML file with column and types of all columns in all external models. 29 30 Args: 31 path: The path to store the YAML file. 32 models: FQN to model 33 adapter: The engine adapter. 34 state_reader: The state reader. 35 dialect: The dialect to serialize the schema as. 36 gateway: If the model should be associated with a specific gateway; the gateway key 37 max_workers: The max concurrent workers to fetch columns. 38 strict: If True, raise an error if the external model is missing in the database. 39 """ 40 external_model_fqns = set() 41 42 for fqn, model in models.items(): 43 if model.kind.is_external: 44 external_model_fqns.add(fqn) 45 for dep in model.depends_on: 46 if dep not in models: 47 external_model_fqns.add(dep) 48 49 # Make sure we don't convert internal models into external ones. 50 existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True) 51 if existing_model_fqns: 52 existing_model_fqns_str = ", ".join(existing_model_fqns) 53 get_console().log_warning( 54 f"The following models already exist and can't be converted to external: {existing_model_fqns_str}. " 55 "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly." 56 ) 57 external_model_fqns -= existing_model_fqns 58 59 with ThreadPoolExecutor(max_workers=max_workers) as pool: 60 gateway_part = {"gateway": gateway} if gateway else {} 61 62 schemas = [ 63 { 64 "name": exp.to_table(table).sql(dialect=dialect), 65 "columns": columns, 66 **gateway_part, 67 } 68 for table, columns in sorted( 69 pool.map( 70 lambda table: (table, get_columns(adapter, dialect, table, strict)), 71 external_model_fqns, 72 ) 73 ) 74 if columns 75 ] 76 77 # dont clobber existing entries from other gateways 78 entries_to_keep = ( 79 [e for e in yaml.load(path) if e.get("gateway", None) != gateway] 80 if path.exists() 81 else [] 82 ) 83 84 with open(path, "w", encoding="utf-8") as file: 85 yaml.dump(entries_to_keep + schemas, file) 86 87 88def get_columns( 89 adapter: EngineAdapter, dialect: DialectType, table: str, strict: bool 90) -> t.Optional[t.Dict[str, t.Any]]: 91 """ 92 Return the column and their types in a dictionary 93 """ 94 try: 95 columns = adapter.columns(table, include_pseudo_columns=True) 96 return {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()} 97 except Exception as e: 98 msg = f"Unable to get schema for '{table}': '{e}'." 99 if strict: 100 raise SQLMeshError(msg) from e 101 get_console().log_warning(msg) 102 return None
def
create_external_models_file( path: pathlib.Path, models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, state_reader: sqlmesh.core.state_sync.base.StateReader, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], gateway: Optional[str] = None, max_workers: int = 1, strict: bool = False) -> None:
19def create_external_models_file( 20 path: Path, 21 models: UniqueKeyDict[str, Model], 22 adapter: EngineAdapter, 23 state_reader: StateReader, 24 dialect: DialectType, 25 gateway: t.Optional[str] = None, 26 max_workers: int = 1, 27 strict: bool = False, 28) -> None: 29 """Create or replace a YAML file with column and types of all columns in all external models. 30 31 Args: 32 path: The path to store the YAML file. 33 models: FQN to model 34 adapter: The engine adapter. 35 state_reader: The state reader. 36 dialect: The dialect to serialize the schema as. 37 gateway: If the model should be associated with a specific gateway; the gateway key 38 max_workers: The max concurrent workers to fetch columns. 39 strict: If True, raise an error if the external model is missing in the database. 40 """ 41 external_model_fqns = set() 42 43 for fqn, model in models.items(): 44 if model.kind.is_external: 45 external_model_fqns.add(fqn) 46 for dep in model.depends_on: 47 if dep not in models: 48 external_model_fqns.add(dep) 49 50 # Make sure we don't convert internal models into external ones. 51 existing_model_fqns = state_reader.nodes_exist(external_model_fqns, exclude_external=True) 52 if existing_model_fqns: 53 existing_model_fqns_str = ", ".join(existing_model_fqns) 54 get_console().log_warning( 55 f"The following models already exist and can't be converted to external: {existing_model_fqns_str}. " 56 "Perhaps these models have been removed, while downstream models that reference them weren't updated accordingly." 57 ) 58 external_model_fqns -= existing_model_fqns 59 60 with ThreadPoolExecutor(max_workers=max_workers) as pool: 61 gateway_part = {"gateway": gateway} if gateway else {} 62 63 schemas = [ 64 { 65 "name": exp.to_table(table).sql(dialect=dialect), 66 "columns": columns, 67 **gateway_part, 68 } 69 for table, columns in sorted( 70 pool.map( 71 lambda table: (table, get_columns(adapter, dialect, table, strict)), 72 external_model_fqns, 73 ) 74 ) 75 if columns 76 ] 77 78 # dont clobber existing entries from other gateways 79 entries_to_keep = ( 80 [e for e in yaml.load(path) if e.get("gateway", None) != gateway] 81 if path.exists() 82 else [] 83 ) 84 85 with open(path, "w", encoding="utf-8") as file: 86 yaml.dump(entries_to_keep + schemas, file)
Create or replace a YAML file with column and types of all columns in all external models.
Arguments:
- path: The path to store the YAML file.
- models: FQN to model
- adapter: The engine adapter.
- state_reader: The state reader.
- dialect: The dialect to serialize the schema as.
- gateway: If the model should be associated with a specific gateway; the gateway key
- max_workers: The max concurrent workers to fetch columns.
- strict: If True, raise an error if the external model is missing in the database.
def
get_columns( adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], table: str, strict: bool) -> Optional[Dict[str, Any]]:
89def get_columns( 90 adapter: EngineAdapter, dialect: DialectType, table: str, strict: bool 91) -> t.Optional[t.Dict[str, t.Any]]: 92 """ 93 Return the column and their types in a dictionary 94 """ 95 try: 96 columns = adapter.columns(table, include_pseudo_columns=True) 97 return {c: dtype.sql(dialect=dialect) for c, dtype in columns.items()} 98 except Exception as e: 99 msg = f"Unable to get schema for '{table}': '{e}'." 100 if strict: 101 raise SQLMeshError(msg) from e 102 get_console().log_warning(msg) 103 return None
Return the column and their types in a dictionary