sqlmesh.core.engine_adapter.fabric
1from __future__ import annotations 2 3import typing as t 4import logging 5import requests 6import time 7from functools import cached_property 8from sqlglot import exp 9from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result 10from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter 11from sqlmesh.core.engine_adapter.shared import ( 12 InsertOverwriteStrategy, 13) 14from sqlmesh.utils.errors import SQLMeshError 15from sqlmesh.utils.connection_pool import ConnectionPool 16from sqlmesh.core.schema_diff import TableAlterOperation 17from sqlmesh.utils import random_id 18 19 20logger = logging.getLogger(__name__) 21 22 23class FabricEngineAdapter(MSSQLEngineAdapter): 24 """ 25 Adapter for Microsoft Fabric. 26 """ 27 28 DIALECT = "fabric" 29 SUPPORTS_INDEXES = False 30 SUPPORTS_TRANSACTIONS = False 31 SUPPORTS_CREATE_DROP_CATALOG = True 32 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 33 34 def __init__( 35 self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any 36 ) -> None: 37 # Wrap connection factory to support changing the catalog dynamically at runtime 38 if not isinstance(connection_factory_or_pool, ConnectionPool): 39 original_connection_factory = connection_factory_or_pool 40 41 connection_factory_or_pool = lambda *args, **kwargs: original_connection_factory( 42 target_catalog=self._target_catalog, *args, **kwargs 43 ) 44 45 super().__init__(connection_factory_or_pool, *args, **kwargs) 46 47 @property 48 def _target_catalog(self) -> t.Optional[str]: 49 return self._connection_pool.get_attribute("target_catalog") 50 51 @_target_catalog.setter 52 def _target_catalog(self, value: t.Optional[str]) -> None: 53 self._connection_pool.set_attribute("target_catalog", value) 54 55 @property 56 def api_client(self) -> FabricHttpClient: 57 # the requests Session is not guaranteed to be threadsafe 58 # so we create a http client per thread on demand 59 if existing_client := self._connection_pool.get_attribute("api_client"): 60 return existing_client 61 62 tenant_id: t.Optional[str] = self._extra_config.get("tenant_id") 63 workspace_id: t.Optional[str] = self._extra_config.get("workspace_id") 64 client_id: t.Optional[str] = self._extra_config.get("user") 65 client_secret: t.Optional[str] = self._extra_config.get("password") 66 67 if not tenant_id or not client_id or not client_secret: 68 raise SQLMeshError( 69 "Service Principal authentication requires tenant_id, client_id, and client_secret " 70 "in the Fabric connection configuration" 71 ) 72 73 if not workspace_id: 74 raise SQLMeshError( 75 "Fabric requires the workspace_id to be configured in the connection configuration to create / drop catalogs" 76 ) 77 78 client = FabricHttpClient( 79 tenant_id=tenant_id, 80 workspace_id=workspace_id, 81 client_id=client_id, 82 client_secret=client_secret, 83 ) 84 85 self._connection_pool.set_attribute("api_client", client) 86 return client 87 88 def _create_catalog(self, catalog_name: exp.Identifier) -> None: 89 """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" 90 warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False) 91 logger.info(f"Creating Fabric warehouse: {warehouse_name}") 92 93 self.api_client.create_warehouse(warehouse_name) 94 95 def _drop_catalog(self, catalog_name: exp.Identifier) -> None: 96 """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" 97 warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False) 98 current_catalog = self.get_current_catalog() 99 100 logger.info(f"Deleting Fabric warehouse: {warehouse_name}") 101 self.api_client.delete_warehouse(warehouse_name) 102 103 if warehouse_name == current_catalog: 104 # Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist 105 # In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog" 106 # So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads 107 # that use an either use an existing connection pointing to this warehouse or trigger a new connection 108 # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data 109 self.close() 110 111 def set_current_catalog(self, catalog_name: str) -> None: 112 """ 113 Set the current catalog for Microsoft Fabric connections. 114 115 Override to handle Fabric's stateless session limitation where USE statements 116 don't persist across queries. Instead, we close existing connections and 117 recreate them with the new catalog in the connection configuration. 118 119 Args: 120 catalog_name: The name of the catalog (warehouse) to switch to 121 122 Note: 123 Fabric doesn't support catalog switching via USE statements because each 124 statement runs as an independent session. This method works around this 125 limitation by updating the connection pool with new catalog configuration. 126 127 See: 128 https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations 129 """ 130 current_catalog = self.get_current_catalog() 131 132 # If already using the requested catalog, do nothing 133 if current_catalog and current_catalog == catalog_name: 134 logger.debug(f"Already using catalog '{catalog_name}', no action needed") 135 return 136 137 logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'") 138 139 # commit the transaction before closing the connection to help prevent errors like: 140 # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a 141 # > DDL statement in another concurrent transaction since the start of this transaction 142 # on subsequent queries in the new connection 143 self._connection_pool.commit() 144 145 # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all() 146 # on the connection pool but we just want to close the connection for this thread 147 self._connection_pool.close() 148 self._target_catalog = catalog_name # new connections will use this catalog 149 150 catalog_after_switch = self.get_current_catalog() 151 152 if catalog_after_switch != catalog_name: 153 # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog 154 raise SQLMeshError( 155 f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" 156 ) 157 158 def alter_table( 159 self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] 160 ) -> None: 161 """ 162 Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, 163 so this method implements a workaround for column type changes. 164 This method is self-contained and sets its own catalog context. 165 """ 166 if not alter_expressions: 167 return 168 169 # Get the target table from the first expression to determine the correct catalog. 170 first_op = alter_expressions[0] 171 expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op 172 if not isinstance(expression, exp.Alter) or not expression.this.catalog: 173 # Fallback for unexpected scenarios 174 logger.warning( 175 "Could not determine catalog from alter expression, executing with current context." 176 ) 177 super().alter_table(alter_expressions) 178 return 179 180 target_catalog = expression.this.catalog 181 self.set_current_catalog(target_catalog) 182 183 with self.transaction(): 184 for op in alter_expressions: 185 expression = op.expression if isinstance(op, TableAlterOperation) else op 186 187 if not isinstance(expression, exp.Alter): 188 self.execute(expression) 189 continue 190 191 for action in expression.actions: 192 table_name = expression.this 193 194 table_name_without_catalog = table_name.copy() 195 table_name_without_catalog.set("catalog", None) 196 197 is_type_change = isinstance(action, exp.AlterColumn) and action.args.get( 198 "dtype" 199 ) 200 201 if is_type_change: 202 column_to_alter = action.this 203 new_type = action.args["dtype"] 204 temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}" 205 temp_column_name = exp.to_identifier(temp_column_name_str) 206 207 logger.info( 208 "Applying workaround for column '%s' on table '%s' to change type to '%s'.", 209 column_to_alter.sql(), 210 table_name.sql(), 211 new_type.sql(), 212 ) 213 214 # Step 1: Add a temporary column. 215 add_column_expr = exp.Alter( 216 this=table_name_without_catalog.copy(), 217 kind="TABLE", 218 actions=[ 219 exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy()) 220 ], 221 ) 222 add_sql = self._to_sql(add_column_expr) 223 self.execute(add_sql) 224 225 # Step 2: Copy and cast data. 226 update_sql = self._to_sql( 227 exp.Update( 228 this=table_name_without_catalog.copy(), 229 expressions=[ 230 exp.EQ( 231 this=temp_column_name.copy(), 232 expression=exp.Cast( 233 this=column_to_alter.copy(), to=new_type.copy() 234 ), 235 ) 236 ], 237 ) 238 ) 239 self.execute(update_sql) 240 241 # Step 3: Drop the original column. 242 drop_sql = self._to_sql( 243 exp.Alter( 244 this=table_name_without_catalog.copy(), 245 kind="TABLE", 246 actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")], 247 ) 248 ) 249 self.execute(drop_sql) 250 251 # Step 4: Rename the temporary column. 252 old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}" 253 new_name_unquoted = column_to_alter.sql( 254 dialect=self.dialect, identify=False 255 ) 256 rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'" 257 self.execute(rename_sql) 258 else: 259 # For other alterations, execute directly. 260 direct_alter_expr = exp.Alter( 261 this=table_name_without_catalog.copy(), kind="TABLE", actions=[action] 262 ) 263 self.execute(direct_alter_expr) 264 265 266class FabricHttpClient: 267 def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str): 268 self.tenant_id = tenant_id 269 self.client_id = client_id 270 self.client_secret = client_secret 271 self.workspace_id = workspace_id 272 273 def create_warehouse( 274 self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0 275 ) -> None: 276 """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" 277 278 # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits 279 if attempt > 10: 280 raise SQLMeshError( 281 f"Gave up waiting for Fabric warehouse {warehouse_name} to become available" 282 ) 283 284 logger.info(f"Creating Fabric warehouse: {warehouse_name}") 285 286 request_data = { 287 "displayName": warehouse_name, 288 "description": f"Warehouse created by SQLMesh: {warehouse_name}", 289 } 290 291 response = self.session.post(self._endpoint_url("warehouses"), json=request_data) 292 293 if ( 294 if_not_exists 295 and response.status_code == 400 296 and (errorCode := response.json().get("errorCode", None)) 297 ): 298 if errorCode == "ItemDisplayNameAlreadyInUse": 299 logger.warning(f"Fabric warehouse {warehouse_name} already exists") 300 return 301 if errorCode == "ItemDisplayNameNotAvailableYet": 302 logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting") 303 # Fabric error message is something like: 304 # - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes." 305 # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created. 306 # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again 307 time.sleep(30) 308 return self.create_warehouse( 309 warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1 310 ) 311 312 try: 313 response.raise_for_status() 314 except: 315 # the important information to actually debug anything is in the response body which Requests never prints 316 logger.exception( 317 f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}" 318 ) 319 raise 320 321 # Handle direct success (201) or async creation (202) 322 if response.status_code == 201: 323 logger.info(f"Successfully created Fabric warehouse: {warehouse_name}") 324 return 325 326 if response.status_code == 202 and (location_header := response.headers.get("location")): 327 logger.info(f"Warehouse creation initiated for: {warehouse_name}") 328 self._wait_for_completion(location_header, warehouse_name) 329 logger.info(f"Successfully created Fabric warehouse: {warehouse_name}") 330 else: 331 logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}") 332 raise SQLMeshError(f"Unable to create warehouse: {response}") 333 334 def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None: 335 """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" 336 logger.info(f"Deleting Fabric warehouse: {warehouse_name}") 337 338 # Get the warehouse ID by listing warehouses 339 # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses 340 response = self.session.get(self._endpoint_url("warehouses")) 341 response.raise_for_status() 342 343 warehouse_name_to_id = { 344 warehouse.get("displayName"): warehouse.get("id") 345 for warehouse in response.json().get("value", []) 346 } 347 348 warehouse_id = warehouse_name_to_id.get(warehouse_name, None) 349 350 if not warehouse_id: 351 logger.warning( 352 f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})" 353 ) 354 if if_exists: 355 return 356 357 raise SQLMeshError( 358 f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist" 359 ) 360 361 # Delete the warehouse by ID 362 response = self.session.delete(self._endpoint_url(f"warehouses/{warehouse_id}")) 363 response.raise_for_status() 364 365 logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}") 366 367 @cached_property 368 def session(self) -> requests.Session: 369 s = requests.Session() 370 371 access_token = self._get_access_token() 372 s.headers.update({"Authorization": f"Bearer {access_token}"}) 373 374 return s 375 376 def _endpoint_url(self, endpoint: str) -> str: 377 if endpoint.startswith("/"): 378 endpoint = endpoint[1:] 379 380 return f"https://api.fabric.microsoft.com/v1/workspaces/{self.workspace_id}/{endpoint}" 381 382 def _get_access_token(self) -> str: 383 """Get access token using Service Principal authentication.""" 384 385 # Use Azure AD OAuth2 token endpoint 386 token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token" 387 388 data = { 389 "grant_type": "client_credentials", 390 "client_id": self.client_id, 391 "client_secret": self.client_secret, 392 "scope": "https://api.fabric.microsoft.com/.default", 393 } 394 395 response = requests.post(token_url, data=data) 396 response.raise_for_status() 397 token_data = response.json() 398 return token_data["access_token"] 399 400 def _wait_for_completion(self, location_url: str, operation_name: str) -> None: 401 """Poll the operation status until completion.""" 402 403 @retry( 404 wait=wait_exponential(multiplier=1, min=1, max=30), 405 stop=stop_after_attempt(20), 406 retry=retry_if_result(lambda result: result not in ["Succeeded", "Failed"]), 407 ) 408 def _poll() -> str: 409 response = self.session.get(location_url) 410 response.raise_for_status() 411 412 result = response.json() 413 status = result.get("status", "Unknown") 414 415 logger.debug(f"Operation {operation_name} status: {status}") 416 417 if status == "Failed": 418 error_msg = result.get("error", {}).get("message", "Unknown error") 419 raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}") 420 elif status in ["InProgress", "Running"]: 421 logger.debug(f"Operation {operation_name} still in progress...") 422 elif status not in ["Succeeded"]: 423 logger.warning(f"Unknown status '{status}' for operation {operation_name}") 424 425 return status 426 427 final_status = _poll() 428 if final_status != "Succeeded": 429 raise SQLMeshError(f"Operation {operation_name} completed with status: {final_status}")
24class FabricEngineAdapter(MSSQLEngineAdapter): 25 """ 26 Adapter for Microsoft Fabric. 27 """ 28 29 DIALECT = "fabric" 30 SUPPORTS_INDEXES = False 31 SUPPORTS_TRANSACTIONS = False 32 SUPPORTS_CREATE_DROP_CATALOG = True 33 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT 34 35 def __init__( 36 self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any 37 ) -> None: 38 # Wrap connection factory to support changing the catalog dynamically at runtime 39 if not isinstance(connection_factory_or_pool, ConnectionPool): 40 original_connection_factory = connection_factory_or_pool 41 42 connection_factory_or_pool = lambda *args, **kwargs: original_connection_factory( 43 target_catalog=self._target_catalog, *args, **kwargs 44 ) 45 46 super().__init__(connection_factory_or_pool, *args, **kwargs) 47 48 @property 49 def _target_catalog(self) -> t.Optional[str]: 50 return self._connection_pool.get_attribute("target_catalog") 51 52 @_target_catalog.setter 53 def _target_catalog(self, value: t.Optional[str]) -> None: 54 self._connection_pool.set_attribute("target_catalog", value) 55 56 @property 57 def api_client(self) -> FabricHttpClient: 58 # the requests Session is not guaranteed to be threadsafe 59 # so we create a http client per thread on demand 60 if existing_client := self._connection_pool.get_attribute("api_client"): 61 return existing_client 62 63 tenant_id: t.Optional[str] = self._extra_config.get("tenant_id") 64 workspace_id: t.Optional[str] = self._extra_config.get("workspace_id") 65 client_id: t.Optional[str] = self._extra_config.get("user") 66 client_secret: t.Optional[str] = self._extra_config.get("password") 67 68 if not tenant_id or not client_id or not client_secret: 69 raise SQLMeshError( 70 "Service Principal authentication requires tenant_id, client_id, and client_secret " 71 "in the Fabric connection configuration" 72 ) 73 74 if not workspace_id: 75 raise SQLMeshError( 76 "Fabric requires the workspace_id to be configured in the connection configuration to create / drop catalogs" 77 ) 78 79 client = FabricHttpClient( 80 tenant_id=tenant_id, 81 workspace_id=workspace_id, 82 client_id=client_id, 83 client_secret=client_secret, 84 ) 85 86 self._connection_pool.set_attribute("api_client", client) 87 return client 88 89 def _create_catalog(self, catalog_name: exp.Identifier) -> None: 90 """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" 91 warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False) 92 logger.info(f"Creating Fabric warehouse: {warehouse_name}") 93 94 self.api_client.create_warehouse(warehouse_name) 95 96 def _drop_catalog(self, catalog_name: exp.Identifier) -> None: 97 """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" 98 warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False) 99 current_catalog = self.get_current_catalog() 100 101 logger.info(f"Deleting Fabric warehouse: {warehouse_name}") 102 self.api_client.delete_warehouse(warehouse_name) 103 104 if warehouse_name == current_catalog: 105 # Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist 106 # In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog" 107 # So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads 108 # that use an either use an existing connection pointing to this warehouse or trigger a new connection 109 # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data 110 self.close() 111 112 def set_current_catalog(self, catalog_name: str) -> None: 113 """ 114 Set the current catalog for Microsoft Fabric connections. 115 116 Override to handle Fabric's stateless session limitation where USE statements 117 don't persist across queries. Instead, we close existing connections and 118 recreate them with the new catalog in the connection configuration. 119 120 Args: 121 catalog_name: The name of the catalog (warehouse) to switch to 122 123 Note: 124 Fabric doesn't support catalog switching via USE statements because each 125 statement runs as an independent session. This method works around this 126 limitation by updating the connection pool with new catalog configuration. 127 128 See: 129 https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations 130 """ 131 current_catalog = self.get_current_catalog() 132 133 # If already using the requested catalog, do nothing 134 if current_catalog and current_catalog == catalog_name: 135 logger.debug(f"Already using catalog '{catalog_name}', no action needed") 136 return 137 138 logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'") 139 140 # commit the transaction before closing the connection to help prevent errors like: 141 # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a 142 # > DDL statement in another concurrent transaction since the start of this transaction 143 # on subsequent queries in the new connection 144 self._connection_pool.commit() 145 146 # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all() 147 # on the connection pool but we just want to close the connection for this thread 148 self._connection_pool.close() 149 self._target_catalog = catalog_name # new connections will use this catalog 150 151 catalog_after_switch = self.get_current_catalog() 152 153 if catalog_after_switch != catalog_name: 154 # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog 155 raise SQLMeshError( 156 f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" 157 ) 158 159 def alter_table( 160 self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] 161 ) -> None: 162 """ 163 Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, 164 so this method implements a workaround for column type changes. 165 This method is self-contained and sets its own catalog context. 166 """ 167 if not alter_expressions: 168 return 169 170 # Get the target table from the first expression to determine the correct catalog. 171 first_op = alter_expressions[0] 172 expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op 173 if not isinstance(expression, exp.Alter) or not expression.this.catalog: 174 # Fallback for unexpected scenarios 175 logger.warning( 176 "Could not determine catalog from alter expression, executing with current context." 177 ) 178 super().alter_table(alter_expressions) 179 return 180 181 target_catalog = expression.this.catalog 182 self.set_current_catalog(target_catalog) 183 184 with self.transaction(): 185 for op in alter_expressions: 186 expression = op.expression if isinstance(op, TableAlterOperation) else op 187 188 if not isinstance(expression, exp.Alter): 189 self.execute(expression) 190 continue 191 192 for action in expression.actions: 193 table_name = expression.this 194 195 table_name_without_catalog = table_name.copy() 196 table_name_without_catalog.set("catalog", None) 197 198 is_type_change = isinstance(action, exp.AlterColumn) and action.args.get( 199 "dtype" 200 ) 201 202 if is_type_change: 203 column_to_alter = action.this 204 new_type = action.args["dtype"] 205 temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}" 206 temp_column_name = exp.to_identifier(temp_column_name_str) 207 208 logger.info( 209 "Applying workaround for column '%s' on table '%s' to change type to '%s'.", 210 column_to_alter.sql(), 211 table_name.sql(), 212 new_type.sql(), 213 ) 214 215 # Step 1: Add a temporary column. 216 add_column_expr = exp.Alter( 217 this=table_name_without_catalog.copy(), 218 kind="TABLE", 219 actions=[ 220 exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy()) 221 ], 222 ) 223 add_sql = self._to_sql(add_column_expr) 224 self.execute(add_sql) 225 226 # Step 2: Copy and cast data. 227 update_sql = self._to_sql( 228 exp.Update( 229 this=table_name_without_catalog.copy(), 230 expressions=[ 231 exp.EQ( 232 this=temp_column_name.copy(), 233 expression=exp.Cast( 234 this=column_to_alter.copy(), to=new_type.copy() 235 ), 236 ) 237 ], 238 ) 239 ) 240 self.execute(update_sql) 241 242 # Step 3: Drop the original column. 243 drop_sql = self._to_sql( 244 exp.Alter( 245 this=table_name_without_catalog.copy(), 246 kind="TABLE", 247 actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")], 248 ) 249 ) 250 self.execute(drop_sql) 251 252 # Step 4: Rename the temporary column. 253 old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}" 254 new_name_unquoted = column_to_alter.sql( 255 dialect=self.dialect, identify=False 256 ) 257 rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'" 258 self.execute(rename_sql) 259 else: 260 # For other alterations, execute directly. 261 direct_alter_expr = exp.Alter( 262 this=table_name_without_catalog.copy(), kind="TABLE", actions=[action] 263 ) 264 self.execute(direct_alter_expr)
Adapter for Microsoft Fabric.
35 def __init__( 36 self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any 37 ) -> None: 38 # Wrap connection factory to support changing the catalog dynamically at runtime 39 if not isinstance(connection_factory_or_pool, ConnectionPool): 40 original_connection_factory = connection_factory_or_pool 41 42 connection_factory_or_pool = lambda *args, **kwargs: original_connection_factory( 43 target_catalog=self._target_catalog, *args, **kwargs 44 ) 45 46 super().__init__(connection_factory_or_pool, *args, **kwargs)
56 @property 57 def api_client(self) -> FabricHttpClient: 58 # the requests Session is not guaranteed to be threadsafe 59 # so we create a http client per thread on demand 60 if existing_client := self._connection_pool.get_attribute("api_client"): 61 return existing_client 62 63 tenant_id: t.Optional[str] = self._extra_config.get("tenant_id") 64 workspace_id: t.Optional[str] = self._extra_config.get("workspace_id") 65 client_id: t.Optional[str] = self._extra_config.get("user") 66 client_secret: t.Optional[str] = self._extra_config.get("password") 67 68 if not tenant_id or not client_id or not client_secret: 69 raise SQLMeshError( 70 "Service Principal authentication requires tenant_id, client_id, and client_secret " 71 "in the Fabric connection configuration" 72 ) 73 74 if not workspace_id: 75 raise SQLMeshError( 76 "Fabric requires the workspace_id to be configured in the connection configuration to create / drop catalogs" 77 ) 78 79 client = FabricHttpClient( 80 tenant_id=tenant_id, 81 workspace_id=workspace_id, 82 client_id=client_id, 83 client_secret=client_secret, 84 ) 85 86 self._connection_pool.set_attribute("api_client", client) 87 return client
112 def set_current_catalog(self, catalog_name: str) -> None: 113 """ 114 Set the current catalog for Microsoft Fabric connections. 115 116 Override to handle Fabric's stateless session limitation where USE statements 117 don't persist across queries. Instead, we close existing connections and 118 recreate them with the new catalog in the connection configuration. 119 120 Args: 121 catalog_name: The name of the catalog (warehouse) to switch to 122 123 Note: 124 Fabric doesn't support catalog switching via USE statements because each 125 statement runs as an independent session. This method works around this 126 limitation by updating the connection pool with new catalog configuration. 127 128 See: 129 https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations 130 """ 131 current_catalog = self.get_current_catalog() 132 133 # If already using the requested catalog, do nothing 134 if current_catalog and current_catalog == catalog_name: 135 logger.debug(f"Already using catalog '{catalog_name}', no action needed") 136 return 137 138 logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'") 139 140 # commit the transaction before closing the connection to help prevent errors like: 141 # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a 142 # > DDL statement in another concurrent transaction since the start of this transaction 143 # on subsequent queries in the new connection 144 self._connection_pool.commit() 145 146 # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all() 147 # on the connection pool but we just want to close the connection for this thread 148 self._connection_pool.close() 149 self._target_catalog = catalog_name # new connections will use this catalog 150 151 catalog_after_switch = self.get_current_catalog() 152 153 if catalog_after_switch != catalog_name: 154 # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog 155 raise SQLMeshError( 156 f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" 157 )
Set the current catalog for Microsoft Fabric connections.
Override to handle Fabric's stateless session limitation where USE statements don't persist across queries. Instead, we close existing connections and recreate them with the new catalog in the connection configuration.
Arguments:
- catalog_name: The name of the catalog (warehouse) to switch to
Note:
Fabric doesn't support catalog switching via USE statements because each statement runs as an independent session. This method works around this limitation by updating the connection pool with new catalog configuration.
See:
https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
159 def alter_table( 160 self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] 161 ) -> None: 162 """ 163 Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, 164 so this method implements a workaround for column type changes. 165 This method is self-contained and sets its own catalog context. 166 """ 167 if not alter_expressions: 168 return 169 170 # Get the target table from the first expression to determine the correct catalog. 171 first_op = alter_expressions[0] 172 expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op 173 if not isinstance(expression, exp.Alter) or not expression.this.catalog: 174 # Fallback for unexpected scenarios 175 logger.warning( 176 "Could not determine catalog from alter expression, executing with current context." 177 ) 178 super().alter_table(alter_expressions) 179 return 180 181 target_catalog = expression.this.catalog 182 self.set_current_catalog(target_catalog) 183 184 with self.transaction(): 185 for op in alter_expressions: 186 expression = op.expression if isinstance(op, TableAlterOperation) else op 187 188 if not isinstance(expression, exp.Alter): 189 self.execute(expression) 190 continue 191 192 for action in expression.actions: 193 table_name = expression.this 194 195 table_name_without_catalog = table_name.copy() 196 table_name_without_catalog.set("catalog", None) 197 198 is_type_change = isinstance(action, exp.AlterColumn) and action.args.get( 199 "dtype" 200 ) 201 202 if is_type_change: 203 column_to_alter = action.this 204 new_type = action.args["dtype"] 205 temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}" 206 temp_column_name = exp.to_identifier(temp_column_name_str) 207 208 logger.info( 209 "Applying workaround for column '%s' on table '%s' to change type to '%s'.", 210 column_to_alter.sql(), 211 table_name.sql(), 212 new_type.sql(), 213 ) 214 215 # Step 1: Add a temporary column. 216 add_column_expr = exp.Alter( 217 this=table_name_without_catalog.copy(), 218 kind="TABLE", 219 actions=[ 220 exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy()) 221 ], 222 ) 223 add_sql = self._to_sql(add_column_expr) 224 self.execute(add_sql) 225 226 # Step 2: Copy and cast data. 227 update_sql = self._to_sql( 228 exp.Update( 229 this=table_name_without_catalog.copy(), 230 expressions=[ 231 exp.EQ( 232 this=temp_column_name.copy(), 233 expression=exp.Cast( 234 this=column_to_alter.copy(), to=new_type.copy() 235 ), 236 ) 237 ], 238 ) 239 ) 240 self.execute(update_sql) 241 242 # Step 3: Drop the original column. 243 drop_sql = self._to_sql( 244 exp.Alter( 245 this=table_name_without_catalog.copy(), 246 kind="TABLE", 247 actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")], 248 ) 249 ) 250 self.execute(drop_sql) 251 252 # Step 4: Rename the temporary column. 253 old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}" 254 new_name_unquoted = column_to_alter.sql( 255 dialect=self.dialect, identify=False 256 ) 257 rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'" 258 self.execute(rename_sql) 259 else: 260 # For other alterations, execute directly. 261 direct_alter_expr = exp.Alter( 262 this=table_name_without_catalog.copy(), kind="TABLE", actions=[action] 263 ) 264 self.execute(direct_alter_expr)
Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, so this method implements a workaround for column type changes. This method is self-contained and sets its own catalog context.
Inherited Members
- sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter
- SUPPORTS_TUPLE_IN
- SUPPORTS_MATERIALIZED_VIEWS
- CURRENT_CATALOG_EXPRESSION
- COMMENT_CREATION_TABLE
- COMMENT_CREATION_VIEW
- SUPPORTS_REPLACE_TABLE
- MAX_IDENTIFIER_LENGTH
- SUPPORTS_QUERY_EXECUTION_TRACKING
- SCHEMA_DIFFER_KWARGS
- VARIABLE_LENGTH_DATA_TYPES
- catalog_support
- columns
- table_exists
- drop_schema
- merge
- delete_from
- sqlmesh.core.engine_adapter.mixins.RowDiffMixin
- MAX_TIMESTAMP_PRECISION
- concat_columns
- normalize_value
- sqlmesh.core.engine_adapter.base.EngineAdapter
- DEFAULT_BATCH_SIZE
- DATA_OBJECT_FILTER_BATCH_SIZE
- MAX_TABLE_COMMENT_LENGTH
- MAX_COLUMN_COMMENT_LENGTH
- SUPPORTS_MATERIALIZED_VIEW_SCHEMA
- SUPPORTS_VIEW_SCHEMA
- SUPPORTS_CLONING
- SUPPORTS_MANAGED_MODELS
- SUPPORTED_DROP_CASCADE_OBJECT_KINDS
- HAS_VIEW_BINDING
- SUPPORTS_GRANTS
- DEFAULT_CATALOG_TYPE
- QUOTE_IDENTIFIERS_IN_VIEWS
- ATTACH_CORRELATION_ID
- SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
- dialect
- correlation_id
- with_settings
- cursor
- connection
- spark
- snowpark
- bigframe
- comments_enabled
- schema_differ
- default_catalog
- engine_run_mode
- recycle
- close
- get_catalog_type
- get_catalog_type_from_table
- current_catalog_type
- replace_query
- create_index
- create_table
- create_managed_table
- ctas
- create_state_table
- create_table_like
- clone_table
- drop_data_object
- drop_table
- drop_managed_table
- get_alter_operations
- create_view
- create_schema
- drop_view
- create_catalog
- drop_catalog
- insert_append
- insert_overwrite_by_partition
- insert_overwrite_by_time_partition
- update_table
- scd_type_2_by_time
- scd_type_2_by_column
- rename_table
- get_data_object
- get_data_objects
- fetchone
- fetchall
- fetchdf
- fetch_pyspark_df
- wap_enabled
- wap_supported
- wap_table_name
- wap_prepare
- wap_publish
- sync_grants_config
- transaction
- session
- execute
- temp_table
- drop_data_object_on_type_mismatch
- ensure_nulls_for_unmatched_after_join
- use_server_nulls_for_unmatched_after_join
- ping
- get_table_last_modified_ts
267class FabricHttpClient: 268 def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str): 269 self.tenant_id = tenant_id 270 self.client_id = client_id 271 self.client_secret = client_secret 272 self.workspace_id = workspace_id 273 274 def create_warehouse( 275 self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0 276 ) -> None: 277 """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" 278 279 # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits 280 if attempt > 10: 281 raise SQLMeshError( 282 f"Gave up waiting for Fabric warehouse {warehouse_name} to become available" 283 ) 284 285 logger.info(f"Creating Fabric warehouse: {warehouse_name}") 286 287 request_data = { 288 "displayName": warehouse_name, 289 "description": f"Warehouse created by SQLMesh: {warehouse_name}", 290 } 291 292 response = self.session.post(self._endpoint_url("warehouses"), json=request_data) 293 294 if ( 295 if_not_exists 296 and response.status_code == 400 297 and (errorCode := response.json().get("errorCode", None)) 298 ): 299 if errorCode == "ItemDisplayNameAlreadyInUse": 300 logger.warning(f"Fabric warehouse {warehouse_name} already exists") 301 return 302 if errorCode == "ItemDisplayNameNotAvailableYet": 303 logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting") 304 # Fabric error message is something like: 305 # - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes." 306 # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created. 307 # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again 308 time.sleep(30) 309 return self.create_warehouse( 310 warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1 311 ) 312 313 try: 314 response.raise_for_status() 315 except: 316 # the important information to actually debug anything is in the response body which Requests never prints 317 logger.exception( 318 f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}" 319 ) 320 raise 321 322 # Handle direct success (201) or async creation (202) 323 if response.status_code == 201: 324 logger.info(f"Successfully created Fabric warehouse: {warehouse_name}") 325 return 326 327 if response.status_code == 202 and (location_header := response.headers.get("location")): 328 logger.info(f"Warehouse creation initiated for: {warehouse_name}") 329 self._wait_for_completion(location_header, warehouse_name) 330 logger.info(f"Successfully created Fabric warehouse: {warehouse_name}") 331 else: 332 logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}") 333 raise SQLMeshError(f"Unable to create warehouse: {response}") 334 335 def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None: 336 """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" 337 logger.info(f"Deleting Fabric warehouse: {warehouse_name}") 338 339 # Get the warehouse ID by listing warehouses 340 # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses 341 response = self.session.get(self._endpoint_url("warehouses")) 342 response.raise_for_status() 343 344 warehouse_name_to_id = { 345 warehouse.get("displayName"): warehouse.get("id") 346 for warehouse in response.json().get("value", []) 347 } 348 349 warehouse_id = warehouse_name_to_id.get(warehouse_name, None) 350 351 if not warehouse_id: 352 logger.warning( 353 f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})" 354 ) 355 if if_exists: 356 return 357 358 raise SQLMeshError( 359 f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist" 360 ) 361 362 # Delete the warehouse by ID 363 response = self.session.delete(self._endpoint_url(f"warehouses/{warehouse_id}")) 364 response.raise_for_status() 365 366 logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}") 367 368 @cached_property 369 def session(self) -> requests.Session: 370 s = requests.Session() 371 372 access_token = self._get_access_token() 373 s.headers.update({"Authorization": f"Bearer {access_token}"}) 374 375 return s 376 377 def _endpoint_url(self, endpoint: str) -> str: 378 if endpoint.startswith("/"): 379 endpoint = endpoint[1:] 380 381 return f"https://api.fabric.microsoft.com/v1/workspaces/{self.workspace_id}/{endpoint}" 382 383 def _get_access_token(self) -> str: 384 """Get access token using Service Principal authentication.""" 385 386 # Use Azure AD OAuth2 token endpoint 387 token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token" 388 389 data = { 390 "grant_type": "client_credentials", 391 "client_id": self.client_id, 392 "client_secret": self.client_secret, 393 "scope": "https://api.fabric.microsoft.com/.default", 394 } 395 396 response = requests.post(token_url, data=data) 397 response.raise_for_status() 398 token_data = response.json() 399 return token_data["access_token"] 400 401 def _wait_for_completion(self, location_url: str, operation_name: str) -> None: 402 """Poll the operation status until completion.""" 403 404 @retry( 405 wait=wait_exponential(multiplier=1, min=1, max=30), 406 stop=stop_after_attempt(20), 407 retry=retry_if_result(lambda result: result not in ["Succeeded", "Failed"]), 408 ) 409 def _poll() -> str: 410 response = self.session.get(location_url) 411 response.raise_for_status() 412 413 result = response.json() 414 status = result.get("status", "Unknown") 415 416 logger.debug(f"Operation {operation_name} status: {status}") 417 418 if status == "Failed": 419 error_msg = result.get("error", {}).get("message", "Unknown error") 420 raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}") 421 elif status in ["InProgress", "Running"]: 422 logger.debug(f"Operation {operation_name} still in progress...") 423 elif status not in ["Succeeded"]: 424 logger.warning(f"Unknown status '{status}' for operation {operation_name}") 425 426 return status 427 428 final_status = _poll() 429 if final_status != "Succeeded": 430 raise SQLMeshError(f"Operation {operation_name} completed with status: {final_status}")
274 def create_warehouse( 275 self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0 276 ) -> None: 277 """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" 278 279 # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits 280 if attempt > 10: 281 raise SQLMeshError( 282 f"Gave up waiting for Fabric warehouse {warehouse_name} to become available" 283 ) 284 285 logger.info(f"Creating Fabric warehouse: {warehouse_name}") 286 287 request_data = { 288 "displayName": warehouse_name, 289 "description": f"Warehouse created by SQLMesh: {warehouse_name}", 290 } 291 292 response = self.session.post(self._endpoint_url("warehouses"), json=request_data) 293 294 if ( 295 if_not_exists 296 and response.status_code == 400 297 and (errorCode := response.json().get("errorCode", None)) 298 ): 299 if errorCode == "ItemDisplayNameAlreadyInUse": 300 logger.warning(f"Fabric warehouse {warehouse_name} already exists") 301 return 302 if errorCode == "ItemDisplayNameNotAvailableYet": 303 logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting") 304 # Fabric error message is something like: 305 # - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes." 306 # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created. 307 # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again 308 time.sleep(30) 309 return self.create_warehouse( 310 warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1 311 ) 312 313 try: 314 response.raise_for_status() 315 except: 316 # the important information to actually debug anything is in the response body which Requests never prints 317 logger.exception( 318 f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}" 319 ) 320 raise 321 322 # Handle direct success (201) or async creation (202) 323 if response.status_code == 201: 324 logger.info(f"Successfully created Fabric warehouse: {warehouse_name}") 325 return 326 327 if response.status_code == 202 and (location_header := response.headers.get("location")): 328 logger.info(f"Warehouse creation initiated for: {warehouse_name}") 329 self._wait_for_completion(location_header, warehouse_name) 330 logger.info(f"Successfully created Fabric warehouse: {warehouse_name}") 331 else: 332 logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}") 333 raise SQLMeshError(f"Unable to create warehouse: {response}")
Create a catalog (warehouse) in Microsoft Fabric via REST API.
335 def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None: 336 """Drop a catalog (warehouse) in Microsoft Fabric via REST API.""" 337 logger.info(f"Deleting Fabric warehouse: {warehouse_name}") 338 339 # Get the warehouse ID by listing warehouses 340 # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses 341 response = self.session.get(self._endpoint_url("warehouses")) 342 response.raise_for_status() 343 344 warehouse_name_to_id = { 345 warehouse.get("displayName"): warehouse.get("id") 346 for warehouse in response.json().get("value", []) 347 } 348 349 warehouse_id = warehouse_name_to_id.get(warehouse_name, None) 350 351 if not warehouse_id: 352 logger.warning( 353 f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})" 354 ) 355 if if_exists: 356 return 357 358 raise SQLMeshError( 359 f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist" 360 ) 361 362 # Delete the warehouse by ID 363 response = self.session.delete(self._endpoint_url(f"warehouses/{warehouse_id}")) 364 response.raise_for_status() 365 366 logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}")
Drop a catalog (warehouse) in Microsoft Fabric via REST API.