Edit on GitHub

sqlmesh.core.engine_adapter.base_postgres

  1from __future__ import annotations
  2
  3import typing as t
  4import logging
  5
  6from sqlglot import exp
  7
  8from sqlmesh.core.dialect import to_schema
  9from sqlmesh.core.engine_adapter.base import EngineAdapter, _get_data_object_cache_key
 10from sqlmesh.core.engine_adapter.shared import (
 11    CatalogSupport,
 12    CommentCreationTable,
 13    CommentCreationView,
 14    DataObject,
 15    DataObjectType,
 16)
 17from sqlmesh.utils.errors import SQLMeshError
 18
 19if t.TYPE_CHECKING:
 20    from sqlmesh.core._typing import SchemaName, TableName
 21    from sqlmesh.core.engine_adapter._typing import QueryOrDF
 22
 23
 24logger = logging.getLogger(__name__)
 25
 26
 27class BasePostgresEngineAdapter(EngineAdapter):
 28    DEFAULT_BATCH_SIZE = 400
 29    COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
 30    COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
 31    SUPPORTS_QUERY_EXECUTION_TRACKING = True
 32    SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]
 33
 34    def columns(
 35        self, table_name: TableName, include_pseudo_columns: bool = False
 36    ) -> t.Dict[str, exp.DataType]:
 37        """Fetches column names and types for the target table."""
 38        table = exp.to_table(table_name)
 39
 40        sql = (
 41            exp.select(
 42                "attname AS column_name",
 43                "pg_catalog.format_type(atttypid, atttypmod) AS data_type",
 44            )
 45            .from_("pg_catalog.pg_attribute")
 46            .join("pg_catalog.pg_class", on="pg_class.oid = attrelid")
 47            .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace")
 48            .where(
 49                exp.and_(
 50                    "attnum > 0",
 51                    "NOT attisdropped",
 52                    exp.column("relname").eq(table.alias_or_name),
 53                )
 54            )
 55        )
 56        if table.args.get("db"):
 57            sql = sql.where(exp.column("nspname").eq(table.args["db"].name))
 58
 59        self.execute(sql)
 60        resp = self.cursor.fetchall()
 61        if not resp:
 62            raise SQLMeshError(
 63                f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found."
 64            )
 65
 66        return {
 67            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
 68            for column_name, data_type in resp
 69        }
 70
 71    @property
 72    def catalog_support(self) -> CatalogSupport:
 73        return CatalogSupport.SINGLE_CATALOG_ONLY
 74
 75    def table_exists(self, table_name: TableName) -> bool:
 76        """
 77        Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table
 78        exists. We don't use this directly in order for this to work as a base class for other postgres
 79
 80        Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553
 81        """
 82        table = exp.to_table(table_name)
 83        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
 84        if data_object_cache_key in self._data_object_cache:
 85            logger.debug("Table existence cache hit: %s", data_object_cache_key)
 86            return self._data_object_cache[data_object_cache_key] is not None
 87
 88        sql = (
 89            exp.select("1")
 90            .from_("information_schema.tables")
 91            .where(f"table_name = '{table.alias_or_name}'")
 92        )
 93        database_name = table.db
 94        if database_name:
 95            sql = sql.where(f"table_schema = '{database_name}'")
 96
 97        self.execute(sql)
 98
 99        result = self.cursor.fetchone()
