sqlmesh.core.engine_adapter.mysql
1from __future__ import annotations 2 3import logging 4import typing as t 5 6from sqlglot import exp, parse_one 7 8from sqlmesh.core.dialect import to_schema 9from sqlmesh.core.engine_adapter.mixins import ( 10 LogicalMergeMixin, 11 NonTransactionalTruncateMixin, 12 PandasNativeFetchDFSupportMixin, 13 RowDiffMixin, 14) 15from sqlmesh.core.engine_adapter.shared import ( 16 CommentCreationTable, 17 CommentCreationView, 18 DataObject, 19 DataObjectType, 20 set_catalog, 21) 22 23if t.TYPE_CHECKING: 24 from sqlmesh.core._typing import SchemaName, TableName 25 26logger = logging.getLogger(__name__) 27 28 29@set_catalog() 30class MySQLEngineAdapter( 31 LogicalMergeMixin, PandasNativeFetchDFSupportMixin, NonTransactionalTruncateMixin, RowDiffMixin 32): 33 DEFAULT_BATCH_SIZE = 200 34 DIALECT = "mysql" 35 SUPPORTS_INDEXES = True 36 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS 37 COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED 38 MAX_TABLE_COMMENT_LENGTH = 2048 39 MAX_COLUMN_COMMENT_LENGTH = 1024 40 SUPPORTS_REPLACE_TABLE = False 41 MAX_IDENTIFIER_LENGTH = 64 42 SUPPORTS_QUERY_EXECUTION_TRACKING = True 43 SCHEMA_DIFFER_KWARGS = { 44 "parameterized_type_defaults": { 45 exp.DataType.build("BIT", dialect=DIALECT).this: [(1,)], 46 exp.DataType.build("BINARY", dialect=DIALECT).this: [(1,)], 47 exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(10, 0), (0,)], 48 exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)], 49 exp.DataType.build("NCHAR", dialect=DIALECT).this: [(1,)], 50 exp.DataType.build("TEXT", dialect=DIALECT).this: [(65535,)], 51 exp.DataType.build("TIME", dialect=DIALECT).this: [(0,)], 52 exp.DataType.build("DATETIME", dialect=DIALECT).this: [(0,)], 53 exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(0,)], 54 }, 55 } 56 57 def get_current_catalog(self) -> t.Optional[str]: 58 """Returns the catalog name of the current connection.""" 59 return None 60 61 def create_index( 62 self, 63 table_name: TableName, 64 index_name: str, 65 columns: t.Tuple[str, ...], 66 exists: bool = True, 67 ) -> None: 68 # MySQL doesn't support IF EXISTS clause for indexes. 69 super().create_index(table_name, index_name, columns, exists=False) 70 71 def drop_schema( 72 self, 73 schema_name: SchemaName, 74 ignore_if_not_exists: bool = True, 75 cascade: bool = False, 76 **drop_args: t.Dict[str, exp.Expr], 77 ) -> None: 78 # MySQL doesn't support CASCADE clause and drops schemas unconditionally. 79 super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False) 80 81 def _get_data_objects( 82 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 83 ) -> t.List[DataObject]: 84 """ 85 Returns all the data objects that exist in the given schema and optionally catalog. 86 """ 87 query = ( 88 exp.select( 89 exp.column("table_name").as_("name"), 90 exp.column("table_schema").as_("schema_name"), 91 exp.case() 92 .when( 93 exp.column("table_type").eq("BASE TABLE"), 94 exp.Literal.string("table"), 95 ) 96 .when( 97 exp.column("table_type").eq("VIEW"), 98 exp.Literal.string("view"), 99 ) 100 .else_("table_type") 101 .as_("type"), 102 ) 103 .from_(exp.table_("tables", db="information_schema")) 104 .where(exp.column("table_schema").eq(to_schema(schema_name).db)) 105 ) 106 if object_names: 107 query = query.where(exp.column("table_name").isin(*object_names)) 108 df = self.fetchdf(query) 109 return [ 110 DataObject( 111 schema=row.schema_name, 112 name=row.name, 113 type=DataObjectType.from_str(row.type), # type: ignore 114 ) 115 for row in df.itertuples() 116 ] 117 118 def _build_create_comment_table_exp( 119 self, table: exp.Table, table_comment: str, table_kind: str 120 ) -> exp.Comment | str: 121 table_sql = table.sql(dialect=self.dialect, identify=True) 122 123 truncated_comment = self._truncate_table_comment(table_comment) 124 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 125 126 return f"ALTER TABLE {table_sql} COMMENT = {comment_sql}" 127 128 def _create_column_comments( 129 self, 130 table_name: TableName, 131 column_comments: t.Dict[str, str], 132 table_kind: str = "TABLE", 133 materialized_view: bool = False, 134 ) -> None: 135 table = exp.to_table(table_name) 136 table_sql = table.sql(dialect=self.dialect, identify=True) 137 138 # MySQL ALTER TABLE MODIFY completely replaces the column (overwriting options and constraints). 139 # self.columns() only returns the column types so doesn't allow us to fully/correctly replace a column definition. 140 # To get the full column definition we retrieve and parse the table's CREATE TABLE statement. 141 create_table_exp = parse_one( 142 self.fetchone(f"SHOW CREATE TABLE {table_sql}")[1], # type: ignore 143 dialect=self.dialect, 144 ) 145 col_def_exps = { 146 col_def.name: col_def.copy() 147 for col_def in create_table_exp.find(exp.Schema).find_all(exp.ColumnDef) # type: ignore 148 } 149 150 for col in column_comments: 151 col_def = col_def_exps.get(col) 152 if col_def: 153 col_def.args["constraints"].extend( 154 self._build_col_comment_exp(col_def.alias_or_name, column_comments) 155 ) 156 157 try: 158 self.execute( 159 f"ALTER TABLE {table_sql} MODIFY {col_def.sql(dialect=self.dialect, identify=True)}", 160 ) 161 except Exception: 162 logger.warning( 163 f"Column comments for column '{col_def.alias_or_name}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions.", 164 exc_info=True, 165 ) 166 167 def _create_table_like( 168 self, 169 target_table_name: TableName, 170 source_table_name: TableName, 171 exists: bool, 172 **kwargs: t.Any, 173 ) -> None: 174 self.execute( 175 exp.Create( 176 this=exp.to_table(target_table_name), 177 kind="TABLE", 178 exists=exists, 179 properties=exp.Properties( 180 expressions=[ 181 exp.LikeProperty( 182 this=exp.to_table(source_table_name), 183 ), 184 ], 185 ), 186 ) 187 ) 188 189 def ping(self) -> None: 190 self._connection_pool.get().ping(reconnect=False)
logger =
<Logger sqlmesh.core.engine_adapter.mysql (WARNING)>
@set_catalog()
class
MySQLEngineAdapter30@set_catalog() 31class MySQLEngineAdapter( 32 LogicalMergeMixin, PandasNativeFetchDFSupportMixin, NonTransactionalTruncateMixin, RowDiffMixin 33): 34 DEFAULT_BATCH_SIZE = 200 35 DIALECT = "mysql" 36 SUPPORTS_INDEXES = True 37 COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS 38 COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED 39 MAX_TABLE_COMMENT_LENGTH = 2048 40 MAX_COLUMN_COMMENT_LENGTH = 1024 41 SUPPORTS_REPLACE_TABLE = False 42 MAX_IDENTIFIER_LENGTH = 64 43 SUPPORTS_QUERY_EXECUTION_TRACKING = True 44 SCHEMA_DIFFER_KWARGS = { 45 "parameterized_type_defaults": { 46 exp.DataType.build("BIT", dialect=DIALECT).this: [(1,)], 47 exp.DataType.build("BINARY", dialect=DIALECT).this: [(1,)], 48 exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(10, 0), (0,)], 49 exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)], 50 exp.DataType.build("NCHAR", dialect=DIALECT).this: [(1,)], 51 exp.DataType.build("TEXT", dialect=DIALECT).this: [(65535,)], 52 exp.DataType.build("TIME", dialect=DIALECT).this: [(0,)], 53 exp.DataType.build("DATETIME", dialect=DIALECT).this: [(0,)], 54 exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(0,)], 55 }, 56 } 57 58 def get_current_catalog(self) -> t.Optional[str]: 59 """Returns the catalog name of the current connection.""" 60 return None 61 62 def create_index( 63 self, 64 table_name: TableName, 65 index_name: str, 66 columns: t.Tuple[str, ...], 67 exists: bool = True, 68 ) -> None: 69 # MySQL doesn't support IF EXISTS clause for indexes. 70 super().create_index(table_name, index_name, columns, exists=False) 71 72 def drop_schema( 73 self, 74 schema_name: SchemaName, 75 ignore_if_not_exists: bool = True, 76 cascade: bool = False, 77 **drop_args: t.Dict[str, exp.Expr], 78 ) -> None: 79 # MySQL doesn't support CASCADE clause and drops schemas unconditionally. 80 super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False) 81 82 def _get_data_objects( 83 self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None 84 ) -> t.List[DataObject]: 85 """ 86 Returns all the data objects that exist in the given schema and optionally catalog. 87 """ 88 query = ( 89 exp.select( 90 exp.column("table_name").as_("name"), 91 exp.column("table_schema").as_("schema_name"), 92 exp.case() 93 .when( 94 exp.column("table_type").eq("BASE TABLE"), 95 exp.Literal.string("table"), 96 ) 97 .when( 98 exp.column("table_type").eq("VIEW"), 99 exp.Literal.string("view"), 100 ) 101 .else_("table_type") 102 .as_("type"), 103 ) 104 .from_(exp.table_("tables", db="information_schema")) 105 .where(exp.column("table_schema").eq(to_schema(schema_name).db)) 106 ) 107 if object_names: 108 query = query.where(exp.column("table_name").isin(*object_names)) 109 df = self.fetchdf(query) 110 return [ 111 DataObject( 112 schema=row.schema_name, 113 name=row.name, 114 type=DataObjectType.from_str(row.type), # type: ignore 115 ) 116 for row in df.itertuples() 117 ] 118 119 def _build_create_comment_table_exp( 120 self, table: exp.Table, table_comment: str, table_kind: str 121 ) -> exp.Comment | str: 122 table_sql = table.sql(dialect=self.dialect, identify=True) 123 124 truncated_comment = self._truncate_table_comment(table_comment) 125 comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect) 126 127 return f"ALTER TABLE {table_sql} COMMENT = {comment_sql}" 128 129 def _create_column_comments( 130 self, 131 table_name: TableName, 132 column_comments: t.Dict[str, str], 133 table_kind: str = "TABLE", 134 materialized_view: bool = False, 135 ) -> None: 136 table = exp.to_table(table_name) 137 table_sql = table.sql(dialect=self.dialect, identify=True) 138 139 # MySQL ALTER TABLE MODIFY completely replaces the column (overwriting options and constraints). 140 # self.columns() only returns the column types so doesn't allow us to fully/correctly replace a column definition. 141 # To get the full column definition we retrieve and parse the table's CREATE TABLE statement. 142 create_table_exp = parse_one( 143 self.fetchone(f"SHOW CREATE TABLE {table_sql}")[1], # type: ignore 144 dialect=self.dialect, 145 ) 146 col_def_exps = { 147 col_def.name: col_def.copy() 148 for col_def in create_table_exp.find(exp.Schema).find_all(exp.ColumnDef) # type: ignore 149 } 150 151 for col in column_comments: 152 col_def = col_def_exps.get(col) 153 if col_def: 154 col_def.args["constraints"].extend( 155 self._build_col_comment_exp(col_def.alias_or_name, column_comments) 156 ) 157 158 try: 159 self.execute( 160 f"ALTER TABLE {table_sql} MODIFY {col_def.sql(dialect=self.dialect, identify=True)}", 161 ) 162 except Exception: 163 logger.warning( 164 f"Column comments for column '{col_def.alias_or_name}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions.", 165 exc_info=True, 166 ) 167 168 def _create_table_like( 169 self, 170 target_table_name: TableName, 171 source_table_name: TableName, 172 exists: bool, 173 **kwargs: t.Any, 174 ) -> None: 175 self.execute( 176 exp.Create( 177 this=exp.to_table(target_table_name), 178 kind="TABLE", 179 exists=exists, 180 properties=exp.Properties( 181 expressions=[ 182 exp.LikeProperty( 183 this=exp.to_table(source_table_name), 184 ), 185 ], 186 ), 187 ) 188 ) 189 190 def ping(self) -> None: 191 self._connection_pool.get().ping(reconnect=False)
Base class wrapping a Database API compliant connection.
The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.
Arguments:
- connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
- dialect: The dialect with which this adapter is associated.
- multithreaded: Indicates whether this adapter will be used by more than one thread.
SCHEMA_DIFFER_KWARGS =
{'parameterized_type_defaults': {<DType.BIT: 'BIT'>: [(1,)], <DType.BINARY: 'BINARY'>: [(1,)], <DType.DECIMAL: 'DECIMAL'>: [(10, 0), (0,)], <DType.CHAR: 'CHAR'>: [(1,)], <DType.NCHAR: 'NCHAR'>: [(1,)], <DType.TEXT: 'TEXT'>: [(65535,)], <DType.TIME: 'TIME'>: [(0,)], <DType.DATETIME: 'DATETIME'>: [(0,)], <DType.TIMESTAMPTZ: 'TIMESTAMPTZ'>: [(0,)]}}
def
get_current_catalog(self) -> Optional[str]:
58 def get_current_catalog(self) -> t.Optional[str]: 59 """Returns the catalog name of the current connection.""" 60 return None
Returns the catalog name of the current connection.
def
create_index( self, table_name: Union[str, sqlglot.expressions.query.Table], index_name: str, columns: Tuple[str, ...], exists: bool = True) -> None:
62 def create_index( 63 self, 64 table_name: TableName, 65 index_name: str, 66 columns: t.Tuple[str, ...], 67 exists: bool = True, 68 ) -> None: 69 # MySQL doesn't support IF EXISTS clause for indexes. 70 super().create_index(table_name, index_name, columns, exists=False)
Creates a new index for the given table if supported
Arguments:
- table_name: The name of the target table.
- index_name: The name of the index.
- columns: The list of columns that constitute the index.
- exists: Indicates whether to include the IF NOT EXISTS check.
def
drop_schema( self, schema_name: Union[str, sqlglot.expressions.query.Table], ignore_if_not_exists: bool = True, cascade: bool = False, **drop_args: Dict[str, sqlglot.expressions.core.Expr]) -> None:
72 def drop_schema( 73 self, 74 schema_name: SchemaName, 75 ignore_if_not_exists: bool = True, 76 cascade: bool = False, 77 **drop_args: t.Dict[str, exp.Expr], 78 ) -> None: 79 # MySQL doesn't support CASCADE clause and drops schemas unconditionally. 80 super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False)
def
merge( self, target_table: Union[str, sqlglot.expressions.query.Table], source_table: <MagicMock id='132726895264336'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]], unique_key: Sequence[sqlglot.expressions.core.Expr], when_matched: Optional[sqlglot.expressions.dml.Whens] = None, merge_filter: Optional[sqlglot.expressions.core.Expr] = None, source_columns: Optional[List[str]] = None, **kwargs: Any) -> None:
37 def merge( 38 self, 39 target_table: TableName, 40 source_table: QueryOrDF, 41 target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]], 42 unique_key: t.Sequence[exp.Expr], 43 when_matched: t.Optional[exp.Whens] = None, 44 merge_filter: t.Optional[exp.Expr] = None, 45 source_columns: t.Optional[t.List[str]] = None, 46 **kwargs: t.Any, 47 ) -> None: 48 logical_merge( 49 self, 50 target_table, 51 source_table, 52 target_columns_to_types, 53 unique_key, 54 when_matched=when_matched, 55 merge_filter=merge_filter, 56 source_columns=source_columns, 57 )
Inherited Members
- sqlmesh.core.engine_adapter.base.EngineAdapter
- EngineAdapter
- DATA_OBJECT_FILTER_BATCH_SIZE
- SUPPORTS_TRANSACTIONS
- INSERT_OVERWRITE_STRATEGY
- SUPPORTS_MATERIALIZED_VIEWS
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTS_CREATE_DROP_CATALOG
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- SUPPORTS_TUPLE_IN
- HAS_VIEW_BINDING
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- ATTACH_CORRELATION_ID
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- catalog_support
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- set_current_catalog
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- alter_table
- create_view
- create_schema
- drop_view
- create_catalog
- drop_catalog
- columns
- table_exists
- delete_from
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- get_table_last_modified_ts