sqlmesh.dbt.adapter
1from __future__ import annotations 2 3import abc 4import logging 5import typing as t 6 7from sqlglot import exp, parse_one 8 9from sqlmesh.core.dialect import normalize_and_quote, normalize_model_name, schema_ 10from sqlmesh.core.engine_adapter import EngineAdapter 11from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot, to_table_mapping 12from sqlmesh.utils.errors import ConfigError, ParsetimeAdapterCallError 13from sqlmesh.utils.jinja import JinjaMacroRegistry 14from sqlmesh.utils import AttributeDict 15from sqlmesh.core.schema_diff import TableAlterOperation 16 17if t.TYPE_CHECKING: 18 import agate 19 from dbt.adapters.base import BaseRelation 20 from dbt.adapters.base.column import Column 21 from dbt.adapters.base.impl import AdapterResponse 22 from sqlmesh.core.engine_adapter.base import DataObject 23 from sqlmesh.dbt.relation import Policy 24 25 26logger = logging.getLogger(__name__) 27 28 29class BaseAdapter(abc.ABC): 30 def __init__( 31 self, 32 jinja_macros: JinjaMacroRegistry, 33 jinja_globals: t.Optional[t.Dict[str, t.Any]] = None, 34 project_dialect: t.Optional[str] = None, 35 quote_policy: t.Optional[Policy] = None, 36 ): 37 from dbt.adapters.base.relation import Policy 38 39 self.jinja_macros = jinja_macros 40 self.jinja_globals = jinja_globals.copy() if jinja_globals else {} 41 self.jinja_globals["adapter"] = self 42 self.project_dialect = project_dialect 43 self.quote_policy = quote_policy or Policy() 44 45 @abc.abstractmethod 46 def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]: 47 """Returns a single relation that matches the provided path.""" 48 49 @abc.abstractmethod 50 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 51 """Returns a single relation that matches the provided relation if present.""" 52 53 @abc.abstractmethod 54 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 55 """Gets all relations in a given schema and optionally database. 56 57 TODO: Add caching functionality to avoid repeat visits to DB 58 """ 59 60 @abc.abstractmethod 61 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 62 """Using the engine adapter, gets all the relations that match the given schema grain relation.""" 63 64 @abc.abstractmethod 65 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 66 """Returns the columns for a given table grained relation.""" 67 68 @abc.abstractmethod 69 def get_missing_columns( 70 self, from_relation: BaseRelation, to_relation: BaseRelation 71 ) -> t.List[Column]: 72 """Returns the columns in from_relation missing from to_relation.""" 73 74 @abc.abstractmethod 75 def create_schema(self, relation: BaseRelation) -> None: 76 """Creates a schema in the target database.""" 77 78 @abc.abstractmethod 79 def drop_schema(self, relation: BaseRelation) -> None: 80 """Drops a schema in the target database.""" 81 82 @abc.abstractmethod 83 def drop_relation(self, relation: BaseRelation) -> None: 84 """Drops a relation (table) in the target database.""" 85 86 @abc.abstractmethod 87 def expand_target_column_types( 88 self, from_relation: BaseRelation, to_relation: BaseRelation 89 ) -> None: 90 """Expand to_relation's column types to match those of from_relation.""" 91 92 @abc.abstractmethod 93 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 94 """Renames a relation (table) in the target database.""" 95 96 @abc.abstractmethod 97 def execute( 98 self, sql: str, auto_begin: bool = False, fetch: bool = False 99 ) -> t.Tuple[AdapterResponse, agate.Table]: 100 """Executes the given SQL statement and returns the results as an agate table.""" 101 102 @abc.abstractmethod 103 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 104 """Resolves the relation's schema to its physical schema.""" 105 106 @abc.abstractmethod 107 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 108 """Resolves the relation's schema to its physical identifier.""" 109 110 def quote(self, identifier: str) -> str: 111 """Returns a quoted identifier.""" 112 return exp.to_column(identifier).sql(dialect=self.project_dialect, identify=True) 113 114 def quote_as_configured(self, value: str, component_type: str) -> str: 115 """Returns the value quoted according to the quote policy.""" 116 return self.quote(value) if getattr(self.quote_policy, component_type, False) else value 117 118 def dispatch( 119 self, 120 macro_name: str, 121 macro_namespace: t.Optional[str] = None, 122 ) -> t.Callable: 123 """Returns a dialect-specific version of a macro with the given name.""" 124 target_type = self.jinja_globals["target"]["type"] 125 macro_suffix = f"__{macro_name}" 126 127 def _relevance(package_name_pair: t.Tuple[t.Optional[str], str]) -> t.Tuple[int, int]: 128 """Lower scores more relevant.""" 129 macro_package, name = package_name_pair 130 131 package_score = 0 if macro_package == macro_namespace else 1 132 name_score = 1 133 134 if name.startswith("default"): 135 name_score = 2 136 elif name.startswith(target_type): 137 name_score = 0 138 139 return name_score, package_score 140 141 jinja_env = self.jinja_macros.build_environment(**self.jinja_globals).globals 142 143 packages_to_check: t.List[t.Optional[str]] = [None] 144 if macro_namespace is not None: 145 if macro_namespace in jinja_env: 146 packages_to_check = [self.jinja_macros.root_package_name, macro_namespace] 147 148 # Add dbt packages as fallback 149 packages_to_check.extend(k for k in jinja_env if k.startswith("dbt")) 150 151 candidates = {} 152 for macro_package in packages_to_check: 153 macros = jinja_env.get(macro_package, {}) if macro_package else jinja_env 154 if not isinstance(macros, dict): 155 continue 156 candidates.update( 157 { 158 (macro_package, macro_name): macro_callable 159 for macro_name, macro_callable in macros.items() 160 if macro_name.endswith(macro_suffix) 161 } 162 ) 163 164 if candidates: 165 sorted_candidates = sorted(candidates, key=_relevance) 166 return candidates[sorted_candidates[0]] 167 168 raise ConfigError(f"Macro '{macro_name}', package '{macro_namespace}' was not found.") 169 170 def type(self) -> str: 171 return self.project_dialect or "" 172 173 def compare_dbr_version(self, major: int, minor: int) -> int: 174 # This method is specific to the Databricks dbt adapter implementation and is used in some macros. 175 # Always return -1 to fallback to Spark macro implementations. 176 return -1 177 178 @property 179 def graph(self) -> t.Any: 180 flat_graph = self.jinja_globals.get("flat_graph", None) 181 return flat_graph or AttributeDict( 182 { 183 "exposures": {}, 184 "groups": {}, 185 "metrics": {}, 186 "nodes": {}, 187 "sources": {}, 188 "semantic_models": {}, 189 "saved_queries": {}, 190 } 191 ) 192 193 194class ParsetimeAdapter(BaseAdapter): 195 def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]: 196 self._raise_parsetime_adapter_call_error("get relation") 197 raise 198 199 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 200 self._raise_parsetime_adapter_call_error("load relation") 201 raise 202 203 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 204 self._raise_parsetime_adapter_call_error("list relation") 205 raise 206 207 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 208 self._raise_parsetime_adapter_call_error("list relation") 209 raise 210 211 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 212 self._raise_parsetime_adapter_call_error("get columns") 213 raise 214 215 def get_missing_columns( 216 self, from_relation: BaseRelation, to_relation: BaseRelation 217 ) -> t.List[Column]: 218 self._raise_parsetime_adapter_call_error("get missing columns") 219 raise 220 221 def create_schema(self, relation: BaseRelation) -> None: 222 self._raise_parsetime_adapter_call_error("create schema") 223 224 def drop_schema(self, relation: BaseRelation) -> None: 225 self._raise_parsetime_adapter_call_error("drop schema") 226 227 def drop_relation(self, relation: BaseRelation) -> None: 228 self._raise_parsetime_adapter_call_error("drop relation") 229 230 def expand_target_column_types( 231 self, from_relation: BaseRelation, to_relation: BaseRelation 232 ) -> None: 233 self._raise_parsetime_adapter_call_error("expand target column types") 234 235 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 236 self._raise_parsetime_adapter_call_error("rename relation") 237 238 def execute( 239 self, sql: str, auto_begin: bool = False, fetch: bool = False 240 ) -> t.Tuple[AdapterResponse, agate.Table]: 241 self._raise_parsetime_adapter_call_error("execute SQL") 242 raise 243 244 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 245 return relation.schema 246 247 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 248 return relation.identifier 249 250 @staticmethod 251 def _raise_parsetime_adapter_call_error(action: str) -> None: 252 raise ParsetimeAdapterCallError(f"Can't {action} at parse time.") 253 254 255class RuntimeAdapter(BaseAdapter): 256 def __init__( 257 self, 258 engine_adapter: EngineAdapter, 259 jinja_macros: JinjaMacroRegistry, 260 jinja_globals: t.Optional[t.Dict[str, t.Any]] = None, 261 relation_type: t.Optional[t.Type[BaseRelation]] = None, 262 column_type: t.Optional[t.Type[Column]] = None, 263 quote_policy: t.Optional[Policy] = None, 264 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 265 table_mapping: t.Optional[t.Dict[str, str]] = None, 266 deployability_index: t.Optional[DeployabilityIndex] = None, 267 project_dialect: t.Optional[str] = None, 268 ): 269 from dbt.adapters.base import BaseRelation 270 from dbt.adapters.base.column import Column 271 272 super().__init__( 273 jinja_macros, 274 jinja_globals=jinja_globals, 275 project_dialect=project_dialect or engine_adapter.dialect, 276 quote_policy=quote_policy, 277 ) 278 279 table_mapping = table_mapping or {} 280 281 self.engine_adapter = engine_adapter 282 self.relation_type = relation_type or BaseRelation 283 self.column_type = column_type or Column 284 self.table_mapping = { 285 **to_table_mapping((snapshots or {}).values(), deployability_index), 286 **table_mapping, 287 } 288 289 def get_relation( 290 self, database: t.Optional[str], schema: str, identifier: str 291 ) -> t.Optional[BaseRelation]: 292 target_table = exp.table_(identifier, db=schema, catalog=database) 293 # Normalize before converting to a relation; otherwise, it will be too late, 294 # as quotes will have already been applied. 295 target_table = self._normalize(target_table) 296 return self.load_relation(self._table_to_relation(target_table)) 297 298 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 299 mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation))) 300 301 data_object = self.engine_adapter.get_data_object(mapped_table) 302 return self._data_object_to_relation(data_object) if data_object is not None else None 303 304 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 305 target_schema = schema_(schema, catalog=database) 306 # Normalize before converting to a relation; otherwise, it will be too late, 307 # as quotes will have already been applied. 308 target_schema = self._normalize(target_schema) 309 return self.list_relations_without_caching(self._table_to_relation(target_schema)) 310 311 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 312 schema = self._normalize(self._schema(schema_relation)) 313 314 relations = [ 315 self._data_object_to_relation(do) for do in self.engine_adapter.get_data_objects(schema) 316 ] 317 return relations 318 319 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 320 mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation))) 321 322 if self.project_dialect == "bigquery": 323 # dbt.adapters.bigquery.column.BigQueryColumn has a different constructor signature 324 # We need to use BigQueryColumn.create_from_field() to create the column instead 325 if ( 326 hasattr(self.column_type, "create_from_field") 327 and callable(getattr(self.column_type, "create_from_field")) 328 and hasattr(self.engine_adapter, "get_bq_schema") 329 and callable(getattr(self.engine_adapter, "get_bq_schema")) 330 ): 331 return [ 332 self.column_type.create_from_field(field) # type: ignore 333 for field in self.engine_adapter.get_bq_schema(mapped_table) # type: ignore 334 ] 335 from dbt.adapters.base.column import Column 336 337 return [ 338 Column.from_description( 339 name=name, raw_data_type=dtype.sql(dialect=self.project_dialect) 340 ) 341 for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items() 342 ] 343 return [ 344 self.column_type.from_description( 345 name=name, raw_data_type=dtype.sql(dialect=self.project_dialect) 346 ) 347 for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items() 348 ] 349 350 def get_missing_columns( 351 self, from_relation: BaseRelation, to_relation: BaseRelation 352 ) -> t.List[Column]: 353 target_columns = {col.name for col in self.get_columns_in_relation(to_relation)} 354 355 return [ 356 col 357 for col in self.get_columns_in_relation(from_relation) 358 if col.name not in target_columns 359 ] 360 361 def create_schema(self, relation: BaseRelation) -> None: 362 if relation.schema is not None: 363 self.engine_adapter.create_schema(self._normalize(self._schema(relation))) 364 365 def drop_schema(self, relation: BaseRelation) -> None: 366 if relation.schema is not None: 367 self.engine_adapter.drop_schema(self._normalize(self._schema(relation))) 368 369 def drop_relation(self, relation: BaseRelation) -> None: 370 if relation.schema is not None and relation.identifier is not None: 371 self.engine_adapter.drop_table(self._normalize(self._relation_to_table(relation))) 372 373 def expand_target_column_types( 374 self, from_relation: BaseRelation, to_relation: BaseRelation 375 ) -> None: 376 from_dbt_columns = {c.name: c for c in self.get_columns_in_relation(from_relation)} 377 to_dbt_columns = {c.name: c for c in self.get_columns_in_relation(to_relation)} 378 379 from_table_name = self._normalize(self._relation_to_table(from_relation)) 380 to_table_name = self._normalize(self._relation_to_table(to_relation)) 381 382 from_columns = self.engine_adapter.columns(from_table_name) 383 to_columns = self.engine_adapter.columns(to_table_name) 384 385 current_columns = {} 386 new_columns = {} 387 for column_name, from_column in from_dbt_columns.items(): 388 target_column = to_dbt_columns.get(column_name) 389 if target_column is not None and target_column.can_expand_to(from_column): 390 current_columns[column_name] = to_columns[column_name] 391 new_columns[column_name] = from_columns[column_name] 392 393 alter_expressions = t.cast( 394 t.List[TableAlterOperation], 395 self.engine_adapter.schema_differ.compare_columns( 396 to_table_name, 397 current_columns, 398 new_columns, 399 ignore_destructive=True, 400 ), 401 ) 402 403 if alter_expressions: 404 self.engine_adapter.alter_table(alter_expressions) 405 406 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 407 old_table_name = self._normalize(self._relation_to_table(from_relation)) 408 new_table_name = self._normalize(self._relation_to_table(to_relation)) 409 410 self.engine_adapter.rename_table(old_table_name, new_table_name) 411 412 def execute( 413 self, sql: str, auto_begin: bool = False, fetch: bool = False 414 ) -> t.Tuple[AdapterResponse, agate.Table]: 415 import pandas as pd 416 from dbt.adapters.base.impl import AdapterResponse 417 418 from sqlmesh.dbt.util import pandas_to_agate, empty_table 419 420 # mypy bug: https://github.com/python/mypy/issues/10740 421 exec_func: t.Callable[..., None | pd.DataFrame] = ( 422 self.engine_adapter.fetchdf if fetch else self.engine_adapter.execute # type: ignore 423 ) 424 425 expression = parse_one(sql, read=self.project_dialect) 426 with normalize_and_quote( 427 expression, t.cast(str, self.project_dialect), self.engine_adapter.default_catalog 428 ) as expression: 429 expression = exp.replace_tables( 430 expression, self.table_mapping, dialect=self.project_dialect, copy=False 431 ) 432 433 if auto_begin: 434 # TODO: This could be a bug. I think dbt leaves the transaction open while we close immediately. 435 with self.engine_adapter.transaction(): 436 resp = exec_func(expression, quote_identifiers=False) 437 else: 438 resp = exec_func(expression, quote_identifiers=False) 439 440 # TODO: Properly fill in adapter response 441 if fetch: 442 assert isinstance(resp, pd.DataFrame) 443 return AdapterResponse("Success"), pandas_to_agate(resp) 444 return AdapterResponse("Success"), empty_table() 445 446 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 447 schema = self._map_table_name(self._normalize(self._relation_to_table(relation))).db 448 return schema if schema else None 449 450 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 451 identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name 452 return identifier if identifier else None 453 454 def _map_table_name(self, table: exp.Table) -> exp.Table: 455 # Use the default dialect since this is the dialect used to normalize and quote keys in the 456 # mapping table. 457 name = table.sql(identify=True) 458 physical_table_name = self.table_mapping.get(name) 459 if not physical_table_name: 460 return table 461 462 logger.debug("Resolved ref '%s' to snapshot table '%s'", name, physical_table_name) 463 464 return exp.to_table(physical_table_name, dialect=self.project_dialect) 465 466 def _relation_to_table(self, relation: BaseRelation) -> exp.Table: 467 return exp.to_table(relation.render(), dialect=self.project_dialect) 468 469 def _data_object_to_relation(self, data_object: DataObject) -> BaseRelation: 470 from sqlmesh.dbt.relation import RelationType 471 472 if data_object.type.is_unknown: 473 dbt_relation_type = RelationType.External 474 elif data_object.type.is_managed_table: 475 dbt_relation_type = RelationType.Table 476 else: 477 dbt_relation_type = RelationType(data_object.type.lower()) 478 479 return self.relation_type.create( 480 database=data_object.catalog, 481 schema=data_object.schema_name, 482 identifier=data_object.name, 483 quote_policy=self.quote_policy, 484 type=dbt_relation_type, 485 ) 486 487 def _table_to_relation(self, table: exp.Table) -> BaseRelation: 488 return self.relation_type.create( 489 database=table.catalog or None, 490 schema=table.db, 491 identifier=table.name, 492 quote_policy=self.quote_policy, 493 ) 494 495 def _schema(self, schema_relation: BaseRelation) -> exp.Table: 496 assert schema_relation.schema is not None 497 return exp.Table( 498 this=None, 499 db=exp.to_identifier(schema_relation.schema, quoted=self.quote_policy.schema), 500 catalog=exp.to_identifier(schema_relation.database, quoted=self.quote_policy.database), 501 ) 502 503 def _normalize(self, input_table: exp.Table) -> exp.Table: 504 normalized_name = normalize_model_name( 505 input_table, self.engine_adapter.default_catalog, self.project_dialect 506 ) 507 normalized_table = exp.to_table(normalized_name) 508 if not input_table.this: 509 normalized_table.set("catalog", normalized_table.args.get("db")) 510 normalized_table.set("db", normalized_table.this) 511 normalized_table.set("this", None) 512 return normalized_table
30class BaseAdapter(abc.ABC): 31 def __init__( 32 self, 33 jinja_macros: JinjaMacroRegistry, 34 jinja_globals: t.Optional[t.Dict[str, t.Any]] = None, 35 project_dialect: t.Optional[str] = None, 36 quote_policy: t.Optional[Policy] = None, 37 ): 38 from dbt.adapters.base.relation import Policy 39 40 self.jinja_macros = jinja_macros 41 self.jinja_globals = jinja_globals.copy() if jinja_globals else {} 42 self.jinja_globals["adapter"] = self 43 self.project_dialect = project_dialect 44 self.quote_policy = quote_policy or Policy() 45 46 @abc.abstractmethod 47 def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]: 48 """Returns a single relation that matches the provided path.""" 49 50 @abc.abstractmethod 51 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 52 """Returns a single relation that matches the provided relation if present.""" 53 54 @abc.abstractmethod 55 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 56 """Gets all relations in a given schema and optionally database. 57 58 TODO: Add caching functionality to avoid repeat visits to DB 59 """ 60 61 @abc.abstractmethod 62 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 63 """Using the engine adapter, gets all the relations that match the given schema grain relation.""" 64 65 @abc.abstractmethod 66 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 67 """Returns the columns for a given table grained relation.""" 68 69 @abc.abstractmethod 70 def get_missing_columns( 71 self, from_relation: BaseRelation, to_relation: BaseRelation 72 ) -> t.List[Column]: 73 """Returns the columns in from_relation missing from to_relation.""" 74 75 @abc.abstractmethod 76 def create_schema(self, relation: BaseRelation) -> None: 77 """Creates a schema in the target database.""" 78 79 @abc.abstractmethod 80 def drop_schema(self, relation: BaseRelation) -> None: 81 """Drops a schema in the target database.""" 82 83 @abc.abstractmethod 84 def drop_relation(self, relation: BaseRelation) -> None: 85 """Drops a relation (table) in the target database.""" 86 87 @abc.abstractmethod 88 def expand_target_column_types( 89 self, from_relation: BaseRelation, to_relation: BaseRelation 90 ) -> None: 91 """Expand to_relation's column types to match those of from_relation.""" 92 93 @abc.abstractmethod 94 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 95 """Renames a relation (table) in the target database.""" 96 97 @abc.abstractmethod 98 def execute( 99 self, sql: str, auto_begin: bool = False, fetch: bool = False 100 ) -> t.Tuple[AdapterResponse, agate.Table]: 101 """Executes the given SQL statement and returns the results as an agate table.""" 102 103 @abc.abstractmethod 104 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 105 """Resolves the relation's schema to its physical schema.""" 106 107 @abc.abstractmethod 108 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 109 """Resolves the relation's schema to its physical identifier.""" 110 111 def quote(self, identifier: str) -> str: 112 """Returns a quoted identifier.""" 113 return exp.to_column(identifier).sql(dialect=self.project_dialect, identify=True) 114 115 def quote_as_configured(self, value: str, component_type: str) -> str: 116 """Returns the value quoted according to the quote policy.""" 117 return self.quote(value) if getattr(self.quote_policy, component_type, False) else value 118 119 def dispatch( 120 self, 121 macro_name: str, 122 macro_namespace: t.Optional[str] = None, 123 ) -> t.Callable: 124 """Returns a dialect-specific version of a macro with the given name.""" 125 target_type = self.jinja_globals["target"]["type"] 126 macro_suffix = f"__{macro_name}" 127 128 def _relevance(package_name_pair: t.Tuple[t.Optional[str], str]) -> t.Tuple[int, int]: 129 """Lower scores more relevant.""" 130 macro_package, name = package_name_pair 131 132 package_score = 0 if macro_package == macro_namespace else 1 133 name_score = 1 134 135 if name.startswith("default"): 136 name_score = 2 137 elif name.startswith(target_type): 138 name_score = 0 139 140 return name_score, package_score 141 142 jinja_env = self.jinja_macros.build_environment(**self.jinja_globals).globals 143 144 packages_to_check: t.List[t.Optional[str]] = [None] 145 if macro_namespace is not None: 146 if macro_namespace in jinja_env: 147 packages_to_check = [self.jinja_macros.root_package_name, macro_namespace] 148 149 # Add dbt packages as fallback 150 packages_to_check.extend(k for k in jinja_env if k.startswith("dbt")) 151 152 candidates = {} 153 for macro_package in packages_to_check: 154 macros = jinja_env.get(macro_package, {}) if macro_package else jinja_env 155 if not isinstance(macros, dict): 156 continue 157 candidates.update( 158 { 159 (macro_package, macro_name): macro_callable 160 for macro_name, macro_callable in macros.items() 161 if macro_name.endswith(macro_suffix) 162 } 163 ) 164 165 if candidates: 166 sorted_candidates = sorted(candidates, key=_relevance) 167 return candidates[sorted_candidates[0]] 168 169 raise ConfigError(f"Macro '{macro_name}', package '{macro_namespace}' was not found.") 170 171 def type(self) -> str: 172 return self.project_dialect or "" 173 174 def compare_dbr_version(self, major: int, minor: int) -> int: 175 # This method is specific to the Databricks dbt adapter implementation and is used in some macros. 176 # Always return -1 to fallback to Spark macro implementations. 177 return -1 178 179 @property 180 def graph(self) -> t.Any: 181 flat_graph = self.jinja_globals.get("flat_graph", None) 182 return flat_graph or AttributeDict( 183 { 184 "exposures": {}, 185 "groups": {}, 186 "metrics": {}, 187 "nodes": {}, 188 "sources": {}, 189 "semantic_models": {}, 190 "saved_queries": {}, 191 } 192 )
Helper class that provides a standard way to create an ABC using inheritance.
46 @abc.abstractmethod 47 def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]: 48 """Returns a single relation that matches the provided path."""
Returns a single relation that matches the provided path.
50 @abc.abstractmethod 51 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 52 """Returns a single relation that matches the provided relation if present."""
Returns a single relation that matches the provided relation if present.
54 @abc.abstractmethod 55 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 56 """Gets all relations in a given schema and optionally database. 57 58 TODO: Add caching functionality to avoid repeat visits to DB 59 """
Gets all relations in a given schema and optionally database.
TODO: Add caching functionality to avoid repeat visits to DB
61 @abc.abstractmethod 62 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 63 """Using the engine adapter, gets all the relations that match the given schema grain relation."""
Using the engine adapter, gets all the relations that match the given schema grain relation.
65 @abc.abstractmethod 66 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 67 """Returns the columns for a given table grained relation."""
Returns the columns for a given table grained relation.
69 @abc.abstractmethod 70 def get_missing_columns( 71 self, from_relation: BaseRelation, to_relation: BaseRelation 72 ) -> t.List[Column]: 73 """Returns the columns in from_relation missing from to_relation."""
Returns the columns in from_relation missing from to_relation.
75 @abc.abstractmethod 76 def create_schema(self, relation: BaseRelation) -> None: 77 """Creates a schema in the target database."""
Creates a schema in the target database.
79 @abc.abstractmethod 80 def drop_schema(self, relation: BaseRelation) -> None: 81 """Drops a schema in the target database."""
Drops a schema in the target database.
83 @abc.abstractmethod 84 def drop_relation(self, relation: BaseRelation) -> None: 85 """Drops a relation (table) in the target database."""
Drops a relation (table) in the target database.
87 @abc.abstractmethod 88 def expand_target_column_types( 89 self, from_relation: BaseRelation, to_relation: BaseRelation 90 ) -> None: 91 """Expand to_relation's column types to match those of from_relation."""
Expand to_relation's column types to match those of from_relation.
93 @abc.abstractmethod 94 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 95 """Renames a relation (table) in the target database."""
Renames a relation (table) in the target database.
97 @abc.abstractmethod 98 def execute( 99 self, sql: str, auto_begin: bool = False, fetch: bool = False 100 ) -> t.Tuple[AdapterResponse, agate.Table]: 101 """Executes the given SQL statement and returns the results as an agate table."""
Executes the given SQL statement and returns the results as an agate table.
103 @abc.abstractmethod 104 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 105 """Resolves the relation's schema to its physical schema."""
Resolves the relation's schema to its physical schema.
107 @abc.abstractmethod 108 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 109 """Resolves the relation's schema to its physical identifier."""
Resolves the relation's schema to its physical identifier.
111 def quote(self, identifier: str) -> str: 112 """Returns a quoted identifier.""" 113 return exp.to_column(identifier).sql(dialect=self.project_dialect, identify=True)
Returns a quoted identifier.
115 def quote_as_configured(self, value: str, component_type: str) -> str: 116 """Returns the value quoted according to the quote policy.""" 117 return self.quote(value) if getattr(self.quote_policy, component_type, False) else value
Returns the value quoted according to the quote policy.
119 def dispatch( 120 self, 121 macro_name: str, 122 macro_namespace: t.Optional[str] = None, 123 ) -> t.Callable: 124 """Returns a dialect-specific version of a macro with the given name.""" 125 target_type = self.jinja_globals["target"]["type"] 126 macro_suffix = f"__{macro_name}" 127 128 def _relevance(package_name_pair: t.Tuple[t.Optional[str], str]) -> t.Tuple[int, int]: 129 """Lower scores more relevant.""" 130 macro_package, name = package_name_pair 131 132 package_score = 0 if macro_package == macro_namespace else 1 133 name_score = 1 134 135 if name.startswith("default"): 136 name_score = 2 137 elif name.startswith(target_type): 138 name_score = 0 139 140 return name_score, package_score 141 142 jinja_env = self.jinja_macros.build_environment(**self.jinja_globals).globals 143 144 packages_to_check: t.List[t.Optional[str]] = [None] 145 if macro_namespace is not None: 146 if macro_namespace in jinja_env: 147 packages_to_check = [self.jinja_macros.root_package_name, macro_namespace] 148 149 # Add dbt packages as fallback 150 packages_to_check.extend(k for k in jinja_env if k.startswith("dbt")) 151 152 candidates = {} 153 for macro_package in packages_to_check: 154 macros = jinja_env.get(macro_package, {}) if macro_package else jinja_env 155 if not isinstance(macros, dict): 156 continue 157 candidates.update( 158 { 159 (macro_package, macro_name): macro_callable 160 for macro_name, macro_callable in macros.items() 161 if macro_name.endswith(macro_suffix) 162 } 163 ) 164 165 if candidates: 166 sorted_candidates = sorted(candidates, key=_relevance) 167 return candidates[sorted_candidates[0]] 168 169 raise ConfigError(f"Macro '{macro_name}', package '{macro_namespace}' was not found.")
Returns a dialect-specific version of a macro with the given name.
179 @property 180 def graph(self) -> t.Any: 181 flat_graph = self.jinja_globals.get("flat_graph", None) 182 return flat_graph or AttributeDict( 183 { 184 "exposures": {}, 185 "groups": {}, 186 "metrics": {}, 187 "nodes": {}, 188 "sources": {}, 189 "semantic_models": {}, 190 "saved_queries": {}, 191 } 192 )
195class ParsetimeAdapter(BaseAdapter): 196 def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]: 197 self._raise_parsetime_adapter_call_error("get relation") 198 raise 199 200 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 201 self._raise_parsetime_adapter_call_error("load relation") 202 raise 203 204 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 205 self._raise_parsetime_adapter_call_error("list relation") 206 raise 207 208 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 209 self._raise_parsetime_adapter_call_error("list relation") 210 raise 211 212 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 213 self._raise_parsetime_adapter_call_error("get columns") 214 raise 215 216 def get_missing_columns( 217 self, from_relation: BaseRelation, to_relation: BaseRelation 218 ) -> t.List[Column]: 219 self._raise_parsetime_adapter_call_error("get missing columns") 220 raise 221 222 def create_schema(self, relation: BaseRelation) -> None: 223 self._raise_parsetime_adapter_call_error("create schema") 224 225 def drop_schema(self, relation: BaseRelation) -> None: 226 self._raise_parsetime_adapter_call_error("drop schema") 227 228 def drop_relation(self, relation: BaseRelation) -> None: 229 self._raise_parsetime_adapter_call_error("drop relation") 230 231 def expand_target_column_types( 232 self, from_relation: BaseRelation, to_relation: BaseRelation 233 ) -> None: 234 self._raise_parsetime_adapter_call_error("expand target column types") 235 236 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 237 self._raise_parsetime_adapter_call_error("rename relation") 238 239 def execute( 240 self, sql: str, auto_begin: bool = False, fetch: bool = False 241 ) -> t.Tuple[AdapterResponse, agate.Table]: 242 self._raise_parsetime_adapter_call_error("execute SQL") 243 raise 244 245 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 246 return relation.schema 247 248 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 249 return relation.identifier 250 251 @staticmethod 252 def _raise_parsetime_adapter_call_error(action: str) -> None: 253 raise ParsetimeAdapterCallError(f"Can't {action} at parse time.")
Helper class that provides a standard way to create an ABC using inheritance.
196 def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]: 197 self._raise_parsetime_adapter_call_error("get relation") 198 raise
Returns a single relation that matches the provided path.
200 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 201 self._raise_parsetime_adapter_call_error("load relation") 202 raise
Returns a single relation that matches the provided relation if present.
204 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 205 self._raise_parsetime_adapter_call_error("list relation") 206 raise
Gets all relations in a given schema and optionally database.
TODO: Add caching functionality to avoid repeat visits to DB
208 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 209 self._raise_parsetime_adapter_call_error("list relation") 210 raise
Using the engine adapter, gets all the relations that match the given schema grain relation.
212 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 213 self._raise_parsetime_adapter_call_error("get columns") 214 raise
Returns the columns for a given table grained relation.
216 def get_missing_columns( 217 self, from_relation: BaseRelation, to_relation: BaseRelation 218 ) -> t.List[Column]: 219 self._raise_parsetime_adapter_call_error("get missing columns") 220 raise
Returns the columns in from_relation missing from to_relation.
222 def create_schema(self, relation: BaseRelation) -> None: 223 self._raise_parsetime_adapter_call_error("create schema")
Creates a schema in the target database.
225 def drop_schema(self, relation: BaseRelation) -> None: 226 self._raise_parsetime_adapter_call_error("drop schema")
Drops a schema in the target database.
228 def drop_relation(self, relation: BaseRelation) -> None: 229 self._raise_parsetime_adapter_call_error("drop relation")
Drops a relation (table) in the target database.
231 def expand_target_column_types( 232 self, from_relation: BaseRelation, to_relation: BaseRelation 233 ) -> None: 234 self._raise_parsetime_adapter_call_error("expand target column types")
Expand to_relation's column types to match those of from_relation.
236 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 237 self._raise_parsetime_adapter_call_error("rename relation")
Renames a relation (table) in the target database.
239 def execute( 240 self, sql: str, auto_begin: bool = False, fetch: bool = False 241 ) -> t.Tuple[AdapterResponse, agate.Table]: 242 self._raise_parsetime_adapter_call_error("execute SQL") 243 raise
Executes the given SQL statement and returns the results as an agate table.
Resolves the relation's schema to its physical schema.
256class RuntimeAdapter(BaseAdapter): 257 def __init__( 258 self, 259 engine_adapter: EngineAdapter, 260 jinja_macros: JinjaMacroRegistry, 261 jinja_globals: t.Optional[t.Dict[str, t.Any]] = None, 262 relation_type: t.Optional[t.Type[BaseRelation]] = None, 263 column_type: t.Optional[t.Type[Column]] = None, 264 quote_policy: t.Optional[Policy] = None, 265 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 266 table_mapping: t.Optional[t.Dict[str, str]] = None, 267 deployability_index: t.Optional[DeployabilityIndex] = None, 268 project_dialect: t.Optional[str] = None, 269 ): 270 from dbt.adapters.base import BaseRelation 271 from dbt.adapters.base.column import Column 272 273 super().__init__( 274 jinja_macros, 275 jinja_globals=jinja_globals, 276 project_dialect=project_dialect or engine_adapter.dialect, 277 quote_policy=quote_policy, 278 ) 279 280 table_mapping = table_mapping or {} 281 282 self.engine_adapter = engine_adapter 283 self.relation_type = relation_type or BaseRelation 284 self.column_type = column_type or Column 285 self.table_mapping = { 286 **to_table_mapping((snapshots or {}).values(), deployability_index), 287 **table_mapping, 288 } 289 290 def get_relation( 291 self, database: t.Optional[str], schema: str, identifier: str 292 ) -> t.Optional[BaseRelation]: 293 target_table = exp.table_(identifier, db=schema, catalog=database) 294 # Normalize before converting to a relation; otherwise, it will be too late, 295 # as quotes will have already been applied. 296 target_table = self._normalize(target_table) 297 return self.load_relation(self._table_to_relation(target_table)) 298 299 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 300 mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation))) 301 302 data_object = self.engine_adapter.get_data_object(mapped_table) 303 return self._data_object_to_relation(data_object) if data_object is not None else None 304 305 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 306 target_schema = schema_(schema, catalog=database) 307 # Normalize before converting to a relation; otherwise, it will be too late, 308 # as quotes will have already been applied. 309 target_schema = self._normalize(target_schema) 310 return self.list_relations_without_caching(self._table_to_relation(target_schema)) 311 312 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 313 schema = self._normalize(self._schema(schema_relation)) 314 315 relations = [ 316 self._data_object_to_relation(do) for do in self.engine_adapter.get_data_objects(schema) 317 ] 318 return relations 319 320 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 321 mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation))) 322 323 if self.project_dialect == "bigquery": 324 # dbt.adapters.bigquery.column.BigQueryColumn has a different constructor signature 325 # We need to use BigQueryColumn.create_from_field() to create the column instead 326 if ( 327 hasattr(self.column_type, "create_from_field") 328 and callable(getattr(self.column_type, "create_from_field")) 329 and hasattr(self.engine_adapter, "get_bq_schema") 330 and callable(getattr(self.engine_adapter, "get_bq_schema")) 331 ): 332 return [ 333 self.column_type.create_from_field(field) # type: ignore 334 for field in self.engine_adapter.get_bq_schema(mapped_table) # type: ignore 335 ] 336 from dbt.adapters.base.column import Column 337 338 return [ 339 Column.from_description( 340 name=name, raw_data_type=dtype.sql(dialect=self.project_dialect) 341 ) 342 for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items() 343 ] 344 return [ 345 self.column_type.from_description( 346 name=name, raw_data_type=dtype.sql(dialect=self.project_dialect) 347 ) 348 for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items() 349 ] 350 351 def get_missing_columns( 352 self, from_relation: BaseRelation, to_relation: BaseRelation 353 ) -> t.List[Column]: 354 target_columns = {col.name for col in self.get_columns_in_relation(to_relation)} 355 356 return [ 357 col 358 for col in self.get_columns_in_relation(from_relation) 359 if col.name not in target_columns 360 ] 361 362 def create_schema(self, relation: BaseRelation) -> None: 363 if relation.schema is not None: 364 self.engine_adapter.create_schema(self._normalize(self._schema(relation))) 365 366 def drop_schema(self, relation: BaseRelation) -> None: 367 if relation.schema is not None: 368 self.engine_adapter.drop_schema(self._normalize(self._schema(relation))) 369 370 def drop_relation(self, relation: BaseRelation) -> None: 371 if relation.schema is not None and relation.identifier is not None: 372 self.engine_adapter.drop_table(self._normalize(self._relation_to_table(relation))) 373 374 def expand_target_column_types( 375 self, from_relation: BaseRelation, to_relation: BaseRelation 376 ) -> None: 377 from_dbt_columns = {c.name: c for c in self.get_columns_in_relation(from_relation)} 378 to_dbt_columns = {c.name: c for c in self.get_columns_in_relation(to_relation)} 379 380 from_table_name = self._normalize(self._relation_to_table(from_relation)) 381 to_table_name = self._normalize(self._relation_to_table(to_relation)) 382 383 from_columns = self.engine_adapter.columns(from_table_name) 384 to_columns = self.engine_adapter.columns(to_table_name) 385 386 current_columns = {} 387 new_columns = {} 388 for column_name, from_column in from_dbt_columns.items(): 389 target_column = to_dbt_columns.get(column_name) 390 if target_column is not None and target_column.can_expand_to(from_column): 391 current_columns[column_name] = to_columns[column_name] 392 new_columns[column_name] = from_columns[column_name] 393 394 alter_expressions = t.cast( 395 t.List[TableAlterOperation], 396 self.engine_adapter.schema_differ.compare_columns( 397 to_table_name, 398 current_columns, 399 new_columns, 400 ignore_destructive=True, 401 ), 402 ) 403 404 if alter_expressions: 405 self.engine_adapter.alter_table(alter_expressions) 406 407 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 408 old_table_name = self._normalize(self._relation_to_table(from_relation)) 409 new_table_name = self._normalize(self._relation_to_table(to_relation)) 410 411 self.engine_adapter.rename_table(old_table_name, new_table_name) 412 413 def execute( 414 self, sql: str, auto_begin: bool = False, fetch: bool = False 415 ) -> t.Tuple[AdapterResponse, agate.Table]: 416 import pandas as pd 417 from dbt.adapters.base.impl import AdapterResponse 418 419 from sqlmesh.dbt.util import pandas_to_agate, empty_table 420 421 # mypy bug: https://github.com/python/mypy/issues/10740 422 exec_func: t.Callable[..., None | pd.DataFrame] = ( 423 self.engine_adapter.fetchdf if fetch else self.engine_adapter.execute # type: ignore 424 ) 425 426 expression = parse_one(sql, read=self.project_dialect) 427 with normalize_and_quote( 428 expression, t.cast(str, self.project_dialect), self.engine_adapter.default_catalog 429 ) as expression: 430 expression = exp.replace_tables( 431 expression, self.table_mapping, dialect=self.project_dialect, copy=False 432 ) 433 434 if auto_begin: 435 # TODO: This could be a bug. I think dbt leaves the transaction open while we close immediately. 436 with self.engine_adapter.transaction(): 437 resp = exec_func(expression, quote_identifiers=False) 438 else: 439 resp = exec_func(expression, quote_identifiers=False) 440 441 # TODO: Properly fill in adapter response 442 if fetch: 443 assert isinstance(resp, pd.DataFrame) 444 return AdapterResponse("Success"), pandas_to_agate(resp) 445 return AdapterResponse("Success"), empty_table() 446 447 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 448 schema = self._map_table_name(self._normalize(self._relation_to_table(relation))).db 449 return schema if schema else None 450 451 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 452 identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name 453 return identifier if identifier else None 454 455 def _map_table_name(self, table: exp.Table) -> exp.Table: 456 # Use the default dialect since this is the dialect used to normalize and quote keys in the 457 # mapping table. 458 name = table.sql(identify=True) 459 physical_table_name = self.table_mapping.get(name) 460 if not physical_table_name: 461 return table 462 463 logger.debug("Resolved ref '%s' to snapshot table '%s'", name, physical_table_name) 464 465 return exp.to_table(physical_table_name, dialect=self.project_dialect) 466 467 def _relation_to_table(self, relation: BaseRelation) -> exp.Table: 468 return exp.to_table(relation.render(), dialect=self.project_dialect) 469 470 def _data_object_to_relation(self, data_object: DataObject) -> BaseRelation: 471 from sqlmesh.dbt.relation import RelationType 472 473 if data_object.type.is_unknown: 474 dbt_relation_type = RelationType.External 475 elif data_object.type.is_managed_table: 476 dbt_relation_type = RelationType.Table 477 else: 478 dbt_relation_type = RelationType(data_object.type.lower()) 479 480 return self.relation_type.create( 481 database=data_object.catalog, 482 schema=data_object.schema_name, 483 identifier=data_object.name, 484 quote_policy=self.quote_policy, 485 type=dbt_relation_type, 486 ) 487 488 def _table_to_relation(self, table: exp.Table) -> BaseRelation: 489 return self.relation_type.create( 490 database=table.catalog or None, 491 schema=table.db, 492 identifier=table.name, 493 quote_policy=self.quote_policy, 494 ) 495 496 def _schema(self, schema_relation: BaseRelation) -> exp.Table: 497 assert schema_relation.schema is not None 498 return exp.Table( 499 this=None, 500 db=exp.to_identifier(schema_relation.schema, quoted=self.quote_policy.schema), 501 catalog=exp.to_identifier(schema_relation.database, quoted=self.quote_policy.database), 502 ) 503 504 def _normalize(self, input_table: exp.Table) -> exp.Table: 505 normalized_name = normalize_model_name( 506 input_table, self.engine_adapter.default_catalog, self.project_dialect 507 ) 508 normalized_table = exp.to_table(normalized_name) 509 if not input_table.this: 510 normalized_table.set("catalog", normalized_table.args.get("db")) 511 normalized_table.set("db", normalized_table.this) 512 normalized_table.set("this", None) 513 return normalized_table
Helper class that provides a standard way to create an ABC using inheritance.
257 def __init__( 258 self, 259 engine_adapter: EngineAdapter, 260 jinja_macros: JinjaMacroRegistry, 261 jinja_globals: t.Optional[t.Dict[str, t.Any]] = None, 262 relation_type: t.Optional[t.Type[BaseRelation]] = None, 263 column_type: t.Optional[t.Type[Column]] = None, 264 quote_policy: t.Optional[Policy] = None, 265 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 266 table_mapping: t.Optional[t.Dict[str, str]] = None, 267 deployability_index: t.Optional[DeployabilityIndex] = None, 268 project_dialect: t.Optional[str] = None, 269 ): 270 from dbt.adapters.base import BaseRelation 271 from dbt.adapters.base.column import Column 272 273 super().__init__( 274 jinja_macros, 275 jinja_globals=jinja_globals, 276 project_dialect=project_dialect or engine_adapter.dialect, 277 quote_policy=quote_policy, 278 ) 279 280 table_mapping = table_mapping or {} 281 282 self.engine_adapter = engine_adapter 283 self.relation_type = relation_type or BaseRelation 284 self.column_type = column_type or Column 285 self.table_mapping = { 286 **to_table_mapping((snapshots or {}).values(), deployability_index), 287 **table_mapping, 288 }
290 def get_relation( 291 self, database: t.Optional[str], schema: str, identifier: str 292 ) -> t.Optional[BaseRelation]: 293 target_table = exp.table_(identifier, db=schema, catalog=database) 294 # Normalize before converting to a relation; otherwise, it will be too late, 295 # as quotes will have already been applied. 296 target_table = self._normalize(target_table) 297 return self.load_relation(self._table_to_relation(target_table))
Returns a single relation that matches the provided path.
299 def load_relation(self, relation: BaseRelation) -> t.Optional[BaseRelation]: 300 mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation))) 301 302 data_object = self.engine_adapter.get_data_object(mapped_table) 303 return self._data_object_to_relation(data_object) if data_object is not None else None
Returns a single relation that matches the provided relation if present.
305 def list_relations(self, database: t.Optional[str], schema: str) -> t.List[BaseRelation]: 306 target_schema = schema_(schema, catalog=database) 307 # Normalize before converting to a relation; otherwise, it will be too late, 308 # as quotes will have already been applied. 309 target_schema = self._normalize(target_schema) 310 return self.list_relations_without_caching(self._table_to_relation(target_schema))
Gets all relations in a given schema and optionally database.
TODO: Add caching functionality to avoid repeat visits to DB
312 def list_relations_without_caching(self, schema_relation: BaseRelation) -> t.List[BaseRelation]: 313 schema = self._normalize(self._schema(schema_relation)) 314 315 relations = [ 316 self._data_object_to_relation(do) for do in self.engine_adapter.get_data_objects(schema) 317 ] 318 return relations
Using the engine adapter, gets all the relations that match the given schema grain relation.
320 def get_columns_in_relation(self, relation: BaseRelation) -> t.List[Column]: 321 mapped_table = self._map_table_name(self._normalize(self._relation_to_table(relation))) 322 323 if self.project_dialect == "bigquery": 324 # dbt.adapters.bigquery.column.BigQueryColumn has a different constructor signature 325 # We need to use BigQueryColumn.create_from_field() to create the column instead 326 if ( 327 hasattr(self.column_type, "create_from_field") 328 and callable(getattr(self.column_type, "create_from_field")) 329 and hasattr(self.engine_adapter, "get_bq_schema") 330 and callable(getattr(self.engine_adapter, "get_bq_schema")) 331 ): 332 return [ 333 self.column_type.create_from_field(field) # type: ignore 334 for field in self.engine_adapter.get_bq_schema(mapped_table) # type: ignore 335 ] 336 from dbt.adapters.base.column import Column 337 338 return [ 339 Column.from_description( 340 name=name, raw_data_type=dtype.sql(dialect=self.project_dialect) 341 ) 342 for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items() 343 ] 344 return [ 345 self.column_type.from_description( 346 name=name, raw_data_type=dtype.sql(dialect=self.project_dialect) 347 ) 348 for name, dtype in self.engine_adapter.columns(table_name=mapped_table).items() 349 ]
Returns the columns for a given table grained relation.
351 def get_missing_columns( 352 self, from_relation: BaseRelation, to_relation: BaseRelation 353 ) -> t.List[Column]: 354 target_columns = {col.name for col in self.get_columns_in_relation(to_relation)} 355 356 return [ 357 col 358 for col in self.get_columns_in_relation(from_relation) 359 if col.name not in target_columns 360 ]
Returns the columns in from_relation missing from to_relation.
362 def create_schema(self, relation: BaseRelation) -> None: 363 if relation.schema is not None: 364 self.engine_adapter.create_schema(self._normalize(self._schema(relation)))
Creates a schema in the target database.
366 def drop_schema(self, relation: BaseRelation) -> None: 367 if relation.schema is not None: 368 self.engine_adapter.drop_schema(self._normalize(self._schema(relation)))
Drops a schema in the target database.
370 def drop_relation(self, relation: BaseRelation) -> None: 371 if relation.schema is not None and relation.identifier is not None: 372 self.engine_adapter.drop_table(self._normalize(self._relation_to_table(relation)))
Drops a relation (table) in the target database.
374 def expand_target_column_types( 375 self, from_relation: BaseRelation, to_relation: BaseRelation 376 ) -> None: 377 from_dbt_columns = {c.name: c for c in self.get_columns_in_relation(from_relation)} 378 to_dbt_columns = {c.name: c for c in self.get_columns_in_relation(to_relation)} 379 380 from_table_name = self._normalize(self._relation_to_table(from_relation)) 381 to_table_name = self._normalize(self._relation_to_table(to_relation)) 382 383 from_columns = self.engine_adapter.columns(from_table_name) 384 to_columns = self.engine_adapter.columns(to_table_name) 385 386 current_columns = {} 387 new_columns = {} 388 for column_name, from_column in from_dbt_columns.items(): 389 target_column = to_dbt_columns.get(column_name) 390 if target_column is not None and target_column.can_expand_to(from_column): 391 current_columns[column_name] = to_columns[column_name] 392 new_columns[column_name] = from_columns[column_name] 393 394 alter_expressions = t.cast( 395 t.List[TableAlterOperation], 396 self.engine_adapter.schema_differ.compare_columns( 397 to_table_name, 398 current_columns, 399 new_columns, 400 ignore_destructive=True, 401 ), 402 ) 403 404 if alter_expressions: 405 self.engine_adapter.alter_table(alter_expressions)
Expand to_relation's column types to match those of from_relation.
407 def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: 408 old_table_name = self._normalize(self._relation_to_table(from_relation)) 409 new_table_name = self._normalize(self._relation_to_table(to_relation)) 410 411 self.engine_adapter.rename_table(old_table_name, new_table_name)
Renames a relation (table) in the target database.
413 def execute( 414 self, sql: str, auto_begin: bool = False, fetch: bool = False 415 ) -> t.Tuple[AdapterResponse, agate.Table]: 416 import pandas as pd 417 from dbt.adapters.base.impl import AdapterResponse 418 419 from sqlmesh.dbt.util import pandas_to_agate, empty_table 420 421 # mypy bug: https://github.com/python/mypy/issues/10740 422 exec_func: t.Callable[..., None | pd.DataFrame] = ( 423 self.engine_adapter.fetchdf if fetch else self.engine_adapter.execute # type: ignore 424 ) 425 426 expression = parse_one(sql, read=self.project_dialect) 427 with normalize_and_quote( 428 expression, t.cast(str, self.project_dialect), self.engine_adapter.default_catalog 429 ) as expression: 430 expression = exp.replace_tables( 431 expression, self.table_mapping, dialect=self.project_dialect, copy=False 432 ) 433 434 if auto_begin: 435 # TODO: This could be a bug. I think dbt leaves the transaction open while we close immediately. 436 with self.engine_adapter.transaction(): 437 resp = exec_func(expression, quote_identifiers=False) 438 else: 439 resp = exec_func(expression, quote_identifiers=False) 440 441 # TODO: Properly fill in adapter response 442 if fetch: 443 assert isinstance(resp, pd.DataFrame) 444 return AdapterResponse("Success"), pandas_to_agate(resp) 445 return AdapterResponse("Success"), empty_table()
Executes the given SQL statement and returns the results as an agate table.
447 def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]: 448 schema = self._map_table_name(self._normalize(self._relation_to_table(relation))).db 449 return schema if schema else None
Resolves the relation's schema to its physical schema.
451 def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]: 452 identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name 453 return identifier if identifier else None
Resolves the relation's schema to its physical identifier.