sqlmesh.core.engine_adapter.base_postgres
1from __future__ import annotations 2 3import typing as t 4import logging 5 6from sqlglot import exp 7 8from sqlmesh.core.dialect import to_schema 9from sqlmesh.core.engine_adapter.base import EngineAdapter, _get_data_object_cache_key 10from sqlmesh.core.engine_adapter.shared import ( 11 CatalogSupport, 12 CommentCreationTable, 13 CommentCreationView, 14 DataObject, 15 DataObjectType, 16) 17from sqlmesh.utils.errors import SQLMeshError 18 19if t.TYPE_CHECKING: 20 from sqlmesh.core._typing import SchemaName, TableName 21 from sqlmesh.core.engine_adapter._typing import QueryOrDF 22 23 24logger = logging.getLogger(__name__) 25 26 27class BasePostgresEngineAdapter(EngineAdapter): 28 DEFAULT_BATCH_SIZE = 400 29 COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY 30 COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY 31 SUPPORTS_QUERY_EXECUTION_TRACKING = True 32 SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] 33 34 def columns( 35 self, table_name: TableName, include_pseudo_columns: bool = False 36 ) -> t.Dict[str, exp.DataType]: 37 """Fetches column names and types for the target table.""" 38 table = exp.to_table(table_name) 39 40 sql = ( 41 exp.select( 42 "attname AS column_name", 43 "pg_catalog.format_type(atttypid, atttypmod) AS data_type", 44 ) 45 .from_("pg_catalog.pg_attribute") 46 .join("pg_catalog.pg_class", on="pg_class.oid = attrelid") 47 .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace") 48 .where( 49 exp.and_( 50 "attnum > 0", 51 "NOT attisdropped", 52 exp.column("relname").eq(table.alias_or_name), 53 ) 54 ) 55 ) 56 if table.args.get("db"): 57 sql = sql.where(exp.column("nspname").eq(table.args["db"].name)) 58 59 self.execute(sql) 60 resp = self.cursor.fetchall() 61 if not resp: 62 raise SQLMeshError( 63 f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found." 64 ) 65 66 return { 67 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 68 for column_name, data_type in resp 69 } 70 71 @property 72 def catalog_support(self) -> CatalogSupport: 73 return CatalogSupport.SINGLE_CATALOG_ONLY 74 75 def table_exists(self, table_name: TableName) -> bool: 76 """ 77 Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table 78 exists. We don't use this directly in order for this to work as a base class for other postgres 79 80 Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553 81 """ 82 table = exp.to_table(table_name) 83 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 84 if data_object_cache_key in self._data_object_cache: 85 logger.debug("Table existence cache hit: %s", data_object_cache_key) 86 return self._data_object_cache[data_object_cache_key] is not None 87 88 sql = ( 89 exp.select("1") 90 .from_("information_schema.tables") 91 .where(f"table_name = '{table.alias_or_name}'") 92 ) 93 database_name = table.db 94 if database_name: 95 sql = sql.where(f"table_schema = '{database_name}'") 96 97 self.execute(sql) 98 99 result = self.cursor.fetchone() 100 101 return result[0] == 1 if result is not None else False 102 103 def create_view( 104 self, 105 view_name: TableName, 106 query_or_df: QueryOrDF, 107 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 108 replace: bool = True, 109 materialized: bool = False, 110 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 111 table_description: t.Optional[str] = None, 112 column_descriptions: t.Optional[t.Dict[str, str]] = None, 113 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 114 source_columns: t.Optional[t.List[str]] = None, 115 **create_kwargs: t.Any, 116 ) -> None: 117 """ 118 Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter 119 of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it 120 to work around these constraints. 121 122 Reference: https://www.postgresql.org/docs/current/sql-createview.html 123 """ 124 with self.transaction(): 125 if replace: 126 self.drop_view(view_name, materialized=materialized) 127 super().create_view( 128 view_name, 129 query_or_df, 130 target_columns_to_types=target_columns_to_types, 131 replace=False, 132 materialized=materialized, 133 materialized_properties=materialized_properties, 134 table_description=table_description, 135 column_descriptions=column_descriptions, 136 view_properties=view_properties, 137 source_columns=source_columns, 138 **create_kwargs, 139 ) 140 141 def drop_view( 142 self, 143 view_name: TableName, 144 ignore_if_not_exists: bool = True, 145 materialized: bool = False, 146 **kwargs: t.Any, 147 ) -> None: 148 kwargs["cascade"] = kwargs.get("cascade", True) 149 return super().drop_view( 150 view_name, 151 ignore_if_not_exists=ignore_if_not_exists, 152 materialized=materialized, 153 **kwargs, 154 ) 155 156 def _get_data_objects( 157 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 158 ) -> t.List[DataObject]: 159 """ 160 Returns all the data objects that exist in the given schema and optionally catalog. 161 """ 162 catalog = self.get_current_catalog() 163 table_query = exp.select( 164 exp.column("schemaname").as_("schema_name"), 165 exp.column("tablename").as_("name"), 166 exp.Literal.string("TABLE").as_("type"), 167 ).from_("pg_tables") 168 view_query = exp.select( 169 exp.column("schemaname").as_("schema_name"), 170 exp.column("viewname").as_("name"), 171 exp.Literal.string("VIEW").as_("type"), 172 ).from_("pg_views") 173 materialized_view_query = exp.select( 174 exp.column("schemaname").as_("schema_name"), 175 exp.column("matviewname").as_("name"), 176 exp.Literal.string("MATERIALIZED_VIEW").as_("type"), 177 ).from_("pg_matviews") 178 subquery = exp.union( 179 table_query, 180 exp.union(view_query, materialized_view_query, distinct=False), 181 distinct=False, 182 ) 183 query = ( 184 exp.select("*") 185 .from_(subquery.subquery(alias="objs")) 186 .where(exp.column("schema_name").eq(to_schema(schema_name).db)) 187 ) 188 if object_names: 189 query = query.where(exp.column("name").isin(*object_names)) 190 df = self.fetchdf(query) 191 return [ 192 DataObject( 193 catalog=catalog, 194 schema=row.schema_name, 195 name=row.name, 196 type=DataObjectType.from_str(row.type), # type: ignore 197 ) 198 for row in df.itertuples() 199 ] 200 201 def _get_current_schema(self) -> str: 202 """Returns the current default schema for the connection.""" 203 result = self.fetchone(exp.select(exp.func("current_schema"))) 204 if result and result[0]: 205 return result[0] 206 return "public"
28class BasePostgresEngineAdapter(EngineAdapter): 29 DEFAULT_BATCH_SIZE = 400 30 COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY 31 COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY 32 SUPPORTS_QUERY_EXECUTION_TRACKING = True 33 SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] 34 35 def columns( 36 self, table_name: TableName, include_pseudo_columns: bool = False 37 ) -> t.Dict[str, exp.DataType]: 38 """Fetches column names and types for the target table.""" 39 table = exp.to_table(table_name) 40 41 sql = ( 42 exp.select( 43 "attname AS column_name", 44 "pg_catalog.format_type(atttypid, atttypmod) AS data_type", 45 ) 46 .from_("pg_catalog.pg_attribute") 47 .join("pg_catalog.pg_class", on="pg_class.oid = attrelid") 48 .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace") 49 .where( 50 exp.and_( 51 "attnum > 0", 52 "NOT attisdropped", 53 exp.column("relname").eq(table.alias_or_name), 54 ) 55 ) 56 ) 57 if table.args.get("db"): 58 sql = sql.where(exp.column("nspname").eq(table.args["db"].name)) 59 60 self.execute(sql) 61 resp = self.cursor.fetchall() 62 if not resp: 63 raise SQLMeshError( 64 f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found." 65 ) 66 67 return { 68 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 69 for column_name, data_type in resp 70 } 71 72 @property 73 def catalog_support(self) -> CatalogSupport: 74 return CatalogSupport.SINGLE_CATALOG_ONLY 75 76 def table_exists(self, table_name: TableName) -> bool: 77 """ 78 Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table 79 exists. We don't use this directly in order for this to work as a base class for other postgres 80 81 Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553 82 """ 83 table = exp.to_table(table_name) 84 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 85 if data_object_cache_key in self._data_object_cache: 86 logger.debug("Table existence cache hit: %s", data_object_cache_key) 87 return self._data_object_cache[data_object_cache_key] is not None 88 89 sql = ( 90 exp.select("1") 91 .from_("information_schema.tables") 92 .where(f"table_name = '{table.alias_or_name}'") 93 ) 94 database_name = table.db 95 if database_name: 96 sql = sql.where(f"table_schema = '{database_name}'") 97 98 self.execute(sql) 99 100 result = self.cursor.fetchone() 101 102 return result[0] == 1 if result is not None else False 103 104 def create_view( 105 self, 106 view_name: TableName, 107 query_or_df: QueryOrDF, 108 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 109 replace: bool = True, 110 materialized: bool = False, 111 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 112 table_description: t.Optional[str] = None, 113 column_descriptions: t.Optional[t.Dict[str, str]] = None, 114 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 115 source_columns: t.Optional[t.List[str]] = None, 116 **create_kwargs: t.Any, 117 ) -> None: 118 """ 119 Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter 120 of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it 121 to work around these constraints. 122 123 Reference: https://www.postgresql.org/docs/current/sql-createview.html 124 """ 125 with self.transaction(): 126 if replace: 127 self.drop_view(view_name, materialized=materialized) 128 super().create_view( 129 view_name, 130 query_or_df, 131 target_columns_to_types=target_columns_to_types, 132 replace=False, 133 materialized=materialized, 134 materialized_properties=materialized_properties, 135 table_description=table_description, 136 column_descriptions=column_descriptions, 137 view_properties=view_properties, 138 source_columns=source_columns, 139 **create_kwargs, 140 ) 141 142 def drop_view( 143 self, 144 view_name: TableName, 145 ignore_if_not_exists: bool = True, 146 materialized: bool = False, 147 **kwargs: t.Any, 148 ) -> None: 149 kwargs["cascade"] = kwargs.get("cascade", True) 150 return super().drop_view( 151 view_name, 152 ignore_if_not_exists=ignore_if_not_exists, 153 materialized=materialized, 154 **kwargs, 155 ) 156 157 def _get_data_objects( 158 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 159 ) -> t.List[DataObject]: 160 """ 161 Returns all the data objects that exist in the given schema and optionally catalog. 162 """ 163 catalog = self.get_current_catalog() 164 table_query = exp.select( 165 exp.column("schemaname").as_("schema_name"), 166 exp.column("tablename").as_("name"), 167 exp.Literal.string("TABLE").as_("type"), 168 ).from_("pg_tables") 169 view_query = exp.select( 170 exp.column("schemaname").as_("schema_name"), 171 exp.column("viewname").as_("name"), 172 exp.Literal.string("VIEW").as_("type"), 173 ).from_("pg_views") 174 materialized_view_query = exp.select( 175 exp.column("schemaname").as_("schema_name"), 176 exp.column("matviewname").as_("name"), 177 exp.Literal.string("MATERIALIZED_VIEW").as_("type"), 178 ).from_("pg_matviews") 179 subquery = exp.union( 180 table_query, 181 exp.union(view_query, materialized_view_query, distinct=False), 182 distinct=False, 183 ) 184 query = ( 185 exp.select("*") 186 .from_(subquery.subquery(alias="objs")) 187 .where(exp.column("schema_name").eq(to_schema(schema_name).db)) 188 ) 189 if object_names: 190 query = query.where(exp.column("name").isin(*object_names)) 191 df = self.fetchdf(query) 192 return [ 193 DataObject( 194 catalog=catalog, 195 schema=row.schema_name, 196 name=row.name, 197 type=DataObjectType.from_str(row.type), # type: ignore 198 ) 199 for row in df.itertuples() 200 ] 201 202 def _get_current_schema(self) -> str: 203 """Returns the current default schema for the connection.""" 204 result = self.fetchone(exp.select(exp.func("current_schema"))) 205 if result and result[0]: 206 return result[0] 207 return "public"
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
35 def columns( 36 self, table_name: TableName, include_pseudo_columns: bool = False 37 ) -> t.Dict[str, exp.DataType]: 38 """Fetches column names and types for the target table.""" 39 table = exp.to_table(table_name) 40 41 sql = ( 42 exp.select( 43 "attname AS column_name", 44 "pg_catalog.format_type(atttypid, atttypmod) AS data_type", 45 ) 46 .from_("pg_catalog.pg_attribute") 47 .join("pg_catalog.pg_class", on="pg_class.oid = attrelid") 48 .join("pg_catalog.pg_namespace", on="pg_namespace.oid = relnamespace") 49 .where( 50 exp.and_( 51 "attnum > 0", 52 "NOT attisdropped", 53 exp.column("relname").eq(table.alias_or_name), 54 ) 55 ) 56 ) 57 if table.args.get("db"): 58 sql = sql.where(exp.column("nspname").eq(table.args["db"].name)) 59 60 self.execute(sql) 61 resp = self.cursor.fetchall() 62 if not resp: 63 raise SQLMeshError( 64 f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found." 65 ) 66 67 return { 68 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 69 for column_name, data_type in resp 70 }
Fetches column names and types for the target table.
76 def table_exists(self, table_name: TableName) -> bool: 77 """ 78 Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table 79 exists. We don't use this directly in order for this to work as a base class for other postgres 80 81 Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553 82 """ 83 table = exp.to_table(table_name) 84 data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name) 85 if data_object_cache_key in self._data_object_cache: 86 logger.debug("Table existence cache hit: %s", data_object_cache_key) 87 return self._data_object_cache[data_object_cache_key] is not None 88 89 sql = ( 90 exp.select("1") 91 .from_("information_schema.tables") 92 .where(f"table_name = '{table.alias_or_name}'") 93 ) 94 database_name = table.db 95 if database_name: 96 sql = sql.where(f"table_schema = '{database_name}'") 97 98 self.execute(sql) 99 100 result = self.cursor.fetchone() 101 102 return result[0] == 1 if result is not None else False
Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table exists. We don't use this directly in order for this to work as a base class for other postgres
104 def create_view( 105 self, 106 view_name: TableName, 107 query_or_df: QueryOrDF, 108 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, 109 replace: bool = True, 110 materialized: bool = False, 111 materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, 112 table_description: t.Optional[str] = None, 113 column_descriptions: t.Optional[t.Dict[str, str]] = None, 114 view_properties: t.Optional[t.Dict[str, exp.Expr]] = None, 115 source_columns: t.Optional[t.List[str]] = None, 116 **create_kwargs: t.Any, 117 ) -> None: 118 """ 119 Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter 120 of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it 121 to work around these constraints. 122 123 Reference: https://www.postgresql.org/docs/current/sql-createview.html 124 """ 125 with self.transaction(): 126 if replace: 127 self.drop_view(view_name, materialized=materialized) 128 super().create_view( 129 view_name, 130 query_or_df, 131 target_columns_to_types=target_columns_to_types, 132 replace=False, 133 materialized=materialized, 134 materialized_properties=materialized_properties, 135 table_description=table_description, 136 column_descriptions=column_descriptions, 137 view_properties=view_properties, 138 source_columns=source_columns, 139 **create_kwargs, 140 )
Postgres has very strict rules around view replacement. For example the new query must generate an identical setFormatter of columns, using the same column names and data types as the old one. We have to delete the old view instead of replacing it to work around these constraints.
Reference: https://www.postgresql.org/docs/current/sql-createview.html
142 def drop_view( 143 self, 144 view_name: TableName, 145 ignore_if_not_exists: bool = True, 146 materialized: bool = False, 147 **kwargs: t.Any, 148 ) -> None: 149 kwargs["cascade"] = kwargs.get("cascade", True) 150 return super().drop_view( 151 view_name, 152 ignore_if_not_exists=ignore_if_not_exists, 153 materialized=materialized, 154 **kwargs, 155 )
Drop a view.
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DIALECT
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- SUPPORTS_INDEXES
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SCHEMA_DIFFER_KWARGS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- MAX_IDENTIFIER_LENGTH
- ATTACH_CORRELATION_ID
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_current_catalog
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_schema
- drop_schema
- create_catalog
- drop_catalog
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- merge
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts