Edit on GitHub

sqlmesh.core.engine_adapter.mysql

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5
  6from sqlglot import exp, parse_one
  7
  8from sqlmesh.core.dialect import to_schema
  9from sqlmesh.core.engine_adapter.mixins import (
 10    LogicalMergeMixin,
 11    NonTransactionalTruncateMixin,
 12    PandasNativeFetchDFSupportMixin,
 13    RowDiffMixin,
 14)
 15from sqlmesh.core.engine_adapter.shared import (
 16    CommentCreationTable,
 17    CommentCreationView,
 18    DataObject,
 19    DataObjectType,
 20    set_catalog,
 21)
 22
 23if t.TYPE_CHECKING:
 24    from sqlmesh.core._typing import SchemaName, TableName
 25
 26logger = logging.getLogger(__name__)
 27
 28
 29@set_catalog()
 30class MySQLEngineAdapter(
 31    LogicalMergeMixin, PandasNativeFetchDFSupportMixin, NonTransactionalTruncateMixin, RowDiffMixin
 32):
 33    DEFAULT_BATCH_SIZE = 200
 34    DIALECT = "mysql"
 35    SUPPORTS_INDEXES = True
 36    COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
 37    COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
 38    MAX_TABLE_COMMENT_LENGTH = 2048
 39    MAX_COLUMN_COMMENT_LENGTH = 1024
 40    SUPPORTS_REPLACE_TABLE = False
 41    MAX_IDENTIFIER_LENGTH = 64
 42    SUPPORTS_QUERY_EXECUTION_TRACKING = True
 43    SCHEMA_DIFFER_KWARGS = {
 44        "parameterized_type_defaults": {
 45            exp.DataType.build("BIT", dialect=DIALECT).this: [(1,)],
 46            exp.DataType.build("BINARY", dialect=DIALECT).this: [(1,)],
 47            exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(10, 0), (0,)],
 48            exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)],
 49            exp.DataType.build("NCHAR", dialect=DIALECT).this: [(1,)],
 50            exp.DataType.build("TEXT", dialect=DIALECT).this: [(65535,)],
 51            exp.DataType.build("TIME", dialect=DIALECT).this: [(0,)],
 52            exp.DataType.build("DATETIME", dialect=DIALECT).this: [(0,)],
 53            exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(0,)],
 54        },
 55    }
 56
 57    def get_current_catalog(self) -> t.Optional[str]:
 58        """Returns the catalog name of the current connection."""
 59        return None
 60
 61    def create_index(
 62        self,
 63        table_name: TableName,
 64        index_name: str,
 65        columns: t.Tuple[str, ...],
 66        exists: bool = True,
 67    ) -> None:
 68        # MySQL doesn't support IF EXISTS clause for indexes.
 69        super().create_index(table_name, index_name, columns, exists=False)
 70
 71    def drop_schema(
 72        self,
 73        schema_name: SchemaName,
 74        ignore_if_not_exists: bool = True,
 75        cascade: bool = False,
 76        **drop_args: t.Dict[str, exp.Expr],
 77    ) -> None:
 78        # MySQL doesn't support CASCADE clause and drops schemas unconditionally.
 79        super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False)
 80
 81    def _get_data_objects(
 82        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
 83    ) -> t.List[DataObject]:
 84        """
 85        Returns all the data objects that exist in the given schema and optionally catalog.
 86        """
 87        query = (
 88            exp.select(
 89                exp.column("table_name").as_("name"),
 90                exp.column("table_schema").as_("schema_name"),
 91                exp.case()
 92                .when(
 93                    exp.column("table_type").eq("BASE TABLE"),
 94                    exp.Literal.string("table"),
 95                )
 96                .when(
 97                    exp.column("table_type").eq("VIEW"),
 98                    exp.Literal.string("view"),
 99                )
100                .else_("table_type")
101                .as_("type"),
102            )
103            .from_(exp.table_("tables", db="information_schema"))
104            .where(exp.column("table_schema").eq(to_schema(schema_name).db))
105        )
106        if object_names:
107            query = query.where(exp.column("table_name").isin(*object_names))
108        df = self.fetchdf(query)
109        return [
110            DataObject(
111                schema=row.schema_name,
112                name=row.name,
113                type=DataObjectType.from_str(row.type),  # type: ignore
114            )
115            for row in df.itertuples()
116        ]
117
118    def _build_create_comment_table_exp(
119        self, table: exp.Table, table_comment: str, table_kind: str
120    ) -> exp.Comment | str:
121        table_sql = table.sql(dialect=self.dialect, identify=True)
122
123        truncated_comment = self._truncate_table_comment(table_comment)
124        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
125
126        return f"ALTER TABLE {table_sql} COMMENT = {comment_sql}"
127
128    def _create_column_comments(
129        self,
130        table_name: TableName,
131        column_comments: t.Dict[str, str],
132        table_kind: str = "TABLE",
133        materialized_view: bool = False,
134    ) -> None:
135        table = exp.to_table(table_name)
136        table_sql = table.sql(dialect=self.dialect, identify=True)
137
138        # MySQL ALTER TABLE MODIFY completely replaces the column (overwriting options and constraints).
139        # self.columns() only returns the column types so doesn't allow us to fully/correctly replace a column definition.
140        # To get the full column definition we retrieve and parse the table's CREATE TABLE statement.
141        create_table_exp = parse_one(
142            self.fetchone(f"SHOW CREATE TABLE {table_sql}")[1],  # type: ignore
143            dialect=self.dialect,
144        )
145        col_def_exps = {
146            col_def.name: col_def.copy()
147            for col_def in create_table_exp.find(exp.Schema).find_all(exp.ColumnDef)  # type: ignore
148        }
149
150        for col in column_comments:
151            col_def = col_def_exps.get(col)
152            if col_def:
153                col_def.args["constraints"].extend(
154                    self._build_col_comment_exp(col_def.alias_or_name, column_comments)
155                )
156
157                try:
158                    self.execute(
159                        f"ALTER TABLE {table_sql} MODIFY {col_def.sql(dialect=self.dialect, identify=True)}",
160                    )
161                except Exception:
162                    logger.warning(
163                        f"Column comments for column '{col_def.alias_or_name}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions.",
164                        exc_info=True,
165                    )
166
167    def _create_table_like(
168        self,
169        target_table_name: TableName,
170        source_table_name: TableName,
171        exists: bool,
172        **kwargs: t.Any,
173    ) -> None:
174        self.execute(
175            exp.Create(
176                this=exp.to_table(target_table_name),
177                kind="TABLE",
178                exists=exists,
179                properties=exp.Properties(
180                    expressions=[
181                        exp.LikeProperty(
182                            this=exp.to_table(source_table_name),
183                        ),
184                    ],
185                ),
186            )
187        )
188
189    def ping(self) -> None:
190        self._connection_pool.get().ping(reconnect=False)
logger = <Logger sqlmesh.core.engine_adapter.mysql (WARNING)>
 30@set_catalog()
 31class MySQLEngineAdapter(
 32    LogicalMergeMixin, PandasNativeFetchDFSupportMixin, NonTransactionalTruncateMixin, RowDiffMixin
 33):
 34    DEFAULT_BATCH_SIZE = 200
 35    DIALECT = "mysql"
 36    SUPPORTS_INDEXES = True
 37    COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
 38    COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
 39    MAX_TABLE_COMMENT_LENGTH = 2048
 40    MAX_COLUMN_COMMENT_LENGTH = 1024
 41    SUPPORTS_REPLACE_TABLE = False
 42    MAX_IDENTIFIER_LENGTH = 64
 43    SUPPORTS_QUERY_EXECUTION_TRACKING = True
 44    SCHEMA_DIFFER_KWARGS = {
 45        "parameterized_type_defaults": {
 46            exp.DataType.build("BIT", dialect=DIALECT).this: [(1,)],
 47            exp.DataType.build("BINARY", dialect=DIALECT).this: [(1,)],
 48            exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(10, 0), (0,)],
 49            exp.DataType.build("CHAR", dialect=DIALECT).this: [(1,)],
 50            exp.DataType.build("NCHAR", dialect=DIALECT).this: [(1,)],
 51            exp.DataType.build("TEXT", dialect=DIALECT).this: [(65535,)],
 52            exp.DataType.build("TIME", dialect=DIALECT).this: [(0,)],
 53            exp.DataType.build("DATETIME", dialect=DIALECT).this: [(0,)],
 54            exp.DataType.build("TIMESTAMP", dialect=DIALECT).this: [(0,)],
 55        },
 56    }
 57
 58    def get_current_catalog(self) -> t.Optional[str]:
 59        """Returns the catalog name of the current connection."""
 60        return None
 61
 62    def create_index(
 63        self,
 64        table_name: TableName,
 65        index_name: str,
 66        columns: t.Tuple[str, ...],
 67        exists: bool = True,
 68    ) -> None:
 69        # MySQL doesn't support IF EXISTS clause for indexes.
 70        super().create_index(table_name, index_name, columns, exists=False)
 71
 72    def drop_schema(
 73        self,
 74        schema_name: SchemaName,
 75        ignore_if_not_exists: bool = True,
 76        cascade: bool = False,
 77        **drop_args: t.Dict[str, exp.Expr],
 78    ) -> None:
 79        # MySQL doesn't support CASCADE clause and drops schemas unconditionally.
 80        super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False)
 81
 82    def _get_data_objects(
 83        self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
 84    ) -> t.List[DataObject]:
 85        """
 86        Returns all the data objects that exist in the given schema and optionally catalog.
 87        """
 88        query = (
 89            exp.select(
 90                exp.column("table_name").as_("name"),
 91                exp.column("table_schema").as_("schema_name"),
 92                exp.case()
 93                .when(
 94                    exp.column("table_type").eq("BASE TABLE"),
 95                    exp.Literal.string("table"),
 96                )
 97                .when(
 98                    exp.column("table_type").eq("VIEW"),
 99                    exp.Literal.string("view"),
100                )
101                .else_("table_type")
102                .as_("type"),
103            )
104            .from_(exp.table_("tables", db="information_schema"))
105            .where(exp.column("table_schema").eq(to_schema(schema_name).db))
106        )
107        if object_names:
108            query = query.where(exp.column("table_name").isin(*object_names))
109        df = self.fetchdf(query)
110        return [
111            DataObject(
112                schema=row.schema_name,
113                name=row.name,
114                type=DataObjectType.from_str(row.type),  # type: ignore
115            )
116            for row in df.itertuples()
117        ]
118
119    def _build_create_comment_table_exp(
120        self, table: exp.Table, table_comment: str, table_kind: str
121    ) -> exp.Comment | str:
122        table_sql = table.sql(dialect=self.dialect, identify=True)
123
124        truncated_comment = self._truncate_table_comment(table_comment)
125        comment_sql = exp.Literal.string(truncated_comment).sql(dialect=self.dialect)
126
127        return f"ALTER TABLE {table_sql} COMMENT = {comment_sql}"
128
129    def _create_column_comments(
130        self,
131        table_name: TableName,
132        column_comments: t.Dict[str, str],
133        table_kind: str = "TABLE",
134        materialized_view: bool = False,
135    ) -> None:
136        table = exp.to_table(table_name)
137        table_sql = table.sql(dialect=self.dialect, identify=True)
138
139        # MySQL ALTER TABLE MODIFY completely replaces the column (overwriting options and constraints).
140        # self.columns() only returns the column types so doesn't allow us to fully/correctly replace a column definition.
141        # To get the full column definition we retrieve and parse the table's CREATE TABLE statement.
142        create_table_exp = parse_one(
143            self.fetchone(f"SHOW CREATE TABLE {table_sql}")[1],  # type: ignore
144            dialect=self.dialect,
145        )
146        col_def_exps = {
147            col_def.name: col_def.copy()
148            for col_def in create_table_exp.find(exp.Schema).find_all(exp.ColumnDef)  # type: ignore
149        }
150
151        for col in column_comments:
152            col_def = col_def_exps.get(col)
153            if col_def:
154                col_def.args["constraints"].extend(
155                    self._build_col_comment_exp(col_def.alias_or_name, column_comments)
156                )
157
158                try:
159                    self.execute(
160                        f"ALTER TABLE {table_sql} MODIFY {col_def.sql(dialect=self.dialect, identify=True)}",
161                    )
162                except Exception:
163                    logger.warning(
164                        f"Column comments for column '{col_def.alias_or_name}' in table '{table.alias_or_name}' not registered - this may be due to limited permissions.",
165                        exc_info=True,
166                    )
167
168    def _create_table_like(
169        self,
170        target_table_name: TableName,
171        source_table_name: TableName,
172        exists: bool,
173        **kwargs: t.Any,
174    ) -> None:
175        self.execute(
176            exp.Create(
177                this=exp.to_table(target_table_name),
178                kind="TABLE",
179                exists=exists,
180                properties=exp.Properties(
181                    expressions=[
182                        exp.LikeProperty(
183                            this=exp.to_table(source_table_name),
184                        ),
185                    ],
186                ),
187            )
188        )
189
190    def ping(self) -> None:
191        self._connection_pool.get().ping(reconnect=False)

