Edit on GitHub

sqlmesh.core.engine_adapter.risingwave

 1from __future__ import annotations
 2
 3import logging
 4import typing as t
 5
 6
 7from sqlglot import exp
 8
 9from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter
10from sqlmesh.core.engine_adapter.shared import (
11    set_catalog,
12    CatalogSupport,
13    CommentCreationView,
14    CommentCreationTable,
15)
16
17from sqlmesh.utils.errors import SQLMeshError
18
19if t.TYPE_CHECKING:
20    from sqlmesh.core._typing import TableName
21
22logger = logging.getLogger(__name__)
23
24
25@set_catalog()
26class RisingwaveEngineAdapter(PostgresEngineAdapter):
27    DIALECT = "risingwave"
28    DEFAULT_BATCH_SIZE = 400
29    CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY
30    COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
31    COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
32    SUPPORTS_MATERIALIZED_VIEWS = True
33    SUPPORTS_TRANSACTIONS = False
34    MAX_IDENTIFIER_LENGTH = None
35    SUPPORTS_GRANTS = False
36
37    def columns(
38        self, table_name: TableName, include_pseudo_columns: bool = False
39    ) -> t.Dict[str, exp.DataType]:
40        """Fetches column names and types for the target_table"""
41        table = exp.to_table(table_name)
42
43        sql = (
44            exp.select("rw_columns.name AS column_name", "rw_columns.data_type AS data_type")
45            .from_("rw_catalog.rw_columns")
46            .join("rw_catalog.rw_relations", on="rw_relations.id=rw_columns.relation_id")
47            .join("rw_catalog.rw_schemas", on="rw_schemas.id=rw_relations.schema_id")
48            .where(
49                exp.and_(
50                    exp.column("name", table="rw_relations").eq(table.alias_or_name),
51                    exp.column("name", table="rw_columns").neq("_row_id"),
52                    exp.column("name", table="rw_columns").neq("_rw_timestamp"),
53                )
54            )
55        )
56
57        if table.db:
58            sql = sql.where(exp.column("name", table="rw_schemas").eq(table.db))
59
60        self.execute(sql)
61        resp = self.cursor.fetchall()
62        if not resp:
63            raise SQLMeshError(f"Could not get columns for table {table_name}. Table not found.")
64        return {
65            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
66            for column_name, data_type in resp
67        }
68
69    def _truncate_table(self, table_name: TableName) -> None:
70        return self.execute(exp.Delete(this=exp.to_table(table_name)))
logger = <Logger sqlmesh.core.engine_adapter.risingwave (WARNING)>
@set_catalog()
class RisingwaveEngineAdapter(sqlmesh.core.engine_adapter.postgres.PostgresEngineAdapter):
26@set_catalog()
27class RisingwaveEngineAdapter(PostgresEngineAdapter):
28    DIALECT = "risingwave"
29    DEFAULT_BATCH_SIZE = 400
30    CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY
31    COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
32    COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
33    SUPPORTS_MATERIALIZED_VIEWS = True
34    SUPPORTS_TRANSACTIONS = False
35    MAX_IDENTIFIER_LENGTH = None
36    SUPPORTS_GRANTS = False
37
38    def columns(
39        self, table_name: TableName, include_pseudo_columns: bool = False
40    ) -> t.Dict[str, exp.DataType]:
41        """Fetches column names and types for the target_table"""
42        table = exp.to_table(table_name)
43
44        sql = (
45            exp.select("rw_columns.name AS column_name", "rw_columns.data_type AS data_type")
46            .from_("rw_catalog.rw_columns")
47            .join("rw_catalog.rw_relations", on="rw_relations.id=rw_columns.relation_id")
48            .join("rw_catalog.rw_schemas", on="rw_schemas.id=rw_relations.schema_id")
49            .where(
50                exp.and_(
51                    exp.column("name", table="rw_relations").eq(table.alias_or_name),
52                    exp.column("name", table="rw_columns").neq("_row_id"),
53                    exp.column("name", table="rw_columns").neq("_rw_timestamp"),
54                )
55            )
56        )
57
58        if table.db:
59            sql = sql.where(exp.column("name", table="rw_schemas").eq(table.db))
60
61        self.execute(sql)
62        resp = self.cursor.fetchall()
63        if not resp:
64            raise SQLMeshError(f"Could not get columns for table {table_name}. Table not found.")
65        return {
66            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
67            for column_name, data_type in resp
68        }
69
70    def _truncate_table(self, table_name: TableName) -> None:
71        return self.execute(exp.Delete(this=exp.to_table(table_name)))

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
DIALECT = 'risingwave'
DEFAULT_BATCH_SIZE = 400
CATALOG_SUPPORT = <CatalogSupport.SINGLE_CATALOG_ONLY: 2>
COMMENT_CREATION_TABLE = <CommentCreationTable.COMMENT_COMMAND_ONLY: 4>
COMMENT_CREATION_VIEW = <CommentCreationView.UNSUPPORTED: 1>
SUPPORTS_MATERIALIZED_VIEWS = True
SUPPORTS_TRANSACTIONS = False
MAX_IDENTIFIER_LENGTH = None
SUPPORTS_GRANTS = False
def columns( self, table_name: Union[str, sqlglot.expressions.query.Table], include_pseudo_columns: bool = False) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
38    def columns(
39        self, table_name: TableName, include_pseudo_columns: bool = False
40    ) -> t.Dict[str, exp.DataType]:
41        """Fetches column names and types for the target_table"""
42        table = exp.to_table(table_name)
43
44        sql = (
45            exp.select("rw_columns.name AS column_name", "rw_columns.data_type AS data_type")
46            .from_("rw_catalog.rw_columns")
47            .join("rw_catalog.rw_relations", on="rw_relations.id=rw_columns.relation_id")
48            .join("rw_catalog.rw_schemas", on="rw_schemas.id=rw_relations.schema_id")
49            .where(
50                exp.and_(
51                    exp.column("name", table="rw_relations").eq(table.alias_or_name),
52                    exp.column("name", table="rw_columns").neq("_row_id"),
53                    exp.column("name", table="rw_columns").neq("_rw_timestamp"),
54                )
55            )
56        )
57
58        if table.db:
59            sql = sql.where(exp.column("name", table="rw_schemas").eq(table.db))
60
61        self.execute(sql)
62        resp = self.cursor.fetchall()
63        if not resp:
64            raise SQLMeshError(f"Could not get columns for table {table_name}. Table not found.")
65        return {
66            column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
67            for column_name, data_type in resp
68        }

