sqlmesh.core.schema_diff
1from __future__ import annotations 2 3import abc 4import logging 5import typing as t 6from dataclasses import dataclass 7from collections import defaultdict 8from enum import Enum 9 10from pydantic import Field 11from sqlglot import exp 12from sqlglot.helper import ensure_list, seq_get 13 14from sqlmesh.utils import columns_to_types_to_struct 15from sqlmesh.utils.pydantic import PydanticModel 16from sqlmesh.utils.errors import SQLMeshError 17 18if t.TYPE_CHECKING: 19 from sqlmesh.core._typing import TableName 20 21logger = logging.getLogger(__name__) 22 23 24@dataclass(frozen=True) 25class TableAlterOperation(abc.ABC): 26 target_table: exp.Table 27 28 @property 29 @abc.abstractmethod 30 def is_destructive(self) -> bool: 31 pass 32 33 @property 34 @abc.abstractmethod 35 def is_additive(self) -> bool: 36 pass 37 38 @property 39 @abc.abstractmethod 40 def _alter_actions(self) -> t.List[exp.Expr]: 41 pass 42 43 @property 44 def expression(self) -> exp.Alter: 45 return exp.Alter( 46 this=self.target_table, 47 kind="TABLE", 48 actions=self._alter_actions, 49 ) 50 51 52@dataclass(frozen=True) 53class TableAlterColumnOperation(TableAlterOperation, abc.ABC): 54 column_parts: t.List[TableAlterColumn] 55 expected_table_struct: exp.DataType 56 array_element_selector: str 57 58 @property 59 def column_identifiers(self) -> t.List[exp.Identifier]: 60 results = [] 61 for column in self.column_parts: 62 results.append(column.identifier) 63 if ( 64 column.is_array_of_struct 65 and len(self.column_parts) > 1 66 and self.array_element_selector 67 ): 68 results.append(exp.to_identifier(self.array_element_selector)) 69 return results 70 71 @property 72 def column(self) -> t.Union[exp.Dot, exp.Identifier]: 73 columns = self.column_identifiers 74 if len(columns) == 1: 75 return columns[0] 76 return exp.Dot.build(columns) 77 78 79@dataclass(frozen=True) 80class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC): 81 column_type: exp.DataType 82 83 @property 84 def column_def(self) -> exp.ColumnDef: 85 if not self.column_type: 86 raise SQLMeshError("Tried to access column type when it shouldn't be needed") 87 return exp.ColumnDef( 88 this=self.column, 89 kind=self.column_type, 90 ) 91 92 93@dataclass(frozen=True) 94class TableAlterAddColumnOperation(TableAlterTypedColumnOperation): 95 position: t.Optional[TableAlterColumnPosition] = None 96 is_part_of_destructive_change: bool = False 97 98 @property 99 def is_additive(self) -> bool: 100 return not self.is_part_of_destructive_change 101 102 @property 103 def is_destructive(self) -> bool: 104 return self.is_part_of_destructive_change 105 106 @property 107 def _alter_actions(self) -> t.List[exp.Expr]: 108 column_def = exp.ColumnDef( 109 this=self.column, 110 kind=self.column_type, 111 ) 112 if self.position: 113 column_def.set("position", self.position.column_position_node) 114 return [column_def] 115 116 117@dataclass(frozen=True) 118class TableAlterDropColumnOperation(TableAlterColumnOperation): 119 cascade: bool = False 120 121 @property 122 def is_additive(self) -> bool: 123 return False 124 125 @property 126 def is_destructive(self) -> bool: 127 return True 128 129 @property 130 def _alter_actions(self) -> t.List[exp.Expr]: 131 return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)] 132 133 134@dataclass(frozen=True) 135class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation): 136 current_type: exp.DataType 137 is_part_of_destructive_change: bool = False 138 139 @property 140 def is_additive(self) -> bool: 141 return not self.is_part_of_destructive_change 142 143 @property 144 def is_destructive(self) -> bool: 145 return self.is_part_of_destructive_change 146 147 @property 148 def _alter_actions(self) -> t.List[exp.Expr]: 149 return [ 150 exp.AlterColumn( 151 this=self.column, 152 dtype=self.column_type, 153 ) 154 ] 155 156 157@dataclass(frozen=True) 158class TableAlterColumn: 159 name: str 160 is_struct: bool 161 is_array_of_struct: bool 162 is_array_of_primitive: bool 163 quoted: bool = False 164 165 @classmethod 166 def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 167 return cls( 168 name=name, 169 is_struct=False, 170 is_array_of_struct=False, 171 is_array_of_primitive=False, 172 quoted=quoted, 173 ) 174 175 @classmethod 176 def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 177 return cls( 178 name=name, 179 is_struct=True, 180 is_array_of_struct=False, 181 is_array_of_primitive=False, 182 quoted=quoted, 183 ) 184 185 @classmethod 186 def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 187 return cls( 188 name=name, 189 is_struct=False, 190 is_array_of_struct=True, 191 is_array_of_primitive=False, 192 quoted=quoted, 193 ) 194 195 @classmethod 196 def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 197 return cls( 198 name=name, 199 is_struct=False, 200 is_array_of_struct=False, 201 is_array_of_primitive=True, 202 quoted=quoted, 203 ) 204 205 @classmethod 206 def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn: 207 name = struct.alias_or_name 208 quoted = struct.this.quoted 209 kwarg_type = struct.args["kind"] 210 211 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 212 return cls.struct(name, quoted=quoted) 213 if kwarg_type.is_type(exp.DataType.Type.ARRAY): 214 if kwarg_type.expressions and kwarg_type.expressions[0].is_type( 215 exp.DataType.Type.STRUCT 216 ): 217 return cls.array_of_struct(name, quoted=quoted) 218 return cls.array_of_primitive(name, quoted=quoted) 219 return cls.primitive(name, quoted=quoted) 220 221 @property 222 def is_array(self) -> bool: 223 return self.is_array_of_struct or self.is_array_of_primitive 224 225 @property 226 def is_primitive(self) -> bool: 227 return not self.is_struct and not self.is_array 228 229 @property 230 def is_nested(self) -> bool: 231 return not self.is_primitive 232 233 @property 234 def identifier(self) -> exp.Identifier: 235 return exp.to_identifier(self.name, quoted=self.quoted) 236 237 238@dataclass(frozen=True) 239class TableAlterColumnPosition: 240 is_first: bool 241 is_last: bool 242 after: t.Optional[exp.Identifier] = None 243 244 @classmethod 245 def first(cls) -> TableAlterColumnPosition: 246 return cls(is_first=True, is_last=False, after=None) 247 248 @classmethod 249 def last( 250 cls, after: t.Optional[t.Union[str, exp.Identifier]] = None 251 ) -> TableAlterColumnPosition: 252 return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None) 253 254 @classmethod 255 def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition: 256 return cls(is_first=False, is_last=False, after=exp.to_identifier(after)) 257 258 @classmethod 259 def create( 260 cls, 261 pos: int, 262 current_kwargs: t.List[exp.ColumnDef], 263 replacing_col: bool = False, 264 ) -> TableAlterColumnPosition: 265 is_first = pos == 0 266 is_last = pos == len(current_kwargs) - int(replacing_col) 267 after = None 268 if not is_first: 269 prior_kwarg = current_kwargs[pos - 1] 270 after, _ = _get_name_and_type(prior_kwarg) 271 return cls(is_first=is_first, is_last=is_last, after=after) 272 273 @property 274 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 275 column = self.after if not self.is_last else None 276 position = None 277 if self.is_first: 278 position = "FIRST" 279 elif column and not self.is_last: 280 position = "AFTER" 281 return exp.ColumnPosition(this=column, position=position) 282 283 284class NestedSupport(str, Enum): 285 # Supports all nested data type operations 286 ALL = "ALL" 287 # Does not support any nested data type operations 288 NONE = "NONE" 289 # Supports nested data type operations except for those that require dropping a nested field 290 ALL_BUT_DROP = "ALL_BUT_DROP" 291 # Ignores all nested data type operations 292 IGNORE = "IGNORE" 293 294 @property 295 def is_all(self) -> bool: 296 return self == NestedSupport.ALL 297 298 @property 299 def is_none(self) -> bool: 300 return self == NestedSupport.NONE 301 302 @property 303 def is_all_but_drop(self) -> bool: 304 return self == NestedSupport.ALL_BUT_DROP 305 306 @property 307 def is_ignore(self) -> bool: 308 return self == NestedSupport.IGNORE 309 310 311class SchemaDiffer(PydanticModel): 312 """ 313 Compares a source schema against a target schema and returns a list of alter statements to have the source 314 match the structure of target. Some engines have constraints on the types of operations that can be performed 315 therefore the final structure may not match the target exactly but it will be as close as possible. Two potential 316 differences that can happen: 317 1. Column order can be different if the engine doesn't support positional additions. Another reason for difference 318 is if a column is just moved since we don't currently support fixing moves. 319 2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested 320 operations. As a result historical data is lost. 321 3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible 322 change. As a result historical data is lost. 323 324 Potential future improvements: 325 1. Support column moves. Databricks Delta supports moves and would allow exact matches. 326 327 Args: 328 support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a 329 specific position in the set of existing columns. 330 nested_support: How the engine for which the diff is being computed supports nested types. 331 compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data 332 type, and value is the set of types that are compatible with it. 333 coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without 334 altering the column type. NOTE: usually callers should not specify this attribute manually and set the 335 `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules. 336 For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted 337 into a FLOAT64 column just fine. 338 support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct 339 coercion of compatible types. 340 drop_cascade: Whether to add CASCADE modifier when dropping a column. 341 parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type, 342 but in the engine adapter specification we build it from the dialect string instead of specifying it directly. 343 Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT` 344 to which it parses. We do that because parameter default replacement will silently break if we specify type 345 directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default 346 values in a list, where the list index contains the remaining defaults given the number of parameter values 347 provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two 348 omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return 349 index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)". 350 max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`. 351 types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly 352 parameterized type can ALTER to its unlimited length version, along with different types in some engines. 353 treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it 354 concludes the change is compatible and won't result in data loss. If this flag is set to True, it will 355 flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't 356 be set outside of that context. 357 """ 358 359 support_positional_add: bool = False 360 nested_support: NestedSupport = NestedSupport.NONE 361 array_element_selector: str = "" 362 compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 363 coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field( 364 default_factory=dict, alias="coerceable_types" 365 ) 366 precision_increase_allowed_types: t.Optional[t.Set[exp.DType]] = None 367 support_coercing_compatible_types: bool = False 368 drop_cascade: bool = False 369 parameterized_type_defaults: t.Dict[exp.DType, t.List[t.Tuple[t.Union[int, float], ...]]] = {} 370 max_parameter_length: t.Dict[exp.DType, t.Union[int, float]] = {} 371 types_with_unlimited_length: t.Dict[exp.DType, t.Set[exp.DType]] = {} 372 treat_alter_data_type_as_destructive: bool = False 373 374 _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 375 376 @property 377 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 378 if not self._coerceable_types: 379 if not self.support_coercing_compatible_types or not self.compatible_types: 380 return self.coerceable_types_ 381 coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set) 382 coerceable_types.update(self.coerceable_types_) 383 for source_type, target_types in self.compatible_types.items(): 384 for target_type in target_types: 385 coerceable_types[target_type].add(source_type) 386 self._coerceable_types = coerceable_types 387 return self._coerceable_types 388 389 def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 390 # types are identical or both types are parameterized and new has higher precision 391 # - default parameter values are automatically provided if not present 392 if current_type == new_type or ( 393 self._is_precision_increase_allowed(current_type) 394 and self._is_precision_increase(current_type, new_type) 395 ): 396 return True 397 # types are un-parameterized and compatible 398 if current_type in self.compatible_types: 399 return new_type in self.compatible_types[current_type] 400 # new type is un-parameterized and has unlimited length, current type is compatible 401 if not new_type.expressions and new_type.this in self.types_with_unlimited_length: 402 return current_type.this in self.types_with_unlimited_length[new_type.this] 403 return False 404 405 def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 406 if current_type in self.coerceable_types: 407 is_coerceable = new_type in self.coerceable_types[current_type] 408 if is_coerceable: 409 from sqlmesh.core.console import get_console 410 411 get_console().log_warning( 412 f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning." 413 ) 414 415 return is_coerceable 416 return False 417 418 def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool: 419 return ( 420 self.precision_increase_allowed_types is None 421 or current_type.this in self.precision_increase_allowed_types 422 ) 423 424 def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 425 if current_type.this == new_type.this and not current_type.is_type( 426 *exp.DataType.NESTED_TYPES 427 ): 428 current_params = self.get_type_parameters(current_type) 429 new_params = self.get_type_parameters(new_type) 430 431 if len(current_params) != len(new_params): 432 return False 433 434 return all(new >= current for current, new in zip(current_params, new_params)) 435 return False 436 437 def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]: 438 def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]: 439 try: 440 return int(string) 441 except ValueError: 442 try: 443 return float(string) 444 except ValueError: 445 if allows_max_param and string.upper() == "MAX": 446 return self.max_parameter_length[type.this] 447 raise ValueError(f"Could not convert '{string}' to a number") 448 449 # extract existing parameters 450 params = [ 451 _str_to_number(param.this.this, type.this in self.max_parameter_length) 452 for param in type.expressions 453 ] 454 455 # maybe get default parameter values 456 param_defaults: t.Tuple[t.Union[int, float], ...] = () 457 if type.this in self.parameterized_type_defaults: 458 param_defaults_list = self.parameterized_type_defaults[type.this] 459 if len(params) < len(param_defaults_list): 460 param_defaults = param_defaults_list[len(params)] 461 462 return [*params, *param_defaults] 463 464 def _get_matching_kwarg( 465 self, 466 current_kwarg: t.Union[str, exp.ColumnDef], 467 new_struct: exp.DataType, 468 current_pos: int, 469 ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]: 470 current_name = ( 471 exp.to_identifier(current_kwarg) 472 if isinstance(current_kwarg, str) 473 else _get_name_and_type(current_kwarg)[0] 474 ) 475 # First check if we have the same column in the same position to get O(1) complexity 476 new_kwarg = seq_get(new_struct.expressions, current_pos) 477 if new_kwarg: 478 new_name, new_type = _get_name_and_type(new_kwarg) 479 if current_name.this == new_name.this: 480 return current_pos, new_kwarg 481 # If not, check if we have the same column in all positions with O(n) complexity 482 for i, new_kwarg in enumerate(new_struct.expressions): 483 new_name, new_type = _get_name_and_type(new_kwarg) 484 if current_name.this == new_name.this: 485 return i, new_kwarg 486 return None, None 487 488 def _drop_operation( 489 self, 490 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 491 struct: exp.DataType, 492 pos: int, 493 root_struct: exp.DataType, 494 table_name: TableName, 495 ) -> t.List[TableAlterColumnOperation]: 496 columns = ensure_list(columns) 497 operations: t.List[TableAlterColumnOperation] = [] 498 column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos) 499 if column_pos is None or not column_kwarg: 500 raise SQLMeshError( 501 f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. " 502 f"This may indicate a mismatch between the expected and actual table schemas." 503 ) 504 struct.expressions.pop(column_pos) 505 operations.append( 506 TableAlterDropColumnOperation( 507 target_table=exp.to_table(table_name), 508 column_parts=columns, 509 expected_table_struct=root_struct.copy(), 510 cascade=self.drop_cascade, 511 array_element_selector=self.array_element_selector, 512 ) 513 ) 514 return operations 515 516 def _requires_drop_alteration( 517 self, current_struct: exp.DataType, new_struct: exp.DataType 518 ) -> bool: 519 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 520 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 521 if new_pos is None: 522 return True 523 return False 524 525 def _resolve_drop_operation( 526 self, 527 parent_columns: t.List[TableAlterColumn], 528 current_struct: exp.DataType, 529 new_struct: exp.DataType, 530 root_struct: exp.DataType, 531 table_name: TableName, 532 ) -> t.List[TableAlterColumnOperation]: 533 operations = [] 534 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 535 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 536 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 537 if new_pos is None: 538 operations.extend( 539 self._drop_operation( 540 columns, current_struct, current_pos, root_struct, table_name 541 ) 542 ) 543 return operations 544 545 def _add_operation( 546 self, 547 columns: t.List[TableAlterColumn], 548 new_pos: int, 549 new_kwarg: exp.ColumnDef, 550 current_struct: exp.DataType, 551 root_struct: exp.DataType, 552 table_name: TableName, 553 is_part_of_destructive_change: bool = False, 554 ) -> t.List[TableAlterColumnOperation]: 555 if self.support_positional_add: 556 col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) 557 current_struct.expressions.insert(new_pos, new_kwarg) 558 else: 559 col_pos = None 560 current_struct.expressions.append(new_kwarg) 561 return [ 562 TableAlterAddColumnOperation( 563 target_table=exp.to_table(table_name), 564 column_parts=columns, 565 column_type=new_kwarg.args["kind"], 566 expected_table_struct=root_struct.copy(), 567 position=col_pos, 568 is_part_of_destructive_change=is_part_of_destructive_change, 569 array_element_selector=self.array_element_selector, 570 ) 571 ] 572 573 def _resolve_add_operations( 574 self, 575 parent_columns: t.List[TableAlterColumn], 576 current_struct: exp.DataType, 577 new_struct: exp.DataType, 578 root_struct: exp.DataType, 579 table_name: TableName, 580 ) -> t.List[TableAlterColumnOperation]: 581 operations = [] 582 for new_pos, new_kwarg in enumerate(new_struct.expressions): 583 possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos) 584 if possible_current_pos is None: 585 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)] 586 operations.extend( 587 self._add_operation( 588 columns, new_pos, new_kwarg, current_struct, root_struct, table_name 589 ) 590 ) 591 return operations 592 593 def _alter_operation( 594 self, 595 columns: t.List[TableAlterColumn], 596 pos: int, 597 struct: exp.DataType, 598 new_type: exp.DataType, 599 current_type: t.Union[str, exp.DataType], 600 root_struct: exp.DataType, 601 new_kwarg: exp.ColumnDef, 602 table_name: TableName, 603 *, 604 ignore_destructive: bool = False, 605 ignore_additive: bool = False, 606 ) -> t.List[TableAlterColumnOperation]: 607 # We don't copy on purpose here because current_type may need to be mutated inside 608 # _get_operations (struct.expressions.pop and struct.expressions.insert) 609 current_type = exp.DataType.build(current_type, copy=False) 610 if not self.nested_support.is_none: 611 if new_type.this == current_type.this == exp.DataType.Type.STRUCT: 612 if self.nested_support.is_ignore: 613 return [] 614 if self.nested_support.is_all or not self._requires_drop_alteration( 615 current_type, new_type 616 ): 617 return self._get_operations( 618 columns, 619 current_type, 620 new_type, 621 root_struct, 622 table_name, 623 ignore_destructive=ignore_destructive, 624 ignore_additive=ignore_additive, 625 ) 626 627 if new_type.this == current_type.this == exp.DataType.Type.ARRAY: 628 # Some engines (i.e. Snowflake) don't support defining types on arrays 629 if not new_type.expressions or not current_type.expressions: 630 return [] 631 new_array_type = new_type.expressions[0] 632 current_array_type = current_type.expressions[0] 633 if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT: 634 if self.nested_support.is_ignore: 635 return [] 636 if self.nested_support.is_all or not self._requires_drop_alteration( 637 current_array_type, new_array_type 638 ): 639 return self._get_operations( 640 columns, 641 current_array_type, 642 new_array_type, 643 root_struct, 644 table_name, 645 ignore_destructive=ignore_destructive, 646 ignore_additive=ignore_additive, 647 ) 648 if self._is_coerceable_type(current_type, new_type): 649 return [] 650 if self._is_compatible_type(current_type, new_type): 651 if ignore_additive: 652 return [] 653 struct.expressions.pop(pos) 654 struct.expressions.insert(pos, new_kwarg) 655 return [ 656 TableAlterChangeColumnTypeOperation( 657 target_table=exp.to_table(table_name), 658 column_parts=columns, 659 column_type=new_type, 660 current_type=current_type, 661 expected_table_struct=root_struct.copy(), 662 array_element_selector=self.array_element_selector, 663 is_part_of_destructive_change=self.treat_alter_data_type_as_destructive, 664 ) 665 ] 666 if ignore_destructive: 667 return [] 668 return self._drop_operation( 669 columns, 670 root_struct, 671 pos, 672 root_struct, 673 table_name, 674 ) + self._add_operation( 675 columns, 676 pos, 677 new_kwarg, 678 struct, 679 root_struct, 680 table_name, 681 is_part_of_destructive_change=True, 682 ) 683 684 def _resolve_alter_operations( 685 self, 686 parent_columns: t.List[TableAlterColumn], 687 current_struct: exp.DataType, 688 new_struct: exp.DataType, 689 root_struct: exp.DataType, 690 table_name: TableName, 691 *, 692 ignore_destructive: bool = False, 693 ignore_additive: bool = False, 694 ) -> t.List[TableAlterColumnOperation]: 695 operations = [] 696 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 697 _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 698 if new_kwarg is None: 699 if ignore_destructive: 700 continue 701 raise ValueError("Cannot alter a column that is being dropped") 702 _, new_type = _get_name_and_type(new_kwarg) 703 _, current_type = _get_name_and_type(current_kwarg) 704 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 705 if new_type == current_type: 706 continue 707 operations.extend( 708 self._alter_operation( 709 columns, 710 current_pos, 711 current_struct, 712 new_type, 713 current_type, 714 root_struct, 715 new_kwarg, 716 table_name, 717 ignore_destructive=ignore_destructive, 718 ignore_additive=ignore_additive, 719 ) 720 ) 721 return operations 722 723 def _get_operations( 724 self, 725 parent_columns: t.List[TableAlterColumn], 726 current_struct: exp.DataType, 727 new_struct: exp.DataType, 728 root_struct: exp.DataType, 729 table_name: TableName, 730 *, 731 ignore_destructive: bool = False, 732 ignore_additive: bool = False, 733 ) -> t.List[TableAlterColumnOperation]: 734 root_struct = root_struct or current_struct 735 parent_columns = parent_columns or [] 736 operations = [] 737 if not ignore_destructive: 738 operations.extend( 739 self._resolve_drop_operation( 740 parent_columns, current_struct, new_struct, root_struct, table_name 741 ) 742 ) 743 if not ignore_additive: 744 operations.extend( 745 self._resolve_add_operations( 746 parent_columns, current_struct, new_struct, root_struct, table_name 747 ) 748 ) 749 operations.extend( 750 self._resolve_alter_operations( 751 parent_columns, 752 current_struct, 753 new_struct, 754 root_struct, 755 ignore_destructive=ignore_destructive, 756 ignore_additive=ignore_additive, 757 table_name=table_name, 758 ) 759 ) 760 return operations 761 762 def _from_structs( 763 self, 764 current_struct: exp.DataType, 765 new_struct: exp.DataType, 766 table_name: TableName, 767 *, 768 ignore_destructive: bool = False, 769 ignore_additive: bool = False, 770 ) -> t.List[TableAlterColumnOperation]: 771 return self._get_operations( 772 [], 773 current_struct, 774 new_struct, 775 current_struct, 776 table_name=table_name, 777 ignore_destructive=ignore_destructive, 778 ignore_additive=ignore_additive, 779 ) 780 781 def _compare_structs( 782 self, 783 table_name: t.Union[str, exp.Table], 784 current: exp.DataType, 785 new: exp.DataType, 786 *, 787 ignore_destructive: bool = False, 788 ignore_additive: bool = False, 789 ) -> t.List[TableAlterColumnOperation]: 790 return self._from_structs( 791 current, 792 new, 793 table_name=table_name, 794 ignore_destructive=ignore_destructive, 795 ignore_additive=ignore_additive, 796 ) 797 798 def compare_columns( 799 self, 800 table_name: TableName, 801 current: t.Dict[str, exp.DataType], 802 new: t.Dict[str, exp.DataType], 803 *, 804 ignore_destructive: bool = False, 805 ignore_additive: bool = False, 806 ) -> t.List[TableAlterColumnOperation]: 807 return self._compare_structs( 808 table_name, 809 columns_to_types_to_struct(current), 810 columns_to_types_to_struct(new), 811 ignore_destructive=ignore_destructive, 812 ignore_additive=ignore_additive, 813 ) 814 815 816def has_drop_alteration(alter_operations: t.List[TableAlterOperation]) -> bool: 817 return any(op.is_destructive for op in alter_operations) 818 819 820def has_additive_alteration(alter_operations: t.List[TableAlterOperation]) -> bool: 821 return any(op.is_additive for op in alter_operations) 822 823 824def get_additive_changes( 825 alter_operations: t.List[TableAlterOperation], 826) -> t.List[TableAlterOperation]: 827 return [x for x in alter_operations if x.is_additive] 828 829 830def get_dropped_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]: 831 return [ 832 op.column.alias_or_name 833 for op in alter_expressions 834 if isinstance(op, TableAlterDropColumnOperation) 835 ] 836 837 838def get_additive_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]: 839 return [ 840 op.column.alias_or_name 841 for op in alter_expressions 842 if op.is_additive and isinstance(op, TableAlterColumnOperation) 843 ] 844 845 846def get_schema_differ( 847 dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None 848) -> SchemaDiffer: 849 """ 850 Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter. 851 852 Args: 853 dialect: The dialect for which to get the schema differ. 854 overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance. 855 856 Returns: 857 The SchemaDiffer instance configured for the given dialect. 858 """ 859 from sqlmesh.core.engine_adapter import ( 860 DIALECT_TO_ENGINE_ADAPTER, 861 DIALECT_ALIASES, 862 EngineAdapter, 863 ) 864 865 dialect = dialect.lower() 866 dialect = DIALECT_ALIASES.get(dialect, dialect) 867 engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter) 868 return SchemaDiffer( 869 **{ 870 **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"), 871 **(overrides or {}), 872 } 873 ) 874 875 876def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]: 877 return struct.this, struct.args["kind"]
25@dataclass(frozen=True) 26class TableAlterOperation(abc.ABC): 27 target_table: exp.Table 28 29 @property 30 @abc.abstractmethod 31 def is_destructive(self) -> bool: 32 pass 33 34 @property 35 @abc.abstractmethod 36 def is_additive(self) -> bool: 37 pass 38 39 @property 40 @abc.abstractmethod 41 def _alter_actions(self) -> t.List[exp.Expr]: 42 pass 43 44 @property 45 def expression(self) -> exp.Alter: 46 return exp.Alter( 47 this=self.target_table, 48 kind="TABLE", 49 actions=self._alter_actions, 50 )
53@dataclass(frozen=True) 54class TableAlterColumnOperation(TableAlterOperation, abc.ABC): 55 column_parts: t.List[TableAlterColumn] 56 expected_table_struct: exp.DataType 57 array_element_selector: str 58 59 @property 60 def column_identifiers(self) -> t.List[exp.Identifier]: 61 results = [] 62 for column in self.column_parts: 63 results.append(column.identifier) 64 if ( 65 column.is_array_of_struct 66 and len(self.column_parts) > 1 67 and self.array_element_selector 68 ): 69 results.append(exp.to_identifier(self.array_element_selector)) 70 return results 71 72 @property 73 def column(self) -> t.Union[exp.Dot, exp.Identifier]: 74 columns = self.column_identifiers 75 if len(columns) == 1: 76 return columns[0] 77 return exp.Dot.build(columns)
59 @property 60 def column_identifiers(self) -> t.List[exp.Identifier]: 61 results = [] 62 for column in self.column_parts: 63 results.append(column.identifier) 64 if ( 65 column.is_array_of_struct 66 and len(self.column_parts) > 1 67 and self.array_element_selector 68 ): 69 results.append(exp.to_identifier(self.array_element_selector)) 70 return results
Inherited Members
80@dataclass(frozen=True) 81class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC): 82 column_type: exp.DataType 83 84 @property 85 def column_def(self) -> exp.ColumnDef: 86 if not self.column_type: 87 raise SQLMeshError("Tried to access column type when it shouldn't be needed") 88 return exp.ColumnDef( 89 this=self.column, 90 kind=self.column_type, 91 )
94@dataclass(frozen=True) 95class TableAlterAddColumnOperation(TableAlterTypedColumnOperation): 96 position: t.Optional[TableAlterColumnPosition] = None 97 is_part_of_destructive_change: bool = False 98 99 @property 100 def is_additive(self) -> bool: 101 return not self.is_part_of_destructive_change 102 103 @property 104 def is_destructive(self) -> bool: 105 return self.is_part_of_destructive_change 106 107 @property 108 def _alter_actions(self) -> t.List[exp.Expr]: 109 column_def = exp.ColumnDef( 110 this=self.column, 111 kind=self.column_type, 112 ) 113 if self.position: 114 column_def.set("position", self.position.column_position_node) 115 return [column_def]
118@dataclass(frozen=True) 119class TableAlterDropColumnOperation(TableAlterColumnOperation): 120 cascade: bool = False 121 122 @property 123 def is_additive(self) -> bool: 124 return False 125 126 @property 127 def is_destructive(self) -> bool: 128 return True 129 130 @property 131 def _alter_actions(self) -> t.List[exp.Expr]: 132 return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)]
135@dataclass(frozen=True) 136class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation): 137 current_type: exp.DataType 138 is_part_of_destructive_change: bool = False 139 140 @property 141 def is_additive(self) -> bool: 142 return not self.is_part_of_destructive_change 143 144 @property 145 def is_destructive(self) -> bool: 146 return self.is_part_of_destructive_change 147 148 @property 149 def _alter_actions(self) -> t.List[exp.Expr]: 150 return [ 151 exp.AlterColumn( 152 this=self.column, 153 dtype=self.column_type, 154 ) 155 ]
158@dataclass(frozen=True) 159class TableAlterColumn: 160 name: str 161 is_struct: bool 162 is_array_of_struct: bool 163 is_array_of_primitive: bool 164 quoted: bool = False 165 166 @classmethod 167 def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 168 return cls( 169 name=name, 170 is_struct=False, 171 is_array_of_struct=False, 172 is_array_of_primitive=False, 173 quoted=quoted, 174 ) 175 176 @classmethod 177 def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 178 return cls( 179 name=name, 180 is_struct=True, 181 is_array_of_struct=False, 182 is_array_of_primitive=False, 183 quoted=quoted, 184 ) 185 186 @classmethod 187 def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 188 return cls( 189 name=name, 190 is_struct=False, 191 is_array_of_struct=True, 192 is_array_of_primitive=False, 193 quoted=quoted, 194 ) 195 196 @classmethod 197 def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 198 return cls( 199 name=name, 200 is_struct=False, 201 is_array_of_struct=False, 202 is_array_of_primitive=True, 203 quoted=quoted, 204 ) 205 206 @classmethod 207 def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn: 208 name = struct.alias_or_name 209 quoted = struct.this.quoted 210 kwarg_type = struct.args["kind"] 211 212 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 213 return cls.struct(name, quoted=quoted) 214 if kwarg_type.is_type(exp.DataType.Type.ARRAY): 215 if kwarg_type.expressions and kwarg_type.expressions[0].is_type( 216 exp.DataType.Type.STRUCT 217 ): 218 return cls.array_of_struct(name, quoted=quoted) 219 return cls.array_of_primitive(name, quoted=quoted) 220 return cls.primitive(name, quoted=quoted) 221 222 @property 223 def is_array(self) -> bool: 224 return self.is_array_of_struct or self.is_array_of_primitive 225 226 @property 227 def is_primitive(self) -> bool: 228 return not self.is_struct and not self.is_array 229 230 @property 231 def is_nested(self) -> bool: 232 return not self.is_primitive 233 234 @property 235 def identifier(self) -> exp.Identifier: 236 return exp.to_identifier(self.name, quoted=self.quoted)
206 @classmethod 207 def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn: 208 name = struct.alias_or_name 209 quoted = struct.this.quoted 210 kwarg_type = struct.args["kind"] 211 212 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 213 return cls.struct(name, quoted=quoted) 214 if kwarg_type.is_type(exp.DataType.Type.ARRAY): 215 if kwarg_type.expressions and kwarg_type.expressions[0].is_type( 216 exp.DataType.Type.STRUCT 217 ): 218 return cls.array_of_struct(name, quoted=quoted) 219 return cls.array_of_primitive(name, quoted=quoted) 220 return cls.primitive(name, quoted=quoted)
239@dataclass(frozen=True) 240class TableAlterColumnPosition: 241 is_first: bool 242 is_last: bool 243 after: t.Optional[exp.Identifier] = None 244 245 @classmethod 246 def first(cls) -> TableAlterColumnPosition: 247 return cls(is_first=True, is_last=False, after=None) 248 249 @classmethod 250 def last( 251 cls, after: t.Optional[t.Union[str, exp.Identifier]] = None 252 ) -> TableAlterColumnPosition: 253 return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None) 254 255 @classmethod 256 def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition: 257 return cls(is_first=False, is_last=False, after=exp.to_identifier(after)) 258 259 @classmethod 260 def create( 261 cls, 262 pos: int, 263 current_kwargs: t.List[exp.ColumnDef], 264 replacing_col: bool = False, 265 ) -> TableAlterColumnPosition: 266 is_first = pos == 0 267 is_last = pos == len(current_kwargs) - int(replacing_col) 268 after = None 269 if not is_first: 270 prior_kwarg = current_kwargs[pos - 1] 271 after, _ = _get_name_and_type(prior_kwarg) 272 return cls(is_first=is_first, is_last=is_last, after=after) 273 274 @property 275 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 276 column = self.after if not self.is_last else None 277 position = None 278 if self.is_first: 279 position = "FIRST" 280 elif column and not self.is_last: 281 position = "AFTER" 282 return exp.ColumnPosition(this=column, position=position)
259 @classmethod 260 def create( 261 cls, 262 pos: int, 263 current_kwargs: t.List[exp.ColumnDef], 264 replacing_col: bool = False, 265 ) -> TableAlterColumnPosition: 266 is_first = pos == 0 267 is_last = pos == len(current_kwargs) - int(replacing_col) 268 after = None 269 if not is_first: 270 prior_kwarg = current_kwargs[pos - 1] 271 after, _ = _get_name_and_type(prior_kwarg) 272 return cls(is_first=is_first, is_last=is_last, after=after)
274 @property 275 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 276 column = self.after if not self.is_last else None 277 position = None 278 if self.is_first: 279 position = "FIRST" 280 elif column and not self.is_last: 281 position = "AFTER" 282 return exp.ColumnPosition(this=column, position=position)
285class NestedSupport(str, Enum): 286 # Supports all nested data type operations 287 ALL = "ALL" 288 # Does not support any nested data type operations 289 NONE = "NONE" 290 # Supports nested data type operations except for those that require dropping a nested field 291 ALL_BUT_DROP = "ALL_BUT_DROP" 292 # Ignores all nested data type operations 293 IGNORE = "IGNORE" 294 295 @property 296 def is_all(self) -> bool: 297 return self == NestedSupport.ALL 298 299 @property 300 def is_none(self) -> bool: 301 return self == NestedSupport.NONE 302 303 @property 304 def is_all_but_drop(self) -> bool: 305 return self == NestedSupport.ALL_BUT_DROP 306 307 @property 308 def is_ignore(self) -> bool: 309 return self == NestedSupport.IGNORE
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
312class SchemaDiffer(PydanticModel): 313 """ 314 Compares a source schema against a target schema and returns a list of alter statements to have the source 315 match the structure of target. Some engines have constraints on the types of operations that can be performed 316 therefore the final structure may not match the target exactly but it will be as close as possible. Two potential 317 differences that can happen: 318 1. Column order can be different if the engine doesn't support positional additions. Another reason for difference 319 is if a column is just moved since we don't currently support fixing moves. 320 2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested 321 operations. As a result historical data is lost. 322 3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible 323 change. As a result historical data is lost. 324 325 Potential future improvements: 326 1. Support column moves. Databricks Delta supports moves and would allow exact matches. 327 328 Args: 329 support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a 330 specific position in the set of existing columns. 331 nested_support: How the engine for which the diff is being computed supports nested types. 332 compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data 333 type, and value is the set of types that are compatible with it. 334 coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without 335 altering the column type. NOTE: usually callers should not specify this attribute manually and set the 336 `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules. 337 For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted 338 into a FLOAT64 column just fine. 339 support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct 340 coercion of compatible types. 341 drop_cascade: Whether to add CASCADE modifier when dropping a column. 342 parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type, 343 but in the engine adapter specification we build it from the dialect string instead of specifying it directly. 344 Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT` 345 to which it parses. We do that because parameter default replacement will silently break if we specify type 346 directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default 347 values in a list, where the list index contains the remaining defaults given the number of parameter values 348 provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two 349 omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return 350 index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)". 351 max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`. 352 types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly 353 parameterized type can ALTER to its unlimited length version, along with different types in some engines. 354 treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it 355 concludes the change is compatible and won't result in data loss. If this flag is set to True, it will 356 flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't 357 be set outside of that context. 358 """ 359 360 support_positional_add: bool = False 361 nested_support: NestedSupport = NestedSupport.NONE 362 array_element_selector: str = "" 363 compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 364 coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field( 365 default_factory=dict, alias="coerceable_types" 366 ) 367 precision_increase_allowed_types: t.Optional[t.Set[exp.DType]] = None 368 support_coercing_compatible_types: bool = False 369 drop_cascade: bool = False 370 parameterized_type_defaults: t.Dict[exp.DType, t.List[t.Tuple[t.Union[int, float], ...]]] = {} 371 max_parameter_length: t.Dict[exp.DType, t.Union[int, float]] = {} 372 types_with_unlimited_length: t.Dict[exp.DType, t.Set[exp.DType]] = {} 373 treat_alter_data_type_as_destructive: bool = False 374 375 _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 376 377 @property 378 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 379 if not self._coerceable_types: 380 if not self.support_coercing_compatible_types or not self.compatible_types: 381 return self.coerceable_types_ 382 coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set) 383 coerceable_types.update(self.coerceable_types_) 384 for source_type, target_types in self.compatible_types.items(): 385 for target_type in target_types: 386 coerceable_types[target_type].add(source_type) 387 self._coerceable_types = coerceable_types 388 return self._coerceable_types 389 390 def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 391 # types are identical or both types are parameterized and new has higher precision 392 # - default parameter values are automatically provided if not present 393 if current_type == new_type or ( 394 self._is_precision_increase_allowed(current_type) 395 and self._is_precision_increase(current_type, new_type) 396 ): 397 return True 398 # types are un-parameterized and compatible 399 if current_type in self.compatible_types: 400 return new_type in self.compatible_types[current_type] 401 # new type is un-parameterized and has unlimited length, current type is compatible 402 if not new_type.expressions and new_type.this in self.types_with_unlimited_length: 403 return current_type.this in self.types_with_unlimited_length[new_type.this] 404 return False 405 406 def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 407 if current_type in self.coerceable_types: 408 is_coerceable = new_type in self.coerceable_types[current_type] 409 if is_coerceable: 410 from sqlmesh.core.console import get_console 411 412 get_console().log_warning( 413 f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning." 414 ) 415 416 return is_coerceable 417 return False 418 419 def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool: 420 return ( 421 self.precision_increase_allowed_types is None 422 or current_type.this in self.precision_increase_allowed_types 423 ) 424 425 def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 426 if current_type.this == new_type.this and not current_type.is_type( 427 *exp.DataType.NESTED_TYPES 428 ): 429 current_params = self.get_type_parameters(current_type) 430 new_params = self.get_type_parameters(new_type) 431 432 if len(current_params) != len(new_params): 433 return False 434 435 return all(new >= current for current, new in zip(current_params, new_params)) 436 return False 437 438 def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]: 439 def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]: 440 try: 441 return int(string) 442 except ValueError: 443 try: 444 return float(string) 445 except ValueError: 446 if allows_max_param and string.upper() == "MAX": 447 return self.max_parameter_length[type.this] 448 raise ValueError(f"Could not convert '{string}' to a number") 449 450 # extract existing parameters 451 params = [ 452 _str_to_number(param.this.this, type.this in self.max_parameter_length) 453 for param in type.expressions 454 ] 455 456 # maybe get default parameter values 457 param_defaults: t.Tuple[t.Union[int, float], ...] = () 458 if type.this in self.parameterized_type_defaults: 459 param_defaults_list = self.parameterized_type_defaults[type.this] 460 if len(params) < len(param_defaults_list): 461 param_defaults = param_defaults_list[len(params)] 462 463 return [*params, *param_defaults] 464 465 def _get_matching_kwarg( 466 self, 467 current_kwarg: t.Union[str, exp.ColumnDef], 468 new_struct: exp.DataType, 469 current_pos: int, 470 ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]: 471 current_name = ( 472 exp.to_identifier(current_kwarg) 473 if isinstance(current_kwarg, str) 474 else _get_name_and_type(current_kwarg)[0] 475 ) 476 # First check if we have the same column in the same position to get O(1) complexity 477 new_kwarg = seq_get(new_struct.expressions, current_pos) 478 if new_kwarg: 479 new_name, new_type = _get_name_and_type(new_kwarg) 480 if current_name.this == new_name.this: 481 return current_pos, new_kwarg 482 # If not, check if we have the same column in all positions with O(n) complexity 483 for i, new_kwarg in enumerate(new_struct.expressions): 484 new_name, new_type = _get_name_and_type(new_kwarg) 485 if current_name.this == new_name.this: 486 return i, new_kwarg 487 return None, None 488 489 def _drop_operation( 490 self, 491 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 492 struct: exp.DataType, 493 pos: int, 494 root_struct: exp.DataType, 495 table_name: TableName, 496 ) -> t.List[TableAlterColumnOperation]: 497 columns = ensure_list(columns) 498 operations: t.List[TableAlterColumnOperation] = [] 499 column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos) 500 if column_pos is None or not column_kwarg: 501 raise SQLMeshError( 502 f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. " 503 f"This may indicate a mismatch between the expected and actual table schemas." 504 ) 505 struct.expressions.pop(column_pos) 506 operations.append( 507 TableAlterDropColumnOperation( 508 target_table=exp.to_table(table_name), 509 column_parts=columns, 510 expected_table_struct=root_struct.copy(), 511 cascade=self.drop_cascade, 512 array_element_selector=self.array_element_selector, 513 ) 514 ) 515 return operations 516 517 def _requires_drop_alteration( 518 self, current_struct: exp.DataType, new_struct: exp.DataType 519 ) -> bool: 520 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 521 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 522 if new_pos is None: 523 return True 524 return False 525 526 def _resolve_drop_operation( 527 self, 528 parent_columns: t.List[TableAlterColumn], 529 current_struct: exp.DataType, 530 new_struct: exp.DataType, 531 root_struct: exp.DataType, 532 table_name: TableName, 533 ) -> t.List[TableAlterColumnOperation]: 534 operations = [] 535 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 536 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 537 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 538 if new_pos is None: 539 operations.extend( 540 self._drop_operation( 541 columns, current_struct, current_pos, root_struct, table_name 542 ) 543 ) 544 return operations 545 546 def _add_operation( 547 self, 548 columns: t.List[TableAlterColumn], 549 new_pos: int, 550 new_kwarg: exp.ColumnDef, 551 current_struct: exp.DataType, 552 root_struct: exp.DataType, 553 table_name: TableName, 554 is_part_of_destructive_change: bool = False, 555 ) -> t.List[TableAlterColumnOperation]: 556 if self.support_positional_add: 557 col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) 558 current_struct.expressions.insert(new_pos, new_kwarg) 559 else: 560 col_pos = None 561 current_struct.expressions.append(new_kwarg) 562 return [ 563 TableAlterAddColumnOperation( 564 target_table=exp.to_table(table_name), 565 column_parts=columns, 566 column_type=new_kwarg.args["kind"], 567 expected_table_struct=root_struct.copy(), 568 position=col_pos, 569 is_part_of_destructive_change=is_part_of_destructive_change, 570 array_element_selector=self.array_element_selector, 571 ) 572 ] 573 574 def _resolve_add_operations( 575 self, 576 parent_columns: t.List[TableAlterColumn], 577 current_struct: exp.DataType, 578 new_struct: exp.DataType, 579 root_struct: exp.DataType, 580 table_name: TableName, 581 ) -> t.List[TableAlterColumnOperation]: 582 operations = [] 583 for new_pos, new_kwarg in enumerate(new_struct.expressions): 584 possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos) 585 if possible_current_pos is None: 586 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)] 587 operations.extend( 588 self._add_operation( 589 columns, new_pos, new_kwarg, current_struct, root_struct, table_name 590 ) 591 ) 592 return operations 593 594 def _alter_operation( 595 self, 596 columns: t.List[TableAlterColumn], 597 pos: int, 598 struct: exp.DataType, 599 new_type: exp.DataType, 600 current_type: t.Union[str, exp.DataType], 601 root_struct: exp.DataType, 602 new_kwarg: exp.ColumnDef, 603 table_name: TableName, 604 *, 605 ignore_destructive: bool = False, 606 ignore_additive: bool = False, 607 ) -> t.List[TableAlterColumnOperation]: 608 # We don't copy on purpose here because current_type may need to be mutated inside 609 # _get_operations (struct.expressions.pop and struct.expressions.insert) 610 current_type = exp.DataType.build(current_type, copy=False) 611 if not self.nested_support.is_none: 612 if new_type.this == current_type.this == exp.DataType.Type.STRUCT: 613 if self.nested_support.is_ignore: 614 return [] 615 if self.nested_support.is_all or not self._requires_drop_alteration( 616 current_type, new_type 617 ): 618 return self._get_operations( 619 columns, 620 current_type, 621 new_type, 622 root_struct, 623 table_name, 624 ignore_destructive=ignore_destructive, 625 ignore_additive=ignore_additive, 626 ) 627 628 if new_type.this == current_type.this == exp.DataType.Type.ARRAY: 629 # Some engines (i.e. Snowflake) don't support defining types on arrays 630 if not new_type.expressions or not current_type.expressions: 631 return [] 632 new_array_type = new_type.expressions[0] 633 current_array_type = current_type.expressions[0] 634 if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT: 635 if self.nested_support.is_ignore: 636 return [] 637 if self.nested_support.is_all or not self._requires_drop_alteration( 638 current_array_type, new_array_type 639 ): 640 return self._get_operations( 641 columns, 642 current_array_type, 643 new_array_type, 644 root_struct, 645 table_name, 646 ignore_destructive=ignore_destructive, 647 ignore_additive=ignore_additive, 648 ) 649 if self._is_coerceable_type(current_type, new_type): 650 return [] 651 if self._is_compatible_type(current_type, new_type): 652 if ignore_additive: 653 return [] 654 struct.expressions.pop(pos) 655 struct.expressions.insert(pos, new_kwarg) 656 return [ 657 TableAlterChangeColumnTypeOperation( 658 target_table=exp.to_table(table_name), 659 column_parts=columns, 660 column_type=new_type, 661 current_type=current_type, 662 expected_table_struct=root_struct.copy(), 663 array_element_selector=self.array_element_selector, 664 is_part_of_destructive_change=self.treat_alter_data_type_as_destructive, 665 ) 666 ] 667 if ignore_destructive: 668 return [] 669 return self._drop_operation( 670 columns, 671 root_struct, 672 pos, 673 root_struct, 674 table_name, 675 ) + self._add_operation( 676 columns, 677 pos, 678 new_kwarg, 679 struct, 680 root_struct, 681 table_name, 682 is_part_of_destructive_change=True, 683 ) 684 685 def _resolve_alter_operations( 686 self, 687 parent_columns: t.List[TableAlterColumn], 688 current_struct: exp.DataType, 689 new_struct: exp.DataType, 690 root_struct: exp.DataType, 691 table_name: TableName, 692 *, 693 ignore_destructive: bool = False, 694 ignore_additive: bool = False, 695 ) -> t.List[TableAlterColumnOperation]: 696 operations = [] 697 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 698 _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 699 if new_kwarg is None: 700 if ignore_destructive: 701 continue 702 raise ValueError("Cannot alter a column that is being dropped") 703 _, new_type = _get_name_and_type(new_kwarg) 704 _, current_type = _get_name_and_type(current_kwarg) 705 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 706 if new_type == current_type: 707 continue 708 operations.extend( 709 self._alter_operation( 710 columns, 711 current_pos, 712 current_struct, 713 new_type, 714 current_type, 715 root_struct, 716 new_kwarg, 717 table_name, 718 ignore_destructive=ignore_destructive, 719 ignore_additive=ignore_additive, 720 ) 721 ) 722 return operations 723 724 def _get_operations( 725 self, 726 parent_columns: t.List[TableAlterColumn], 727 current_struct: exp.DataType, 728 new_struct: exp.DataType, 729 root_struct: exp.DataType, 730 table_name: TableName, 731 *, 732 ignore_destructive: bool = False, 733 ignore_additive: bool = False, 734 ) -> t.List[TableAlterColumnOperation]: 735 root_struct = root_struct or current_struct 736 parent_columns = parent_columns or [] 737 operations = [] 738 if not ignore_destructive: 739 operations.extend( 740 self._resolve_drop_operation( 741 parent_columns, current_struct, new_struct, root_struct, table_name 742 ) 743 ) 744 if not ignore_additive: 745 operations.extend( 746 self._resolve_add_operations( 747 parent_columns, current_struct, new_struct, root_struct, table_name 748 ) 749 ) 750 operations.extend( 751 self._resolve_alter_operations( 752 parent_columns, 753 current_struct, 754 new_struct, 755 root_struct, 756 ignore_destructive=ignore_destructive, 757 ignore_additive=ignore_additive, 758 table_name=table_name, 759 ) 760 ) 761 return operations 762 763 def _from_structs( 764 self, 765 current_struct: exp.DataType, 766 new_struct: exp.DataType, 767 table_name: TableName, 768 *, 769 ignore_destructive: bool = False, 770 ignore_additive: bool = False, 771 ) -> t.List[TableAlterColumnOperation]: 772 return self._get_operations( 773 [], 774 current_struct, 775 new_struct, 776 current_struct, 777 table_name=table_name, 778 ignore_destructive=ignore_destructive, 779 ignore_additive=ignore_additive, 780 ) 781 782 def _compare_structs( 783 self, 784 table_name: t.Union[str, exp.Table], 785 current: exp.DataType, 786 new: exp.DataType, 787 *, 788 ignore_destructive: bool = False, 789 ignore_additive: bool = False, 790 ) -> t.List[TableAlterColumnOperation]: 791 return self._from_structs( 792 current, 793 new, 794 table_name=table_name, 795 ignore_destructive=ignore_destructive, 796 ignore_additive=ignore_additive, 797 ) 798 799 def compare_columns( 800 self, 801 table_name: TableName, 802 current: t.Dict[str, exp.DataType], 803 new: t.Dict[str, exp.DataType], 804 *, 805 ignore_destructive: bool = False, 806 ignore_additive: bool = False, 807 ) -> t.List[TableAlterColumnOperation]: 808 return self._compare_structs( 809 table_name, 810 columns_to_types_to_struct(current), 811 columns_to_types_to_struct(new), 812 ignore_destructive=ignore_destructive, 813 ignore_additive=ignore_additive, 814 )
Compares a source schema against a target schema and returns a list of alter statements to have the source match the structure of target. Some engines have constraints on the types of operations that can be performed therefore the final structure may not match the target exactly but it will be as close as possible. Two potential differences that can happen:
- Column order can be different if the engine doesn't support positional additions. Another reason for difference is if a column is just moved since we don't currently support fixing moves.
- Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested operations. As a result historical data is lost.
- Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible change. As a result historical data is lost.
Potential future improvements:
- Support column moves. Databricks Delta supports moves and would allow exact matches.
Arguments:
- support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a specific position in the set of existing columns.
- nested_support: How the engine for which the diff is being computed supports nested types.
- compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data type, and value is the set of types that are compatible with it.
- coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without
altering the column type. NOTE: usually callers should not specify this attribute manually and set the
support_coercing_compatible_typesflag instead. Some engines are inconsistent about their type coercion rules. For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted into a FLOAT64 column just fine. - support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct coercion of compatible types.
- drop_cascade: Whether to add CASCADE modifier when dropping a column.
- parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type,
but in the engine adapter specification we build it from the dialect string instead of specifying it directly.
Example:
exp.DataType.build("STRING", dialect=DIALECT).thisinstead of the underlyingexp.DataType.Type.TEXTto which it parses. We do that because parameter default replacement will silently break if we specify type directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default values in a list, where the list index contains the remaining defaults given the number of parameter values provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two omitted parameters(38, 9)-> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return index 1 value for the one omitted parameters(0,)-> "DECIMAL(10,0)". - max_parameter_length: Numeric parameter values corresponding to "max". Example:
VARCHAR(max)->VARCHAR(65535). - types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly parameterized type can ALTER to its unlimited length version, along with different types in some engines.
- treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it concludes the change is compatible and won't result in data loss. If this flag is set to True, it will flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't be set outside of that context.
377 @property 378 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 379 if not self._coerceable_types: 380 if not self.support_coercing_compatible_types or not self.compatible_types: 381 return self.coerceable_types_ 382 coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set) 383 coerceable_types.update(self.coerceable_types_) 384 for source_type, target_types in self.compatible_types.items(): 385 for target_type in target_types: 386 coerceable_types[target_type].add(source_type) 387 self._coerceable_types = coerceable_types 388 return self._coerceable_types
438 def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]: 439 def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]: 440 try: 441 return int(string) 442 except ValueError: 443 try: 444 return float(string) 445 except ValueError: 446 if allows_max_param and string.upper() == "MAX": 447 return self.max_parameter_length[type.this] 448 raise ValueError(f"Could not convert '{string}' to a number") 449 450 # extract existing parameters 451 params = [ 452 _str_to_number(param.this.this, type.this in self.max_parameter_length) 453 for param in type.expressions 454 ] 455 456 # maybe get default parameter values 457 param_defaults: t.Tuple[t.Union[int, float], ...] = () 458 if type.this in self.parameterized_type_defaults: 459 param_defaults_list = self.parameterized_type_defaults[type.this] 460 if len(params) < len(param_defaults_list): 461 param_defaults = param_defaults_list[len(params)] 462 463 return [*params, *param_defaults]
799 def compare_columns( 800 self, 801 table_name: TableName, 802 current: t.Dict[str, exp.DataType], 803 new: t.Dict[str, exp.DataType], 804 *, 805 ignore_destructive: bool = False, 806 ignore_additive: bool = False, 807 ) -> t.List[TableAlterColumnOperation]: 808 return self._compare_structs( 809 table_name, 810 columns_to_types_to_struct(current), 811 columns_to_types_to_struct(new), 812 ignore_destructive=ignore_destructive, 813 ignore_additive=ignore_additive, 814 )
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
847def get_schema_differ( 848 dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None 849) -> SchemaDiffer: 850 """ 851 Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter. 852 853 Args: 854 dialect: The dialect for which to get the schema differ. 855 overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance. 856 857 Returns: 858 The SchemaDiffer instance configured for the given dialect. 859 """ 860 from sqlmesh.core.engine_adapter import ( 861 DIALECT_TO_ENGINE_ADAPTER, 862 DIALECT_ALIASES, 863 EngineAdapter, 864 ) 865 866 dialect = dialect.lower() 867 dialect = DIALECT_ALIASES.get(dialect, dialect) 868 engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter) 869 return SchemaDiffer( 870 **{ 871 **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"), 872 **(overrides or {}), 873 } 874 )
Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.
Arguments:
- dialect: The dialect for which to get the schema differ.
- overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
Returns:
The SchemaDiffer instance configured for the given dialect.