Edit on GitHub

sqlmesh.core.engine_adapter

 1from __future__ import annotations
 2
 3import typing as t
 4
 5from sqlmesh.core.engine_adapter.base import (
 6    EngineAdapter,
 7    EngineAdapterWithIndexSupport,
 8)
 9from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter
10from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter
11from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
12from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
13from sqlmesh.core.engine_adapter.mysql import MySQLEngineAdapter
14from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter
15from sqlmesh.core.engine_adapter.redshift import RedshiftEngineAdapter
16from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter
17from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
18from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
19
20DIALECT_TO_ENGINE_ADAPTER = {
21    "spark": SparkEngineAdapter,
22    "bigquery": BigQueryEngineAdapter,
23    "duckdb": DuckDBEngineAdapter,
24    "snowflake": SnowflakeEngineAdapter,
25    "databricks": DatabricksEngineAdapter,
26    "redshift": RedshiftEngineAdapter,
27    "postgres": PostgresEngineAdapter,
28    "mysql": MySQLEngineAdapter,
29    "mssql": MSSQLEngineAdapter,
30    "trino": TrinoEngineAdapter,
31}
32
33DIALECT_ALIASES = {
34    "postgresql": "postgres",
35}
36
37
38def create_engine_adapter(
39    connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any
40) -> EngineAdapter:
41    dialect = dialect.lower()
42    dialect = DIALECT_ALIASES.get(dialect, dialect)
43    engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect)
44    if engine_adapter is None:
45        return EngineAdapter(connection_factory, dialect, **kwargs)
46    if engine_adapter is EngineAdapterWithIndexSupport:
47        return EngineAdapterWithIndexSupport(
48            connection_factory,
49            dialect,
50            **kwargs,
51        )
52    return engine_adapter(connection_factory, **kwargs)
def create_engine_adapter( connection_factory: Callable[[], Any], dialect: str, **kwargs: Any) -> sqlmesh.core.engine_adapter.base.EngineAdapter:
39def create_engine_adapter(
40    connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any
41) -> EngineAdapter:
42    dialect = dialect.lower()
43    dialect = DIALECT_ALIASES.get(dialect, dialect)
44    engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect)
45    if engine_adapter is None:
46        return EngineAdapter(connection_factory, dialect, **kwargs)
47    if engine_adapter is EngineAdapterWithIndexSupport:
48        return EngineAdapterWithIndexSupport(
49            connection_factory,
50            dialect,
51            **kwargs,
52        )
53    return engine_adapter(connection_factory, **kwargs)