100
101        return result[0] == 1 if result is not None else False
102
103    def create_view(
104        self,
105        view_name: TableName,
106        query_or_df: QueryOrDF,
107        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
108        replace: bool = True,
109        materialized: bool = False,
110        materialized_properties: t.Optional[t.Dict[str, t.Any]] = None,
111        table_description: t.Optional[str] = None,
112        column_descriptions: t.Optional[t.Dict[str, str]] = None,
113        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
114        source_columns: t.Optional[t.List[str]] = None,
115        **create_kwargs: t.Any,
116    ) -> None:
117        """
118        Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter
119        of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it
120        to work around these constraints.
121
122        Reference: https://www.postgresql.org/docs/current/sql-createview.html
123        """
124        with self.transaction():
125            if replace:
126                self.drop_view(view_name, materialized=materialized)
127            super().create_view(
128                view_name,
129                query_or_df,
130                target_columns_to_types=target_columns_to_types,
131                replace=False,
132                materialized=materialized,
133                materialized_properties=materialized_properties,
134                table_description=table_description,
135                column_descriptions=column_descriptions,
136                view_properties=view_properties,
137                source_columns=source_columns,
138                **create_kwargs,
139            )
140
141    def drop_view(
142        self,
143        view_name: TableName,
144        ignore_if_not_exists: bool = True,
145        materialized: bool = False,
146        **kwargs: t.Any,
147    ) -> None:
148        kwargs["cascade"] = kwargs.get("cascade", True)
149        return super().drop_view(
150            view_name,
151            ignore_if_not_exists=ignore_if_not_exists,
152            materialized=materialized,
153            **kwargs,
154        )
155
156    def _get_data_objects(
157        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
158    ) -> t.List[DataObject]:
159        """
160        Returns all the data objects that exist in the given schema and optionally catalog.
161        """
162        catalog = self.get_current_catalog()
163        table_query = exp.select(
164            exp.column("schemaname").as_("schema_name"),
165            exp.column("tablename").as_("name"),
166            exp.Literal.string("TABLE").as_("type"),
167        ).from_("pg_tables")
168        view_query = exp.select(
169            exp.column("schemaname").as_("schema_name"),
170            exp.column("viewname").as_("name"),
171            exp.Literal.string("VIEW").as_("type"),
172        ).from_("pg_views")
173        materialized_view_query = exp.select(
174            exp.column("schemaname").as_("schema_name"),
175            exp.column("matviewname").as_("name"),
176            exp.Literal.string("MATERIALIZED_VIEW").as_("type"),
177        ).from_("pg_matviews")
178        subquery = exp.union(
179            table_query,
180            exp.union(view_query, materialized_view_query, distinct=False),
181            distinct=False,
182        )
183        query = (
184            exp.select("*")
185            .from_(subquery.subquery(alias="objs"))
186            .where(exp.column("schema_name").eq(to_schema(schema_name).db))
187        )
188        if object_names:
189            query = query.where(exp.column("name").isin(*object_names))
190        df = self.fetchdf(query)
191        return [
192            DataObject(
193                catalog=catalog,
194                schema=row.schema_name,
195                name=row.name,
196                type=DataObjectType.from_str(row.type),  # type: ignore
197            )
198            for row in df.itertuples()
199        ]
200
201    def _get_current_schema(self) -> str:
202        """Returns the current default schema for the connection."""
203        result = self.fetchone(exp.select(exp.func("current_schema")))
204        if result and result[0]:
205            return result[0]
206        return "public"
logger = <Logger sqlmesh.core.engine_adapter.base_postgres (WARNING)>
class BasePostgresEngineAdapter(sqlmesh.core.engine_adapter.base.EngineAdapter):
 28class BasePostgresEngineAdapter(EngineAdapter):
 29    DEFAULT_BATCH_SIZE = 400
 30    COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
 31    COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
 32    SUPPORTS_QUERY_EXECUTION_TRACKING = True
 33    SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]
 34
 35    def columns(
 36        self, table_name: TableName, include_pseudo_columns: bool = False
 37    ) -> t.Dict[str, exp.DataType]:
 38        """Fetches column names and types for the target table."""
 39        table = exp.to_table(table_name)
 40
 41        sql = (
 42            exp.select(
 43                "attname AS column_name",
 44                "pg_catalog.format_type(atttypid, atttypmod) AS data_type",
 45            )
 46            .from_("pg_catalog.pg_attribute")
 47            .join("pg_catalog.pg_class", on="pg_class.oid = attrelid")
 48            .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace")
 49            .where(
 50                exp.and_(
 51                    "attnum > 0",
 52                    "NOT attisdropped",
 53                    exp.column("relname").eq(table.alias_or_name),
 54                )
 55            )
 56        )
 57        if table.args.get("db"):
 58            sql = sql.where(exp.column("nspname").eq(table.args["db"].name))
 59
 60        self.execute(sql)
 61        resp = self.cursor.fetchall()
 62        if not resp:
 63            raise SQLMeshError(
 64                f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found."
 65            )
 66
 67        return {
 68            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
 69            for column_name, data_type in resp
 70        }
 71
 72    @property
 73    def catalog_support(self) -> CatalogSupport:
 74        return CatalogSupport.SINGLE_CATALOG_ONLY
 75
 76    def table_exists(self, table_name: TableName) -> bool:
 77        """
 78        Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table
 79        exists. We don't use this directly in order for this to work as a base class for other postgres
 80
 81        Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553
 82        """
 83        table = exp.to_table(table_name)
 84        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
 85        if data_object_cache_key in self._data_object_cache:
 86            logger.debug("Table existence cache hit: %s", data_object_cache_key)
 87            return self._data_object_cache[data_object_cache_key] is not None
 88
 89        sql = (
 90            exp.select("1")
 91            .from_("information_schema.tables")
 92            .where(f"table_name = '{table.alias_or_name}'")
 93        )
 94        database_name = table.db
 95        if database_name:
 96            sql = sql.where(f"table_schema = '{database_name}'")
 97
 98        self.execute(sql)
 99
