Edit on GitHub

sqlmesh.core.engine_adapter.shared

  1from __future__ import annotations
  2
  3import functools
  4import inspect
  5import logging
  6import types
  7import typing as t
  8from enum import Enum
  9
 10from pydantic import Field
 11from sqlglot import exp
 12
 13from sqlmesh.core.dialect import to_schema
 14from sqlmesh.utils.errors import UnsupportedCatalogOperationError, SQLMeshError
 15from sqlmesh.utils.pydantic import PydanticModel
 16
 17if t.TYPE_CHECKING:
 18    from sqlmesh.core.engine_adapter._typing import Query
 19    from sqlmesh.core.engine_adapter.base import EngineAdapter
 20
 21
 22logger = logging.getLogger(__name__)
 23
 24
 25class DataObjectType(str, Enum):
 26    UNKNOWN = "unknown"
 27    TABLE = "table"
 28    VIEW = "view"
 29    MATERIALIZED_VIEW = "materialized_view"
 30    MANAGED_TABLE = "managed_table"
 31
 32    @property
 33    def is_unknown(self) -> bool:
 34        return self == DataObjectType.UNKNOWN
 35
 36    @property
 37    def is_table(self) -> bool:
 38        return self == DataObjectType.TABLE
 39
 40    @property
 41    def is_view(self) -> bool:
 42        return self == DataObjectType.VIEW
 43
 44    @property
 45    def is_materialized_view(self) -> bool:
 46        return self == DataObjectType.MATERIALIZED_VIEW
 47
 48    @property
 49    def is_managed_table(self) -> bool:
 50        return self == DataObjectType.MANAGED_TABLE
 51
 52    @classmethod
 53    def from_str(cls, s: str) -> DataObjectType:
 54        s = s.lower()
 55        if s == "table":
 56            return DataObjectType.TABLE
 57        if s == "view":
 58            return DataObjectType.VIEW
 59        if s == "materialized_view":
 60            return DataObjectType.MATERIALIZED_VIEW
 61        if s == "managed_table":
 62            return DataObjectType.MANAGED_TABLE
 63        return DataObjectType.UNKNOWN
 64
 65
 66class CommentCreationTable(Enum):
 67    """
 68    Enum for SQL engine TABLE comment support.
 69
 70    UNSUPPORTED = no comments at all
 71    IN_SCHEMA_DEF_CTAS = comments can be registered in CREATE schema definitions, including CTAS calls
 72    IN_SCHEMA_DEF_NO_CTAS = comments can be registered in CREATE schema definitions, excluding CTAS calls
 73    COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER`
 74    """
 75
 76    UNSUPPORTED = 1
 77    IN_SCHEMA_DEF_CTAS = 2
 78    IN_SCHEMA_DEF_NO_CTAS = 3
 79    COMMENT_COMMAND_ONLY = 4
 80
 81    @property
 82    def is_unsupported(self) -> bool:
 83        return self == CommentCreationTable.UNSUPPORTED
 84
 85    @property
 86    def is_in_schema_def_ctas(self) -> bool:
 87        return self == CommentCreationTable.IN_SCHEMA_DEF_CTAS
 88
 89    @property
 90    def is_in_schema_def_no_ctas(self) -> bool:
 91        return self == CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
 92
 93    @property
 94    def is_comment_command_only(self) -> bool:
 95        return self == CommentCreationTable.COMMENT_COMMAND_ONLY
 96
 97    @property
 98    def is_supported(self) -> bool:
 99        return self != CommentCreationTable.UNSUPPORTED
100
101    @property
102    def supports_schema_def(self) -> bool:
103        return self in (
104            CommentCreationTable.IN_SCHEMA_DEF_CTAS,
105            CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS,
106        )
107
108
109class CommentCreationView(Enum):
110    """
111    Enum for SQL engine VIEW comment support.
112
113    UNSUPPORTED = no comments at all
114    IN_SCHEMA_DEF_AND_COMMANDS = all comments can be registered in CREATE VIEW schema definitions
115                                   and in post-creation commands
116    IN_SCHEMA_DEF_NO_COMMANDS = all comments can be registered in CREATE VIEW schema definitions,
117                                  but not in post-creation commands
118    COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER`
119    """
120
121    UNSUPPORTED = 1
122    IN_SCHEMA_DEF_AND_COMMANDS = 2
123    IN_SCHEMA_DEF_NO_COMMANDS = 3
124    COMMENT_COMMAND_ONLY = 4
125
126    @property
127    def is_unsupported(self) -> bool:
128        return self == CommentCreationView.UNSUPPORTED
129
130    @property
131    def is_in_schema_def_and_commands(self) -> bool:
132        return self == CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS
133
134    @property
135    def is_in_schema_def_no_commands(self) -> bool:
136        return self == CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS
137
138    @property
139    def is_comment_command_only(self) -> bool:
140        return self == CommentCreationView.COMMENT_COMMAND_ONLY
141
142    @property
143    def is_supported(self) -> bool:
144        return self != CommentCreationView.UNSUPPORTED
145
146    @property
147    def supports_schema_def(self) -> bool:
148        return self in (
149            CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS,
150            CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS,
151        )
152
153    @property
154    def supports_column_comment_commands(self) -> bool:
155        return self in (
156            CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS,
157            CommentCreationView.COMMENT_COMMAND_ONLY,
158        )
159
160
161class DataObject(PydanticModel):
162    catalog: t.Optional[str] = None
163    schema_name: str = Field(alias="schema")
164    name: str
165    type: DataObjectType
166
167    # for type=DataObjectType.Table, only if the DB supports it
168    clustering_key: t.Optional[str] = None
169
170    @property
171    def is_clustered(self) -> bool:
172        return bool(self.clustering_key)
173
174    def to_table(self) -> exp.Table:
175        return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True)
176
177
178class CatalogSupport(Enum):
179    # The engine has no concept of catalogs
180    UNSUPPORTED = 1
181
182    # The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables
183    SINGLE_CATALOG_ONLY = 2
184
185    # The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding
186    REQUIRES_SET_CATALOG = 3
187
188    # The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first)
189    FULL_SUPPORT = 4
190
191    @property
192    def is_unsupported(self) -> bool:
193        return self == CatalogSupport.UNSUPPORTED
194
195    @property
196    def is_single_catalog_only(self) -> bool:
197        return self == CatalogSupport.SINGLE_CATALOG_ONLY
198
199    @property
200    def is_requires_set_catalog(self) -> bool:
201        return self == CatalogSupport.REQUIRES_SET_CATALOG
202
203    @property
204    def is_full_support(self) -> bool:
205        return self == CatalogSupport.FULL_SUPPORT
206
207    @property
208    def is_multi_catalog_supported(self) -> bool:
209        return self.is_requires_set_catalog or self.is_full_support
210
211
212class EngineRunMode(Enum):
213    SINGLE_MODE_ENGINE = 1
214    STANDALONE = 2
215    CLUSTER = 3
216    CLOUD = 4
217
218    @property
219    def is_single_mode_engine(self) -> bool:
220        return self == EngineRunMode.SINGLE_MODE_ENGINE
221
222    @property
223    def is_standalone(self) -> bool:
224        return self == EngineRunMode.STANDALONE
225
226    @property
227    def is_cluster(self) -> bool:
228        return self == EngineRunMode.CLUSTER
229
230    @property
231    def is_cloud(self) -> bool:
232        return self == EngineRunMode.CLOUD
233
234
235class InsertOverwriteStrategy(Enum):
236    # First, issue a DELETE to clear the data range. Then, issue an INSERT query to insert the new data
237    DELETE_INSERT = 1
238    # Issue a single INSERT OVERWRITE query to replace a data range.
239    INSERT_OVERWRITE = 2
240    # Issue a single INSERT INTO... REPLACE WHERE query
241    # Note: Replace where on Databricks requires that `spark.sql.sources.partitionOverwriteMode` be set to `static`
242    REPLACE_WHERE = 3
243    # Issue a single INSERT query to replace a data range. The assumption is that the query engine will transparently match partition bounds
244    # and replace data rather than append to it. Trino is an example of this when `hive.insert-existing-partitions-behavior=OVERWRITE` is configured
245    INTO_IS_OVERWRITE = 4
246    # Do the INSERT OVERWRITE using merge since the engine doesn't support it natively
247    MERGE = 5
248
249    @property
250    def is_delete_insert(self) -> bool:
251        return self == InsertOverwriteStrategy.DELETE_INSERT
252
253    @property
254    def is_insert_overwrite(self) -> bool:
255        return self == InsertOverwriteStrategy.INSERT_OVERWRITE
256
257    @property
258    def is_replace_where(self) -> bool:
259        return self == InsertOverwriteStrategy.REPLACE_WHERE
260
261    @property
262    def is_into_is_overwrite(self) -> bool:
263        return self == InsertOverwriteStrategy.INTO_IS_OVERWRITE
264
265    @property
266    def is_merge(self) -> bool:
267        return self == InsertOverwriteStrategy.MERGE
268
269
270class SourceQuery:
271    def __init__(
272        self,
273        query_factory: t.Callable[[], Query],
274        cleanup_func: t.Optional[t.Callable[[], None]] = None,
275        transforms: t.Optional[t.List[t.Callable[[Query], Query]]] = None,
276        **kwargs: t.Any,
277    ) -> None:
278        self.query_factory = query_factory
279        self.cleanup_func = cleanup_func
280        self._transforms = transforms or []
281
282    def add_transform(self, transform: t.Callable[[Query], Query]) -> None:
283        self._transforms.append(transform)
284
285    def __enter__(self) -> Query:
286        query = self.query_factory()
287        for transform in self._transforms:
288            query = t.cast(exp.Query, query.transform(transform))
289        return query
290
291    def __exit__(
292        self,
293        exc_type: t.Optional[t.Type[BaseException]],
294        exc_val: t.Optional[BaseException],
295        exc_tb: t.Optional[types.TracebackType],
296    ) -> t.Optional[bool]:
297        if self.cleanup_func:
298            self.cleanup_func()
299        return None
300
301
302def set_catalog(override_mapping: t.Optional[t.Dict[str, CatalogSupport]] = None) -> t.Callable:
303    def set_catalog_decorator(
304        func: t.Callable,
305        target_name: str,
306        target_pos: int,
307        target_type: str,
308        override: t.Optional[CatalogSupport] = None,
309    ) -> t.Callable:
310        @functools.wraps(func)
311        def internal_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any:
312            # Need to convert args to list in order to later do assignment to the object
313            list_args = list(args)
314            engine_adapter = list_args[0]
315            catalog_support = override or engine_adapter.catalog_support
316            # If there is full catalog support then we have nothing to do
317            if catalog_support.is_full_support:
318                return func(*list_args, **kwargs)
319
320            obj, container, key = t.cast(
321                t.Tuple[t.Union[str, exp.Table], t.Union[t.Dict, t.List], t.Union[int, str]],
322                (
323                    (kwargs.get(target_name), kwargs, target_name)
324                    if kwargs.get(target_name)
325                    else (list_args[target_pos], list_args, target_pos)
326                ),
327            )
328            to_expression_func = t.cast(
329                t.Callable[[t.Union[str, exp.Table]], exp.Table],
330                exp.to_table if target_type == "TableName" else to_schema,
331            )
332            expression = to_expression_func(obj.copy() if isinstance(obj, exp.Table) else obj)
333            catalog_name = expression.catalog
334            if not catalog_name:
335                return func(*list_args, **kwargs)
336
337            # If we have a catalog and this engine doesn't support catalogs then we need to error
338            if catalog_support.is_unsupported:
339                raise UnsupportedCatalogOperationError(
340                    f"{engine_adapter.dialect} does not support catalogs and a catalog was provided: {catalog_name}"
341                )
342            # Remove the catalog name from the argument so the engine adapter doesn't try to use it
343            expression.set("catalog", None)
344            container[key] = expression  # type: ignore
345            if catalog_support.is_single_catalog_only:
346                if catalog_name != engine_adapter._default_catalog:
347                    raise SQLMeshError(
348                        f"{engine_adapter.dialect} requires that all catalog operations be against a single catalog: {engine_adapter._default_catalog}. Provided catalog: {catalog_name}"
349                    )
350                return func(*list_args, **kwargs)
351            # Set the catalog name on the engine adapter if needed
352            current_catalog = engine_adapter.get_current_catalog()
353            if catalog_name != current_catalog:
354                engine_adapter.set_current_catalog(catalog_name)
355                resp = func(*list_args, **kwargs)
356                engine_adapter.set_current_catalog(current_catalog)
357            else:
358                resp = func(*list_args, **kwargs)
359            return resp
360
361        return internal_wrapper
362
363    inclusion_list = {
364        "_get_data_objects",
365    }
366
367    # Exclude this to avoid a circular dependency from inspecting the classproperty
368    exclusion_list = {
369        "can_access_spark_session",
370    }
371
372    override_mapping = override_mapping or {}
373
374    def wrapper(cls: t.Type[EngineAdapter]) -> t.Callable:
375        for name in dir(cls):
376            if name in exclusion_list or (name.startswith("_") and name not in inclusion_list):
377                continue
378            m = getattr(cls, name)
379            if inspect.isfunction(m):
380                spec = inspect.getfullargspec(m)
381                for i, obj_name in enumerate(spec.args):
382                    obj_type = spec.annotations.get(obj_name)
383                    if obj_type not in {"SchemaName", "TableName"}:
384                        continue
385                    setattr(
386                        cls,
387                        name,
388                        set_catalog_decorator(
389                            m,
390                            target_name=obj_name,
391                            target_pos=i,
392                            target_type=obj_type,
393                            override=override_mapping.get(name),
394                        ),
395                    )
396        return cls
397
398    return wrapper
logger = <Logger sqlmesh.core.engine_adapter.shared (WARNING)>
class DataObjectType(builtins.str, enum.Enum):
26class DataObjectType(str, Enum):
27    UNKNOWN = "unknown"
28    TABLE = "table"
29    VIEW = "view"
30    MATERIALIZED_VIEW = "materialized_view"
31    MANAGED_TABLE = "managed_table"
32
33    @property
34    def is_unknown(self) -> bool:
35        return self == DataObjectType.UNKNOWN
36
37    @property
38    def is_table(self) -> bool:
39        return self == DataObjectType.TABLE
40
41    @property
42    def is_view(self) -> bool:
43        return self == DataObjectType.VIEW
44
45    @property
46    def is_materialized_view(self) -> bool:
47        return self == DataObjectType.MATERIALIZED_VIEW
48
49    @property
50    def is_managed_table(self) -> bool:
51        return self == DataObjectType.MANAGED_TABLE
52
53    @classmethod
54    def from_str(cls, s: str) -> DataObjectType:
55        s = s.lower()
56        if s == "table":
57            return DataObjectType.TABLE
58        if s == "view":
59            return DataObjectType.VIEW
60        if s == "materialized_view":
61            return DataObjectType.MATERIALIZED_VIEW
62        if s == "managed_table":
63            return DataObjectType.MANAGED_TABLE
64        return DataObjectType.UNKNOWN

An enumeration.

UNKNOWN = <DataObjectType.UNKNOWN: 'unknown'>
TABLE = <DataObjectType.TABLE: 'table'>
VIEW = <DataObjectType.VIEW: 'view'>
MATERIALIZED_VIEW = <DataObjectType.MATERIALIZED_VIEW: 'materialized_view'>
MANAGED_TABLE = <DataObjectType.MANAGED_TABLE: 'managed_table'>
is_unknown: bool
33    @property
34    def is_unknown(self) -> bool:
35        return self == DataObjectType.UNKNOWN
is_table: bool
37    @property
38    def is_table(self) -> bool:
39        return self == DataObjectType.TABLE
is_view: bool
41    @property
42    def is_view(self) -> bool:
43        return self == DataObjectType.VIEW
is_materialized_view: bool
45    @property
46    def is_materialized_view(self) -> bool:
47        return self == DataObjectType.MATERIALIZED_VIEW
is_managed_table: bool
49    @property
50    def is_managed_table(self) -> bool:
51        return self == DataObjectType.MANAGED_TABLE
@classmethod
def from_str(cls, s: str) -> DataObjectType:
53    @classmethod
54    def from_str(cls, s: str) -> DataObjectType:
55        s = s.lower()
56        if s == "table":
57            return DataObjectType.TABLE
58        if s == "view":
59            return DataObjectType.VIEW
60        if s == "materialized_view":
61            return DataObjectType.MATERIALIZED_VIEW
62        if s == "managed_table":
63            return DataObjectType.MANAGED_TABLE
64        return DataObjectType.UNKNOWN
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class CommentCreationTable(enum.Enum):
 67class CommentCreationTable(Enum):
 68    """
 69    Enum for SQL engine TABLE comment support.
 70
 71    UNSUPPORTED = no comments at all
 72    IN_SCHEMA_DEF_CTAS = comments can be registered in CREATE schema definitions, including CTAS calls
 73    IN_SCHEMA_DEF_NO_CTAS = comments can be registered in CREATE schema definitions, excluding CTAS calls
 74    COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER`
 75    """
 76
 77    UNSUPPORTED = 1
 78    IN_SCHEMA_DEF_CTAS = 2
 79    IN_SCHEMA_DEF_NO_CTAS = 3
 80    COMMENT_COMMAND_ONLY = 4
 81
 82    @property
 83    def is_unsupported(self) -> bool:
 84        return self == CommentCreationTable.UNSUPPORTED
 85
 86    @property
 87    def is_in_schema_def_ctas(self) -> bool:
 88        return self == CommentCreationTable.IN_SCHEMA_DEF_CTAS
 89
 90    @property
 91    def is_in_schema_def_no_ctas(self) -> bool:
 92        return self == CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
 93
 94    @property
 95    def is_comment_command_only(self) -> bool:
 96        return self == CommentCreationTable.COMMENT_COMMAND_ONLY
 97
 98    @property
 99    def is_supported(self) -> bool:
100        return self != CommentCreationTable.UNSUPPORTED
101
102    @property
103    def supports_schema_def(self) -> bool:
104        return self in (
105            CommentCreationTable.IN_SCHEMA_DEF_CTAS,
106            CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS,
107        )

Enum for SQL engine TABLE comment support.

UNSUPPORTED = no comments at all IN_SCHEMA_DEF_CTAS = comments can be registered in CREATE schema definitions, including CTAS calls IN_SCHEMA_DEF_NO_CTAS = comments can be registered in CREATE schema definitions, excluding CTAS calls COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like COMMENT or ALTER

IN_SCHEMA_DEF_CTAS = <CommentCreationTable.IN_SCHEMA_DEF_CTAS: 2>
IN_SCHEMA_DEF_NO_CTAS = <CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS: 3>
COMMENT_COMMAND_ONLY = <CommentCreationTable.COMMENT_COMMAND_ONLY: 4>
is_unsupported: bool
82    @property
83    def is_unsupported(self) -> bool:
84        return self == CommentCreationTable.UNSUPPORTED
is_in_schema_def_ctas: bool
86    @property
87    def is_in_schema_def_ctas(self) -> bool:
88        return self == CommentCreationTable.IN_SCHEMA_DEF_CTAS
is_in_schema_def_no_ctas: bool
90    @property
91    def is_in_schema_def_no_ctas(self) -> bool:
92        return self == CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
is_comment_command_only: bool
94    @property
95    def is_comment_command_only(self) -> bool:
96        return self == CommentCreationTable.COMMENT_COMMAND_ONLY
is_supported: bool
 98    @property
 99    def is_supported(self) -> bool:
100        return self != CommentCreationTable.UNSUPPORTED
supports_schema_def: bool
102    @property
103    def supports_schema_def(self) -> bool:
104        return self in (
105            CommentCreationTable.IN_SCHEMA_DEF_CTAS,
106            CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS,
107        )
Inherited Members
enum.Enum
name
value
class CommentCreationView(enum.Enum):
110class CommentCreationView(Enum):
111    """
112    Enum for SQL engine VIEW comment support.
113
114    UNSUPPORTED = no comments at all
115    IN_SCHEMA_DEF_AND_COMMANDS = all comments can be registered in CREATE VIEW schema definitions
116                                   and in post-creation commands
117    IN_SCHEMA_DEF_NO_COMMANDS = all comments can be registered in CREATE VIEW schema definitions,
118                                  but not in post-creation commands
119    COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER`
120    """
121
122    UNSUPPORTED = 1
123    IN_SCHEMA_DEF_AND_COMMANDS = 2
124    IN_SCHEMA_DEF_NO_COMMANDS = 3
125    COMMENT_COMMAND_ONLY = 4
126
127    @property
128    def is_unsupported(self) -> bool:
129        return self == CommentCreationView.UNSUPPORTED
130
131    @property
132    def is_in_schema_def_and_commands(self) -> bool:
133        return self == CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS
134
135    @property
136    def is_in_schema_def_no_commands(self) -> bool:
137        return self == CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS
138
139    @property
140    def is_comment_command_only(self) -> bool:
141        return self == CommentCreationView.COMMENT_COMMAND_ONLY
142
143    @property
144    def is_supported(self) -> bool:
145        return self != CommentCreationView.UNSUPPORTED
146
147    @property
148    def supports_schema_def(self) -> bool:
149        return self in (
150            CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS,
151            CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS,
152        )
153
154    @property
155    def supports_column_comment_commands(self) -> bool:
156        return self in (
157            CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS,
158            CommentCreationView.COMMENT_COMMAND_ONLY,
159        )

Enum for SQL engine VIEW comment support.

UNSUPPORTED = no comments at all IN_SCHEMA_DEF_AND_COMMANDS = all comments can be registered in CREATE VIEW schema definitions and in post-creation commands IN_SCHEMA_DEF_NO_COMMANDS = all comments can be registered in CREATE VIEW schema definitions, but not in post-creation commands COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like COMMENT or ALTER

UNSUPPORTED = <CommentCreationView.UNSUPPORTED: 1>
IN_SCHEMA_DEF_AND_COMMANDS = <CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS: 2>
IN_SCHEMA_DEF_NO_COMMANDS = <CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS: 3>
COMMENT_COMMAND_ONLY = <CommentCreationView.COMMENT_COMMAND_ONLY: 4>
is_unsupported: bool
127    @property
128    def is_unsupported(self) -> bool:
129        return self == CommentCreationView.UNSUPPORTED
is_in_schema_def_and_commands: bool
131    @property
132    def is_in_schema_def_and_commands(self) -> bool:
133        return self == CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS
is_in_schema_def_no_commands: bool
135    @property
136    def is_in_schema_def_no_commands(self) -> bool:
137        return self == CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS
is_comment_command_only: bool
139    @property
140    def is_comment_command_only(self) -> bool:
141        return self == CommentCreationView.COMMENT_COMMAND_ONLY
is_supported: bool
143    @property
144    def is_supported(self) -> bool:
145        return self != CommentCreationView.UNSUPPORTED
supports_schema_def: bool
147    @property
148    def supports_schema_def(self) -> bool:
149        return self in (
150            CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS,
151            CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS,
152        )
supports_column_comment_commands: bool
154    @property
155    def supports_column_comment_commands(self) -> bool:
156        return self in (
157            CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS,
158            CommentCreationView.COMMENT_COMMAND_ONLY,
159        )
Inherited Members
enum.Enum
name
value
class DataObject(sqlmesh.utils.pydantic.PydanticModel):
162class DataObject(PydanticModel):
163    catalog: t.Optional[str] = None
164    schema_name: str = Field(alias="schema")
165    name: str
166    type: DataObjectType
167
168    # for type=DataObjectType.Table, only if the DB supports it
169    clustering_key: t.Optional[str] = None
170
171    @property
172    def is_clustered(self) -> bool:
173        return bool(self.clustering_key)
174
175    def to_table(self) -> exp.Table:
176        return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
catalog: Optional[str]
schema_name: str
name: str
clustering_key: Optional[str]
is_clustered: bool
171    @property
172    def is_clustered(self) -> bool:
173        return bool(self.clustering_key)
def to_table(self) -> sqlglot.expressions.query.Table:
175    def to_table(self) -> exp.Table:
176        return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True)
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class CatalogSupport(enum.Enum):
179class CatalogSupport(Enum):
180    # The engine has no concept of catalogs
181    UNSUPPORTED = 1
182
183    # The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables
184    SINGLE_CATALOG_ONLY = 2
185
186    # The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding
187    REQUIRES_SET_CATALOG = 3
188
189    # The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first)
190    FULL_SUPPORT = 4
191
192    @property
193    def is_unsupported(self) -> bool:
194        return self == CatalogSupport.UNSUPPORTED
195
196    @property
197    def is_single_catalog_only(self) -> bool:
198        return self == CatalogSupport.SINGLE_CATALOG_ONLY
199
200    @property
201    def is_requires_set_catalog(self) -> bool:
202        return self == CatalogSupport.REQUIRES_SET_CATALOG
203
204    @property
205    def is_full_support(self) -> bool:
206        return self == CatalogSupport.FULL_SUPPORT
207
208    @property
209    def is_multi_catalog_supported(self) -> bool:
210        return self.is_requires_set_catalog or self.is_full_support

An enumeration.

UNSUPPORTED = <CatalogSupport.UNSUPPORTED: 1>
SINGLE_CATALOG_ONLY = <CatalogSupport.SINGLE_CATALOG_ONLY: 2>
REQUIRES_SET_CATALOG = <CatalogSupport.REQUIRES_SET_CATALOG: 3>
FULL_SUPPORT = <CatalogSupport.FULL_SUPPORT: 4>
is_unsupported: bool
192    @property
193    def is_unsupported(self) -> bool:
194        return self == CatalogSupport.UNSUPPORTED
is_single_catalog_only: bool
196    @property
197    def is_single_catalog_only(self) -> bool:
198        return self == CatalogSupport.SINGLE_CATALOG_ONLY
is_requires_set_catalog: bool
200    @property
201    def is_requires_set_catalog(self) -> bool:
202        return self == CatalogSupport.REQUIRES_SET_CATALOG
is_full_support: bool
204    @property
205    def is_full_support(self) -> bool:
206        return self == CatalogSupport.FULL_SUPPORT
is_multi_catalog_supported: bool
208    @property
209    def is_multi_catalog_supported(self) -> bool:
210        return self.is_requires_set_catalog or self.is_full_support
Inherited Members
enum.Enum
name
value
class EngineRunMode(enum.Enum):
213class EngineRunMode(Enum):
214    SINGLE_MODE_ENGINE = 1
215    STANDALONE = 2
216    CLUSTER = 3
217    CLOUD = 4
218
219    @property
220    def is_single_mode_engine(self) -> bool:
221        return self == EngineRunMode.SINGLE_MODE_ENGINE
222
223    @property
224    def is_standalone(self) -> bool:
225        return self == EngineRunMode.STANDALONE
226
227    @property
228    def is_cluster(self) -> bool:
229        return self == EngineRunMode.CLUSTER
230
231    @property
232    def is_cloud(self) -> bool:
233        return self == EngineRunMode.CLOUD

An enumeration.

SINGLE_MODE_ENGINE = <EngineRunMode.SINGLE_MODE_ENGINE: 1>
STANDALONE = <EngineRunMode.STANDALONE: 2>
CLUSTER = <EngineRunMode.CLUSTER: 3>
CLOUD = <EngineRunMode.CLOUD: 4>
is_single_mode_engine: bool
219    @property
220    def is_single_mode_engine(self) -> bool:
221        return self == EngineRunMode.SINGLE_MODE_ENGINE
is_standalone: bool
223    @property
224    def is_standalone(self) -> bool:
225        return self == EngineRunMode.STANDALONE
is_cluster: bool
227    @property
228    def is_cluster(self) -> bool:
229        return self == EngineRunMode.CLUSTER
is_cloud: bool
231    @property
232    def is_cloud(self) -> bool:
233        return self == EngineRunMode.CLOUD
Inherited Members
enum.Enum
name
value
class InsertOverwriteStrategy(enum.Enum):
236class InsertOverwriteStrategy(Enum):
237    # First, issue a DELETE to clear the data range. Then, issue an INSERT query to insert the new data
238    DELETE_INSERT = 1
239    # Issue a single INSERT OVERWRITE query to replace a data range.
240    INSERT_OVERWRITE = 2
241    # Issue a single INSERT INTO... REPLACE WHERE query
242    # Note: Replace where on Databricks requires that `spark.sql.sources.partitionOverwriteMode` be set to `static`
243    REPLACE_WHERE = 3
244    # Issue a single INSERT query to replace a data range. The assumption is that the query engine will transparently match partition bounds
245    # and replace data rather than append to it. Trino is an example of this when `hive.insert-existing-partitions-behavior=OVERWRITE` is configured
246    INTO_IS_OVERWRITE = 4
247    # Do the INSERT OVERWRITE using merge since the engine doesn't support it natively
248    MERGE = 5
249
250    @property
251    def is_delete_insert(self) -> bool:
252        return self == InsertOverwriteStrategy.DELETE_INSERT
253
254    @property
255    def is_insert_overwrite(self) -> bool:
256        return self == InsertOverwriteStrategy.INSERT_OVERWRITE
257
258    @property
259    def is_replace_where(self) -> bool:
260        return self == InsertOverwriteStrategy.REPLACE_WHERE
261
262    @property
263    def is_into_is_overwrite(self) -> bool:
264        return self == InsertOverwriteStrategy.INTO_IS_OVERWRITE
265
266    @property
267    def is_merge(self) -> bool:
268        return self == InsertOverwriteStrategy.MERGE

An enumeration.

is_delete_insert: bool
250    @property
251    def is_delete_insert(self) -> bool:
252        return self == InsertOverwriteStrategy.DELETE_INSERT
is_insert_overwrite: bool
254    @property
255    def is_insert_overwrite(self) -> bool:
256        return self == InsertOverwriteStrategy.INSERT_OVERWRITE
is_replace_where: bool
258    @property
259    def is_replace_where(self) -> bool:
260        return self == InsertOverwriteStrategy.REPLACE_WHERE
is_into_is_overwrite: bool
262    @property
263    def is_into_is_overwrite(self) -> bool:
264        return self == InsertOverwriteStrategy.INTO_IS_OVERWRITE
is_merge: bool
266    @property
267    def is_merge(self) -> bool:
268        return self == InsertOverwriteStrategy.MERGE
Inherited Members
enum.Enum
name
value
class SourceQuery:
271class SourceQuery:
272    def __init__(
273        self,
274        query_factory: t.Callable[[], Query],
275        cleanup_func: t.Optional[t.Callable[[], None]] = None,
276        transforms: t.Optional[t.List[t.Callable[[Query], Query]]] = None,
277        **kwargs: t.Any,
278    ) -> None:
279        self.query_factory = query_factory
280        self.cleanup_func = cleanup_func
281        self._transforms = transforms or []
282
283    def add_transform(self, transform: t.Callable[[Query], Query]) -> None:
284        self._transforms.append(transform)
285
286    def __enter__(self) -> Query:
287        query = self.query_factory()
288        for transform in self._transforms:
289            query = t.cast(exp.Query, query.transform(transform))
290        return query
291
292    def __exit__(
293        self,
294        exc_type: t.Optional[t.Type[BaseException]],
295        exc_val: t.Optional[BaseException],
296        exc_tb: t.Optional[types.TracebackType],
297    ) -> t.Optional[bool]:
298        if self.cleanup_func:
299            self.cleanup_func()
300        return None
SourceQuery( query_factory: Callable[[], <MagicMock id='132726887963936'>], cleanup_func: Optional[Callable[[], NoneType]] = None, transforms: Optional[List[Callable[[<MagicMock id='132726887963936'>], <MagicMock id='132726887963936'>]]] = None, **kwargs: Any)
272    def __init__(
273        self,
274        query_factory: t.Callable[[], Query],
275        cleanup_func: t.Optional[t.Callable[[], None]] = None,
276        transforms: t.Optional[t.List[t.Callable[[Query], Query]]] = None,
277        **kwargs: t.Any,
278    ) -> None:
279        self.query_factory = query_factory
280        self.cleanup_func = cleanup_func
281        self._transforms = transforms or []
query_factory
cleanup_func
def add_transform( self, transform: Callable[[<MagicMock id='132726887963936'>], <MagicMock id='132726887963936'>]) -> None:
283    def add_transform(self, transform: t.Callable[[Query], Query]) -> None:
284        self._transforms.append(transform)
def set_catalog( override_mapping: Optional[Dict[str, CatalogSupport]] = None) -> Callable:
303def set_catalog(override_mapping: t.Optional[t.Dict[str, CatalogSupport]] = None) -> t.Callable:
304    def set_catalog_decorator(
305        func: t.Callable,
306        target_name: str,
307        target_pos: int,
308        target_type: str,
309        override: t.Optional[CatalogSupport] = None,
310    ) -> t.Callable:
311        @functools.wraps(func)
312        def internal_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any:
313            # Need to convert args to list in order to later do assignment to the object
314            list_args = list(args)
315            engine_adapter = list_args[0]
316            catalog_support = override or engine_adapter.catalog_support
317            # If there is full catalog support then we have nothing to do
318            if catalog_support.is_full_support:
319                return func(*list_args, **kwargs)
320
321            obj, container, key = t.cast(
322                t.Tuple[t.Union[str, exp.Table], t.Union[t.Dict, t.List], t.Union[int, str]],
323                (
324                    (kwargs.get(target_name), kwargs, target_name)
325                    if kwargs.get(target_name)
326                    else (list_args[target_pos], list_args, target_pos)
327                ),
328            )
329            to_expression_func = t.cast(
330                t.Callable[[t.Union[str, exp.Table]], exp.Table],
331                exp.to_table if target_type == "TableName" else to_schema,
332            )
333            expression = to_expression_func(obj.copy() if isinstance(obj, exp.Table) else obj)
334            catalog_name = expression.catalog
335            if not catalog_name:
336                return func(*list_args, **kwargs)
337
338            # If we have a catalog and this engine doesn't support catalogs then we need to error
339            if catalog_support.is_unsupported:
340                raise UnsupportedCatalogOperationError(
341                    f"{engine_adapter.dialect} does not support catalogs and a catalog was provided: {catalog_name}"
342                )
343            # Remove the catalog name from the argument so the engine adapter doesn't try to use it
344            expression.set("catalog", None)
345            container[key] = expression  # type: ignore
346            if catalog_support.is_single_catalog_only:
347                if catalog_name != engine_adapter._default_catalog:
348                    raise SQLMeshError(
349                        f"{engine_adapter.dialect} requires that all catalog operations be against a single catalog: {engine_adapter._default_catalog}. Provided catalog: {catalog_name}"
350                    )
351                return func(*list_args, **kwargs)
352            # Set the catalog name on the engine adapter if needed
353            current_catalog = engine_adapter.get_current_catalog()
354            if catalog_name != current_catalog:
355                engine_adapter.set_current_catalog(catalog_name)
356                resp = func(*list_args, **kwargs)
357                engine_adapter.set_current_catalog(current_catalog)
358            else:
359                resp = func(*list_args, **kwargs)
360            return resp
361
362        return internal_wrapper
363
364    inclusion_list = {
365        "_get_data_objects",
366    }
367
368    # Exclude this to avoid a circular dependency from inspecting the classproperty
369    exclusion_list = {
370        "can_access_spark_session",
371    }
372
373    override_mapping = override_mapping or {}
374
375    def wrapper(cls: t.Type[EngineAdapter]) -> t.Callable:
376        for name in dir(cls):
377            if name in exclusion_list or (name.startswith("_") and name not in inclusion_list):
378                continue
379            m = getattr(cls, name)
380            if inspect.isfunction(m):
381                spec = inspect.getfullargspec(m)
382                for i, obj_name in enumerate(spec.args):
383                    obj_type = spec.annotations.get(obj_name)
384                    if obj_type not in {"SchemaName", "TableName"}:
385                        continue
386                    setattr(
387                        cls,
388                        name,
389                        set_catalog_decorator(
390                            m,
391                            target_name=obj_name,
392                            target_pos=i,
393                            target_type=obj_type,
394                            override=override_mapping.get(name),
395                        ),
396                    )
397        return cls
398
399    return wrapper