Base class wrapping a Database API compliant connection.

The EngineAdapter is an easily-subclassable interface that interacts with the underlying engine and data store.

Arguments:
  • connection_factory_or_pool: a callable which produces a new Database API-compliant connection on every call.
  • dialect: The dialect with which this adapter is associated.
  • multithreaded: Indicates whether this adapter will be used by more than one thread.
DEFAULT_BATCH_SIZE = 200
DIALECT = 'mysql'
SUPPORTS_INDEXES = True
COMMENT_CREATION_TABLE = <CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS: 3>
COMMENT_CREATION_VIEW = <CommentCreationView.UNSUPPORTED: 1>
MAX_TABLE_COMMENT_LENGTH = 2048
MAX_COLUMN_COMMENT_LENGTH = 1024
SUPPORTS_REPLACE_TABLE = False
MAX_IDENTIFIER_LENGTH = 64
SUPPORTS_QUERY_EXECUTION_TRACKING = True
SCHEMA_DIFFER_KWARGS = {'parameterized_type_defaults': {<DType.BIT: 'BIT'>: [(1,)], <DType.BINARY: 'BINARY'>: [(1,)], <DType.DECIMAL: 'DECIMAL'>: [(10, 0), (0,)], <DType.CHAR: 'CHAR'>: [(1,)], <DType.NCHAR: 'NCHAR'>: [(1,)], <DType.TEXT: 'TEXT'>: [(65535,)], <DType.TIME: 'TIME'>: [(0,)], <DType.DATETIME: 'DATETIME'>: [(0,)], <DType.TIMESTAMPTZ: 'TIMESTAMPTZ'>: [(0,)]}}
def get_current_catalog(self) -> Optional[str]:
58    def get_current_catalog(self) -> t.Optional[str]:
59        """Returns the catalog name of the current connection."""
60        return None