100        result = self.cursor.fetchone()
101
102        return result[0] == 1 if result is not None else False
103
104    def create_view(
105        self,
106        view_name: TableName,
107        query_or_df: QueryOrDF,
108        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
109        replace: bool = True,
110        materialized: bool = False,
111        materialized_properties: t.Optional[t.Dict[str, t.Any]] = None,
112        table_description: t.Optional[str] = None,
113        column_descriptions: t.Optional[t.Dict[str, str]] = None,
114        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
115        source_columns: t.Optional[t.List[str]] = None,
116        **create_kwargs: t.Any,
117    ) -> None:
118        """
119        Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter
120        of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it
121        to work around these constraints.
122
123        Reference: https://www.postgresql.org/docs/current/sql-createview.html
124        """
125        with self.transaction():
126            if replace:
127                self.drop_view(view_name, materialized=materialized)
128            super().create_view(
129                view_name,
130                query_or_df,
131                target_columns_to_types=target_columns_to_types,
132                replace=False,
133                materialized=materialized,
134                materialized_properties=materialized_properties,
135                table_description=table_description,
136                column_descriptions=column_descriptions,
137                view_properties=view_properties,
138                source_columns=source_columns,
139                **create_kwargs,
140            )
141
142    def drop_view(
143        self,
144        view_name: TableName,
145        ignore_if_not_exists: bool = True,
146        materialized: bool = False,
147        **kwargs: t.Any,
148    ) -> None:
149        kwargs["cascade"] = kwargs.get("cascade", True)
150        return super().drop_view(
151            view_name,
152            ignore_if_not_exists=ignore_if_not_exists,
153            materialized=materialized,
154            **kwargs,
155        )
156
157    def _get_data_objects(
158        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
159    ) -> t.List[DataObject]:
160        """
161        Returns all the data objects that exist in the given schema and optionally catalog.
162        """
163        catalog = self.get_current_catalog()
164        table_query = exp.select(
165            exp.column("schemaname").as_("schema_name"),
166            exp.column("tablename").as_("name"),
167            exp.Literal.string("TABLE").as_("type"),
168        ).from_("pg_tables")
169        view_query = exp.select(
170            exp.column("schemaname").as_("schema_name"),
171            exp.column("viewname").as_("name"),
172            exp.Literal.string("VIEW").as_("type"),
173        ).from_("pg_views")
174        materialized_view_query = exp.select(
175            exp.column("schemaname").as_("schema_name"),
176            exp.column("matviewname").as_("name"),
177            exp.Literal.string("MATERIALIZED_VIEW").as_("type"),
178        ).from_("pg_matviews")
179        subquery = exp.union(
180            table_query,
181            exp.union(view_query, materialized_view_query, distinct=False),
182            distinct=False,
183        )
184        query = (
185            exp.select("*")
186            .from_(subquery.subquery(alias="objs"))
187            .where(exp.column("schema_name").eq(to_schema(schema_name).db))
188        )
189        if object_names:
190            query = query.where(exp.column("name").isin(*object_names))
191        df = self.fetchdf(query)
192        return [
193            DataObject(
194                catalog=catalog,
195                schema=row.schema_name,
196                name=row.name,
197                type=DataObjectType.from_str(row.type),  # type: ignore
198            )
199            for row in df.itertuples()
200        ]
201
202    def _get_current_schema(self) -> str:
203        """Returns the current default schema for the connection."""
204        result = self.fetchone(exp.select(exp.func("current_schema")))
205        if result and result[0]:
206            return result[0]
207        return "public"

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
DEFAULT_BATCH_SIZE = 400
COMMENT_CREATION_TABLE = <CommentCreationTable.COMMENT_COMMAND_ONLY: 4>
COMMENT_CREATION_VIEW = <CommentCreationView.COMMENT_COMMAND_ONLY: 4>
SUPPORTS_QUERY_EXECUTION_TRACKING = True
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ['SCHEMA', 'TABLE', 'VIEW']
def columns( self, table_name: Union[str, sqlglot.expressions.query.Table], include_pseudo_columns: bool = False) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
35    def columns(
36        self, table_name: TableName, include_pseudo_columns: bool = False
37    ) -> t.Dict[str, exp.DataType]:
38        """Fetches column names and types for the target table."""
39        table = exp.to_table(table_name)
40
41        sql = (
42            exp.select(
43                "attname AS column_name",
44                "pg_catalog.format_type(atttypid, atttypmod) AS data_type",
45            )
46            .from_("pg_catalog.pg_attribute")
47            .join("pg_catalog.pg_class", on="pg_class.oid = attrelid")
48            .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace")
49            .where(
50                exp.and_(
51                    "attnum > 0",
52                    "NOT attisdropped",
53                    exp.column("relname").eq(table.alias_or_name),
54                )
55            )
56        )
57        if table.args.get("db"):
58            sql = sql.where(exp.column("nspname").eq(table.args["db"].name))
59
60        self.execute(sql)
61        resp = self.cursor.fetchall()
62        if not resp:
63            raise SQLMeshError(
64                f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found."
65            )
66
67        return {
68            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
69            for column_name, data_type in resp
70        }

Fetches column names and types for the target table.

72    @property
73    def catalog_support(self) -> CatalogSupport:
74        return CatalogSupport.SINGLE_CATALOG_ONLY
def table_exists(self, table_name: Union[str, sqlglot.expressions.query.Table]) -> bool:
 76    def table_exists(self, table_name: TableName) -> bool:
 77        """
 78        Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table
 79        exists. We don't use this directly in order for this to work as a base class for other postgres
 80
 81        Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553
 82        """
 83        table = exp.to_table(table_name)
 84        data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
 85        if data_object_cache_key in self._data_object_cache:
 86            logger.debug("Table existence cache hit: %s", data_object_cache_key)
 87            return self._data_object_cache[data_object_cache_key] is not None
 88
 89        sql = (
 90            exp.select("1")
 91            .from_("information_schema.tables")
 92            .where(f"table_name = '{table.alias_or_name}'")
 93        )
 94        database_name = table.db
 95        if database_name:
 96            sql = sql.where(f"table_schema = '{database_name}'")
 97
 98        self.execute(sql)
 99
100        result = self.cursor.fetchone()
101
102        return result[0] == 1 if result is not None else False

Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table exists. We don't use this directly in order for this to work as a base class for other postgres

Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553

def create_view( self, view_name: Union[str, sqlglot.expressions.query.Table], query_or_df: <MagicMock id='132726898885728'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]] = None, replace: bool = True, materialized: bool = False, materialized_properties: Optional[Dict[str, Any]] = None, table_description: Optional[str] = None, column_descriptions: Optional[Dict[str, str]] = None, view_properties: Optional[Dict[str, sqlglot.expressions.core.Expr]] = None, source_columns: Optional[List[str]] = None, **create_kwargs: Any) -> None:
104    def create_view(
105        self,
106        view_name: TableName,
107        query_or_df: QueryOrDF,
108        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
109        replace: bool = True,
110        materialized: bool = False,
111        materialized_properties: t.Optional[t.Dict[str, t.Any]] = None,
112        table_description: t.Optional[str] = None,
113        column_descriptions: t.Optional[t.Dict[str, str]] = None,
114        view_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
115        source_columns: t.Optional[t.List[str]] = None,
116        **create_kwargs: t.Any,
117    ) -> None:
118        """
119        Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter
120        of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it
121        to work around these constraints.
122
123        Reference: https://www.postgresql.org/docs/current/sql-createview.html
124        """
125        with self.transaction():
126            if replace:
127                self.drop_view(view_name, materialized=materialized)
128            super().create_view(
129                view_name,
130                query_or_df,
131                target_columns_to_types=target_columns_to_types,
132                replace=False,
133                materialized=materialized,
134                materialized_properties=materialized_properties,
135                table_description=table_description,
136                column_descriptions=column_descriptions,
137                view_properties=view_properties,
138                source_columns=source_columns,
139                **create_kwargs,
140            )

Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it to work around these constraints.

Reference: https://www.postgresql.org/docs/current/sql-createview.html

def drop_view( self, view_name: Union[str, sqlglot.expressions.query.Table], ignore_if_not_exists: bool = True, materialized: bool = False, **kwargs: Any) -> None:
142    def drop_view(
143        self,
144        view_name: TableName,
145        ignore_if_not_exists: bool = True,
146        materialized: bool = False,
147        **kwargs: t.Any,
148    ) -> None:
149        kwargs["cascade"] = kwargs.get("cascade", True)
150        return super().drop_view(
151            view_name,
152            ignore_if_not_exists=ignore_if_not_exists,
153            materialized=materialized,
154            **kwargs,
155        )

Drop a view.

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DIALECT
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
SUPPORTS_INDEXES
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SCHEMA_DIFFER_KWARGS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_REPLACE_TABLE
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
MAX_IDENTIFIER_LENGTH
ATTACH_CORRELATION_ID
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
schema_differ
default_catalog
engine_run_mode
recycle
close
get_current_catalog
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_schema
drop_schema
create_catalog
drop_catalog
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
merge
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts