Edit on GitHub

sqlmesh.dbt.adapter

  1from __future__ import annotations
  2
  3import abc
  4import logging
  5import typing as t
  6
  7from sqlglot import exp, parse_one
  8
  9from sqlmesh.core.dialect import normalize_and_quote, normalize_model_name, schema_
 10from sqlmesh.core.engine_adapter import EngineAdapter
 11from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot, to_table_mapping
 12from sqlmesh.utils.errors import ConfigError, ParsetimeAdapterCallError
 13from sqlmesh.utils.jinja import JinjaMacroRegistry
 14from sqlmesh.utils import AttributeDict
 15from sqlmesh.core.schema_diff import TableAlterOperation
 16
 17if t.TYPE_CHECKING:
 18    import agate
 19    from dbt.adapters.base import BaseRelation
 20    from dbt.adapters.base.column import Column
 21    from dbt.adapters.base.impl import AdapterResponse
 22    from sqlmesh.core.engine_adapter.base import DataObject
 23    from sqlmesh.dbt.relation import Policy
 24
 25
 26logger = logging.getLogger(__name__)
 27
 28
 29class BaseAdapter(abc.ABC):
 30    def __init__(
 31        self,
 32        jinja_macros: JinjaMacroRegistry,
 33        jinja_globals: t.Optional[t.Dict[str, t.Any]] = None,
 34        project_dialect: t.Optional[str] = None,
 35        quote_policy: t.Optional[Policy] = None,
 36    ):
 37        from dbt.adapters.base.relation import Policy
 38
 39        self.jinja_macros = jinja_macros
 40        self.jinja_globals = jinja_globals.copy() if jinja_globals else {}
 41        self.jinja_globals["adapter"] = self
 42        self.project_dialect = project_dialect
 43        self.quote_policy = quote_policy or Policy()
 44
 45    @abc.abstractmethod
 46    def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
 47        """Returns a single relation that matches the provided path."""
 48
 49    @abc.abstractmethod
 50    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
 51        """Returns a single relation that matches the provided relation if present."""
 52
 53    @abc.abstractmethod
 54    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
 55        """Gets all relations in a given schema and optionally database.
 56
 57        TODO: Add caching functionality to avoid repeat visits to DB
 58        """
 59
 60    @abc.abstractmethod
 61    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
 62        """Using the engine adapter, gets all the relations that match the given schema grain relation."""
 63
 64    @abc.abstractmethod
 65    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
 66        """Returns the columns for a given table grained relation."""
 67
 68    @abc.abstractmethod
 69    def get_missing_columns(
 70        self, from_relation: BaseRelation, to_relation: BaseRelation
 71    ) -> t.List[Column]:
 72        """Returns the columns in from_relation missing from to_relation."""
 73
 74    @abc.abstractmethod
 75    def create_schema(self, relation: BaseRelation) -> None:
 76        """Creates a schema in the target database."""
 77
 78    @abc.abstractmethod
 79    def drop_schema(self, relation: BaseRelation) -> None:
 80        """Drops a schema in the target database."""
 81
 82    @abc.abstractmethod
 83    def drop_relation(self, relation: BaseRelation) -> None:
 84        """Drops a relation (table) in the target database."""
 85
 86    @abc.abstractmethod
 87    def expand_target_column_types(
 88        self, from_relation: BaseRelation, to_relation: BaseRelation
 89    ) -> None:
 90        """Expand to_relation's column types to match those of from_relation."""
 91
 92    @abc.abstractmethod
 93    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
 94        """Renames a relation (table) in the target database."""
 95
 96    @abc.abstractmethod
 97    def execute(
 98        self, sql: str, auto_begin: bool = False, fetch: bool = False
 99    ) -> t.Tuple[AdapterResponse, agate.Table]:
