sqlmesh.core.engine_adapter.postgres
1from __future__ import annotations 2 3import logging 4import re 5import typing as t 6from functools import cached_property, partial 7from sqlglot import exp 8 9from sqlmesh.core.engine_adapter.base_postgres import BasePostgresEngineAdapter 10from sqlmesh.core.engine_adapter.mixins import ( 11 GetCurrentCatalogFromFunctionMixin, 12 PandasNativeFetchDFSupportMixin, 13 RowDiffMixin, 14 logical_merge, 15 GrantsFromInfoSchemaMixin, 16) 17from sqlmesh.core.engine_adapter.shared import set_catalog 18 19if t.TYPE_CHECKING: 20 from sqlmesh.core._typing import TableName 21 from sqlmesh.core.engine_adapter._typing import DF, QueryOrDF 22 23logger = logging.getLogger(__name__) 24 25 26@set_catalog() 27class PostgresEngineAdapter( 28 BasePostgresEngineAdapter, 29 PandasNativeFetchDFSupportMixin, 30 GetCurrentCatalogFromFunctionMixin, 31 RowDiffMixin, 32 GrantsFromInfoSchemaMixin, 33): 34 DIALECT = "postgres" 35 SUPPORTS_GRANTS = True 36 SUPPORTS_INDEXES = True 37 HAS_VIEW_BINDING = True 38 CURRENT_CATALOG_EXPRESSION = exp.column("current_catalog") 39 SUPPORTS_REPLACE_TABLE = False 40 MAX_IDENTIFIER_LENGTH: t.Optional[int] = 63 41 SUPPORTS_QUERY_EXECUTION_TRACKING = True 42 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "role_table_grants" 43 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.column("current_role") 44 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True 45 SCHEMA_DIFFER_KWARGS = { 46 "parameterized_type_defaults": { 47 # DECIMAL without precision is "up to 131072 digits before the decimal point; up to 16383 digits after the decimal point" 48 exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(131072 + 16383, 16383), (0,)], 49 exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)], 50 exp.DataType.build("TIME", dialect=DIALECT).this: [(6,)], 51 exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(6,)], 52 }, 53 "types_with_unlimited_length": { 54 # all can ALTER to `TEXT` 55 exp.DataType.build("TEXT", dialect=DIALECT).this: { 56 exp.DataType.build("VARCHAR", dialect=DIALECT).this, 57 exp.DataType.build("CHAR", dialect=DIALECT).this, 58 exp.DataType.build("BPCHAR", dialect=DIALECT).this, 59 }, 60 # all can ALTER to unparameterized `VARCHAR` 61 exp.DataType.build("VARCHAR", dialect=DIALECT).this: { 62 exp.DataType.build("VARCHAR", dialect=DIALECT).this, 63 exp.DataType.build("CHAR", dialect=DIALECT).this, 64 exp.DataType.build("BPCHAR", dialect=DIALECT).this, 65 exp.DataType.build("TEXT", dialect=DIALECT).this, 66 }, 67 # parameterized `BPCHAR(n)` can ALTER to unparameterized `BPCHAR` 68 exp.DataType.build("BPCHAR", dialect=DIALECT).this: { 69 exp.DataType.build("BPCHAR", dialect=DIALECT).this 70 }, 71 }, 72 "drop_cascade": True, 73 } 74 75 def _fetch_native_df( 76 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 77 ) -> DF: 78 """ 79 `read_sql_query` when using psycopg will result on a hanging transaction that must be committed 80 81 https://github.com/pandas-dev/pandas/pull/42277 82 """ 83 df = super()._fetch_native_df(query, quote_identifiers) 84 if not self._connection_pool.is_transaction_active: 85 self._connection_pool.commit() 86 return df 87 88 def _create_table_like( 89 self, 90 target_table_name: TableName, 91 source_table_name: TableName, 92 exists: bool, 93 **kwargs: t.Any, 94 ) -> None: 95 self.execute( 96 exp.Create( 97 this=exp.Schema( 98 this=exp.to_table(target_table_name), 99 expressions=[ 100 exp.LikeProperty( 101 this=exp.to_table(source_table_name), 102 expressions=[exp.Property(this="INCLUDING", value=exp.Var(this="ALL"))], 103 ) 104 ], 105 ), 106 kind="TABLE", 107 exists=exists, 108 ) 109 ) 110 111 def merge( 112 self, 113 target_table: TableName, 114 source_table: QueryOrDF, 115 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 116 unique_key: t.Sequence[exp.Expr], 117 when_matched: t.Optional[exp.Whens] = None, 118 merge_filter: t.Optional[exp.Expr] = None, 119 source_columns: t.Optional[t.List[str]] = None, 120 **kwargs: t.Any, 121 ) -> None: 122 # Merge isn't supported until Postgres 15 123 major, minor = self.server_version 124 merge_impl = super().merge if major >= 15 else partial(logical_merge, self) 125 merge_impl( # type: ignore 126 target_table, 127 source_table, 128 target_columns_to_types, 129 unique_key, 130 when_matched=when_matched, 131 merge_filter=merge_filter, 132 source_columns=source_columns, 133 ) 134 135 @cached_property 136 def server_version(self) -> t.Tuple[int, int]: 137 """Lazily fetch and cache major and minor server version""" 138 if result := self.fetchone("SHOW server_version"): 139 server_version, *_ = result 140 match = re.search(r"(\d+)\.(\d+)", server_version) 141 if match: 142 return int(match.group(1)), int(match.group(2)) 143 return 0, 0
27@set_catalog() 28class PostgresEngineAdapter( 29 BasePostgresEngineAdapter, 30 PandasNativeFetchDFSupportMixin, 31 GetCurrentCatalogFromFunctionMixin, 32 RowDiffMixin, 33 GrantsFromInfoSchemaMixin, 34): 35 DIALECT = "postgres" 36 SUPPORTS_GRANTS = True 37 SUPPORTS_INDEXES = True 38 HAS_VIEW_BINDING = True 39 CURRENT_CATALOG_EXPRESSION = exp.column("current_catalog") 40 SUPPORTS_REPLACE_TABLE = False 41 MAX_IDENTIFIER_LENGTH: t.Optional[int] = 63 42 SUPPORTS_QUERY_EXECUTION_TRACKING = True 43 GRANT_INFORMATION_SCHEMA_TABLE_NAME = "role_table_grants" 44 CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expr = exp.column("current_role") 45 SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True 46 SCHEMA_DIFFER_KWARGS = { 47 "parameterized_type_defaults": { 48 # DECIMAL without precision is "up to 131072 digits before the decimal point; up to 16383 digits after the decimal point" 49 exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(131072 + 16383, 16383), (0,)], 50 exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)], 51 exp.DataType.build("TIME", dialect=DIALECT).this: [(6,)], 52 exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(6,)], 53 }, 54 "types_with_unlimited_length": { 55 # all can ALTER to `TEXT` 56 exp.DataType.build("TEXT", dialect=DIALECT).this: { 57 exp.DataType.build("VARCHAR", dialect=DIALECT).this, 58 exp.DataType.build("CHAR", dialect=DIALECT).this, 59 exp.DataType.build("BPCHAR", dialect=DIALECT).this, 60 }, 61 # all can ALTER to unparameterized `VARCHAR` 62 exp.DataType.build("VARCHAR", dialect=DIALECT).this: { 63 exp.DataType.build("VARCHAR", dialect=DIALECT).this, 64 exp.DataType.build("CHAR", dialect=DIALECT).this, 65 exp.DataType.build("BPCHAR", dialect=DIALECT).this, 66 exp.DataType.build("TEXT", dialect=DIALECT).this, 67 }, 68 # parameterized `BPCHAR(n)` can ALTER to unparameterized `BPCHAR` 69 exp.DataType.build("BPCHAR", dialect=DIALECT).this: { 70 exp.DataType.build("BPCHAR", dialect=DIALECT).this 71 }, 72 }, 73 "drop_cascade": True, 74 } 75 76 def _fetch_native_df( 77 self, query: t.Union[exp.Expr, str], quote_identifiers: bool = False 78 ) -> DF: 79 """ 80 `read_sql_query` when using psycopg will result on a hanging transaction that must be committed 81 82 https://github.com/pandas-dev/pandas/pull/42277 83 """ 84 df = super()._fetch_native_df(query, quote_identifiers) 85 if not self._connection_pool.is_transaction_active: 86 self._connection_pool.commit() 87 return df 88 89 def _create_table_like( 90 self, 91 target_table_name: TableName, 92 source_table_name: TableName, 93 exists: bool, 94 **kwargs: t.Any, 95 ) -> None: 96 self.execute( 97 exp.Create( 98 this=exp.Schema( 99 this=exp.to_table(target_table_name), 100 expressions=[ 101 exp.LikeProperty( 102 this=exp.to_table(source_table_name), 103 expressions=[exp.Property(this="INCLUDING", value=exp.Var(this="ALL"))], 104 ) 105 ], 106 ), 107 kind="TABLE", 108 exists=exists, 109 ) 110 ) 111 112 def merge( 113 self, 114 target_table: TableName, 115 source_table: QueryOrDF, 116 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 117 unique_key: t.Sequence[exp.Expr], 118 when_matched: t.Optional[exp.Whens] = None, 119 merge_filter: t.Optional[exp.Expr] = None, 120 source_columns: t.Optional[t.List[str]] = None, 121 **kwargs: t.Any, 122 ) -> None: 123 # Merge isn't supported until Postgres 15 124 major, minor = self.server_version 125 merge_impl = super().merge if major >= 15 else partial(logical_merge, self) 126 merge_impl( # type: ignore 127 target_table, 128 source_table, 129 target_columns_to_types, 130 unique_key, 131 when_matched=when_matched, 132 merge_filter=merge_filter, 133 source_columns=source_columns, 134 ) 135 136 @cached_property 137 def server_version(self) -> t.Tuple[int, int]: 138 """Lazily fetch and cache major and minor server version""" 139 if result := self.fetchone("SHOW server_version"): 140 server_version, *_ = result 141 match = re.search(r"(\d+)\.(\d+)", server_version) 142 if match: 143 return int(match.group(1)), int(match.group(2)) 144 return 0, 0
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
112 def merge( 113 self, 114 target_table: TableName, 115 source_table: QueryOrDF, 116 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 117 unique_key: t.Sequence[exp.Expr], 118 when_matched: t.Optional[exp.Whens] = None, 119 merge_filter: t.Optional[exp.Expr] = None, 120 source_columns: t.Optional[t.List[str]] = None, 121 **kwargs: t.Any, 122 ) -> None: 123 # Merge isn't supported until Postgres 15 124 major, minor = self.server_version 125 merge_impl = super().merge if major >= 15 else partial(logical_merge, self) 126 merge_impl( # type: ignore 127 target_table, 128 source_table, 129 target_columns_to_types, 130 unique_key, 131 when_matched=when_matched, 132 merge_filter=merge_filter, 133 source_columns=source_columns, 134 )
136 @cached_property 137 def server_version(self) -> t.Tuple[int, int]: 138 """Lazily fetch and cache major and minor server version""" 139 if result := self.fetchone("SHOW server_version"): 140 server_version, *_ = result 141 match = re.search(r"(\d+)\.(\d+)", server_version) 142 if match: 143 return int(match.group(1)), int(match.group(2)) 144 return 0, 0
Lazily fetch and cache major and minor server version
35 def columns( 36 self, table_name: TableName, include_pseudo_columns: bool = False 37 ) -> t.Dict[str, exp.DataType]: 38 """Fetches column names and types for the target table.""" 39 table = exp.to_table(table_name) 40 41 sql = ( 42 exp.select( 43 "attname AS column_name", 44 "pg_catalog.format_type(atttypid, atttypmod) AS data_type", 45 ) 46 .from_("pg_catalog.pg_attribute") 47 .join("pg_catalog.pg_class", on="pg_class.oid = attrelid") 48 .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace") 49 .where( 50 exp.and_( 51 "attnum > 0", 52 "NOT attisdropped", 53 exp.column("relname").eq(table.alias_or_name), 54 ) 55 ) 56 ) 57 if table.args.get("db"): 58 sql = sql.where(exp.column("nspname").eq(table.args["db"].name)) 59 60 self.execute(sql) 61 resp = self.cursor.fetchall() 62 if not resp: 63 raise SQLMeshError( 64 f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found." 65 ) 66 67 return { 68 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 69 for column_name, data_type in resp 70 }
Fetches column names and types for the target table.
76 def table_exists(self, table_name: TableName) -> bool: 77 """ 78 Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table 79 exists. We don't use this directly in order for this to work as a base class for other postgres 80 81 Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553 82 """ 83 table = exp.to_table(table_name) 84 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 85 if data_object_cache_key in self._data_object_cache: 86 logger.debug("Table existence cache hit: %s", data_object_cache_key) 87 return self._data_object_cache[data_object_cache_key] is not None 88 89 sql = ( 90 exp.select("1") 91 .from_("information_schema.tables") 92 .where(f"table_name = '{table.alias_or_name}'") 93 ) 94 database_name = table.db 95 if database_name: 96 sql = sql.where(f"table_schema = '{database_name}'") 97 98 self.execute(sql) 99 100 result = self.cursor.fetchone() 101 102 return result[0] == 1 if result is not None else False
Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table exists. We don't use this directly in order for this to work as a base class for other postgres
104 def create_view( 105 self, 106 view_name: TableName, 107 query_or_df: QueryOrDF, 108 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 109 replace: bool = True, 110 materialized: bool = False, 111 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 112 table_description: t.Optional[str] = None, 113 column_descriptions: t.Optional[t.Dict[str, str]] = None, 114 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 115 source_columns: t.Optional[t.List[str]] = None, 116 **create_kwargs: t.Any, 117 ) -> None: 118 """ 119 Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter 120 of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it 121 to work around these constraints. 122 123 Reference: https://www.postgresql.org/docs/current/sql-createview.html 124 """ 125 with self.transaction(): 126 if replace: 127 self.drop_view(view_name, materialized=materialized) 128 super().create_view( 129 view_name, 130 query_or_df, 131 target_columns_to_types=target_columns_to_types, 132 replace=False, 133 materialized=materialized, 134 materialized_properties=materialized_properties, 135 table_description=table_description, 136 column_descriptions=column_descriptions, 137 view_properties=view_properties, 138 source_columns=source_columns, 139 **create_kwargs, 140 )
Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it to work around these constraints.
Reference: https://www.postgresql.org/docs/current/sql-createview.html
142 def drop_view( 143 self, 144 view_name: TableName, 145 ignore_if_not_exists: bool = True, 146 materialized: bool = False, 147 **kwargs: t.Any, 148 ) -> None: 149 kwargs["cascade"] = kwargs.get("cascade", True) 150 return super().drop_view( 151 view_name, 152 ignore_if_not_exists=ignore_if_not_exists, 153 materialized=materialized, 154 **kwargs, 155 )
Drop a view.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTS_TUPLE_IN
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- ATTACH_CORRELATION_ID
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_schema
- drop_schema
- create_catalog
- drop_catalog
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
- sqlmesh.core.engine_adapter.base_postgres.BasePostgresEngineAdapter
- DEFAULT_BATCH_SIZE
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- catalog_support