sqlmesh.core.engine_adapter.risingwave
1from __future__ import annotations 2 3import logging 4import typing as t 5 6 7from sqlglot import exp 8 9from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter 10from sqlmesh.core.engine_adapter.shared import ( 11 set_catalog, 12 CatalogSupport, 13 CommentCreationView, 14 CommentCreationTable, 15) 16 17from sqlmesh.utils.errors import SQLMeshError 18 19if t.TYPE_CHECKING: 20 from sqlmesh.core._typing import TableName 21 22logger = logging.getLogger(__name__) 23 24 25@set_catalog() 26class RisingwaveEngineAdapter(PostgresEngineAdapter): 27 DIALECT = "risingwave" 28 DEFAULT_BATCH_SIZE = 400 29 CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY 30 COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY 31 COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED 32 SUPPORTS_MATERIALIZED_VIEWS = True 33 SUPPORTS_TRANSACTIONS = False 34 MAX_IDENTIFIER_LENGTH = None 35 SUPPORTS_GRANTS = False 36 37 def columns( 38 self, table_name: TableName, include_pseudo_columns: bool = False 39 ) -> t.Dict[str, exp.DataType]: 40 """Fetches column names and types for the target_table""" 41 table = exp.to_table(table_name) 42 43 sql = ( 44 exp.select("rw_columns.name AS column_name", "rw_columns.data_type AS data_type") 45 .from_("rw_catalog.rw_columns") 46 .join("rw_catalog.rw_relations", on="rw_relations.id=rw_columns.relation_id") 47 .join("rw_catalog.rw_schemas", on="rw_schemas.id=rw_relations.schema_id") 48 .where( 49 exp.and_( 50 exp.column("name", table="rw_relations").eq(table.alias_or_name), 51 exp.column("name", table="rw_columns").neq("_row_id"), 52 exp.column("name", table="rw_columns").neq("_rw_timestamp"), 53 ) 54 ) 55 ) 56 57 if table.db: 58 sql = sql.where(exp.column("name", table="rw_schemas").eq(table.db)) 59 60 self.execute(sql) 61 resp = self.cursor.fetchall() 62 if not resp: 63 raise SQLMeshError(f"Could not get columns for table {table_name}. Table not found.") 64 return { 65 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 66 for column_name, data_type in resp 67 } 68 69 def _truncate_table(self, table_name: TableName) -> None: 70 return self.execute(exp.Delete(this=exp.to_table(table_name)))
logger =
<Logger sqlmesh.core.engine_adapter.risingwave (WARNING)>
@set_catalog()
class
RisingwaveEngineAdapter26@set_catalog() 27class RisingwaveEngineAdapter(PostgresEngineAdapter): 28 DIALECT = "risingwave" 29 DEFAULT_BATCH_SIZE = 400 30 CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY 31 COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY 32 COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED 33 SUPPORTS_MATERIALIZED_VIEWS = True 34 SUPPORTS_TRANSACTIONS = False 35 MAX_IDENTIFIER_LENGTH = None 36 SUPPORTS_GRANTS = False 37 38 def columns( 39 self, table_name: TableName, include_pseudo_columns: bool = False 40 ) -> t.Dict[str, exp.DataType]: 41 """Fetches column names and types for the target_table""" 42 table = exp.to_table(table_name) 43 44 sql = ( 45 exp.select("rw_columns.name AS column_name", "rw_columns.data_type AS data_type") 46 .from_("rw_catalog.rw_columns") 47 .join("rw_catalog.rw_relations", on="rw_relations.id=rw_columns.relation_id") 48 .join("rw_catalog.rw_schemas", on="rw_schemas.id=rw_relations.schema_id") 49 .where( 50 exp.and_( 51 exp.column("name", table="rw_relations").eq(table.alias_or_name), 52 exp.column("name", table="rw_columns").neq("_row_id"), 53 exp.column("name", table="rw_columns").neq("_rw_timestamp"), 54 ) 55 ) 56 ) 57 58 if table.db: 59 sql = sql.where(exp.column("name", table="rw_schemas").eq(table.db)) 60 61 self.execute(sql) 62 resp = self.cursor.fetchall() 63 if not resp: 64 raise SQLMeshError(f"Could not get columns for table {table_name}. Table not found.") 65 return { 66 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 67 for column_name, data_type in resp 68 } 69 70 def _truncate_table(self, table_name: TableName) -> None: 71 return self.execute(exp.Delete(this=exp.to_table(table_name)))
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
def
columns( self, table_name: Union[str, sqlglot.expressions.query.Table], include_pseudo_columns: bool = False) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
38 def columns( 39 self, table_name: TableName, include_pseudo_columns: bool = False 40 ) -> t.Dict[str, exp.DataType]: 41 """Fetches column names and types for the target_table""" 42 table = exp.to_table(table_name) 43 44 sql = ( 45 exp.select("rw_columns.name AS column_name", "rw_columns.data_type AS data_type") 46 .from_("rw_catalog.rw_columns") 47 .join("rw_catalog.rw_relations", on="rw_relations.id=rw_columns.relation_id") 48 .join("rw_catalog.rw_schemas", on="rw_schemas.id=rw_relations.schema_id") 49 .where( 50 exp.and_( 51 exp.column("name", table="rw_relations").eq(table.alias_or_name), 52 exp.column("name", table="rw_columns").neq("_row_id"), 53 exp.column("name", table="rw_columns").neq("_rw_timestamp"), 54 ) 55 ) 56 ) 57 58 if table.db: 59 sql = sql.where(exp.column("name", table="rw_schemas").eq(table.db)) 60 61 self.execute(sql) 62 resp = self.cursor.fetchall() 63 if not resp: 64 raise SQLMeshError(f"Could not get columns for table {table_name}. Table not found.") 65 return { 66 column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True) 67 for column_name, data_type in resp 68 }
Fetches column names and types for the target_table
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DATA_OBJECT_FILTER_BATCH_SIZE
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTS_TUPLE_IN
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- ATTACH_CORRELATION_ID
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_schema
- drop_schema
- create_catalog
- drop_catalog
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
- sqlmesh.core.engine_adapter.postgres.PostgresEngineAdapter
- SUPPORTS_INDEXES
- HAS_VIEW_BINDING
- CURRENT_CATALOG_EXPRESSION
- SUPPORTS_REPLACE_TABLE
- SUPPORTS_QUERY_EXECUTION_TRACKING
- GRANT_INFORMATION_SCHEMA_TABLE_NAME
- CURRENT_USER_OR_ROLE_EXPRESSION
- SUPPORTS_MULTIPLE_GRANT_PRINCIPALS
- SCHEMA_DIFFER_KWARGS
- merge
- server_version
- table_exists
- create_view
- drop_view
- sqlmesh.core.engine_adapter.base_postgres.BasePostgresEngineAdapter
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- catalog_support