Edit on GitHub

sqlmesh.core.engine_adapter.postgres

  1from __future__ import annotations
  2
  3import logging
  4import re
  5import typing as t
  6from functools import cached_property, partial
  7from sqlglot import exp
  8
  9from sqlmesh.core.engine_adapter.base_postgres import BasePostgresEngineAdapter
 10from sqlmesh.core.engine_adapter.mixins import (
 11    GetCurrentCatalogFromFunctionMixin,
 12    PandasNativeFetchDFSupportMixin,
 13    RowDiffMixin,
 14    logical_merge,
 15    GrantsFromInfoSchemaMixin,
 16)
 17from sqlmesh.core.engine_adapter.shared import set_catalog
 18
 19if t.TYPE_CHECKING:
 20    from sqlmesh.core._typing import TableName
 21    from sqlmesh.core.engine_adapter._typing import DF, QueryOrDF
 22
 23logger = logging.getLogger(__name__)
 24
 25
 26@set_catalog()
 27class PostgresEngineAdapter(
 28    BasePostgresEngineAdapter,
 29    PandasNativeFetchDFSupportMixin,
 30    GetCurrentCatalogFromFunctionMixin,
 31    RowDiffMixin,
 32    GrantsFromInfoSchemaMixin,
 33):
 34    DIALECT = "postgres"
 35    SUPPORTS_GRANTS = True
 36    SUPPORTS_INDEXES = True
 37    HAS_VIEW_BINDING = True
 38    CURRENT_CATALOG_EXPRESSION = exp.column("current_catalog")
 39    SUPPORTS_REPLACE_TABLE = False
 40    MAX_IDENTIFIER_LENGTH: t.Optional[int] = 63
 41    SUPPORTS_QUERY_EXECUTION_TRACKING = True
 42    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "role_table_grants"
 43    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.column("current_role")
 44    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
 45    SCHEMA_DIFFER_KWARGS = {
 46        "parameterized_type_defaults": {
 47            # DECIMAL without precision is "up to 131072 digits before the decimal point; up to 16383 digits after the decimal point"
 48            exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(131072 + 16383, 16383), (0,)],
 49            exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)],
 50            exp.DataType.build("TIME", dialect=DIALECT).this: [(6,)],
 51            exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(6,)],
 52        },
 53        "types_with_unlimited_length": {
 54            # all can ALTER to `TEXT`
 55            exp.DataType.build("TEXT", dialect=DIALECT).this: {
 56                exp.DataType.build("VARCHAR", dialect=DIALECT).this,
 57                exp.DataType.build("CHAR", dialect=DIALECT).this,
 58                exp.DataType.build("BPCHAR", dialect=DIALECT).this,
 59            },
 60            # all can ALTER to unparameterized `VARCHAR`
 61            exp.DataType.build("VARCHAR", dialect=DIALECT).this: {
 62                exp.DataType.build("VARCHAR", dialect=DIALECT).this,
 63                exp.DataType.build("CHAR", dialect=DIALECT).this,
 64                exp.DataType.build("BPCHAR", dialect=DIALECT).this,
 65                exp.DataType.build("TEXT", dialect=DIALECT).this,
 66            },
 67            # parameterized `BPCHAR(n)` can ALTER to unparameterized `BPCHAR`
 68            exp.DataType.build("BPCHAR", dialect=DIALECT).this: {
 69                exp.DataType.build("BPCHAR", dialect=DIALECT).this
 70            },
 71        },
 72        "drop_cascade": True,
 73    }
 74
 75    def _fetch_native_df(
 76        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 77    ) -> DF:
 78        """
 79        `read_sql_query` when using psycopg will result on a hanging transaction that must be committed
 80
 81        https://github.com/pandas-dev/pandas/pull/42277
 82        """
 83        df = super()._fetch_native_df(query, quote_identifiers)
 84        if not self._connection_pool.is_transaction_active:
 85            self._connection_pool.commit()
 86        return df
 87
 88    def _create_table_like(
 89        self,
 90        target_table_name: TableName,
 91        source_table_name: TableName,
 92        exists: bool,
 93        **kwargs: t.Any,
 94    ) -> None:
 95        self.execute(
 96            exp.Create(
 97                this=exp.Schema(
 98                    this=exp.to_table(target_table_name),
 99                    expressions=[
100                        exp.LikeProperty(
101                            this=exp.to_table(source_table_name),
102                            expressions=[exp.Property(this="INCLUDING", value=exp.Var(this="ALL"))],
103                        )
104                    ],
105                ),
106                kind="TABLE",
107                exists=exists,
108            )
109        )
110
111    def merge(
112        self,
113        target_table: TableName,
114        source_table: QueryOrDF,
115        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
116        unique_key: t.Sequence[exp.Expr],
117        when_matched: t.Optional[exp.Whens] = None,
118        merge_filter: t.Optional[exp.Expr] = None,
119        source_columns: t.Optional[t.List[str]] = None,
120        **kwargs: t.Any,
121    ) -> None:
122        # Merge isn't supported until Postgres 15
123        major, minor = self.server_version
124        merge_impl = super().merge if major >= 15 else partial(logical_merge, self)
125        merge_impl(  # type: ignore
126            target_table,
127            source_table,
128            target_columns_to_types,
129            unique_key,
130            when_matched=when_matched,
131            merge_filter=merge_filter,
132            source_columns=source_columns,
133        )
134
135    @cached_property
136    def server_version(self) -> t.Tuple[int, int]:
137        """Lazily fetch and cache major and minor server version"""
138        if result := self.fetchone("SHOW server_version"):
139            server_version, *_ = result
140            match = re.search(r"(\d+)\.(\d+)", server_version)
141            if match:
142                return int(match.group(1)), int(match.group(2))
143        return 0, 0
logger = <Logger sqlmesh.core.engine_adapter.postgres (WARNING)>
 27@set_catalog()
 28class PostgresEngineAdapter(
 29    BasePostgresEngineAdapter,
 30    PandasNativeFetchDFSupportMixin,
 31    GetCurrentCatalogFromFunctionMixin,
 32    RowDiffMixin,
 33    GrantsFromInfoSchemaMixin,
 34):
 35    DIALECT = "postgres"
 36    SUPPORTS_GRANTS = True
 37    SUPPORTS_INDEXES = True
 38    HAS_VIEW_BINDING = True
 39    CURRENT_CATALOG_EXPRESSION = exp.column("current_catalog")
 40    SUPPORTS_REPLACE_TABLE = False
 41    MAX_IDENTIFIER_LENGTH: t.Optional[int] = 63
 42    SUPPORTS_QUERY_EXECUTION_TRACKING = True
 43    GRANT_INFORMATION_SCHEMA_TABLE_NAME = "role_table_grants"
 44    CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.column("current_role")
 45    SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
 46    SCHEMA_DIFFER_KWARGS = {
 47        "parameterized_type_defaults": {
 48            # DECIMAL without precision is "up to 131072 digits before the decimal point; up to 16383 digits after the decimal point"
 49            exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(131072 + 16383, 16383), (0,)],
 50            exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)],
 51            exp.DataType.build("TIME", dialect=DIALECT).this: [(6,)],
 52            exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(6,)],
 53        },
 54        "types_with_unlimited_length": {
 55            # all can ALTER to `TEXT`
 56            exp.DataType.build("TEXT", dialect=DIALECT).this: {
 57                exp.DataType.build("VARCHAR", dialect=DIALECT).this,
 58                exp.DataType.build("CHAR", dialect=DIALECT).this,
 59                exp.DataType.build("BPCHAR", dialect=DIALECT).this,
 60            },
 61            # all can ALTER to unparameterized `VARCHAR`
 62            exp.DataType.build("VARCHAR", dialect=DIALECT).this: {
 63                exp.DataType.build("VARCHAR", dialect=DIALECT).this,
 64                exp.DataType.build("CHAR", dialect=DIALECT).this,
 65                exp.DataType.build("BPCHAR", dialect=DIALECT).this,
 66                exp.DataType.build("TEXT", dialect=DIALECT).this,
 67            },
 68            # parameterized `BPCHAR(n)` can ALTER to unparameterized `BPCHAR`
 69            exp.DataType.build("BPCHAR", dialect=DIALECT).this: {
 70                exp.DataType.build("BPCHAR", dialect=DIALECT).this
 71            },
 72        },
 73        "drop_cascade": True,
 74    }
 75
 76    def _fetch_native_df(
 77        self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False
 78    ) -> DF:
 79        """
 80        `read_sql_query` when using psycopg will result on a hanging transaction that must be committed
 81
 82        https://github.com/pandas-dev/pandas/pull/42277
 83        """
 84        df = super()._fetch_native_df(query, quote_identifiers)
 85        if not self._connection_pool.is_transaction_active:
 86            self._connection_pool.commit()
 87        return df
 88
 89    def _create_table_like(
 90        self,
 91        target_table_name: TableName,
 92        source_table_name: TableName,
 93        exists: bool,
 94        **kwargs: t.Any,
 95    ) -> None:
 96        self.execute(
 97            exp.Create(
 98                this=exp.Schema(
 99                    this=exp.to_table(target_table_name),
100                    expressions=[
101                        exp.LikeProperty(
102                            this=exp.to_table(source_table_name),
103                            expressions=[exp.Property(this="INCLUDING", value=exp.Var(this="ALL"))],
104                        )
105                    ],
106                ),
107                kind="TABLE",
108                exists=exists,
109            )
110        )
111
112    def merge(
113        self,
114        target_table: TableName,
115        source_table: QueryOrDF,
116        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
117        unique_key: t.Sequence[exp.Expr],
118        when_matched: t.Optional[exp.Whens] = None,
119        merge_filter: t.Optional[exp.Expr] = None,
120        source_columns: t.Optional[t.List[str]] = None,
121        **kwargs: t.Any,
122    ) -> None:
123        # Merge isn't supported until Postgres 15
124        major, minor = self.server_version
125        merge_impl = super().merge if major >= 15 else partial(logical_merge, self)
126        merge_impl(  # type: ignore
127            target_table,
128            source_table,
129            target_columns_to_types,
130            unique_key,
131            when_matched=when_matched,
132            merge_filter=merge_filter,
133            source_columns=source_columns,
134        )
135
136    @cached_property
137    def server_version(self) -> t.Tuple[int, int]:
138        """Lazily fetch and cache major and minor server version"""
139        if result := self.fetchone("SHOW server_version"):
140            server_version, *_ = result
141            match = re.search(r"(\d+)\.(\d+)", server_version)
142            if match:
143                return int(match.group(1)), int(match.group(2))
144        return 0, 0

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
DIALECT = 'postgres'
SUPPORTS_GRANTS = True
SUPPORTS_INDEXES = True
HAS_VIEW_BINDING = True
CURRENT_CATALOG_EXPRESSION = Column( this=Identifier(this=current_catalog, quoted=False))
SUPPORTS_REPLACE_TABLE = False
MAX_IDENTIFIER_LENGTH: Optional[int] = 63
SUPPORTS_QUERY_EXECUTION_TRACKING = True
GRANT_INFORMATION_SCHEMA_TABLE_NAME = 'role_table_grants'
CURRENT_USER_OR_ROLE_EXPRESSION: sqlglot.expressions.core.Expr = Column( this=Identifier(this=current_role, quoted=False))
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
SCHEMA_DIFFER_KWARGS = {'parameterized_type_defaults': {<DType.DECIMAL: 'DECIMAL'>: [(147455, 16383), (0,)], <DType.CHAR: 'CHAR'>: [(1,)], <DType.TIME: 'TIME'>: [(6,)], <DType.TIMESTAMP: 'TIMESTAMP'>: [(6,)]}, 'types_with_unlimited_length': {<DType.TEXT: 'TEXT'>: {<DType.CHAR: 'CHAR'>, <DType.BPCHAR: 'BPCHAR'>, <DType.VARCHAR: 'VARCHAR'>}, <DType.VARCHAR: 'VARCHAR'>: {<DType.CHAR: 'CHAR'>, <DType.TEXT: 'TEXT'>, <DType.BPCHAR: 'BPCHAR'>, <DType.VARCHAR: 'VARCHAR'>}, <DType.BPCHAR: 'BPCHAR'>: {<DType.BPCHAR: 'BPCHAR'>}}, 'drop_cascade': True}
def merge( self, target_table: Union[str, sqlglot.expressions.query.Table], source_table: <MagicMock id='132726889869376'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]], unique_key: Sequence[sqlglot.expressions.core.Expr], when_matched: Optional[sqlglot.expressions.dml.Whens] = None, merge_filter: Optional[sqlglot.expressions.core.Expr] = None, source_columns: Optional[List[str]] = None, **kwargs: Any) -> None:
112    def merge(
113        self,
114        target_table: TableName,
115        source_table: QueryOrDF,
116        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
117        unique_key: t.Sequence[exp.Expr],
118        when_matched: t.Optional[exp.Whens] = None,
119        merge_filter: t.Optional[exp.Expr] = None,
120        source_columns: t.Optional[t.List[str]] = None,
121        **kwargs: t.Any,
122    ) -> None:
123        # Merge isn't supported until Postgres 15
124        major, minor = self.server_version
125        merge_impl = super().merge if major >= 15 else partial(logical_merge, self)
126        merge_impl(  # type: ignore
127            target_table,
128            source_table,
129            target_columns_to_types,
130            unique_key,
131            when_matched=when_matched,
132            merge_filter=merge_filter,
133            source_columns=source_columns,
134        )
server_version: Tuple[int, int]
136    @cached_property
137    def server_version(self) -> t.Tuple[int, int]:
138        """Lazily fetch and cache major and minor server version"""
139        if result := self.fetchone("SHOW server_version"):
140            server_version, *_ = result
141            match = re.search(r"(\d+)\.(\d+)", server_version)
142            if match:
143                return int(match.group(1)), int(match.group(2))
144        return 0, 0

Lazily fetch and cache major and minor server version

def columns( self, table_name: Union[str, sqlglot.expressions.query.Table], include_pseudo_columns: bool = False) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
35    def columns(
36        self, table_name: TableName, include_pseudo_columns: bool = False
37    ) -> t.Dict[str, exp.DataType]:
38        """Fetches column names and types for the target table."""
39        table = exp.to_table(table_name)
40
41        sql = (
42            exp.select(
43                "attname AS column_name",
44                "pg_catalog.format_type(atttypid, atttypmod) AS data_type",
45            )
46            .from_("pg_catalog.pg_attribute")
47            .join("pg_catalog.pg_class", on="pg_class.oid = attrelid")
48            .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace")
49            .where(
50                exp.and_(
51                    "attnum > 0",
52                    "NOT attisdropped",
53                    exp.column("relname").eq(table.alias_or_name),
54                )
55            )
56        )
57        if table.args.get("db"):
58            sql = sql.where(exp.column("nspname").eq(table.args["db"].name))
59
60        self.execute(sql)
61        resp = self.cursor.fetchall()
62        if not resp:
63            raise SQLMeshError(
64                f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found."
65            )
66
67        return {
68            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
69            for column_name, data_type in resp
70        }

Fetches column names and types for the target table.

def table_exists(self, table_name: Union[str, sqlglot.expressions.query.Table]) -> bool:
 76    def table_exists(self, table_name: TableName) -> bool:
 77        """
 78        Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table
 79        exists. We don't use this directly in order for this to work as a base class for other postgres
 80
 81        Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553
 82        """
 83        table = exp.to_table(table_name)
 84        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
 85        if data_object_cache_key in self._data_object_cache:
 86            logger.debug("Table existence cache hit: %s", data_object_cache_key)
 87            return self._data_object_cache[data_object_cache_key] is not None
 88
 89        sql = (
 90            exp.select("1")
 91            .from_("information_schema.tables")
 92            .where(f"table_name = '{table.alias_or_name}'")
 93        )
 94        database_name = table.db
 95        if database_name:
 96            sql = sql.where(f"table_schema = '{database_name}'")
 97
 98        self.execute(sql)
 99
100        result = self.cursor.fetchone()
101
102        return result[0] == 1 if result is not None else False

Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table exists. We don't use this directly in order for this to work as a base class for other postgres

Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553

def create_view( self, view_name: Union[str, sqlglot.expressions.query.Table], query_or_df: <MagicMock id='132726898885728'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]] = None, replace: bool = True, materialized: bool = False, materialized_properties: Optional[Dict[str, Any]] = None, table_description: Optional[str] = None, column_descriptions: Optional[Dict[str, str]] = None, view_properties: Optional[Dict[str, sqlglot.expressions.core.Expr]] = None, source_columns: Optional[List[str]] = None, **create_kwargs: Any) -> None:
104    def create_view(
105        self,
106        view_name: TableName,
107        query_or_df: QueryOrDF,
108        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
109        replace: bool = True,
110        materialized: bool = False,
111        materialized_properties: t.Optional[t.Dict[str, t.Any]] = None,
112        table_description: t.Optional[str] = None,
113        column_descriptions: t.Optional[t.Dict[str, str]] = None,
114        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
115        source_columns: t.Optional[t.List[str]] = None,
116        **create_kwargs: t.Any,
117    ) -> None:
118        """
119        Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter
120        of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it
121        to work around these constraints.
122
123        Reference: https://www.postgresql.org/docs/current/sql-createview.html
124        """
125        with self.transaction():
126            if replace:
127                self.drop_view(view_name, materialized=materialized)
128            super().create_view(
129                view_name,
130                query_or_df,
131                target_columns_to_types=target_columns_to_types,
132                replace=False,
133                materialized=materialized,
134                materialized_properties=materialized_properties,
135                table_description=table_description,
136                column_descriptions=column_descriptions,
137                view_properties=view_properties,
138                source_columns=source_columns,
139                **create_kwargs,
140            )

Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it to work around these constraints.

Reference: https://www.postgresql.org/docs/current/sql-createview.html

def drop_view( self, view_name: Union[str, sqlglot.expressions.query.Table], ignore_if_not_exists: bool = True, materialized: bool = False, **kwargs: Any) -> None:
142    def drop_view(
143        self,
144        view_name: TableName,
145        ignore_if_not_exists: bool = True,
146        materialized: bool = False,
147        **kwargs: t.Any,
148    ) -> None:
149        kwargs["cascade"] = kwargs.get("cascade", True)
150        return super().drop_view(
151            view_name,
152            ignore_if_not_exists=ignore_if_not_exists,
153            materialized=materialized,
154            **kwargs,
155        )

Drop a view.

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTS_TUPLE_IN
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
ATTACH_CORRELATION_ID
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
schema_differ
default_catalog
engine_run_mode
recycle
close
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_schema
drop_schema
create_catalog
drop_catalog
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
sqlmesh.core.engine_adapter.base_postgres.BasePostgresEngineAdapter
DEFAULT_BATCH_SIZE
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
catalog_support
sqlmesh.core.engine_adapter.mixins.GetCurrentCatalogFromFunctionMixin
get_current_catalog
sqlmesh.core.engine_adapter.mixins.RowDiffMixin
MAX_TIMESTAMP_PRECISION
concat_columns
normalize_value
sqlmesh.core.engine_adapter.mixins.GrantsFromInfoSchemaMixin
USE_CATALOG_IN_GRANTS