100        """Executes the given SQL statement and returns the results as an agate table."""
101
102    @abc.abstractmethod
103    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
104        """Resolves the relation's schema to its physical schema."""
105
106    @abc.abstractmethod
107    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
108        """Resolves the relation's schema to its physical identifier."""
109
110    def quote(self, identifier: str) -> str:
111        """Returns a quoted identifier."""
112        return exp.to_column(identifier).sql(dialect=self.project_dialect, identify=True)
113
114    def quote_as_configured(self, value: str, component_type: str) -> str:
115        """Returns the value quoted according to the quote policy."""
116        return self.quote(value) if getattr(self.quote_policy, component_type, False) else value
117
118    def dispatch(
119        self,
120        macro_name: str,
121        macro_namespace: t.Optional[str] = None,
122    ) -> t.Callable:
123        """Returns a dialect-specific version of a macro with the given name."""
124        target_type = self.jinja_globals["target"]["type"]
125        macro_suffix = f"__{macro_name}"
126
127        def _relevance(package_name_pair: t.Tuple[t.Optional[str], str]) -> t.Tuple[int, int]:
128            """Lower scores more relevant."""
129            macro_package, name = package_name_pair
130
131            package_score = 0 if macro_package == macro_namespace else 1
132            name_score = 1
133
134            if name.startswith("default"):
135                name_score = 2
136            elif name.startswith(target_type):
137                name_score = 0
138
139            return name_score, package_score
140
141        jinja_env = self.jinja_macros.build_environment(**self.jinja_globals).globals
142
143        packages_to_check: t.List[t.Optional[str]] = [None]
144        if macro_namespace is not None:
145            if macro_namespace in jinja_env:
146                packages_to_check = [self.jinja_macros.root_package_name, macro_namespace]
147
148        # Add dbt packages as fallback
149        packages_to_check.extend(k for k in jinja_env if k.startswith("dbt"))
150
151        candidates = {}
152        for macro_package in packages_to_check:
153            macros = jinja_env.get(macro_package, {}) if macro_package else jinja_env
154            if not isinstance(macros, dict):
155                continue
156            candidates.update(
157                {
158                    (macro_package, macro_name): macro_callable
159                    for macro_name, macro_callable in macros.items()
160                    if macro_name.endswith(macro_suffix)
161                }
162            )
163
164        if candidates:
165            sorted_candidates = sorted(candidates, key=_relevance)
166            return candidates[sorted_candidates[0]]
167
168        raise ConfigError(f"Macro '{macro_name}', package '{macro_namespace}' was not found.")
169
170    def type(self) -> str:
171        return self.project_dialect or ""
172
173    def compare_dbr_version(self, major: int, minor: int) -> int:
174        # This method is specific to the Databricks dbt adapter implementation and is used in some macros.
175        # Always return -1 to fallback to Spark macro implementations.
176        return -1
177
178    @property
179    def graph(self) -> t.Any:
180        flat_graph = self.jinja_globals.get("flat_graph", None)
181        return flat_graph or AttributeDict(
182            {
183                "exposures": {},
184                "groups": {},
185                "metrics": {},
186                "nodes": {},
187                "sources": {},
188                "semantic_models": {},
189                "saved_queries": {},
190            }
191        )
192
193
194class ParsetimeAdapter(BaseAdapter):
195    def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
196        self._raise_parsetime_adapter_call_error("get relation")
197        raise
198
199    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
200        self._raise_parsetime_adapter_call_error("load relation")
201        raise
202
203    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
204        self._raise_parsetime_adapter_call_error("list relation")
205        raise
206
207    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
208        self._raise_parsetime_adapter_call_error("list relation")
209        raise
210
211    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
212        self._raise_parsetime_adapter_call_error("get columns")
213        raise
214
215    def get_missing_columns(
216        self, from_relation: BaseRelation, to_relation: BaseRelation
217    ) -> t.List[Column]:
218        self._raise_parsetime_adapter_call_error("get missing columns")
219        raise
220
221    def create_schema(self, relation: BaseRelation) -> None:
222        self._raise_parsetime_adapter_call_error("create schema")
223
224    def drop_schema(self, relation: BaseRelation) -> None:
225        self._raise_parsetime_adapter_call_error("drop schema")
226
227    def drop_relation(self, relation: BaseRelation) -> None:
228        self._raise_parsetime_adapter_call_error("drop relation")
229
230    def expand_target_column_types(
231        self, from_relation: BaseRelation, to_relation: BaseRelation
232    ) -> None:
233        self._raise_parsetime_adapter_call_error("expand target column types")
234
235    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
236        self._raise_parsetime_adapter_call_error("rename relation")
237
238    def execute(
239        self, sql: str, auto_begin: bool = False, fetch: bool = False
240    ) -> t.Tuple[AdapterResponse, agate.Table]:
241        self._raise_parsetime_adapter_call_error("execute SQL")
242        raise
243
244    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
245        return relation.schema
246
247    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
248        return relation.identifier
249
250    @staticmethod
251    def _raise_parsetime_adapter_call_error(action: str) -> None:
252        raise ParsetimeAdapterCallError(f"Can't {action} at parse time.")
253
254
255class RuntimeAdapter(BaseAdapter):
256    def __init__(
257        self,
258        engine_adapter: EngineAdapter,
259        jinja_macros: JinjaMacroRegistry,
260        jinja_globals: t.Optional[t.Dict[str, t.Any]] = None,
261        relation_type: t.Optional[t.Type[BaseRelation]] = None,
262        column_type: t.Optional[t.Type[Column]] = None,
263        quote_policy: t.Optional[Policy] = None,
264        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
265        table_mapping: t.Optional[t.Dict[str, str]] = None,
266        deployability_index: t.Optional[DeployabilityIndex] = None,
267        project_dialect: t.Optional[str] = None,
268    ):
269        from dbt.adapters.base import BaseRelation
270        from dbt.adapters.base.column import Column
271
272        super().__init__(
273            jinja_macros,
274            jinja_globals=jinja_globals,
275            project_dialect=project_dialect or engine_adapter.dialect,
276            quote_policy=quote_policy,
277        )
278
279        table_mapping = table_mapping or {}
280
281        self.engine_adapter = engine_adapter
282        self.relation_type = relation_type or BaseRelation
283        self.column_type = column_type or Column
284        self.table_mapping = {
285            **to_table_mapping((snapshots or {}).values(), deployability_index),
286            **table_mapping,
287        }
288
289    def get_relation(
290        self, database: t.Optional[str], schema: str, identifier: str
291    ) -> t.Optional[BaseRelation]:
292        target_table = exp.table_(identifier, db=schema, catalog=database)
293        # Normalize before converting to a relation; otherwise, it will be too late,
294        # as quotes will have already been applied.
295        target_table = self._normalize(target_table)
296        return self.load_relation(self._table_to_relation(target_table))
297
298    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
299        mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation)))
300
301        data_object = self.engine_adapter.get_data_object(mapped_table)
302        return self._data_object_to_relation(data_object) if data_object is not None else None
303
304    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
305        target_schema = schema_(schema, catalog=database)
306        # Normalize before converting to a relation; otherwise, it will be too late,
307        # as quotes will have already been applied.
308        target_schema = self._normalize(target_schema)
309        return self.list_relations_without_caching(self._table_to_relation(target_schema))
310
311    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
312        schema = self._normalize(self._schema(schema_relation))
313
314        relations = [
315            self._data_object_to_relation(do) for do in self.engine_adapter.get_data_objects(schema)
316        ]
317        return relations
318
319    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
320        mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation)))
321
322        if self.project_dialect == "bigquery":
323            # dbt.adapters.bigquery.column.BigQueryColumn has a different constructor signature
324            # We need to use BigQueryColumn.create_from_field() to create the column instead
325            if (
326                hasattr(self.column_type, "create_from_field")
327                and callable(getattr(self.column_type, "create_from_field"))
328                and hasattr(self.engine_adapter, "get_bq_schema")
329                and callable(getattr(self.engine_adapter, "get_bq_schema"))
330            ):
331                return [
332                    self.column_type.create_from_field(field)  # type: ignore
333                    for field in self.engine_adapter.get_bq_schema(mapped_table)  # type: ignore
334                ]
335            from dbt.adapters.base.column import Column
336
337            return [
338                Column.from_description(
339                    name=name, raw_data_type=dtype.sql(dialect=self.project_dialect)
340                )
341                for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items()
342            ]
343        return [
344            self.column_type.from_description(
345                name=name, raw_data_type=dtype.sql(dialect=self.project_dialect)
346            )
347            for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items()
348        ]
349
350    def get_missing_columns(
351        self, from_relation: BaseRelation, to_relation: BaseRelation
352    ) -> t.List[Column]:
353        target_columns = {col.name for col in self.get_columns_in_relation(to_relation)}
354
355        return [
356            col
357            for col in self.get_columns_in_relation(from_relation)
358            if col.name not in target_columns
359        ]
360
361    def create_schema(self, relation: BaseRelation) -> None:
362        if relation.schema is not None:
363            self.engine_adapter.create_schema(self._normalize(self._schema(relation)))
364
365    def drop_schema(self, relation: BaseRelation) -> None:
366        if relation.schema is not None:
367            self.engine_adapter.drop_schema(self._normalize(self._schema(relation)))
368
369    def drop_relation(self, relation: BaseRelation) -> None:
370        if relation.schema is not None and relation.identifier is not None:
371            self.engine_adapter.drop_table(self._normalize(self._relation_to_table(relation)))
372
373    def expand_target_column_types(
374        self, from_relation: BaseRelation, to_relation: BaseRelation
375    ) -> None:
376        from_dbt_columns = {c.name: c for c in self.get_columns_in_relation(from_relation)}
377        to_dbt_columns = {c.name: c for c in self.get_columns_in_relation(to_relation)}
378
379        from_table_name = self._normalize(self._relation_to_table(from_relation))
380        to_table_name = self._normalize(self._relation_to_table(to_relation))
381
382        from_columns = self.engine_adapter.columns(from_table_name)
383        to_columns = self.engine_adapter.columns(to_table_name)
384
385        current_columns = {}
386        new_columns = {}
387        for column_name, from_column in from_dbt_columns.items():
388            target_column = to_dbt_columns.get(column_name)
389            if target_column is not None and target_column.can_expand_to(from_column):
390                current_columns[column_name] = to_columns[column_name]
391                new_columns[column_name] = from_columns[column_name]
392
393        alter_expressions = t.cast(
394            t.List[TableAlterOperation],
395            self.engine_adapter.schema_differ.compare_columns(
396                to_table_name,
397                current_columns,
398                new_columns,
399                ignore_destructive=True,
400            ),
401        )
402
403        if alter_expressions:
404            self.engine_adapter.alter_table(alter_expressions)
405
406    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
407        old_table_name = self._normalize(self._relation_to_table(from_relation))
408        new_table_name = self._normalize(self._relation_to_table(to_relation))
409
410        self.engine_adapter.rename_table(old_table_name, new_table_name)
411
412    def execute(
413        self, sql: str, auto_begin: bool = False, fetch: bool = False
414    ) -> t.Tuple[AdapterResponse, agate.Table]:
415        import pandas as pd
416        from dbt.adapters.base.impl import AdapterResponse
417
418        from sqlmesh.dbt.util import pandas_to_agate, empty_table
419
420        # mypy bug: https://github.com/python/mypy/issues/10740
421        exec_func: t.Callable[..., None | pd.DataFrame] = (
422            self.engine_adapter.fetchdf if fetch else self.engine_adapter.execute  # type: ignore
423        )
424
425        expression = parse_one(sql, read=self.project_dialect)
426        with normalize_and_quote(
427            expression, t.cast(str, self.project_dialect), self.engine_adapter.default_catalog
428        ) as expression:
429            expression = exp.replace_tables(
430                expression, self.table_mapping, dialect=self.project_dialect, copy=False
431            )
432
433        if auto_begin:
434            # TODO: This could be a bug. I think dbt leaves the transaction open while we close immediately.
435            with self.engine_adapter.transaction():
436                resp = exec_func(expression, quote_identifiers=False)
437        else:
438            resp = exec_func(expression, quote_identifiers=False)
439
440        # TODO: Properly fill in adapter response
441        if fetch:
442            assert isinstance(resp, pd.DataFrame)
443            return AdapterResponse("Success"), pandas_to_agate(resp)
444        return AdapterResponse("Success"), empty_table()
445
446    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
447        schema = self._map_table_name(self._normalize(self._relation_to_table(relation))).db
448        return schema if schema else None
449
450    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
451        identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name
452        return identifier if identifier else None
453
454    def _map_table_name(self, table: exp.Table) -> exp.Table:
455        # Use the default dialect since this is the dialect used to normalize and quote keys in the
456        # mapping table.
457        name = table.sql(identify=True)
458        physical_table_name = self.table_mapping.get(name)
459        if not physical_table_name:
460            return table
461
462        logger.debug("Resolved ref '%s' to snapshot table '%s'", name, physical_table_name)
463
464        return exp.to_table(physical_table_name, dialect=self.project_dialect)
465
466    def _relation_to_table(self, relation: BaseRelation) -> exp.Table:
467        return exp.to_table(relation.render(), dialect=self.project_dialect)
468
469    def _data_object_to_relation(self, data_object: DataObject) -> BaseRelation:
470        from sqlmesh.dbt.relation import RelationType
471
472        if data_object.type.is_unknown:
473            dbt_relation_type = RelationType.External
474        elif data_object.type.is_managed_table:
475            dbt_relation_type = RelationType.Table
476        else:
477            dbt_relation_type = RelationType(data_object.type.lower())
478
479        return self.relation_type.create(
480            database=data_object.catalog,
481            schema=data_object.schema_name,
482            identifier=data_object.name,
483            quote_policy=self.quote_policy,
484            type=dbt_relation_type,
485        )
486
487    def _table_to_relation(self, table: exp.Table) -> BaseRelation:
488        return self.relation_type.create(
489            database=table.catalog or None,
490            schema=table.db,
491            identifier=table.name,
492            quote_policy=self.quote_policy,
493        )
494
495    def _schema(self, schema_relation: BaseRelation) -> exp.Table:
496        assert schema_relation.schema is not None
497        return exp.Table(
498            this=None,
499            db=exp.to_identifier(schema_relation.schema, quoted=self.quote_policy.schema),
500            catalog=exp.to_identifier(schema_relation.database, quoted=self.quote_policy.database),
501        )
502
503    def _normalize(self, input_table: exp.Table) -> exp.Table:
504        normalized_name = normalize_model_name(
505            input_table, self.engine_adapter.default_catalog, self.project_dialect
506        )
507        normalized_table = exp.to_table(normalized_name)
508        if not input_table.this:
509            normalized_table.set("catalog", normalized_table.args.get("db"))
510            normalized_table.set("db", normalized_table.this)
511            normalized_table.set("this", None)
512        return normalized_table
logger = <Logger sqlmesh.dbt.adapter (WARNING)>
class BaseAdapter(abc.ABC):
 30class BaseAdapter(abc.ABC):
 31    def __init__(
 32        self,
 33        jinja_macros: JinjaMacroRegistry,
 34        jinja_globals: t.Optional[t.Dict[str, t.Any]] = None,
 35        project_dialect: t.Optional[str] = None,
 36        quote_policy: t.Optional[Policy] = None,
 37    ):
 38        from dbt.adapters.base.relation import Policy
 39
 40        self.jinja_macros = jinja_macros
 41        self.jinja_globals = jinja_globals.copy() if jinja_globals else {}
 42        self.jinja_globals["adapter"] = self
 43        self.project_dialect = project_dialect
 44        self.quote_policy = quote_policy or Policy()
 45
 46    @abc.abstractmethod
 47    def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
 48        """Returns a single relation that matches the provided path."""
 49
 50    @abc.abstractmethod
 51    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
 52        """Returns a single relation that matches the provided relation if present."""
 53
 54    @abc.abstractmethod
 55    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
 56        """Gets all relations in a given schema and optionally database.
 57
 58        TODO: Add caching functionality to avoid repeat visits to DB
 59        """
 60
 61    @abc.abstractmethod
 62    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
 63        """Using the engine adapter, gets all the relations that match the given schema grain relation."""
 64
 65    @abc.abstractmethod
 66    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
 67        """Returns the columns for a given table grained relation."""
 68
 69    @abc.abstractmethod
 70    def get_missing_columns(
 71        self, from_relation: BaseRelation, to_relation: BaseRelation
 72    ) -> t.List[Column]:
 73        """Returns the columns in from_relation missing from to_relation."""
 74
 75    @abc.abstractmethod
 76    def create_schema(self, relation: BaseRelation) -> None:
 77        """Creates a schema in the target database."""
 78
 79    @abc.abstractmethod
 80    def drop_schema(self, relation: BaseRelation) -> None:
 81        """Drops a schema in the target database."""
 82
 83    @abc.abstractmethod
 84    def drop_relation(self, relation: BaseRelation) -> None:
 85        """Drops a relation (table) in the target database."""
 86
 87    @abc.abstractmethod
 88    def expand_target_column_types(
 89        self, from_relation: BaseRelation, to_relation: BaseRelation
 90    ) -> None:
 91        """Expand to_relation's column types to match those of from_relation."""
 92
 93    @abc.abstractmethod
 94    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
 95        """Renames a relation (table) in the target database."""
 96
 97    @abc.abstractmethod
 98    def execute(
 99        self, sql: str, auto_begin: bool = False, fetch: bool = False
100    ) -> t.Tuple[AdapterResponse, agate.Table]:
101        """Executes the given SQL statement and returns the results as an agate table."""
102
103    @abc.abstractmethod
104    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
105        """Resolves the relation's schema to its physical schema."""
106
107    @abc.abstractmethod
108    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
109        """Resolves the relation's schema to its physical identifier."""
110
111    def quote(self, identifier: str) -> str:
112        """Returns a quoted identifier."""
113        return exp.to_column(identifier).sql(dialect=self.project_dialect, identify=True)
114
115    def quote_as_configured(self, value: str, component_type: str) -> str:
116        """Returns the value quoted according to the quote policy."""
117        return self.quote(value) if getattr(self.quote_policy, component_type, False) else value
118
119    def dispatch(
120        self,
121        macro_name: str,
122        macro_namespace: t.Optional[str] = None,
123    ) -> t.Callable:
124        """Returns a dialect-specific version of a macro with the given name."""
125        target_type = self.jinja_globals["target"]["type"]
126        macro_suffix = f"__{macro_name}"
127
128        def _relevance(package_name_pair: t.Tuple[t.Optional[str], str]) -> t.Tuple[int, int]:
129            """Lower scores more relevant."""
130            macro_package, name = package_name_pair
131
132            package_score = 0 if macro_package == macro_namespace else 1
133            name_score = 1
134
135            if name.startswith("default"):
136                name_score = 2
137            elif name.startswith(target_type):
138                name_score = 0
139
140            return name_score, package_score
141
142        jinja_env = self.jinja_macros.build_environment(**self.jinja_globals).globals
143
144        packages_to_check: t.List[t.Optional[str]] = [None]
145        if macro_namespace is not None:
146            if macro_namespace in jinja_env:
147                packages_to_check = [self.jinja_macros.root_package_name, macro_namespace]
148
149        # Add dbt packages as fallback
150        packages_to_check.extend(k for k in jinja_env if k.startswith("dbt"))
151
152        candidates = {}
153        for macro_package in packages_to_check:
154            macros = jinja_env.get(macro_package, {}) if macro_package else jinja_env
155            if not isinstance(macros, dict):
156                continue
157            candidates.update(
158                {
159                    (macro_package, macro_name): macro_callable
160                    for macro_name, macro_callable in macros.items()
161                    if macro_name.endswith(macro_suffix)
162                }
163            )
164
165        if candidates:
166            sorted_candidates = sorted(candidates, key=_relevance)
167            return candidates[sorted_candidates[0]]
168
169        raise ConfigError(f"Macro '{macro_name}', package '{macro_namespace}' was not found.")
170
171    def type(self) -> str:
172        return self.project_dialect or ""
173
174    def compare_dbr_version(self, major: int, minor: int) -> int:
175        # This method is specific to the Databricks dbt adapter implementation and is used in some macros.
176        # Always return -1 to fallback to Spark macro implementations.
177        return -1
178
179    @property
180    def graph(self) -> t.Any:
181        flat_graph = self.jinja_globals.get("flat_graph", None)
182        return flat_graph or AttributeDict(
183            {
184                "exposures": {},
185                "groups": {},
186                "metrics": {},
187                "nodes": {},
188                "sources": {},
189                "semantic_models": {},
190                "saved_queries": {},
191            }
192        )

Helper class that provides a standard way to create an ABC using inheritance.

jinja_macros
jinja_globals
project_dialect
quote_policy
@abc.abstractmethod
def get_relation( self, database: str, schema: str, identifier: str) -> Optional[dbt.adapters.base.relation.BaseRelation]:
46    @abc.abstractmethod
47    def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
48        """Returns a single relation that matches the provided path."""

Returns a single relation that matches the provided path.

@abc.abstractmethod
def load_relation( self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[dbt.adapters.base.relation.BaseRelation]:
50    @abc.abstractmethod
51    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
52        """Returns a single relation that matches the provided relation if present."""

Returns a single relation that matches the provided relation if present.

@abc.abstractmethod
def list_relations( self, database: Optional[str], schema: str) -> List[dbt.adapters.base.relation.BaseRelation]:
54    @abc.abstractmethod
55    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
56        """Gets all relations in a given schema and optionally database.
57
58        TODO: Add caching functionality to avoid repeat visits to DB
59        """

Gets all relations in a given schema and optionally database.

TODO: Add caching functionality to avoid repeat visits to DB

@abc.abstractmethod
def list_relations_without_caching( self, schema_relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.relation.BaseRelation]:
61    @abc.abstractmethod
62    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
63        """Using the engine adapter, gets all the relations that match the given schema grain relation."""

Using the engine adapter, gets all the relations that match the given schema grain relation.

@abc.abstractmethod
def get_columns_in_relation( self, relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.column.Column]:
65    @abc.abstractmethod
66    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
67        """Returns the columns for a given table grained relation."""

Returns the columns for a given table grained relation.

@abc.abstractmethod
def get_missing_columns( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.column.Column]:
69    @abc.abstractmethod
70    def get_missing_columns(
71        self, from_relation: BaseRelation, to_relation: BaseRelation
72    ) -> t.List[Column]:
73        """Returns the columns in from_relation missing from to_relation."""

Returns the columns in from_relation missing from to_relation.

@abc.abstractmethod
def create_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
75    @abc.abstractmethod
76    def create_schema(self, relation: BaseRelation) -> None:
77        """Creates a schema in the target database."""

Creates a schema in the target database.

@abc.abstractmethod
def drop_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
79    @abc.abstractmethod
80    def drop_schema(self, relation: BaseRelation) -> None:
81        """Drops a schema in the target database."""

Drops a schema in the target database.

@abc.abstractmethod
def drop_relation(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
83    @abc.abstractmethod
84    def drop_relation(self, relation: BaseRelation) -> None:
85        """Drops a relation (table) in the target database."""

Drops a relation (table) in the target database.

@abc.abstractmethod
def expand_target_column_types( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> None:
87    @abc.abstractmethod
88    def expand_target_column_types(
89        self, from_relation: BaseRelation, to_relation: BaseRelation
90    ) -> None:
91        """Expand to_relation's column types to match those of from_relation."""

Expand to_relation's column types to match those of from_relation.

@abc.abstractmethod
def rename_relation( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> None:
93    @abc.abstractmethod
94    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
95        """Renames a relation (table) in the target database."""

Renames a relation (table) in the target database.

@abc.abstractmethod
def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False) -> Tuple[dbt.adapters.contracts.connection.AdapterResponse, agate.table.Table]:
 97    @abc.abstractmethod
 98    def execute(
 99        self, sql: str, auto_begin: bool = False, fetch: bool = False
100    ) -> t.Tuple[AdapterResponse, agate.Table]:
101        """Executes the given SQL statement and returns the results as an agate table."""

Executes the given SQL statement and returns the results as an agate table.

@abc.abstractmethod
def resolve_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[str]:
103    @abc.abstractmethod
104    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
105        """Resolves the relation's schema to its physical schema."""

Resolves the relation's schema to its physical schema.

@abc.abstractmethod
def resolve_identifier(self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[str]:
107    @abc.abstractmethod
108    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
109        """Resolves the relation's schema to its physical identifier."""

Resolves the relation's schema to its physical identifier.

def quote(self, identifier: str) -> str:
111    def quote(self, identifier: str) -> str:
112        """Returns a quoted identifier."""
113        return exp.to_column(identifier).sql(dialect=self.project_dialect, identify=True)

Returns a quoted identifier.

def quote_as_configured(self, value: str, component_type: str) -> str:
115    def quote_as_configured(self, value: str, component_type: str) -> str:
116        """Returns the value quoted according to the quote policy."""
117        return self.quote(value) if getattr(self.quote_policy, component_type, False) else value

Returns the value quoted according to the quote policy.

def dispatch(self, macro_name: str, macro_namespace: Optional[str] = None) -> Callable:
119    def dispatch(
120        self,
121        macro_name: str,
122        macro_namespace: t.Optional[str] = None,
123    ) -> t.Callable:
124        """Returns a dialect-specific version of a macro with the given name."""
125        target_type = self.jinja_globals["target"]["type"]
126        macro_suffix = f"__{macro_name}"
127
128        def _relevance(package_name_pair: t.Tuple[t.Optional[str], str]) -> t.Tuple[int, int]:
129            """Lower scores more relevant."""
130            macro_package, name = package_name_pair
131
132            package_score = 0 if macro_package == macro_namespace else 1
133            name_score = 1
134
135            if name.startswith("default"):
136                name_score = 2
137            elif name.startswith(target_type):
138                name_score = 0
139
140            return name_score, package_score
141
142        jinja_env = self.jinja_macros.build_environment(**self.jinja_globals).globals
143
144        packages_to_check: t.List[t.Optional[str]] = [None]
145        if macro_namespace is not None:
146            if macro_namespace in jinja_env:
147                packages_to_check = [self.jinja_macros.root_package_name, macro_namespace]
148
149        # Add dbt packages as fallback
150        packages_to_check.extend(k for k in jinja_env if k.startswith("dbt"))
151
152        candidates = {}
153        for macro_package in packages_to_check:
154            macros = jinja_env.get(macro_package, {}) if macro_package else jinja_env
155            if not isinstance(macros, dict):
156                continue
157            candidates.update(
158                {
159                    (macro_package, macro_name): macro_callable
160                    for macro_name, macro_callable in macros.items()
161                    if macro_name.endswith(macro_suffix)
162                }
163            )
164
165        if candidates:
166            sorted_candidates = sorted(candidates, key=_relevance)
167            return candidates[sorted_candidates[0]]
168
169        raise ConfigError(f"Macro '{macro_name}', package '{macro_namespace}' was not found.")

Returns a dialect-specific version of a macro with the given name.

def type(self) -> str:
171    def type(self) -> str:
172        return self.project_dialect or ""
def compare_dbr_version(self, major: int, minor: int) -> int:
174    def compare_dbr_version(self, major: int, minor: int) -> int:
175        # This method is specific to the Databricks dbt adapter implementation and is used in some macros.
176        # Always return -1 to fallback to Spark macro implementations.
177        return -1
graph: Any
179    @property
180    def graph(self) -> t.Any:
181        flat_graph = self.jinja_globals.get("flat_graph", None)
182        return flat_graph or AttributeDict(
183            {
184                "exposures": {},
185                "groups": {},
186                "metrics": {},
187                "nodes": {},
188                "sources": {},
189                "semantic_models": {},
190                "saved_queries": {},
191            }
192        )
class ParsetimeAdapter(BaseAdapter):
195class ParsetimeAdapter(BaseAdapter):
196    def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
197        self._raise_parsetime_adapter_call_error("get relation")
198        raise
199
200    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
201        self._raise_parsetime_adapter_call_error("load relation")
202        raise
203
204    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
205        self._raise_parsetime_adapter_call_error("list relation")
206        raise
207
208    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
209        self._raise_parsetime_adapter_call_error("list relation")
210        raise
211
212    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
213        self._raise_parsetime_adapter_call_error("get columns")
214        raise
215
216    def get_missing_columns(
217        self, from_relation: BaseRelation, to_relation: BaseRelation
218    ) -> t.List[Column]:
219        self._raise_parsetime_adapter_call_error("get missing columns")
220        raise
221
222    def create_schema(self, relation: BaseRelation) -> None:
223        self._raise_parsetime_adapter_call_error("create schema")
224
225    def drop_schema(self, relation: BaseRelation) -> None:
226        self._raise_parsetime_adapter_call_error("drop schema")
227
228    def drop_relation(self, relation: BaseRelation) -> None:
229        self._raise_parsetime_adapter_call_error("drop relation")
230
231    def expand_target_column_types(
232        self, from_relation: BaseRelation, to_relation: BaseRelation
233    ) -> None:
234        self._raise_parsetime_adapter_call_error("expand target column types")
235
236    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
237        self._raise_parsetime_adapter_call_error("rename relation")
238
239    def execute(
240        self, sql: str, auto_begin: bool = False, fetch: bool = False
241    ) -> t.Tuple[AdapterResponse, agate.Table]:
242        self._raise_parsetime_adapter_call_error("execute SQL")
243        raise
244
245    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
246        return relation.schema
247
248    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
249        return relation.identifier
250
251    @staticmethod
252    def _raise_parsetime_adapter_call_error(action: str) -> None:
253        raise ParsetimeAdapterCallError(f"Can't {action} at parse time.")

Helper class that provides a standard way to create an ABC using inheritance.

def get_relation( self, database: str, schema: str, identifier: str) -> Optional[dbt.adapters.base.relation.BaseRelation]:
196    def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
197        self._raise_parsetime_adapter_call_error("get relation")
198        raise

Returns a single relation that matches the provided path.

def load_relation( self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[dbt.adapters.base.relation.BaseRelation]:
200    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
201        self._raise_parsetime_adapter_call_error("load relation")
202        raise

Returns a single relation that matches the provided relation if present.

def list_relations( self, database: Optional[str], schema: str) -> List[dbt.adapters.base.relation.BaseRelation]:
204    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
205        self._raise_parsetime_adapter_call_error("list relation")
206        raise

Gets all relations in a given schema and optionally database.

TODO: Add caching functionality to avoid repeat visits to DB

def list_relations_without_caching( self, schema_relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.relation.BaseRelation]:
208    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
209        self._raise_parsetime_adapter_call_error("list relation")
210        raise

Using the engine adapter, gets all the relations that match the given schema grain relation.

def get_columns_in_relation( self, relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.column.Column]:
212    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
213        self._raise_parsetime_adapter_call_error("get columns")
214        raise

Returns the columns for a given table grained relation.

def get_missing_columns( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.column.Column]:
216    def get_missing_columns(
217        self, from_relation: BaseRelation, to_relation: BaseRelation
218    ) -> t.List[Column]:
219        self._raise_parsetime_adapter_call_error("get missing columns")
220        raise

Returns the columns in from_relation missing from to_relation.

def create_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
222    def create_schema(self, relation: BaseRelation) -> None:
223        self._raise_parsetime_adapter_call_error("create schema")

Creates a schema in the target database.

def drop_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
225    def drop_schema(self, relation: BaseRelation) -> None:
226        self._raise_parsetime_adapter_call_error("drop schema")

Drops a schema in the target database.

def drop_relation(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
228    def drop_relation(self, relation: BaseRelation) -> None:
229        self._raise_parsetime_adapter_call_error("drop relation")

Drops a relation (table) in the target database.

def expand_target_column_types( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> None:
231    def expand_target_column_types(
232        self, from_relation: BaseRelation, to_relation: BaseRelation
233    ) -> None:
234        self._raise_parsetime_adapter_call_error("expand target column types")

Expand to_relation's column types to match those of from_relation.

def rename_relation( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> None:
236    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
237        self._raise_parsetime_adapter_call_error("rename relation")

Renames a relation (table) in the target database.

def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False) -> Tuple[dbt.adapters.contracts.connection.AdapterResponse, agate.table.Table]:
239    def execute(
240        self, sql: str, auto_begin: bool = False, fetch: bool = False
241    ) -> t.Tuple[AdapterResponse, agate.Table]:
242        self._raise_parsetime_adapter_call_error("execute SQL")
243        raise

Executes the given SQL statement and returns the results as an agate table.

def resolve_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[str]:
245    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
246        return relation.schema

Resolves the relation's schema to its physical schema.

def resolve_identifier(self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[str]:
248    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
249        return relation.identifier

Resolves the relation's schema to its physical identifier.

class RuntimeAdapter(BaseAdapter):
256class RuntimeAdapter(BaseAdapter):
257    def __init__(
258        self,
259        engine_adapter: EngineAdapter,
260        jinja_macros: JinjaMacroRegistry,
261        jinja_globals: t.Optional[t.Dict[str, t.Any]] = None,
262        relation_type: t.Optional[t.Type[BaseRelation]] = None,
263        column_type: t.Optional[t.Type[Column]] = None,
264        quote_policy: t.Optional[Policy] = None,
265        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
266        table_mapping: t.Optional[t.Dict[str, str]] = None,
267        deployability_index: t.Optional[DeployabilityIndex] = None,
268        project_dialect: t.Optional[str] = None,
269    ):
270        from dbt.adapters.base import BaseRelation
271        from dbt.adapters.base.column import Column
272
273        super().__init__(
274            jinja_macros,
275            jinja_globals=jinja_globals,
276            project_dialect=project_dialect or engine_adapter.dialect,
277            quote_policy=quote_policy,
278        )
279
280        table_mapping = table_mapping or {}
281
282        self.engine_adapter = engine_adapter
283        self.relation_type = relation_type or BaseRelation
284        self.column_type = column_type or Column
285        self.table_mapping = {
286            **to_table_mapping((snapshots or {}).values(), deployability_index),
287            **table_mapping,
288        }
289
290    def get_relation(
291        self, database: t.Optional[str], schema: str, identifier: str
292    ) -> t.Optional[BaseRelation]:
293        target_table = exp.table_(identifier, db=schema, catalog=database)
294        # Normalize before converting to a relation; otherwise, it will be too late,
295        # as quotes will have already been applied.
296        target_table = self._normalize(target_table)
297        return self.load_relation(self._table_to_relation(target_table))
298
299    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
300        mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation)))
301
302        data_object = self.engine_adapter.get_data_object(mapped_table)
303        return self._data_object_to_relation(data_object) if data_object is not None else None
304
305    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
306        target_schema = schema_(schema, catalog=database)
307        # Normalize before converting to a relation; otherwise, it will be too late,
308        # as quotes will have already been applied.
309        target_schema = self._normalize(target_schema)
310        return self.list_relations_without_caching(self._table_to_relation(target_schema))
311
312    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
313        schema = self._normalize(self._schema(schema_relation))
314
315        relations = [
316            self._data_object_to_relation(do) for do in self.engine_adapter.get_data_objects(schema)
317        ]
318        return relations
319
320    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
321        mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation)))
322
323        if self.project_dialect == "bigquery":
324            # dbt.adapters.bigquery.column.BigQueryColumn has a different constructor signature
325            # We need to use BigQueryColumn.create_from_field() to create the column instead
326            if (
327                hasattr(self.column_type, "create_from_field")
328                and callable(getattr(self.column_type, "create_from_field"))
329                and hasattr(self.engine_adapter, "get_bq_schema")
330                and callable(getattr(self.engine_adapter, "get_bq_schema"))
331            ):
332                return [
333                    self.column_type.create_from_field(field)  # type: ignore
334                    for field in self.engine_adapter.get_bq_schema(mapped_table)  # type: ignore
335                ]
336            from dbt.adapters.base.column import Column
337
338            return [
339                Column.from_description(
340                    name=name, raw_data_type=dtype.sql(dialect=self.project_dialect)
341                )
342                for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items()
343            ]
344        return [
345            self.column_type.from_description(
346                name=name, raw_data_type=dtype.sql(dialect=self.project_dialect)
347            )
348            for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items()
349        ]
350
351    def get_missing_columns(
352        self, from_relation: BaseRelation, to_relation: BaseRelation
353    ) -> t.List[Column]:
354        target_columns = {col.name for col in self.get_columns_in_relation(to_relation)}
355
356        return [
357            col
358            for col in self.get_columns_in_relation(from_relation)
359            if col.name not in target_columns
360        ]
361
362    def create_schema(self, relation: BaseRelation) -> None:
363        if relation.schema is not None:
364            self.engine_adapter.create_schema(self._normalize(self._schema(relation)))
365
366    def drop_schema(self, relation: BaseRelation) -> None:
367        if relation.schema is not None:
368            self.engine_adapter.drop_schema(self._normalize(self._schema(relation)))
369
370    def drop_relation(self, relation: BaseRelation) -> None:
371        if relation.schema is not None and relation.identifier is not None:
372            self.engine_adapter.drop_table(self._normalize(self._relation_to_table(relation)))
373
374    def expand_target_column_types(
375        self, from_relation: BaseRelation, to_relation: BaseRelation
376    ) -> None:
377        from_dbt_columns = {c.name: c for c in self.get_columns_in_relation(from_relation)}
378        to_dbt_columns = {c.name: c for c in self.get_columns_in_relation(to_relation)}
379
380        from_table_name = self._normalize(self._relation_to_table(from_relation))
381        to_table_name = self._normalize(self._relation_to_table(to_relation))
382
383        from_columns = self.engine_adapter.columns(from_table_name)
384        to_columns = self.engine_adapter.columns(to_table_name)
385
386        current_columns = {}
387        new_columns = {}
388        for column_name, from_column in from_dbt_columns.items():
389            target_column = to_dbt_columns.get(column_name)
390            if target_column is not None and target_column.can_expand_to(from_column):
391                current_columns[column_name] = to_columns[column_name]
392                new_columns[column_name] = from_columns[column_name]
393
394        alter_expressions = t.cast(
395            t.List[TableAlterOperation],
396            self.engine_adapter.schema_differ.compare_columns(
397                to_table_name,
398                current_columns,
399                new_columns,
400                ignore_destructive=True,
401            ),
402        )
403
404        if alter_expressions:
405            self.engine_adapter.alter_table(alter_expressions)
406
407    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
408        old_table_name = self._normalize(self._relation_to_table(from_relation))
409        new_table_name = self._normalize(self._relation_to_table(to_relation))
410
411        self.engine_adapter.rename_table(old_table_name, new_table_name)
412
413    def execute(
414        self, sql: str, auto_begin: bool = False, fetch: bool = False
415    ) -> t.Tuple[AdapterResponse, agate.Table]:
416        import pandas as pd
417        from dbt.adapters.base.impl import AdapterResponse
418
419        from sqlmesh.dbt.util import pandas_to_agate, empty_table
420
421        # mypy bug: https://github.com/python/mypy/issues/10740
422        exec_func: t.Callable[..., None | pd.DataFrame] = (
423            self.engine_adapter.fetchdf if fetch else self.engine_adapter.execute  # type: ignore
424        )
425
426        expression = parse_one(sql, read=self.project_dialect)
427        with normalize_and_quote(
428            expression, t.cast(str, self.project_dialect), self.engine_adapter.default_catalog
429        ) as expression:
430            expression = exp.replace_tables(
431                expression, self.table_mapping, dialect=self.project_dialect, copy=False
432            )
433
434        if auto_begin:
435            # TODO: This could be a bug. I think dbt leaves the transaction open while we close immediately.
436            with self.engine_adapter.transaction():
437                resp = exec_func(expression, quote_identifiers=False)
438        else:
439            resp = exec_func(expression, quote_identifiers=False)
440
441        # TODO: Properly fill in adapter response
442        if fetch:
443            assert isinstance(resp, pd.DataFrame)
444            return AdapterResponse("Success"), pandas_to_agate(resp)
445        return AdapterResponse("Success"), empty_table()
446
447    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
448        schema = self._map_table_name(self._normalize(self._relation_to_table(relation))).db
449        return schema if schema else None
450
451    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
452        identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name
453        return identifier if identifier else None
454
455    def _map_table_name(self, table: exp.Table) -> exp.Table:
456        # Use the default dialect since this is the dialect used to normalize and quote keys in the
457        # mapping table.
458        name = table.sql(identify=True)
459        physical_table_name = self.table_mapping.get(name)
460        if not physical_table_name:
461            return table
462
463        logger.debug("Resolved ref '%s' to snapshot table '%s'", name, physical_table_name)
464
465        return exp.to_table(physical_table_name, dialect=self.project_dialect)
466
467    def _relation_to_table(self, relation: BaseRelation) -> exp.Table:
468        return exp.to_table(relation.render(), dialect=self.project_dialect)
469
470    def _data_object_to_relation(self, data_object: DataObject) -> BaseRelation:
471        from sqlmesh.dbt.relation import RelationType
472
473        if data_object.type.is_unknown:
474            dbt_relation_type = RelationType.External
475        elif data_object.type.is_managed_table:
476            dbt_relation_type = RelationType.Table
477        else:
478            dbt_relation_type = RelationType(data_object.type.lower())
479
480        return self.relation_type.create(
481            database=data_object.catalog,
482            schema=data_object.schema_name,
483            identifier=data_object.name,
484            quote_policy=self.quote_policy,
485            type=dbt_relation_type,
486        )
487
488    def _table_to_relation(self, table: exp.Table) -> BaseRelation:
489        return self.relation_type.create(
490            database=table.catalog or None,
491            schema=table.db,
492            identifier=table.name,
493            quote_policy=self.quote_policy,
494        )
495
496    def _schema(self, schema_relation: BaseRelation) -> exp.Table:
497        assert schema_relation.schema is not None
498        return exp.Table(
499            this=None,
500            db=exp.to_identifier(schema_relation.schema, quoted=self.quote_policy.schema),
501            catalog=exp.to_identifier(schema_relation.database, quoted=self.quote_policy.database),
502        )
503
504    def _normalize(self, input_table: exp.Table) -> exp.Table:
505        normalized_name = normalize_model_name(
506            input_table, self.engine_adapter.default_catalog, self.project_dialect
507        )
508        normalized_table = exp.to_table(normalized_name)
509        if not input_table.this:
510            normalized_table.set("catalog", normalized_table.args.get("db"))
511            normalized_table.set("db", normalized_table.this)
512            normalized_table.set("this", None)
513        return normalized_table

Helper class that provides a standard way to create an ABC using inheritance.

RuntimeAdapter( engine_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, jinja_macros: sqlmesh.utils.jinja.JinjaMacroRegistry, jinja_globals: Optional[Dict[str, Any]] = None, relation_type: Optional[Type[dbt.adapters.base.relation.BaseRelation]] = None, column_type: Optional[Type[dbt.adapters.base.column.Column]] = None, quote_policy: Optional[dbt.adapters.contracts.relation.Policy] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, table_mapping: Optional[Dict[str, str]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, project_dialect: Optional[str] = None)
257    def __init__(
258        self,
259        engine_adapter: EngineAdapter,
260        jinja_macros: JinjaMacroRegistry,
261        jinja_globals: t.Optional[t.Dict[str, t.Any]] = None,
262        relation_type: t.Optional[t.Type[BaseRelation]] = None,
263        column_type: t.Optional[t.Type[Column]] = None,
264        quote_policy: t.Optional[Policy] = None,
265        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
266        table_mapping: t.Optional[t.Dict[str, str]] = None,
267        deployability_index: t.Optional[DeployabilityIndex] = None,
268        project_dialect: t.Optional[str] = None,
269    ):
270        from dbt.adapters.base import BaseRelation
271        from dbt.adapters.base.column import Column
272
273        super().__init__(
274            jinja_macros,
275            jinja_globals=jinja_globals,
276            project_dialect=project_dialect or engine_adapter.dialect,
277            quote_policy=quote_policy,
278        )
279
280        table_mapping = table_mapping or {}
281
282        self.engine_adapter = engine_adapter
283        self.relation_type = relation_type or BaseRelation
284        self.column_type = column_type or Column
285        self.table_mapping = {
286            **to_table_mapping((snapshots or {}).values(), deployability_index),
287            **table_mapping,
288        }
engine_adapter
relation_type
column_type
table_mapping
def get_relation( self, database: Optional[str], schema: str, identifier: str) -> Optional[dbt.adapters.base.relation.BaseRelation]:
290    def get_relation(
291        self, database: t.Optional[str], schema: str, identifier: str
292    ) -> t.Optional[BaseRelation]:
293        target_table = exp.table_(identifier, db=schema, catalog=database)
294        # Normalize before converting to a relation; otherwise, it will be too late,
295        # as quotes will have already been applied.
296        target_table = self._normalize(target_table)
297        return self.load_relation(self._table_to_relation(target_table))

Returns a single relation that matches the provided path.

def load_relation( self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[dbt.adapters.base.relation.BaseRelation]:
299    def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]:
300        mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation)))
301
302        data_object = self.engine_adapter.get_data_object(mapped_table)
303        return self._data_object_to_relation(data_object) if data_object is not None else None

Returns a single relation that matches the provided relation if present.

def list_relations( self, database: Optional[str], schema: str) -> List[dbt.adapters.base.relation.BaseRelation]:
305    def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]:
306        target_schema = schema_(schema, catalog=database)
307        # Normalize before converting to a relation; otherwise, it will be too late,
308        # as quotes will have already been applied.
309        target_schema = self._normalize(target_schema)
310        return self.list_relations_without_caching(self._table_to_relation(target_schema))

Gets all relations in a given schema and optionally database.

TODO: Add caching functionality to avoid repeat visits to DB

def list_relations_without_caching( self, schema_relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.relation.BaseRelation]:
312    def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]:
313        schema = self._normalize(self._schema(schema_relation))
314
315        relations = [
316            self._data_object_to_relation(do) for do in self.engine_adapter.get_data_objects(schema)
317        ]
318        return relations

Using the engine adapter, gets all the relations that match the given schema grain relation.

def get_columns_in_relation( self, relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.column.Column]:
320    def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]:
321        mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation)))
322
323        if self.project_dialect == "bigquery":
324            # dbt.adapters.bigquery.column.BigQueryColumn has a different constructor signature
325            # We need to use BigQueryColumn.create_from_field() to create the column instead
326            if (
327                hasattr(self.column_type, "create_from_field")
328                and callable(getattr(self.column_type, "create_from_field"))
329                and hasattr(self.engine_adapter, "get_bq_schema")
330                and callable(getattr(self.engine_adapter, "get_bq_schema"))
331            ):
332                return [
333                    self.column_type.create_from_field(field)  # type: ignore
334                    for field in self.engine_adapter.get_bq_schema(mapped_table)  # type: ignore
335                ]
336            from dbt.adapters.base.column import Column
337
338            return [
339                Column.from_description(
340                    name=name, raw_data_type=dtype.sql(dialect=self.project_dialect)
341                )
342                for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items()
343            ]
344        return [
345            self.column_type.from_description(
346                name=name, raw_data_type=dtype.sql(dialect=self.project_dialect)
347            )
348            for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items()
349        ]

Returns the columns for a given table grained relation.

def get_missing_columns( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> List[dbt.adapters.base.column.Column]:
351    def get_missing_columns(
352        self, from_relation: BaseRelation, to_relation: BaseRelation
353    ) -> t.List[Column]:
354        target_columns = {col.name for col in self.get_columns_in_relation(to_relation)}
355
356        return [
357            col
358            for col in self.get_columns_in_relation(from_relation)
359            if col.name not in target_columns
360        ]

Returns the columns in from_relation missing from to_relation.

def create_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
362    def create_schema(self, relation: BaseRelation) -> None:
363        if relation.schema is not None:
364            self.engine_adapter.create_schema(self._normalize(self._schema(relation)))

Creates a schema in the target database.

def drop_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
366    def drop_schema(self, relation: BaseRelation) -> None:
367        if relation.schema is not None:
368            self.engine_adapter.drop_schema(self._normalize(self._schema(relation)))

Drops a schema in the target database.

def drop_relation(self, relation: dbt.adapters.base.relation.BaseRelation) -> None:
370    def drop_relation(self, relation: BaseRelation) -> None:
371        if relation.schema is not None and relation.identifier is not None:
372            self.engine_adapter.drop_table(self._normalize(self._relation_to_table(relation)))

Drops a relation (table) in the target database.

def expand_target_column_types( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> None:
374    def expand_target_column_types(
375        self, from_relation: BaseRelation, to_relation: BaseRelation
376    ) -> None:
377        from_dbt_columns = {c.name: c for c in self.get_columns_in_relation(from_relation)}
378        to_dbt_columns = {c.name: c for c in self.get_columns_in_relation(to_relation)}
379
380        from_table_name = self._normalize(self._relation_to_table(from_relation))
381        to_table_name = self._normalize(self._relation_to_table(to_relation))
382
383        from_columns = self.engine_adapter.columns(from_table_name)
384        to_columns = self.engine_adapter.columns(to_table_name)
385
386        current_columns = {}
387        new_columns = {}
388        for column_name, from_column in from_dbt_columns.items():
389            target_column = to_dbt_columns.get(column_name)
390            if target_column is not None and target_column.can_expand_to(from_column):
391                current_columns[column_name] = to_columns[column_name]
392                new_columns[column_name] = from_columns[column_name]
393
394        alter_expressions = t.cast(
395            t.List[TableAlterOperation],
396            self.engine_adapter.schema_differ.compare_columns(
397                to_table_name,
398                current_columns,
399                new_columns,
400                ignore_destructive=True,
401            ),
402        )
403
404        if alter_expressions:
405            self.engine_adapter.alter_table(alter_expressions)

Expand to_relation's column types to match those of from_relation.

def rename_relation( self, from_relation: dbt.adapters.base.relation.BaseRelation, to_relation: dbt.adapters.base.relation.BaseRelation) -> None:
407    def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
408        old_table_name = self._normalize(self._relation_to_table(from_relation))
409        new_table_name = self._normalize(self._relation_to_table(to_relation))
410
411        self.engine_adapter.rename_table(old_table_name, new_table_name)

Renames a relation (table) in the target database.

def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False) -> Tuple[dbt.adapters.contracts.connection.AdapterResponse, agate.table.Table]:
413    def execute(
414        self, sql: str, auto_begin: bool = False, fetch: bool = False
415    ) -> t.Tuple[AdapterResponse, agate.Table]:
416        import pandas as pd
417        from dbt.adapters.base.impl import AdapterResponse
418
419        from sqlmesh.dbt.util import pandas_to_agate, empty_table
420
421        # mypy bug: https://github.com/python/mypy/issues/10740
422        exec_func: t.Callable[..., None | pd.DataFrame] = (
423            self.engine_adapter.fetchdf if fetch else self.engine_adapter.execute  # type: ignore
424        )
425
426        expression = parse_one(sql, read=self.project_dialect)
427        with normalize_and_quote(
428            expression, t.cast(str, self.project_dialect), self.engine_adapter.default_catalog
429        ) as expression:
430            expression = exp.replace_tables(
431                expression, self.table_mapping, dialect=self.project_dialect, copy=False
432            )
433
434        if auto_begin:
435            # TODO: This could be a bug. I think dbt leaves the transaction open while we close immediately.
436            with self.engine_adapter.transaction():
437                resp = exec_func(expression, quote_identifiers=False)
438        else:
439            resp = exec_func(expression, quote_identifiers=False)
440
441        # TODO: Properly fill in adapter response
442        if fetch:
443            assert isinstance(resp, pd.DataFrame)
444            return AdapterResponse("Success"), pandas_to_agate(resp)
445        return AdapterResponse("Success"), empty_table()

Executes the given SQL statement and returns the results as an agate table.

def resolve_schema(self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[str]:
447    def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
448        schema = self._map_table_name(self._normalize(self._relation_to_table(relation))).db
449        return schema if schema else None

Resolves the relation's schema to its physical schema.

def resolve_identifier(self, relation: dbt.adapters.base.relation.BaseRelation) -> Optional[str]:
451    def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
452        identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name
453        return identifier if identifier else None

Resolves the relation's schema to its physical identifier.