Fetches column names and types for the target_table

Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DATA_OBJECT_FILTER_BATCH_SIZE
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTS_TUPLE_IN
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
ATTACH_CORRELATION_ID
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
schema_differ
default_catalog
engine_run_mode
recycle
close
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_schema
drop_schema
create_catalog
drop_catalog
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
sqlmesh.core.engine_adapter.postgres.PostgresEngineAdapter
SUPPORTS_INDEXES
HAS_VIEW_BINDING
CURRENT_CATALOG_EXPRESSION
SUPPORTS_REPLACE_TABLE
SUPPORTS_QUERY_EXECUTION_TRACKING
GRANT_INFORMATION_SCHEMA_TABLE_NAME
CURRENT_USER_OR_ROLE_EXPRESSION
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS
SCHEMA_DIFFER_KWARGS
merge
server_version
table_exists
create_view
drop_view
sqlmesh.core.engine_adapter.base_postgres.BasePostgresEngineAdapter
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
catalog_support
sqlmesh.core.engine_adapter.mixins.GetCurrentCatalogFromFunctionMixin
get_current_catalog
sqlmesh.core.engine_adapter.mixins.RowDiffMixin
MAX_TIMESTAMP_PRECISION
concat_columns
normalize_value
sqlmesh.core.engine_adapter.mixins.GrantsFromInfoSchemaMixin
USE_CATALOG_IN_GRANTS