Edit on GitHub

sqlmesh.core.engine_adapter.fabric

  1from __future__ import annotations
  2
  3import typing as t
  4import logging
  5import requests
  6import time
  7from functools import cached_property
  8from sqlglot import exp
  9from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
 10from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
 11from sqlmesh.core.engine_adapter.shared import (
 12    InsertOverwriteStrategy,
 13)
 14from sqlmesh.utils.errors import SQLMeshError
 15from sqlmesh.utils.connection_pool import ConnectionPool
 16from sqlmesh.core.schema_diff import TableAlterOperation
 17from sqlmesh.utils import random_id
 18
 19
 20logger = logging.getLogger(__name__)
 21
 22
 23class FabricEngineAdapter(MSSQLEngineAdapter):
 24    """
 25    Adapter for Microsoft Fabric.
 26    """
 27
 28    DIALECT = "fabric"
 29    SUPPORTS_INDEXES = False
 30    SUPPORTS_TRANSACTIONS = False
 31    SUPPORTS_CREATE_DROP_CATALOG = True
 32    INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
 33
 34    def __init__(
 35        self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any
 36    ) -> None:
 37        # Wrap connection factory to support changing the catalog dynamically at runtime
 38        if not isinstance(connection_factory_or_pool, ConnectionPool):
 39            original_connection_factory = connection_factory_or_pool
 40
 41            connection_factory_or_pool = lambda *args, **kwargs: original_connection_factory(
 42                target_catalog=self._target_catalog, *args, **kwargs
 43            )
 44
 45        super().__init__(connection_factory_or_pool, *args, **kwargs)
 46
 47    @property
 48    def _target_catalog(self) -> t.Optional[str]:
 49        return self._connection_pool.get_attribute("target_catalog")
 50
 51    @_target_catalog.setter
 52    def _target_catalog(self, value: t.Optional[str]) -> None:
 53        self._connection_pool.set_attribute("target_catalog", value)
 54
 55    @property
 56    def api_client(self) -> FabricHttpClient:
 57        # the requests Session is not guaranteed to be threadsafe
 58        # so we create a http client per thread on demand
 59        if existing_client := self._connection_pool.get_attribute("api_client"):
 60            return existing_client
 61
 62        tenant_id: t.Optional[str] = self._extra_config.get("tenant_id")
 63        workspace_id: t.Optional[str] = self._extra_config.get("workspace_id")
 64        client_id: t.Optional[str] = self._extra_config.get("user")
 65        client_secret: t.Optional[str] = self._extra_config.get("password")
 66
 67        if not tenant_id or not client_id or not client_secret:
 68            raise SQLMeshError(
 69                "Service Principal authentication requires tenant_id, client_id, and client_secret "
 70                "in the Fabric connection configuration"
 71            )
 72
 73        if not workspace_id:
 74            raise SQLMeshError(
 75                "Fabric requires the workspace_id to be configured in the connection configuration to create / drop catalogs"
 76            )
 77
 78        client = FabricHttpClient(
 79            tenant_id=tenant_id,
 80            workspace_id=workspace_id,
 81            client_id=client_id,
 82            client_secret=client_secret,
 83        )
 84
 85        self._connection_pool.set_attribute("api_client", client)
 86        return client
 87
 88    def _create_catalog(self, catalog_name: exp.Identifier) -> None:
 89        """Create a catalog (warehouse) in Microsoft Fabric via REST API."""
 90        warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
 91        logger.info(f"Creating Fabric warehouse: {warehouse_name}")
 92
 93        self.api_client.create_warehouse(warehouse_name)
 94
 95    def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
 96        """Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
 97        warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
 98        current_catalog = self.get_current_catalog()
 99
100        logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
101        self.api_client.delete_warehouse(warehouse_name)
102
103        if warehouse_name == current_catalog:
104            # Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist
105            # In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog"
106            # So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads
107            # that use an either use an existing connection pointing to this warehouse or trigger a new connection
108            # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data
109            self.close()
110
111    def set_current_catalog(self, catalog_name: str) -> None:
112        """
113        Set the current catalog for Microsoft Fabric connections.
114
115        Override to handle Fabric's stateless session limitation where USE statements
116        don't persist across queries. Instead, we close existing connections and
117        recreate them with the new catalog in the connection configuration.
118
119        Args:
120            catalog_name: The name of the catalog (warehouse) to switch to
121
122        Note:
123            Fabric doesn't support catalog switching via USE statements because each
124            statement runs as an independent session. This method works around this
125            limitation by updating the connection pool with new catalog configuration.
126
127        See:
128            https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
129        """
130        current_catalog = self.get_current_catalog()
131
132        # If already using the requested catalog, do nothing
133        if current_catalog and current_catalog == catalog_name:
134            logger.debug(f"Already using catalog '{catalog_name}', no action needed")
135            return
136
137        logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'")
138
139        # commit the transaction before closing the connection to help prevent errors like:
140        # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a
141        # > DDL statement in another concurrent transaction since the start of this transaction
142        # on subsequent queries in the new connection
143        self._connection_pool.commit()
144
145        # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all()
146        # on the connection pool but we just want to close the connection for this thread
147        self._connection_pool.close()
148        self._target_catalog = catalog_name  # new connections will use this catalog
149
150        catalog_after_switch = self.get_current_catalog()
151
152        if catalog_after_switch != catalog_name:
153            # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog
154            raise SQLMeshError(
155                f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}"
156            )
157
158    def alter_table(
159        self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]]
160    ) -> None:
161        """
162        Applies alter expressions to a table. Fabric has limited support for ALTER TABLE,
163        so this method implements a workaround for column type changes.
164        This method is self-contained and sets its own catalog context.
165        """
166        if not alter_expressions:
167            return
168
169        # Get the target table from the first expression to determine the correct catalog.
170        first_op = alter_expressions[0]
171        expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op
172        if not isinstance(expression, exp.Alter) or not expression.this.catalog:
173            # Fallback for unexpected scenarios
174            logger.warning(
175                "Could not determine catalog from alter expression, executing with current context."
176            )
177            super().alter_table(alter_expressions)
178            return
179
180        target_catalog = expression.this.catalog
181        self.set_current_catalog(target_catalog)
182
183        with self.transaction():
184            for op in alter_expressions:
185                expression = op.expression if isinstance(op, TableAlterOperation) else op
186
187                if not isinstance(expression, exp.Alter):
188                    self.execute(expression)
189                    continue
190
191                for action in expression.actions:
192                    table_name = expression.this
193
194                    table_name_without_catalog = table_name.copy()
195                    table_name_without_catalog.set("catalog", None)
196
197                    is_type_change = isinstance(action, exp.AlterColumn) and action.args.get(
198                        "dtype"
199                    )
200
201                    if is_type_change:
202                        column_to_alter = action.this
203                        new_type = action.args["dtype"]
204                        temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}"
205                        temp_column_name = exp.to_identifier(temp_column_name_str)
206
207                        logger.info(
208                            "Applying workaround for column '%s' on table '%s' to change type to '%s'.",
209                            column_to_alter.sql(),
210                            table_name.sql(),
211                            new_type.sql(),
212                        )
213
214                        # Step 1: Add a temporary column.
215                        add_column_expr = exp.Alter(
216                            this=table_name_without_catalog.copy(),
217                            kind="TABLE",
218                            actions=[
219                                exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy())
220                            ],
221                        )
222                        add_sql = self._to_sql(add_column_expr)
223                        self.execute(add_sql)
224
225                        # Step 2: Copy and cast data.
226                        update_sql = self._to_sql(
227                            exp.Update(
228                                this=table_name_without_catalog.copy(),
229                                expressions=[
230                                    exp.EQ(
231                                        this=temp_column_name.copy(),
232                                        expression=exp.Cast(
233                                            this=column_to_alter.copy(), to=new_type.copy()
234                                        ),
235                                    )
236                                ],
237                            )
238                        )
239                        self.execute(update_sql)
240
241                        # Step 3: Drop the original column.
242                        drop_sql = self._to_sql(
243                            exp.Alter(
244                                this=table_name_without_catalog.copy(),
245                                kind="TABLE",
246                                actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")],
247                            )
248                        )
249                        self.execute(drop_sql)
250
251                        # Step 4: Rename the temporary column.
252                        old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}"
253                        new_name_unquoted = column_to_alter.sql(
254                            dialect=self.dialect, identify=False
255                        )
256                        rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'"
257                        self.execute(rename_sql)
258                    else:
259                        # For other alterations, execute directly.
260                        direct_alter_expr = exp.Alter(
261                            this=table_name_without_catalog.copy(), kind="TABLE", actions=[action]
262                        )
263                        self.execute(direct_alter_expr)
264
265
266class FabricHttpClient:
267    def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str):
268        self.tenant_id = tenant_id
269        self.client_id = client_id
270        self.client_secret = client_secret
271        self.workspace_id = workspace_id
272
273    def create_warehouse(
274        self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0
275    ) -> None:
276        """Create a catalog (warehouse) in Microsoft Fabric via REST API."""
277
278        # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits
279        if attempt > 10:
280            raise SQLMeshError(
281                f"Gave up waiting for Fabric warehouse {warehouse_name} to become available"
282            )
283
284        logger.info(f"Creating Fabric warehouse: {warehouse_name}")
285
286        request_data = {
287            "displayName": warehouse_name,
288            "description": f"Warehouse created by SQLMesh: {warehouse_name}",
289        }
290
291        response = self.session.post(self._endpoint_url("warehouses"), json=request_data)
292
293        if (
294            if_not_exists
295            and response.status_code == 400
296            and (errorCode := response.json().get("errorCode", None))
297        ):
298            if errorCode == "ItemDisplayNameAlreadyInUse":
299                logger.warning(f"Fabric warehouse {warehouse_name} already exists")
300                return
301            if errorCode == "ItemDisplayNameNotAvailableYet":
302                logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting")
303                # Fabric error message is something like:
304                #  - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes."
305                # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created.
306                # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again
307                time.sleep(30)
308                return self.create_warehouse(
309                    warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1
310                )
311
312        try:
313            response.raise_for_status()
314        except:
315            # the important information to actually debug anything is in the response body which Requests never prints
316            logger.exception(
317                f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}"
318            )
319            raise
320
321        # Handle direct success (201) or async creation (202)
322        if response.status_code == 201:
323            logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
324            return
325
326        if response.status_code == 202 and (location_header := response.headers.get("location")):
327            logger.info(f"Warehouse creation initiated for: {warehouse_name}")
328            self._wait_for_completion(location_header, warehouse_name)
329            logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
330        else:
331            logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}")
332            raise SQLMeshError(f"Unable to create warehouse: {response}")
333
334    def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None:
335        """Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
336        logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
337
338        # Get the warehouse ID by listing warehouses
339        # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses
340        response = self.session.get(self._endpoint_url("warehouses"))
341        response.raise_for_status()
342
343        warehouse_name_to_id = {
344            warehouse.get("displayName"): warehouse.get("id")
345            for warehouse in response.json().get("value", [])
346        }
347
348        warehouse_id = warehouse_name_to_id.get(warehouse_name, None)
349
350        if not warehouse_id:
351            logger.warning(
352                f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})"
353            )
354            if if_exists:
355                return
356
357            raise SQLMeshError(
358                f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist"
359            )
360
361        # Delete the warehouse by ID
362        response = self.session.delete(self._endpoint_url(f"warehouses/{warehouse_id}"))
363        response.raise_for_status()
364
365        logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}")
366
367    @cached_property
368    def session(self) -> requests.Session:
369        s = requests.Session()
370
371        access_token = self._get_access_token()
372        s.headers.update({"Authorization": f"Bearer {access_token}"})
373
374        return s
375
376    def _endpoint_url(self, endpoint: str) -> str:
377        if endpoint.startswith("/"):
378            endpoint = endpoint[1:]
379
380        return f"https://api.fabric.microsoft.com/v1/workspaces/{self.workspace_id}/{endpoint}"
381
382    def _get_access_token(self) -> str:
383        """Get access token using Service Principal authentication."""
384
385        # Use Azure AD OAuth2 token endpoint
386        token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
387
388        data = {
389            "grant_type": "client_credentials",
390            "client_id": self.client_id,
391            "client_secret": self.client_secret,
392            "scope": "https://api.fabric.microsoft.com/.default",
393        }
394
395        response = requests.post(token_url, data=data)
396        response.raise_for_status()
397        token_data = response.json()
398        return token_data["access_token"]
399
400    def _wait_for_completion(self, location_url: str, operation_name: str) -> None:
401        """Poll the operation status until completion."""
402
403        @retry(
404            wait=wait_exponential(multiplier=1, min=1, max=30),
405            stop=stop_after_attempt(20),
406            retry=retry_if_result(lambda result: result not in ["Succeeded", "Failed"]),
407        )
408        def _poll() -> str:
409            response = self.session.get(location_url)
410            response.raise_for_status()
411
412            result = response.json()
413            status = result.get("status", "Unknown")
414
415            logger.debug(f"Operation {operation_name} status: {status}")
416
417            if status == "Failed":
418                error_msg = result.get("error", {}).get("message", "Unknown error")
419                raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}")
420            elif status in ["InProgress", "Running"]:
421                logger.debug(f"Operation {operation_name} still in progress...")
422            elif status not in ["Succeeded"]:
423                logger.warning(f"Unknown status '{status}' for operation {operation_name}")
424
425            return status
426
427        final_status = _poll()
428        if final_status != "Succeeded":
429            raise SQLMeshError(f"Operation {operation_name} completed with status: {final_status}")
logger = <Logger sqlmesh.core.engine_adapter.fabric (WARNING)>
class FabricEngineAdapter(sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter):
 24class FabricEngineAdapter(MSSQLEngineAdapter):
 25    """
 26    Adapter for Microsoft Fabric.
 27    """
 28
 29    DIALECT = "fabric"
 30    SUPPORTS_INDEXES = False
 31    SUPPORTS_TRANSACTIONS = False
 32    SUPPORTS_CREATE_DROP_CATALOG = True
 33    INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
 34
 35    def __init__(
 36        self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any
 37    ) -> None:
 38        # Wrap connection factory to support changing the catalog dynamically at runtime
 39        if not isinstance(connection_factory_or_pool, ConnectionPool):
 40            original_connection_factory = connection_factory_or_pool
 41
 42            connection_factory_or_pool = lambda *args, **kwargs: original_connection_factory(
 43                target_catalog=self._target_catalog, *args, **kwargs
 44            )
 45
 46        super().__init__(connection_factory_or_pool, *args, **kwargs)
 47
 48    @property
 49    def _target_catalog(self) -> t.Optional[str]:
 50        return self._connection_pool.get_attribute("target_catalog")
 51
 52    @_target_catalog.setter
 53    def _target_catalog(self, value: t.Optional[str]) -> None:
 54        self._connection_pool.set_attribute("target_catalog", value)
 55
 56    @property
 57    def api_client(self) -> FabricHttpClient:
 58        # the requests Session is not guaranteed to be threadsafe
 59        # so we create a http client per thread on demand
 60        if existing_client := self._connection_pool.get_attribute("api_client"):
 61            return existing_client
 62
 63        tenant_id: t.Optional[str] = self._extra_config.get("tenant_id")
 64        workspace_id: t.Optional[str] = self._extra_config.get("workspace_id")
 65        client_id: t.Optional[str] = self._extra_config.get("user")
 66        client_secret: t.Optional[str] = self._extra_config.get("password")
 67
 68        if not tenant_id or not client_id or not client_secret:
 69            raise SQLMeshError(
 70                "Service Principal authentication requires tenant_id, client_id, and client_secret "
 71                "in the Fabric connection configuration"
 72            )
 73
 74        if not workspace_id:
 75            raise SQLMeshError(
 76                "Fabric requires the workspace_id to be configured in the connection configuration to create / drop catalogs"
 77            )
 78
 79        client = FabricHttpClient(
 80            tenant_id=tenant_id,
 81            workspace_id=workspace_id,
 82            client_id=client_id,
 83            client_secret=client_secret,
 84        )
 85
 86        self._connection_pool.set_attribute("api_client", client)
 87        return client
 88
 89    def _create_catalog(self, catalog_name: exp.Identifier) -> None:
 90        """Create a catalog (warehouse) in Microsoft Fabric via REST API."""
 91        warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
 92        logger.info(f"Creating Fabric warehouse: {warehouse_name}")
 93
 94        self.api_client.create_warehouse(warehouse_name)
 95
 96    def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
 97        """Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
 98        warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
 99        current_catalog = self.get_current_catalog()
100
101        logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
102        self.api_client.delete_warehouse(warehouse_name)
103
104        if warehouse_name == current_catalog:
105            # Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist
106            # In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog"
107            # So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads
108            # that use an either use an existing connection pointing to this warehouse or trigger a new connection
109            # will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data
110            self.close()
111
112    def set_current_catalog(self, catalog_name: str) -> None:
113        """
114        Set the current catalog for Microsoft Fabric connections.
115
116        Override to handle Fabric's stateless session limitation where USE statements
117        don't persist across queries. Instead, we close existing connections and
118        recreate them with the new catalog in the connection configuration.
119
120        Args:
121            catalog_name: The name of the catalog (warehouse) to switch to
122
123        Note:
124            Fabric doesn't support catalog switching via USE statements because each
125            statement runs as an independent session. This method works around this
126            limitation by updating the connection pool with new catalog configuration.
127
128        See:
129            https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
130        """
131        current_catalog = self.get_current_catalog()
132
133        # If already using the requested catalog, do nothing
134        if current_catalog and current_catalog == catalog_name:
135            logger.debug(f"Already using catalog '{catalog_name}', no action needed")
136            return
137
138        logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'")
139
140        # commit the transaction before closing the connection to help prevent errors like:
141        # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a
142        # > DDL statement in another concurrent transaction since the start of this transaction
143        # on subsequent queries in the new connection
144        self._connection_pool.commit()
145
146        # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all()
147        # on the connection pool but we just want to close the connection for this thread
148        self._connection_pool.close()
149        self._target_catalog = catalog_name  # new connections will use this catalog
150
151        catalog_after_switch = self.get_current_catalog()
152
153        if catalog_after_switch != catalog_name:
154            # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog
155            raise SQLMeshError(
156                f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}"
157            )
158
159    def alter_table(
160        self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]]
161    ) -> None:
162        """
163        Applies alter expressions to a table. Fabric has limited support for ALTER TABLE,
164        so this method implements a workaround for column type changes.
165        This method is self-contained and sets its own catalog context.
166        """
167        if not alter_expressions:
168            return
169
170        # Get the target table from the first expression to determine the correct catalog.
171        first_op = alter_expressions[0]
172        expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op
173        if not isinstance(expression, exp.Alter) or not expression.this.catalog:
174            # Fallback for unexpected scenarios
175            logger.warning(
176                "Could not determine catalog from alter expression, executing with current context."
177            )
178            super().alter_table(alter_expressions)
179            return
180
181        target_catalog = expression.this.catalog
182        self.set_current_catalog(target_catalog)
183
184        with self.transaction():
185            for op in alter_expressions:
186                expression = op.expression if isinstance(op, TableAlterOperation) else op
187
188                if not isinstance(expression, exp.Alter):
189                    self.execute(expression)
190                    continue
191
192                for action in expression.actions:
193                    table_name = expression.this
194
195                    table_name_without_catalog = table_name.copy()
196                    table_name_without_catalog.set("catalog", None)
197
198                    is_type_change = isinstance(action, exp.AlterColumn) and action.args.get(
199                        "dtype"
200                    )
201
202                    if is_type_change:
203                        column_to_alter = action.this
204                        new_type = action.args["dtype"]
205                        temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}"
206                        temp_column_name = exp.to_identifier(temp_column_name_str)
207
208                        logger.info(
209                            "Applying workaround for column '%s' on table '%s' to change type to '%s'.",
210                            column_to_alter.sql(),
211                            table_name.sql(),
212                            new_type.sql(),
213                        )
214
215                        # Step 1: Add a temporary column.
216                        add_column_expr = exp.Alter(
217                            this=table_name_without_catalog.copy(),
218                            kind="TABLE",
219                            actions=[
220                                exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy())
221                            ],
222                        )
223                        add_sql = self._to_sql(add_column_expr)
224                        self.execute(add_sql)
225
226                        # Step 2: Copy and cast data.
227                        update_sql = self._to_sql(
228                            exp.Update(
229                                this=table_name_without_catalog.copy(),
230                                expressions=[
231                                    exp.EQ(
232                                        this=temp_column_name.copy(),
233                                        expression=exp.Cast(
234                                            this=column_to_alter.copy(), to=new_type.copy()
235                                        ),
236                                    )
237                                ],
238                            )
239                        )
240                        self.execute(update_sql)
241
242                        # Step 3: Drop the original column.
243                        drop_sql = self._to_sql(
244                            exp.Alter(
245                                this=table_name_without_catalog.copy(),
246                                kind="TABLE",
247                                actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")],
248                            )
249                        )
250                        self.execute(drop_sql)
251
252                        # Step 4: Rename the temporary column.
253                        old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}"
254                        new_name_unquoted = column_to_alter.sql(
255                            dialect=self.dialect, identify=False
256                        )
257                        rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'"
258                        self.execute(rename_sql)
259                    else:
260                        # For other alterations, execute directly.
261                        direct_alter_expr = exp.Alter(
262                            this=table_name_without_catalog.copy(), kind="TABLE", actions=[action]
263                        )
264                        self.execute(direct_alter_expr)

Adapter for Microsoft Fabric.

FabricEngineAdapter( connection_factory_or_pool: Union[Callable, Any], *args: Any, **kwargs: Any)
35    def __init__(
36        self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any
37    ) -> None:
38        # Wrap connection factory to support changing the catalog dynamically at runtime
39        if not isinstance(connection_factory_or_pool, ConnectionPool):
40            original_connection_factory = connection_factory_or_pool
41
42            connection_factory_or_pool = lambda *args, **kwargs: original_connection_factory(
43                target_catalog=self._target_catalog, *args, **kwargs
44            )
45
46        super().__init__(connection_factory_or_pool, *args, **kwargs)
DIALECT = 'fabric'
SUPPORTS_INDEXES = False
SUPPORTS_TRANSACTIONS = False
SUPPORTS_CREATE_DROP_CATALOG = True
INSERT_OVERWRITE_STRATEGY = <InsertOverwriteStrategy.DELETE_INSERT: 1>
api_client: FabricHttpClient
56    @property
57    def api_client(self) -> FabricHttpClient:
58        # the requests Session is not guaranteed to be threadsafe
59        # so we create a http client per thread on demand
60        if existing_client := self._connection_pool.get_attribute("api_client"):
61            return existing_client
62
63        tenant_id: t.Optional[str] = self._extra_config.get("tenant_id")
64        workspace_id: t.Optional[str] = self._extra_config.get("workspace_id")
65        client_id: t.Optional[str] = self._extra_config.get("user")
66        client_secret: t.Optional[str] = self._extra_config.get("password")
67
68        if not tenant_id or not client_id or not client_secret:
69            raise SQLMeshError(
70                "Service Principal authentication requires tenant_id, client_id, and client_secret "
71                "in the Fabric connection configuration"
72            )
73
74        if not workspace_id:
75            raise SQLMeshError(
76                "Fabric requires the workspace_id to be configured in the connection configuration to create / drop catalogs"
77            )
78
79        client = FabricHttpClient(
80            tenant_id=tenant_id,
81            workspace_id=workspace_id,
82            client_id=client_id,
83            client_secret=client_secret,
84        )
85
86        self._connection_pool.set_attribute("api_client", client)
87        return client
def set_current_catalog(self, catalog_name: str) -> None:
112    def set_current_catalog(self, catalog_name: str) -> None:
113        """
114        Set the current catalog for Microsoft Fabric connections.
115
116        Override to handle Fabric's stateless session limitation where USE statements
117        don't persist across queries. Instead, we close existing connections and
118        recreate them with the new catalog in the connection configuration.
119
120        Args:
121            catalog_name: The name of the catalog (warehouse) to switch to
122
123        Note:
124            Fabric doesn't support catalog switching via USE statements because each
125            statement runs as an independent session. This method works around this
126            limitation by updating the connection pool with new catalog configuration.
127
128        See:
129            https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
130        """
131        current_catalog = self.get_current_catalog()
132
133        # If already using the requested catalog, do nothing
134        if current_catalog and current_catalog == catalog_name:
135            logger.debug(f"Already using catalog '{catalog_name}', no action needed")
136            return
137
138        logger.info(f"Switching from catalog '{current_catalog}' to '{catalog_name}'")
139
140        # commit the transaction before closing the connection to help prevent errors like:
141        # > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a
142        # > DDL statement in another concurrent transaction since the start of this transaction
143        # on subsequent queries in the new connection
144        self._connection_pool.commit()
145
146        # note: we call close() on the connection pool instead of self.close() because self.close() calls close_all()
147        # on the connection pool but we just want to close the connection for this thread
148        self._connection_pool.close()
149        self._target_catalog = catalog_name  # new connections will use this catalog
150
151        catalog_after_switch = self.get_current_catalog()
152
153        if catalog_after_switch != catalog_name:
154            # We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog
155            raise SQLMeshError(
156                f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}"
157            )

Set the current catalog for Microsoft Fabric connections.

Override to handle Fabric's stateless session limitation where USE statements don't persist across queries. Instead, we close existing connections and recreate them with the new catalog in the connection configuration.

Arguments:
  • catalog_name: The name of the catalog (warehouse) to switch to
Note:

Fabric doesn't support catalog switching via USE statements because each statement runs as an independent session. This method works around this limitation by updating the connection pool with new catalog configuration.

See:

https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations

def alter_table( self, alter_expressions: Union[List[sqlglot.expressions.ddl.Alter], List[sqlmesh.core.schema_diff.TableAlterOperation]]) -> None:
159    def alter_table(
160        self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]]
161    ) -> None:
162        """
163        Applies alter expressions to a table. Fabric has limited support for ALTER TABLE,
164        so this method implements a workaround for column type changes.
165        This method is self-contained and sets its own catalog context.
166        """
167        if not alter_expressions:
168            return
169
170        # Get the target table from the first expression to determine the correct catalog.
171        first_op = alter_expressions[0]
172        expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op
173        if not isinstance(expression, exp.Alter) or not expression.this.catalog:
174            # Fallback for unexpected scenarios
175            logger.warning(
176                "Could not determine catalog from alter expression, executing with current context."
177            )
178            super().alter_table(alter_expressions)
179            return
180
181        target_catalog = expression.this.catalog
182        self.set_current_catalog(target_catalog)
183
184        with self.transaction():
185            for op in alter_expressions:
186                expression = op.expression if isinstance(op, TableAlterOperation) else op
187
188                if not isinstance(expression, exp.Alter):
189                    self.execute(expression)
190                    continue
191
192                for action in expression.actions:
193                    table_name = expression.this
194
195                    table_name_without_catalog = table_name.copy()
196                    table_name_without_catalog.set("catalog", None)
197
198                    is_type_change = isinstance(action, exp.AlterColumn) and action.args.get(
199                        "dtype"
200                    )
201
202                    if is_type_change:
203                        column_to_alter = action.this
204                        new_type = action.args["dtype"]
205                        temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}"
206                        temp_column_name = exp.to_identifier(temp_column_name_str)
207
208                        logger.info(
209                            "Applying workaround for column '%s' on table '%s' to change type to '%s'.",
210                            column_to_alter.sql(),
211                            table_name.sql(),
212                            new_type.sql(),
213                        )
214
215                        # Step 1: Add a temporary column.
216                        add_column_expr = exp.Alter(
217                            this=table_name_without_catalog.copy(),
218                            kind="TABLE",
219                            actions=[
220                                exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy())
221                            ],
222                        )
223                        add_sql = self._to_sql(add_column_expr)
224                        self.execute(add_sql)
225
226                        # Step 2: Copy and cast data.
227                        update_sql = self._to_sql(
228                            exp.Update(
229                                this=table_name_without_catalog.copy(),
230                                expressions=[
231                                    exp.EQ(
232                                        this=temp_column_name.copy(),
233                                        expression=exp.Cast(
234                                            this=column_to_alter.copy(), to=new_type.copy()
235                                        ),
236                                    )
237                                ],
238                            )
239                        )
240                        self.execute(update_sql)
241
242                        # Step 3: Drop the original column.
243                        drop_sql = self._to_sql(
244                            exp.Alter(
245                                this=table_name_without_catalog.copy(),
246                                kind="TABLE",
247                                actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")],
248                            )
249                        )
250                        self.execute(drop_sql)
251
252                        # Step 4: Rename the temporary column.
253                        old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}"
254                        new_name_unquoted = column_to_alter.sql(
255                            dialect=self.dialect, identify=False
256                        )
257                        rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'"
258                        self.execute(rename_sql)
259                    else:
260                        # For other alterations, execute directly.
261                        direct_alter_expr = exp.Alter(
262                            this=table_name_without_catalog.copy(), kind="TABLE", actions=[action]
263                        )
264                        self.execute(direct_alter_expr)

Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, so this method implements a workaround for column type changes. This method is self-contained and sets its own catalog context.

Inherited Members
sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter
SUPPORTS_TUPLE_IN
SUPPORTS_MATERIALIZED_VIEWS
CURRENT_CATALOG_EXPRESSION
COMMENT_CREATION_TABLE
COMMENT_CREATION_VIEW
SUPPORTS_REPLACE_TABLE
MAX_IDENTIFIER_LENGTH
SUPPORTS_QUERY_EXECUTION_TRACKING
SCHEMA_DIFFER_KWARGS
VARIABLE_LENGTH_DATA_TYPES
catalog_support
columns
table_exists
drop_schema
merge
delete_from
sqlmesh.core.engine_adapter.mixins.GetCurrentCatalogFromFunctionMixin
get_current_catalog
sqlmesh.core.engine_adapter.mixins.RowDiffMixin
MAX_TIMESTAMP_PRECISION
concat_columns
normalize_value
sqlmesh.core.engine_adapter.base.EngineAdapter
DEFAULT_BATCH_SIZE
DATA_OBJECT_FILTER_BATCH_SIZE
MAX_TABLE_COMMENT_LENGTH
MAX_COLUMN_COMMENT_LENGTH
SUPPORTS_MATERIALIZED_VIEW_SCHEMA
SUPPORTS_VIEW_SCHEMA
SUPPORTS_CLONING
SUPPORTS_MANAGED_MODELS
SUPPORTED_DROP_CASCADE_OBJECT_KINDS
HAS_VIEW_BINDING
SUPPORTS_GRANTS
DEFAULT_CATALOG_TYPE
QUOTE_IDENTIFIERS_IN_VIEWS
ATTACH_CORRELATION_ID
SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS
dialect
correlation_id
with_settings
cursor
connection
spark
snowpark
bigframe
comments_enabled
schema_differ
default_catalog
engine_run_mode
recycle
close
get_catalog_type
get_catalog_type_from_table
current_catalog_type
replace_query
create_index
create_table
create_managed_table
ctas
create_state_table
create_table_like
clone_table
drop_data_object
drop_table
drop_managed_table
get_alter_operations
create_view
create_schema
drop_view
create_catalog
drop_catalog
insert_append
insert_overwrite_by_partition
insert_overwrite_by_time_partition
update_table
scd_type_2_by_time
scd_type_2_by_column
rename_table
get_data_object
get_data_objects
fetchone
fetchall
fetchdf
fetch_pyspark_df
wap_enabled
wap_supported
wap_table_name
wap_prepare
wap_publish
sync_grants_config
transaction
session
execute
temp_table
drop_data_object_on_type_mismatch
ensure_nulls_for_unmatched_after_join
use_server_nulls_for_unmatched_after_join
ping
get_table_last_modified_ts
class FabricHttpClient:
267class FabricHttpClient:
268    def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str):
269        self.tenant_id = tenant_id
270        self.client_id = client_id
271        self.client_secret = client_secret
272        self.workspace_id = workspace_id
273
274    def create_warehouse(
275        self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0
276    ) -> None:
277        """Create a catalog (warehouse) in Microsoft Fabric via REST API."""
278
279        # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits
280        if attempt > 10:
281            raise SQLMeshError(
282                f"Gave up waiting for Fabric warehouse {warehouse_name} to become available"
283            )
284
285        logger.info(f"Creating Fabric warehouse: {warehouse_name}")
286
287        request_data = {
288            "displayName": warehouse_name,
289            "description": f"Warehouse created by SQLMesh: {warehouse_name}",
290        }
291
292        response = self.session.post(self._endpoint_url("warehouses"), json=request_data)
293
294        if (
295            if_not_exists
296            and response.status_code == 400
297            and (errorCode := response.json().get("errorCode", None))
298        ):
299            if errorCode == "ItemDisplayNameAlreadyInUse":
300                logger.warning(f"Fabric warehouse {warehouse_name} already exists")
301                return
302            if errorCode == "ItemDisplayNameNotAvailableYet":
303                logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting")
304                # Fabric error message is something like:
305                #  - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes."
306                # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created.
307                # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again
308                time.sleep(30)
309                return self.create_warehouse(
310                    warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1
311                )
312
313        try:
314            response.raise_for_status()
315        except:
316            # the important information to actually debug anything is in the response body which Requests never prints
317            logger.exception(
318                f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}"
319            )
320            raise
321
322        # Handle direct success (201) or async creation (202)
323        if response.status_code == 201:
324            logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
325            return
326
327        if response.status_code == 202 and (location_header := response.headers.get("location")):
328            logger.info(f"Warehouse creation initiated for: {warehouse_name}")
329            self._wait_for_completion(location_header, warehouse_name)
330            logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
331        else:
332            logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}")
333            raise SQLMeshError(f"Unable to create warehouse: {response}")
334
335    def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None:
336        """Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
337        logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
338
339        # Get the warehouse ID by listing warehouses
340        # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses
341        response = self.session.get(self._endpoint_url("warehouses"))
342        response.raise_for_status()
343
344        warehouse_name_to_id = {
345            warehouse.get("displayName"): warehouse.get("id")
346            for warehouse in response.json().get("value", [])
347        }
348
349        warehouse_id = warehouse_name_to_id.get(warehouse_name, None)
350
351        if not warehouse_id:
352            logger.warning(
353                f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})"
354            )
355            if if_exists:
356                return
357
358            raise SQLMeshError(
359                f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist"
360            )
361
362        # Delete the warehouse by ID
363        response = self.session.delete(self._endpoint_url(f"warehouses/{warehouse_id}"))
364        response.raise_for_status()
365
366        logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}")
367
368    @cached_property
369    def session(self) -> requests.Session:
370        s = requests.Session()
371
372        access_token = self._get_access_token()
373        s.headers.update({"Authorization": f"Bearer {access_token}"})
374
375        return s
376
377    def _endpoint_url(self, endpoint: str) -> str:
378        if endpoint.startswith("/"):
379            endpoint = endpoint[1:]
380
381        return f"https://api.fabric.microsoft.com/v1/workspaces/{self.workspace_id}/{endpoint}"
382
383    def _get_access_token(self) -> str:
384        """Get access token using Service Principal authentication."""
385
386        # Use Azure AD OAuth2 token endpoint
387        token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
388
389        data = {
390            "grant_type": "client_credentials",
391            "client_id": self.client_id,
392            "client_secret": self.client_secret,
393            "scope": "https://api.fabric.microsoft.com/.default",
394        }
395
396        response = requests.post(token_url, data=data)
397        response.raise_for_status()
398        token_data = response.json()
399        return token_data["access_token"]
400
401    def _wait_for_completion(self, location_url: str, operation_name: str) -> None:
402        """Poll the operation status until completion."""
403
404        @retry(
405            wait=wait_exponential(multiplier=1, min=1, max=30),
406            stop=stop_after_attempt(20),
407            retry=retry_if_result(lambda result: result not in ["Succeeded", "Failed"]),
408        )
409        def _poll() -> str:
410            response = self.session.get(location_url)
411            response.raise_for_status()
412
413            result = response.json()
414            status = result.get("status", "Unknown")
415
416            logger.debug(f"Operation {operation_name} status: {status}")
417
418            if status == "Failed":
419                error_msg = result.get("error", {}).get("message", "Unknown error")
420                raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}")
421            elif status in ["InProgress", "Running"]:
422                logger.debug(f"Operation {operation_name} still in progress...")
423            elif status not in ["Succeeded"]:
424                logger.warning(f"Unknown status '{status}' for operation {operation_name}")
425
426            return status
427
428        final_status = _poll()
429        if final_status != "Succeeded":
430            raise SQLMeshError(f"Operation {operation_name} completed with status: {final_status}")
FabricHttpClient( tenant_id: str, workspace_id: str, client_id: str, client_secret: str)
268    def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str):
269        self.tenant_id = tenant_id
270        self.client_id = client_id
271        self.client_secret = client_secret
272        self.workspace_id = workspace_id
tenant_id
client_id
client_secret
workspace_id
def create_warehouse( self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0) -> None:
274    def create_warehouse(
275        self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0
276    ) -> None:
277        """Create a catalog (warehouse) in Microsoft Fabric via REST API."""
278
279        # attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits
280        if attempt > 10:
281            raise SQLMeshError(
282                f"Gave up waiting for Fabric warehouse {warehouse_name} to become available"
283            )
284
285        logger.info(f"Creating Fabric warehouse: {warehouse_name}")
286
287        request_data = {
288            "displayName": warehouse_name,
289            "description": f"Warehouse created by SQLMesh: {warehouse_name}",
290        }
291
292        response = self.session.post(self._endpoint_url("warehouses"), json=request_data)
293
294        if (
295            if_not_exists
296            and response.status_code == 400
297            and (errorCode := response.json().get("errorCode", None))
298        ):
299            if errorCode == "ItemDisplayNameAlreadyInUse":
300                logger.warning(f"Fabric warehouse {warehouse_name} already exists")
301                return
302            if errorCode == "ItemDisplayNameNotAvailableYet":
303                logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting")
304                # Fabric error message is something like:
305                #  - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes."
306                # This seems to happen if a catalog is dropped and then a new one with the same name is immediately created.
307                # There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again
308                time.sleep(30)
309                return self.create_warehouse(
310                    warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1
311                )
312
313        try:
314            response.raise_for_status()
315        except:
316            # the important information to actually debug anything is in the response body which Requests never prints
317            logger.exception(
318                f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}"
319            )
320            raise
321
322        # Handle direct success (201) or async creation (202)
323        if response.status_code == 201:
324            logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
325            return
326
327        if response.status_code == 202 and (location_header := response.headers.get("location")):
328            logger.info(f"Warehouse creation initiated for: {warehouse_name}")
329            self._wait_for_completion(location_header, warehouse_name)
330            logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
331        else:
332            logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}")
333            raise SQLMeshError(f"Unable to create warehouse: {response}")

Create a catalog (warehouse) in Microsoft Fabric via REST API.

def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None:
335    def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None:
336        """Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
337        logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
338
339        # Get the warehouse ID by listing warehouses
340        # TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses
341        response = self.session.get(self._endpoint_url("warehouses"))
342        response.raise_for_status()
343
344        warehouse_name_to_id = {
345            warehouse.get("displayName"): warehouse.get("id")
346            for warehouse in response.json().get("value", [])
347        }
348
349        warehouse_id = warehouse_name_to_id.get(warehouse_name, None)
350
351        if not warehouse_id:
352            logger.warning(
353                f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})"
354            )
355            if if_exists:
356                return
357
358            raise SQLMeshError(
359                f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist"
360            )
361
362        # Delete the warehouse by ID
363        response = self.session.delete(self._endpoint_url(f"warehouses/{warehouse_id}"))
364        response.raise_for_status()
365
366        logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}")

Drop a catalog (warehouse) in Microsoft Fabric via REST API.

session: requests.sessions.Session
368    @cached_property
369    def session(self) -> requests.Session:
370        s = requests.Session()
371
372        access_token = self._get_access_token()
373        s.headers.update({"Authorization": f"Bearer {access_token}"})
374
375        return s