sqlmesh.core.engine_adapter.shared
1from __future__ import annotations 2 3import functools 4import inspect 5import logging 6import types 7import typing as t 8from enum import Enum 9 10from pydantic import Field 11from sqlglot import exp 12 13from sqlmesh.core.dialect import to_schema 14from sqlmesh.utils.errors import UnsupportedCatalogOperationError, SQLMeshError 15from sqlmesh.utils.pydantic import PydanticModel 16 17if t.TYPE_CHECKING: 18 from sqlmesh.core.engine_adapter._typing import Query 19 from sqlmesh.core.engine_adapter.base import EngineAdapter 20 21 22logger = logging.getLogger(__name__) 23 24 25class DataObjectType(str, Enum): 26 UNKNOWN = "unknown" 27 TABLE = "table" 28 VIEW = "view" 29 MATERIALIZED_VIEW = "materialized_view" 30 MANAGED_TABLE = "managed_table" 31 32 @property 33 def is_unknown(self) -> bool: 34 return self == DataObjectType.UNKNOWN 35 36 @property 37 def is_table(self) -> bool: 38 return self == DataObjectType.TABLE 39 40 @property 41 def is_view(self) -> bool: 42 return self == DataObjectType.VIEW 43 44 @property 45 def is_materialized_view(self) -> bool: 46 return self == DataObjectType.MATERIALIZED_VIEW 47 48 @property 49 def is_managed_table(self) -> bool: 50 return self == DataObjectType.MANAGED_TABLE 51 52 @classmethod 53 def from_str(cls, s: str) -> DataObjectType: 54 s = s.lower() 55 if s == "table": 56 return DataObjectType.TABLE 57 if s == "view": 58 return DataObjectType.VIEW 59 if s == "materialized_view": 60 return DataObjectType.MATERIALIZED_VIEW 61 if s == "managed_table": 62 return DataObjectType.MANAGED_TABLE 63 return DataObjectType.UNKNOWN 64 65 66class CommentCreationTable(Enum): 67 """ 68 Enum for SQL engine TABLE comment support. 69 70 UNSUPPORTED = no comments at all 71 IN_SCHEMA_DEF_CTAS = comments can be registered in CREATE schema definitions, including CTAS calls 72 IN_SCHEMA_DEF_NO_CTAS = comments can be registered in CREATE schema definitions, excluding CTAS calls 73 COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER` 74 """ 75 76 UNSUPPORTED = 1 77 IN_SCHEMA_DEF_CTAS = 2 78 IN_SCHEMA_DEF_NO_CTAS = 3 79 COMMENT_COMMAND_ONLY = 4 80 81 @property 82 def is_unsupported(self) -> bool: 83 return self == CommentCreationTable.UNSUPPORTED 84 85 @property 86 def is_in_schema_def_ctas(self) -> bool: 87 return self == CommentCreationTable.IN_SCHEMA_DEF_CTAS 88 89 @property 90 def is_in_schema_def_no_ctas(self) -> bool: 91 return self == CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS 92 93 @property 94 def is_comment_command_only(self) -> bool: 95 return self == CommentCreationTable.COMMENT_COMMAND_ONLY 96 97 @property 98 def is_supported(self) -> bool: 99 return self != CommentCreationTable.UNSUPPORTED 100 101 @property 102 def supports_schema_def(self) -> bool: 103 return self in ( 104 CommentCreationTable.IN_SCHEMA_DEF_CTAS, 105 CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS, 106 ) 107 108 109class CommentCreationView(Enum): 110 """ 111 Enum for SQL engine VIEW comment support. 112 113 UNSUPPORTED = no comments at all 114 IN_SCHEMA_DEF_AND_COMMANDS = all comments can be registered in CREATE VIEW schema definitions 115 and in post-creation commands 116 IN_SCHEMA_DEF_NO_COMMANDS = all comments can be registered in CREATE VIEW schema definitions, 117 but not in post-creation commands 118 COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER` 119 """ 120 121 UNSUPPORTED = 1 122 IN_SCHEMA_DEF_AND_COMMANDS = 2 123 IN_SCHEMA_DEF_NO_COMMANDS = 3 124 COMMENT_COMMAND_ONLY = 4 125 126 @property 127 def is_unsupported(self) -> bool: 128 return self == CommentCreationView.UNSUPPORTED 129 130 @property 131 def is_in_schema_def_and_commands(self) -> bool: 132 return self == CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 133 134 @property 135 def is_in_schema_def_no_commands(self) -> bool: 136 return self == CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS 137 138 @property 139 def is_comment_command_only(self) -> bool: 140 return self == CommentCreationView.COMMENT_COMMAND_ONLY 141 142 @property 143 def is_supported(self) -> bool: 144 return self != CommentCreationView.UNSUPPORTED 145 146 @property 147 def supports_schema_def(self) -> bool: 148 return self in ( 149 CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS, 150 CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS, 151 ) 152 153 @property 154 def supports_column_comment_commands(self) -> bool: 155 return self in ( 156 CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS, 157 CommentCreationView.COMMENT_COMMAND_ONLY, 158 ) 159 160 161class DataObject(PydanticModel): 162 catalog: t.Optional[str] = None 163 schema_name: str = Field(alias="schema") 164 name: str 165 type: DataObjectType 166 167 # for type=DataObjectType.Table, only if the DB supports it 168 clustering_key: t.Optional[str] = None 169 170 @property 171 def is_clustered(self) -> bool: 172 return bool(self.clustering_key) 173 174 def to_table(self) -> exp.Table: 175 return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True) 176 177 178class CatalogSupport(Enum): 179 # The engine has no concept of catalogs 180 UNSUPPORTED = 1 181 182 # The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables 183 SINGLE_CATALOG_ONLY = 2 184 185 # The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding 186 REQUIRES_SET_CATALOG = 3 187 188 # The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first) 189 FULL_SUPPORT = 4 190 191 @property 192 def is_unsupported(self) -> bool: 193 return self == CatalogSupport.UNSUPPORTED 194 195 @property 196 def is_single_catalog_only(self) -> bool: 197 return self == CatalogSupport.SINGLE_CATALOG_ONLY 198 199 @property 200 def is_requires_set_catalog(self) -> bool: 201 return self == CatalogSupport.REQUIRES_SET_CATALOG 202 203 @property 204 def is_full_support(self) -> bool: 205 return self == CatalogSupport.FULL_SUPPORT 206 207 @property 208 def is_multi_catalog_supported(self) -> bool: 209 return self.is_requires_set_catalog or self.is_full_support 210 211 212class EngineRunMode(Enum): 213 SINGLE_MODE_ENGINE = 1 214 STANDALONE = 2 215 CLUSTER = 3 216 CLOUD = 4 217 218 @property 219 def is_single_mode_engine(self) -> bool: 220 return self == EngineRunMode.SINGLE_MODE_ENGINE 221 222 @property 223 def is_standalone(self) -> bool: 224 return self == EngineRunMode.STANDALONE 225 226 @property 227 def is_cluster(self) -> bool: 228 return self == EngineRunMode.CLUSTER 229 230 @property 231 def is_cloud(self) -> bool: 232 return self == EngineRunMode.CLOUD 233 234 235class InsertOverwriteStrategy(Enum): 236 # First, issue a DELETE to clear the data range. Then, issue an INSERT query to insert the new data 237 DELETE_INSERT = 1 238 # Issue a single INSERT OVERWRITE query to replace a data range. 239 INSERT_OVERWRITE = 2 240 # Issue a single INSERT INTO... REPLACE WHERE query 241 # Note: Replace where on Databricks requires that `spark.sql.sources.partitionOverwriteMode` be set to `static` 242 REPLACE_WHERE = 3 243 # Issue a single INSERT query to replace a data range. The assumption is that the query engine will transparently match partition bounds 244 # and replace data rather than append to it. Trino is an example of this when `hive.insert-existing-partitions-behavior=OVERWRITE` is configured 245 INTO_IS_OVERWRITE = 4 246 # Do the INSERT OVERWRITE using merge since the engine doesn't support it natively 247 MERGE = 5 248 249 @property 250 def is_delete_insert(self) -> bool: 251 return self == InsertOverwriteStrategy.DELETE_INSERT 252 253 @property 254 def is_insert_overwrite(self) -> bool: 255 return self == InsertOverwriteStrategy.INSERT_OVERWRITE 256 257 @property 258 def is_replace_where(self) -> bool: 259 return self == InsertOverwriteStrategy.REPLACE_WHERE 260 261 @property 262 def is_into_is_overwrite(self) -> bool: 263 return self == InsertOverwriteStrategy.INTO_IS_OVERWRITE 264 265 @property 266 def is_merge(self) -> bool: 267 return self == InsertOverwriteStrategy.MERGE 268 269 270class SourceQuery: 271 def __init__( 272 self, 273 query_factory: t.Callable[[], Query], 274 cleanup_func: t.Optional[t.Callable[[], None]] = None, 275 transforms: t.Optional[t.List[t.Callable[[Query], Query]]] = None, 276 **kwargs: t.Any, 277 ) -> None: 278 self.query_factory = query_factory 279 self.cleanup_func = cleanup_func 280 self._transforms = transforms or [] 281 282 def add_transform(self, transform: t.Callable[[Query], Query]) -> None: 283 self._transforms.append(transform) 284 285 def __enter__(self) -> Query: 286 query = self.query_factory() 287 for transform in self._transforms: 288 query = t.cast(exp.Query, query.transform(transform)) 289 return query 290 291 def __exit__( 292 self, 293 exc_type: t.Optional[t.Type[BaseException]], 294 exc_val: t.Optional[BaseException], 295 exc_tb: t.Optional[types.TracebackType], 296 ) -> t.Optional[bool]: 297 if self.cleanup_func: 298 self.cleanup_func() 299 return None 300 301 302def set_catalog(override_mapping: t.Optional[t.Dict[str, CatalogSupport]] = None) -> t.Callable: 303 def set_catalog_decorator( 304 func: t.Callable, 305 target_name: str, 306 target_pos: int, 307 target_type: str, 308 override: t.Optional[CatalogSupport] = None, 309 ) -> t.Callable: 310 @functools.wraps(func) 311 def internal_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: 312 # Need to convert args to list in order to later do assignment to the object 313 list_args = list(args) 314 engine_adapter = list_args[0] 315 catalog_support = override or engine_adapter.catalog_support 316 # If there is full catalog support then we have nothing to do 317 if catalog_support.is_full_support: 318 return func(*list_args, **kwargs) 319 320 obj, container, key = t.cast( 321 t.Tuple[t.Union[str, exp.Table], t.Union[t.Dict, t.List], t.Union[int, str]], 322 ( 323 (kwargs.get(target_name), kwargs, target_name) 324 if kwargs.get(target_name) 325 else (list_args[target_pos], list_args, target_pos) 326 ), 327 ) 328 to_expression_func = t.cast( 329 t.Callable[[t.Union[str, exp.Table]], exp.Table], 330 exp.to_table if target_type == "TableName" else to_schema, 331 ) 332 expression = to_expression_func(obj.copy() if isinstance(obj, exp.Table) else obj) 333 catalog_name = expression.catalog 334 if not catalog_name: 335 return func(*list_args, **kwargs) 336 337 # If we have a catalog and this engine doesn't support catalogs then we need to error 338 if catalog_support.is_unsupported: 339 raise UnsupportedCatalogOperationError( 340 f"{engine_adapter.dialect} does not support catalogs and a catalog was provided: {catalog_name}" 341 ) 342 # Remove the catalog name from the argument so the engine adapter doesn't try to use it 343 expression.set("catalog", None) 344 container[key] = expression # type: ignore 345 if catalog_support.is_single_catalog_only: 346 if catalog_name != engine_adapter._default_catalog: 347 raise SQLMeshError( 348 f"{engine_adapter.dialect} requires that all catalog operations be against a single catalog: {engine_adapter._default_catalog}. Provided catalog: {catalog_name}" 349 ) 350 return func(*list_args, **kwargs) 351 # Set the catalog name on the engine adapter if needed 352 current_catalog = engine_adapter.get_current_catalog() 353 if catalog_name != current_catalog: 354 engine_adapter.set_current_catalog(catalog_name) 355 resp = func(*list_args, **kwargs) 356 engine_adapter.set_current_catalog(current_catalog) 357 else: 358 resp = func(*list_args, **kwargs) 359 return resp 360 361 return internal_wrapper 362 363 inclusion_list = { 364 "_get_data_objects", 365 } 366 367 # Exclude this to avoid a circular dependency from inspecting the classproperty 368 exclusion_list = { 369 "can_access_spark_session", 370 } 371 372 override_mapping = override_mapping or {} 373 374 def wrapper(cls: t.Type[EngineAdapter]) -> t.Callable: 375 for name in dir(cls): 376 if name in exclusion_list or (name.startswith("_") and name not in inclusion_list): 377 continue 378 m = getattr(cls, name) 379 if inspect.isfunction(m): 380 spec = inspect.getfullargspec(m) 381 for i, obj_name in enumerate(spec.args): 382 obj_type = spec.annotations.get(obj_name) 383 if obj_type not in {"SchemaName", "TableName"}: 384 continue 385 setattr( 386 cls, 387 name, 388 set_catalog_decorator( 389 m, 390 target_name=obj_name, 391 target_pos=i, 392 target_type=obj_type, 393 override=override_mapping.get(name), 394 ), 395 ) 396 return cls 397 398 return wrapper
26class DataObjectType(str, Enum): 27 UNKNOWN = "unknown" 28 TABLE = "table" 29 VIEW = "view" 30 MATERIALIZED_VIEW = "materialized_view" 31 MANAGED_TABLE = "managed_table" 32 33 @property 34 def is_unknown(self) -> bool: 35 return self == DataObjectType.UNKNOWN 36 37 @property 38 def is_table(self) -> bool: 39 return self == DataObjectType.TABLE 40 41 @property 42 def is_view(self) -> bool: 43 return self == DataObjectType.VIEW 44 45 @property 46 def is_materialized_view(self) -> bool: 47 return self == DataObjectType.MATERIALIZED_VIEW 48 49 @property 50 def is_managed_table(self) -> bool: 51 return self == DataObjectType.MANAGED_TABLE 52 53 @classmethod 54 def from_str(cls, s: str) -> DataObjectType: 55 s = s.lower() 56 if s == "table": 57 return DataObjectType.TABLE 58 if s == "view": 59 return DataObjectType.VIEW 60 if s == "materialized_view": 61 return DataObjectType.MATERIALIZED_VIEW 62 if s == "managed_table": 63 return DataObjectType.MANAGED_TABLE 64 return DataObjectType.UNKNOWN
An enumeration.
53 @classmethod 54 def from_str(cls, s: str) -> DataObjectType: 55 s = s.lower() 56 if s == "table": 57 return DataObjectType.TABLE 58 if s == "view": 59 return DataObjectType.VIEW 60 if s == "materialized_view": 61 return DataObjectType.MATERIALIZED_VIEW 62 if s == "managed_table": 63 return DataObjectType.MANAGED_TABLE 64 return DataObjectType.UNKNOWN
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
67class CommentCreationTable(Enum): 68 """ 69 Enum for SQL engine TABLE comment support. 70 71 UNSUPPORTED = no comments at all 72 IN_SCHEMA_DEF_CTAS = comments can be registered in CREATE schema definitions, including CTAS calls 73 IN_SCHEMA_DEF_NO_CTAS = comments can be registered in CREATE schema definitions, excluding CTAS calls 74 COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER` 75 """ 76 77 UNSUPPORTED = 1 78 IN_SCHEMA_DEF_CTAS = 2 79 IN_SCHEMA_DEF_NO_CTAS = 3 80 COMMENT_COMMAND_ONLY = 4 81 82 @property 83 def is_unsupported(self) -> bool: 84 return self == CommentCreationTable.UNSUPPORTED 85 86 @property 87 def is_in_schema_def_ctas(self) -> bool: 88 return self == CommentCreationTable.IN_SCHEMA_DEF_CTAS 89 90 @property 91 def is_in_schema_def_no_ctas(self) -> bool: 92 return self == CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS 93 94 @property 95 def is_comment_command_only(self) -> bool: 96 return self == CommentCreationTable.COMMENT_COMMAND_ONLY 97 98 @property 99 def is_supported(self) -> bool: 100 return self != CommentCreationTable.UNSUPPORTED 101 102 @property 103 def supports_schema_def(self) -> bool: 104 return self in ( 105 CommentCreationTable.IN_SCHEMA_DEF_CTAS, 106 CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS, 107 )
Enum for SQL engine TABLE comment support.
UNSUPPORTED = no comments at all
IN_SCHEMA_DEF_CTAS = comments can be registered in CREATE schema definitions, including CTAS calls
IN_SCHEMA_DEF_NO_CTAS = comments can be registered in CREATE schema definitions, excluding CTAS calls
COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like COMMENT or ALTER
Inherited Members
- enum.Enum
- name
- value
110class CommentCreationView(Enum): 111 """ 112 Enum for SQL engine VIEW comment support. 113 114 UNSUPPORTED = no comments at all 115 IN_SCHEMA_DEF_AND_COMMANDS = all comments can be registered in CREATE VIEW schema definitions 116 and in post-creation commands 117 IN_SCHEMA_DEF_NO_COMMANDS = all comments can be registered in CREATE VIEW schema definitions, 118 but not in post-creation commands 119 COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like `COMMENT` or `ALTER` 120 """ 121 122 UNSUPPORTED = 1 123 IN_SCHEMA_DEF_AND_COMMANDS = 2 124 IN_SCHEMA_DEF_NO_COMMANDS = 3 125 COMMENT_COMMAND_ONLY = 4 126 127 @property 128 def is_unsupported(self) -> bool: 129 return self == CommentCreationView.UNSUPPORTED 130 131 @property 132 def is_in_schema_def_and_commands(self) -> bool: 133 return self == CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS 134 135 @property 136 def is_in_schema_def_no_commands(self) -> bool: 137 return self == CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS 138 139 @property 140 def is_comment_command_only(self) -> bool: 141 return self == CommentCreationView.COMMENT_COMMAND_ONLY 142 143 @property 144 def is_supported(self) -> bool: 145 return self != CommentCreationView.UNSUPPORTED 146 147 @property 148 def supports_schema_def(self) -> bool: 149 return self in ( 150 CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS, 151 CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS, 152 ) 153 154 @property 155 def supports_column_comment_commands(self) -> bool: 156 return self in ( 157 CommentCreationView.IN_SCHEMA_DEF_AND_COMMANDS, 158 CommentCreationView.COMMENT_COMMAND_ONLY, 159 )
Enum for SQL engine VIEW comment support.
UNSUPPORTED = no comments at all
IN_SCHEMA_DEF_AND_COMMANDS = all comments can be registered in CREATE VIEW schema definitions
and in post-creation commands
IN_SCHEMA_DEF_NO_COMMANDS = all comments can be registered in CREATE VIEW schema definitions,
but not in post-creation commands
COMMENT_COMMAND_ONLY = comments can only be registered via a post-creation command like COMMENT or ALTER
Inherited Members
- enum.Enum
- name
- value
162class DataObject(PydanticModel): 163 catalog: t.Optional[str] = None 164 schema_name: str = Field(alias="schema") 165 name: str 166 type: DataObjectType 167 168 # for type=DataObjectType.Table, only if the DB supports it 169 clustering_key: t.Optional[str] = None 170 171 @property 172 def is_clustered(self) -> bool: 173 return bool(self.clustering_key) 174 175 def to_table(self) -> exp.Table: 176 return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
179class CatalogSupport(Enum): 180 # The engine has no concept of catalogs 181 UNSUPPORTED = 1 182 183 # The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables 184 SINGLE_CATALOG_ONLY = 2 185 186 # The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding 187 REQUIRES_SET_CATALOG = 3 188 189 # The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first) 190 FULL_SUPPORT = 4 191 192 @property 193 def is_unsupported(self) -> bool: 194 return self == CatalogSupport.UNSUPPORTED 195 196 @property 197 def is_single_catalog_only(self) -> bool: 198 return self == CatalogSupport.SINGLE_CATALOG_ONLY 199 200 @property 201 def is_requires_set_catalog(self) -> bool: 202 return self == CatalogSupport.REQUIRES_SET_CATALOG 203 204 @property 205 def is_full_support(self) -> bool: 206 return self == CatalogSupport.FULL_SUPPORT 207 208 @property 209 def is_multi_catalog_supported(self) -> bool: 210 return self.is_requires_set_catalog or self.is_full_support
An enumeration.
Inherited Members
- enum.Enum
- name
- value
213class EngineRunMode(Enum): 214 SINGLE_MODE_ENGINE = 1 215 STANDALONE = 2 216 CLUSTER = 3 217 CLOUD = 4 218 219 @property 220 def is_single_mode_engine(self) -> bool: 221 return self == EngineRunMode.SINGLE_MODE_ENGINE 222 223 @property 224 def is_standalone(self) -> bool: 225 return self == EngineRunMode.STANDALONE 226 227 @property 228 def is_cluster(self) -> bool: 229 return self == EngineRunMode.CLUSTER 230 231 @property 232 def is_cloud(self) -> bool: 233 return self == EngineRunMode.CLOUD
An enumeration.
Inherited Members
- enum.Enum
- name
- value
236class InsertOverwriteStrategy(Enum): 237 # First, issue a DELETE to clear the data range. Then, issue an INSERT query to insert the new data 238 DELETE_INSERT = 1 239 # Issue a single INSERT OVERWRITE query to replace a data range. 240 INSERT_OVERWRITE = 2 241 # Issue a single INSERT INTO... REPLACE WHERE query 242 # Note: Replace where on Databricks requires that `spark.sql.sources.partitionOverwriteMode` be set to `static` 243 REPLACE_WHERE = 3 244 # Issue a single INSERT query to replace a data range. The assumption is that the query engine will transparently match partition bounds 245 # and replace data rather than append to it. Trino is an example of this when `hive.insert-existing-partitions-behavior=OVERWRITE` is configured 246 INTO_IS_OVERWRITE = 4 247 # Do the INSERT OVERWRITE using merge since the engine doesn't support it natively 248 MERGE = 5 249 250 @property 251 def is_delete_insert(self) -> bool: 252 return self == InsertOverwriteStrategy.DELETE_INSERT 253 254 @property 255 def is_insert_overwrite(self) -> bool: 256 return self == InsertOverwriteStrategy.INSERT_OVERWRITE 257 258 @property 259 def is_replace_where(self) -> bool: 260 return self == InsertOverwriteStrategy.REPLACE_WHERE 261 262 @property 263 def is_into_is_overwrite(self) -> bool: 264 return self == InsertOverwriteStrategy.INTO_IS_OVERWRITE 265 266 @property 267 def is_merge(self) -> bool: 268 return self == InsertOverwriteStrategy.MERGE
An enumeration.
Inherited Members
- enum.Enum
- name
- value
271class SourceQuery: 272 def __init__( 273 self, 274 query_factory: t.Callable[[], Query], 275 cleanup_func: t.Optional[t.Callable[[], None]] = None, 276 transforms: t.Optional[t.List[t.Callable[[Query], Query]]] = None, 277 **kwargs: t.Any, 278 ) -> None: 279 self.query_factory = query_factory 280 self.cleanup_func = cleanup_func 281 self._transforms = transforms or [] 282 283 def add_transform(self, transform: t.Callable[[Query], Query]) -> None: 284 self._transforms.append(transform) 285 286 def __enter__(self) -> Query: 287 query = self.query_factory() 288 for transform in self._transforms: 289 query = t.cast(exp.Query, query.transform(transform)) 290 return query 291 292 def __exit__( 293 self, 294 exc_type: t.Optional[t.Type[BaseException]], 295 exc_val: t.Optional[BaseException], 296 exc_tb: t.Optional[types.TracebackType], 297 ) -> t.Optional[bool]: 298 if self.cleanup_func: 299 self.cleanup_func() 300 return None
272 def __init__( 273 self, 274 query_factory: t.Callable[[], Query], 275 cleanup_func: t.Optional[t.Callable[[], None]] = None, 276 transforms: t.Optional[t.List[t.Callable[[Query], Query]]] = None, 277 **kwargs: t.Any, 278 ) -> None: 279 self.query_factory = query_factory 280 self.cleanup_func = cleanup_func 281 self._transforms = transforms or []
303def set_catalog(override_mapping: t.Optional[t.Dict[str, CatalogSupport]] = None) -> t.Callable: 304 def set_catalog_decorator( 305 func: t.Callable, 306 target_name: str, 307 target_pos: int, 308 target_type: str, 309 override: t.Optional[CatalogSupport] = None, 310 ) -> t.Callable: 311 @functools.wraps(func) 312 def internal_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: 313 # Need to convert args to list in order to later do assignment to the object 314 list_args = list(args) 315 engine_adapter = list_args[0] 316 catalog_support = override or engine_adapter.catalog_support 317 # If there is full catalog support then we have nothing to do 318 if catalog_support.is_full_support: 319 return func(*list_args, **kwargs) 320 321 obj, container, key = t.cast( 322 t.Tuple[t.Union[str, exp.Table], t.Union[t.Dict, t.List], t.Union[int, str]], 323 ( 324 (kwargs.get(target_name), kwargs, target_name) 325 if kwargs.get(target_name) 326 else (list_args[target_pos], list_args, target_pos) 327 ), 328 ) 329 to_expression_func = t.cast( 330 t.Callable[[t.Union[str, exp.Table]], exp.Table], 331 exp.to_table if target_type == "TableName" else to_schema, 332 ) 333 expression = to_expression_func(obj.copy() if isinstance(obj, exp.Table) else obj) 334 catalog_name = expression.catalog 335 if not catalog_name: 336 return func(*list_args, **kwargs) 337 338 # If we have a catalog and this engine doesn't support catalogs then we need to error 339 if catalog_support.is_unsupported: 340 raise UnsupportedCatalogOperationError( 341 f"{engine_adapter.dialect} does not support catalogs and a catalog was provided: {catalog_name}" 342 ) 343 # Remove the catalog name from the argument so the engine adapter doesn't try to use it 344 expression.set("catalog", None) 345 container[key] = expression # type: ignore 346 if catalog_support.is_single_catalog_only: 347 if catalog_name != engine_adapter._default_catalog: 348 raise SQLMeshError( 349 f"{engine_adapter.dialect} requires that all catalog operations be against a single catalog: {engine_adapter._default_catalog}. Provided catalog: {catalog_name}" 350 ) 351 return func(*list_args, **kwargs) 352 # Set the catalog name on the engine adapter if needed 353 current_catalog = engine_adapter.get_current_catalog() 354 if catalog_name != current_catalog: 355 engine_adapter.set_current_catalog(catalog_name) 356 resp = func(*list_args, **kwargs) 357 engine_adapter.set_current_catalog(current_catalog) 358 else: 359 resp = func(*list_args, **kwargs) 360 return resp 361 362 return internal_wrapper 363 364 inclusion_list = { 365 "_get_data_objects", 366 } 367 368 # Exclude this to avoid a circular dependency from inspecting the classproperty 369 exclusion_list = { 370 "can_access_spark_session", 371 } 372 373 override_mapping = override_mapping or {} 374 375 def wrapper(cls: t.Type[EngineAdapter]) -> t.Callable: 376 for name in dir(cls): 377 if name in exclusion_list or (name.startswith("_") and name not in inclusion_list): 378 continue 379 m = getattr(cls, name) 380 if inspect.isfunction(m): 381 spec = inspect.getfullargspec(m) 382 for i, obj_name in enumerate(spec.args): 383 obj_type = spec.annotations.get(obj_name) 384 if obj_type not in {"SchemaName", "TableName"}: 385 continue 386 setattr( 387 cls, 388 name, 389 set_catalog_decorator( 390 m, 391 target_name=obj_name, 392 target_pos=i, 393 target_type=obj_type, 394 override=override_mapping.get(name), 395 ), 396 ) 397 return cls 398 399 return wrapper