sqlmesh.core.engine_adapter
1from __future__ import annotations 2 3import typing as t 4 5from sqlmesh.core.engine_adapter.base import ( 6 EngineAdapter, 7 EngineAdapterWithIndexSupport, 8) 9from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter 10from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter 11from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter 12from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter 13from sqlmesh.core.engine_adapter.mysql import MySQLEngineAdapter 14from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter 15from sqlmesh.core.engine_adapter.redshift import RedshiftEngineAdapter 16from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter 17from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter 18from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter 19 20DIALECT_TO_ENGINE_ADAPTER = { 21 "spark": SparkEngineAdapter, 22 "bigquery": BigQueryEngineAdapter, 23 "duckdb": DuckDBEngineAdapter, 24 "snowflake": SnowflakeEngineAdapter, 25 "databricks": DatabricksEngineAdapter, 26 "redshift": RedshiftEngineAdapter, 27 "postgres": PostgresEngineAdapter, 28 "mysql": MySQLEngineAdapter, 29 "mssql": MSSQLEngineAdapter, 30 "trino": TrinoEngineAdapter, 31} 32 33DIALECT_ALIASES = { 34 "postgresql": "postgres", 35} 36 37 38def create_engine_adapter( 39 connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any 40) -> EngineAdapter: 41 dialect = dialect.lower() 42 dialect = DIALECT_ALIASES.get(dialect, dialect) 43 engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect) 44 if engine_adapter is None: 45 return EngineAdapter(connection_factory, dialect, **kwargs) 46 if engine_adapter is EngineAdapterWithIndexSupport: 47 return EngineAdapterWithIndexSupport( 48 connection_factory, 49 dialect, 50 **kwargs, 51 ) 52 return engine_adapter(connection_factory, **kwargs)
def
create_engine_adapter( connection_factory: Callable[[], Any], dialect: str, **kwargs: Any) -> sqlmesh.core.engine_adapter.base.EngineAdapter:
39def create_engine_adapter( 40 connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any 41) -> EngineAdapter: 42 dialect = dialect.lower() 43 dialect = DIALECT_ALIASES.get(dialect, dialect) 44 engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect) 45 if engine_adapter is None: 46 return EngineAdapter(connection_factory, dialect, **kwargs) 47 if engine_adapter is EngineAdapterWithIndexSupport: 48 return EngineAdapterWithIndexSupport( 49 connection_factory, 50 dialect, 51 **kwargs, 52 ) 53 return engine_adapter(connection_factory, **kwargs)