Returns the catalog name of the current connection.

def create_index( self, table_name: Union[str, sqlglot.expressions.query.Table], index_name: str, columns: Tuple[str, ...], exists: bool = True) -> None:
62    def create_index(
63        self,
64        table_name: TableName,
65        index_name: str,
66        columns: t.Tuple[str, ...],
67        exists: bool = True,
68    ) -> None:
69        # MySQL doesn't support IF EXISTS clause for indexes.
70        super().create_index(table_name, index_name, columns, exists=False)

Creates a new index for the given table if supported

Arguments:
  • table_name: The name of the target table.
  • index_name: The name of the index.
  • columns: The list of columns that constitute the index.
  • exists: Indicates whether to include the IF NOT EXISTS check.
def drop_schema( self, schema_name: Union[str, sqlglot.expressions.query.Table], ignore_if_not_exists: bool = True, cascade: bool = False, **drop_args: Dict[str, sqlglot.expressions.core.Expr]) -> None:
72    def drop_schema(
73        self,
74        schema_name: SchemaName,
75        ignore_if_not_exists: bool = True,
76        cascade: bool = False,
77        **drop_args: t.Dict[str, exp.Expr],
78    ) -> None:
79        # MySQL doesn't support CASCADE clause and drops schemas unconditionally.
80        super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False)
def ping(self) -> None:
190    def ping(self) -> None:
191        self._connection_pool.get().ping(reconnect=False)
def merge( self, target_table: Union[str, sqlglot.expressions.query.Table], source_table: <MagicMock id='132726895264336'>, target_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]], unique_key: Sequence[sqlglot.expressions.core.Expr], when_matched: Optional[sqlglot.expressions.dml.Whens] = None, merge_filter: Optional[sqlglot.expressions.core.Expr] = None, source_columns: Optional[List[str]] = None, **kwargs: Any) -> None:
37    def merge(
38        self,
39        target_table: TableName,
40        source_table: QueryOrDF,
41        target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
42        unique_key: t.Sequence[exp.Expr],
43        when_matched: t.Optional[exp.Whens] = None,
44        merge_filter: t.Optional[exp.Expr] = None,
45        source_columns: t.Optional[t.List[str]] = None,
46        **kwargs: t.Any,
47    ) -> None:
48        logical_merge(
49            self,
50            target_table,
51            source_table,
52            target_columns_to_types,
53            unique_key,
54            when_matched=when_matched,
55            merge_filter=merge_filter,
56            source_columns=source_columns,
57        )
Inherited Members
sqlmesh.core.engine_adapter.base.EngineAdapter
EngineAdapter
DATA_OBJECT_FILTER_BATCH_SIZE
SUPPORTS_TRANSACTIONS
INSERT_OVERWRITE_STRATEGY
SUPPORTS_MATERIALIZED_VIEWS
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTS_CREATE_DROP_CATALOG
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
SUPPORTS_TUPLE_IN
HAS_VIEW_BINDING
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
ATTACH_CORRELATION_ID
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
catalog_support
schema_differ
default_catalog
engine_run_mode
recycle
close
set_current_catalog
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
alter_table
create_view
create_schema
drop_view
create_catalog
drop_catalog
columns
table_exists
delete_from
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
get_table_last_modified_ts
sqlmesh.core.engine_adapter.mixins.RowDiffMixin
MAX_TIMESTAMP_PRECISION
concat_columns
normalize_value