sqlmesh.core.schema_diff
1from __future__ import annotations 2 3import abc 4import logging 5import typing as t 6from dataclasses import dataclass 7from collections import defaultdict 8from enum import Enum 9 10from pydantic import Field 11from sqlglot import exp 12from sqlglot.helper import ensure_list, seq_get 13 14from sqlmesh.utils import columns_to_types_to_struct 15from sqlmesh.utils.pydantic import PydanticModel 16from sqlmesh.utils.errors import SQLMeshError 17 18if t.TYPE_CHECKING: 19 from sqlmesh.core._typing import TableName 20 21logger = logging.getLogger(__name__) 22 23 24@dataclass(frozen=True) 25class TableAlterOperation(abc.ABC): 26 target_table: exp.Table 27 28 @property 29 @abc.abstractmethod 30 def is_destructive(self) -> bool: 31 pass 32 33 @property 34 @abc.abstractmethod 35 def is_additive(self) -> bool: 36 pass 37 38 @property 39 @abc.abstractmethod 40 def _alter_actions(self) -> t.List[exp.Expression]: 41 pass 42 43 @property 44 def expression(self) -> exp.Alter: 45 return exp.Alter( 46 this=self.target_table, 47 kind="TABLE", 48 actions=self._alter_actions, 49 ) 50 51 52@dataclass(frozen=True) 53class TableAlterColumnOperation(TableAlterOperation, abc.ABC): 54 column_parts: t.List[TableAlterColumn] 55 expected_table_struct: exp.DataType 56 array_element_selector: str 57 58 @property 59 def column_identifiers(self) -> t.List[exp.Identifier]: 60 results = [] 61 for column in self.column_parts: 62 results.append(column.identifier) 63 if ( 64 column.is_array_of_struct 65 and len(self.column_parts) > 1 66 and self.array_element_selector 67 ): 68 results.append(exp.to_identifier(self.array_element_selector)) 69 return results 70 71 @property 72 def column(self) -> t.Union[exp.Dot, exp.Identifier]: 73 columns = self.column_identifiers 74 if len(columns) == 1: 75 return columns[0] 76 return exp.Dot.build(columns) 77 78 79@dataclass(frozen=True) 80class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC): 81 column_type: exp.DataType 82 83 @property 84 def column_def(self) -> exp.ColumnDef: 85 if not self.column_type: 86 raise SQLMeshError("Tried to access column type when it shouldn't be needed") 87 return exp.ColumnDef( 88 this=self.column, 89 kind=self.column_type, 90 ) 91 92 93@dataclass(frozen=True) 94class TableAlterAddColumnOperation(TableAlterTypedColumnOperation): 95 position: t.Optional[TableAlterColumnPosition] = None 96 is_part_of_destructive_change: bool = False 97 98 @property 99 def is_additive(self) -> bool: 100 return not self.is_part_of_destructive_change 101 102 @property 103 def is_destructive(self) -> bool: 104 return self.is_part_of_destructive_change 105 106 @property 107 def _alter_actions(self) -> t.List[exp.Expression]: 108 column_def = exp.ColumnDef( 109 this=self.column, 110 kind=self.column_type, 111 ) 112 if self.position: 113 column_def.set("position", self.position.column_position_node) 114 return [column_def] 115 116 117@dataclass(frozen=True) 118class TableAlterDropColumnOperation(TableAlterColumnOperation): 119 cascade: bool = False 120 121 @property 122 def is_additive(self) -> bool: 123 return False 124 125 @property 126 def is_destructive(self) -> bool: 127 return True 128 129 @property 130 def _alter_actions(self) -> t.List[exp.Expression]: 131 return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)] 132 133 134@dataclass(frozen=True) 135class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation): 136 current_type: exp.DataType 137 is_part_of_destructive_change: bool = False 138 139 @property 140 def is_additive(self) -> bool: 141 return not self.is_part_of_destructive_change 142 143 @property 144 def is_destructive(self) -> bool: 145 return self.is_part_of_destructive_change 146 147 @property 148 def _alter_actions(self) -> t.List[exp.Expression]: 149 return [ 150 exp.AlterColumn( 151 this=self.column, 152 dtype=self.column_type, 153 ) 154 ] 155 156 157@dataclass(frozen=True) 158class TableAlterColumn: 159 name: str 160 is_struct: bool 161 is_array_of_struct: bool 162 is_array_of_primitive: bool 163 quoted: bool = False 164 165 @classmethod 166 def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 167 return cls( 168 name=name, 169 is_struct=False, 170 is_array_of_struct=False, 171 is_array_of_primitive=False, 172 quoted=quoted, 173 ) 174 175 @classmethod 176 def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 177 return cls( 178 name=name, 179 is_struct=True, 180 is_array_of_struct=False, 181 is_array_of_primitive=False, 182 quoted=quoted, 183 ) 184 185 @classmethod 186 def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 187 return cls( 188 name=name, 189 is_struct=False, 190 is_array_of_struct=True, 191 is_array_of_primitive=False, 192 quoted=quoted, 193 ) 194 195 @classmethod 196 def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 197 return cls( 198 name=name, 199 is_struct=False, 200 is_array_of_struct=False, 201 is_array_of_primitive=True, 202 quoted=quoted, 203 ) 204 205 @classmethod 206 def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn: 207 name = struct.alias_or_name 208 quoted = struct.this.quoted 209 kwarg_type = struct.args["kind"] 210 211 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 212 return cls.struct(name, quoted=quoted) 213 if kwarg_type.is_type(exp.DataType.Type.ARRAY): 214 if kwarg_type.expressions and kwarg_type.expressions[0].is_type( 215 exp.DataType.Type.STRUCT 216 ): 217 return cls.array_of_struct(name, quoted=quoted) 218 return cls.array_of_primitive(name, quoted=quoted) 219 return cls.primitive(name, quoted=quoted) 220 221 @property 222 def is_array(self) -> bool: 223 return self.is_array_of_struct or self.is_array_of_primitive 224 225 @property 226 def is_primitive(self) -> bool: 227 return not self.is_struct and not self.is_array 228 229 @property 230 def is_nested(self) -> bool: 231 return not self.is_primitive 232 233 @property 234 def identifier(self) -> exp.Identifier: 235 return exp.to_identifier(self.name, quoted=self.quoted) 236 237 238@dataclass(frozen=True) 239class TableAlterColumnPosition: 240 is_first: bool 241 is_last: bool 242 after: t.Optional[exp.Identifier] = None 243 244 @classmethod 245 def first(cls) -> TableAlterColumnPosition: 246 return cls(is_first=True, is_last=False, after=None) 247 248 @classmethod 249 def last( 250 cls, after: t.Optional[t.Union[str, exp.Identifier]] = None 251 ) -> TableAlterColumnPosition: 252 return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None) 253 254 @classmethod 255 def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition: 256 return cls(is_first=False, is_last=False, after=exp.to_identifier(after)) 257 258 @classmethod 259 def create( 260 cls, 261 pos: int, 262 current_kwargs: t.List[exp.ColumnDef], 263 replacing_col: bool = False, 264 ) -> TableAlterColumnPosition: 265 is_first = pos == 0 266 is_last = pos == len(current_kwargs) - int(replacing_col) 267 after = None 268 if not is_first: 269 prior_kwarg = current_kwargs[pos - 1] 270 after, _ = _get_name_and_type(prior_kwarg) 271 return cls(is_first=is_first, is_last=is_last, after=after) 272 273 @property 274 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 275 column = self.after if not self.is_last else None 276 position = None 277 if self.is_first: 278 position = "FIRST" 279 elif column and not self.is_last: 280 position = "AFTER" 281 return exp.ColumnPosition(this=column, position=position) 282 283 284class NestedSupport(str, Enum): 285 # Supports all nested data type operations 286 ALL = "ALL" 287 # Does not support any nested data type operations 288 NONE = "NONE" 289 # Supports nested data type operations except for those that require dropping a nested field 290 ALL_BUT_DROP = "ALL_BUT_DROP" 291 # Ignores all nested data type operations 292 IGNORE = "IGNORE" 293 294 @property 295 def is_all(self) -> bool: 296 return self == NestedSupport.ALL 297 298 @property 299 def is_none(self) -> bool: 300 return self == NestedSupport.NONE 301 302 @property 303 def is_all_but_drop(self) -> bool: 304 return self == NestedSupport.ALL_BUT_DROP 305 306 @property 307 def is_ignore(self) -> bool: 308 return self == NestedSupport.IGNORE 309 310 311class SchemaDiffer(PydanticModel): 312 """ 313 Compares a source schema against a target schema and returns a list of alter statements to have the source 314 match the structure of target. Some engines have constraints on the types of operations that can be performed 315 therefore the final structure may not match the target exactly but it will be as close as possible. Two potential 316 differences that can happen: 317 1. Column order can be different if the engine doesn't support positional additions. Another reason for difference 318 is if a column is just moved since we don't currently support fixing moves. 319 2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested 320 operations. As a result historical data is lost. 321 3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible 322 change. As a result historical data is lost. 323 324 Potential future improvements: 325 1. Support column moves. Databricks Delta supports moves and would allow exact matches. 326 327 Args: 328 support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a 329 specific position in the set of existing columns. 330 nested_support: How the engine for which the diff is being computed supports nested types. 331 compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data 332 type, and value is the set of types that are compatible with it. 333 coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without 334 altering the column type. NOTE: usually callers should not specify this attribute manually and set the 335 `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules. 336 For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted 337 into a FLOAT64 column just fine. 338 support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct 339 coercion of compatible types. 340 drop_cascade: Whether to add CASCADE modifier when dropping a column. 341 parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type, 342 but in the engine adapter specification we build it from the dialect string instead of specifying it directly. 343 Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT` 344 to which it parses. We do that because parameter default replacement will silently break if we specify type 345 directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default 346 values in a list, where the list index contains the remaining defaults given the number of parameter values 347 provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two 348 omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return 349 index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)". 350 max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`. 351 types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly 352 parameterized type can ALTER to its unlimited length version, along with different types in some engines. 353 treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it 354 concludes the change is compatible and won't result in data loss. If this flag is set to True, it will 355 flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't 356 be set outside of that context. 357 """ 358 359 support_positional_add: bool = False 360 nested_support: NestedSupport = NestedSupport.NONE 361 array_element_selector: str = "" 362 compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 363 coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field( 364 default_factory=dict, alias="coerceable_types" 365 ) 366 precision_increase_allowed_types: t.Optional[t.Set[exp.DataType.Type]] = None 367 support_coercing_compatible_types: bool = False 368 drop_cascade: bool = False 369 parameterized_type_defaults: t.Dict[ 370 exp.DataType.Type, t.List[t.Tuple[t.Union[int, float], ...]] 371 ] = {} 372 max_parameter_length: t.Dict[exp.DataType.Type, t.Union[int, float]] = {} 373 types_with_unlimited_length: t.Dict[exp.DataType.Type, t.Set[exp.DataType.Type]] = {} 374 treat_alter_data_type_as_destructive: bool = False 375 376 _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 377 378 @property 379 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 380 if not self._coerceable_types: 381 if not self.support_coercing_compatible_types or not self.compatible_types: 382 return self.coerceable_types_ 383 coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set) 384 coerceable_types.update(self.coerceable_types_) 385 for source_type, target_types in self.compatible_types.items(): 386 for target_type in target_types: 387 coerceable_types[target_type].add(source_type) 388 self._coerceable_types = coerceable_types 389 return self._coerceable_types 390 391 def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 392 # types are identical or both types are parameterized and new has higher precision 393 # - default parameter values are automatically provided if not present 394 if current_type == new_type or ( 395 self._is_precision_increase_allowed(current_type) 396 and self._is_precision_increase(current_type, new_type) 397 ): 398 return True 399 # types are un-parameterized and compatible 400 if current_type in self.compatible_types: 401 return new_type in self.compatible_types[current_type] 402 # new type is un-parameterized and has unlimited length, current type is compatible 403 if not new_type.expressions and new_type.this in self.types_with_unlimited_length: 404 return current_type.this in self.types_with_unlimited_length[new_type.this] 405 return False 406 407 def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 408 if current_type in self.coerceable_types: 409 is_coerceable = new_type in self.coerceable_types[current_type] 410 if is_coerceable: 411 from sqlmesh.core.console import get_console 412 413 get_console().log_warning( 414 f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning." 415 ) 416 417 return is_coerceable 418 return False 419 420 def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool: 421 return ( 422 self.precision_increase_allowed_types is None 423 or current_type.this in self.precision_increase_allowed_types 424 ) 425 426 def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 427 if current_type.this == new_type.this and not current_type.is_type( 428 *exp.DataType.NESTED_TYPES 429 ): 430 current_params = self.get_type_parameters(current_type) 431 new_params = self.get_type_parameters(new_type) 432 433 if len(current_params) != len(new_params): 434 return False 435 436 return all(new >= current for current, new in zip(current_params, new_params)) 437 return False 438 439 def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]: 440 def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]: 441 try: 442 return int(string) 443 except ValueError: 444 try: 445 return float(string) 446 except ValueError: 447 if allows_max_param and string.upper() == "MAX": 448 return self.max_parameter_length[type.this] 449 raise ValueError(f"Could not convert '{string}' to a number") 450 451 # extract existing parameters 452 params = [ 453 _str_to_number(param.this.this, type.this in self.max_parameter_length) 454 for param in type.expressions 455 ] 456 457 # maybe get default parameter values 458 param_defaults: t.Tuple[t.Union[int, float], ...] = () 459 if type.this in self.parameterized_type_defaults: 460 param_defaults_list = self.parameterized_type_defaults[type.this] 461 if len(params) < len(param_defaults_list): 462 param_defaults = param_defaults_list[len(params)] 463 464 return [*params, *param_defaults] 465 466 def _get_matching_kwarg( 467 self, 468 current_kwarg: t.Union[str, exp.ColumnDef], 469 new_struct: exp.DataType, 470 current_pos: int, 471 ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]: 472 current_name = ( 473 exp.to_identifier(current_kwarg) 474 if isinstance(current_kwarg, str) 475 else _get_name_and_type(current_kwarg)[0] 476 ) 477 # First check if we have the same column in the same position to get O(1) complexity 478 new_kwarg = seq_get(new_struct.expressions, current_pos) 479 if new_kwarg: 480 new_name, new_type = _get_name_and_type(new_kwarg) 481 if current_name.this == new_name.this: 482 return current_pos, new_kwarg 483 # If not, check if we have the same column in all positions with O(n) complexity 484 for i, new_kwarg in enumerate(new_struct.expressions): 485 new_name, new_type = _get_name_and_type(new_kwarg) 486 if current_name.this == new_name.this: 487 return i, new_kwarg 488 return None, None 489 490 def _drop_operation( 491 self, 492 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 493 struct: exp.DataType, 494 pos: int, 495 root_struct: exp.DataType, 496 table_name: TableName, 497 ) -> t.List[TableAlterColumnOperation]: 498 columns = ensure_list(columns) 499 operations: t.List[TableAlterColumnOperation] = [] 500 column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos) 501 if column_pos is None or not column_kwarg: 502 raise SQLMeshError( 503 f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. " 504 f"This may indicate a mismatch between the expected and actual table schemas." 505 ) 506 struct.expressions.pop(column_pos) 507 operations.append( 508 TableAlterDropColumnOperation( 509 target_table=exp.to_table(table_name), 510 column_parts=columns, 511 expected_table_struct=root_struct.copy(), 512 cascade=self.drop_cascade, 513 array_element_selector=self.array_element_selector, 514 ) 515 ) 516 return operations 517 518 def _requires_drop_alteration( 519 self, current_struct: exp.DataType, new_struct: exp.DataType 520 ) -> bool: 521 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 522 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 523 if new_pos is None: 524 return True 525 return False 526 527 def _resolve_drop_operation( 528 self, 529 parent_columns: t.List[TableAlterColumn], 530 current_struct: exp.DataType, 531 new_struct: exp.DataType, 532 root_struct: exp.DataType, 533 table_name: TableName, 534 ) -> t.List[TableAlterColumnOperation]: 535 operations = [] 536 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 537 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 538 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 539 if new_pos is None: 540 operations.extend( 541 self._drop_operation( 542 columns, current_struct, current_pos, root_struct, table_name 543 ) 544 ) 545 return operations 546 547 def _add_operation( 548 self, 549 columns: t.List[TableAlterColumn], 550 new_pos: int, 551 new_kwarg: exp.ColumnDef, 552 current_struct: exp.DataType, 553 root_struct: exp.DataType, 554 table_name: TableName, 555 is_part_of_destructive_change: bool = False, 556 ) -> t.List[TableAlterColumnOperation]: 557 if self.support_positional_add: 558 col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) 559 current_struct.expressions.insert(new_pos, new_kwarg) 560 else: 561 col_pos = None 562 current_struct.expressions.append(new_kwarg) 563 return [ 564 TableAlterAddColumnOperation( 565 target_table=exp.to_table(table_name), 566 column_parts=columns, 567 column_type=new_kwarg.args["kind"], 568 expected_table_struct=root_struct.copy(), 569 position=col_pos, 570 is_part_of_destructive_change=is_part_of_destructive_change, 571 array_element_selector=self.array_element_selector, 572 ) 573 ] 574 575 def _resolve_add_operations( 576 self, 577 parent_columns: t.List[TableAlterColumn], 578 current_struct: exp.DataType, 579 new_struct: exp.DataType, 580 root_struct: exp.DataType, 581 table_name: TableName, 582 ) -> t.List[TableAlterColumnOperation]: 583 operations = [] 584 for new_pos, new_kwarg in enumerate(new_struct.expressions): 585 possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos) 586 if possible_current_pos is None: 587 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)] 588 operations.extend( 589 self._add_operation( 590 columns, new_pos, new_kwarg, current_struct, root_struct, table_name 591 ) 592 ) 593 return operations 594 595 def _alter_operation( 596 self, 597 columns: t.List[TableAlterColumn], 598 pos: int, 599 struct: exp.DataType, 600 new_type: exp.DataType, 601 current_type: t.Union[str, exp.DataType], 602 root_struct: exp.DataType, 603 new_kwarg: exp.ColumnDef, 604 table_name: TableName, 605 *, 606 ignore_destructive: bool = False, 607 ignore_additive: bool = False, 608 ) -> t.List[TableAlterColumnOperation]: 609 # We don't copy on purpose here because current_type may need to be mutated inside 610 # _get_operations (struct.expressions.pop and struct.expressions.insert) 611 current_type = exp.DataType.build(current_type, copy=False) 612 if not self.nested_support.is_none: 613 if new_type.this == current_type.this == exp.DataType.Type.STRUCT: 614 if self.nested_support.is_ignore: 615 return [] 616 if self.nested_support.is_all or not self._requires_drop_alteration( 617 current_type, new_type 618 ): 619 return self._get_operations( 620 columns, 621 current_type, 622 new_type, 623 root_struct, 624 table_name, 625 ignore_destructive=ignore_destructive, 626 ignore_additive=ignore_additive, 627 ) 628 629 if new_type.this == current_type.this == exp.DataType.Type.ARRAY: 630 # Some engines (i.e. Snowflake) don't support defining types on arrays 631 if not new_type.expressions or not current_type.expressions: 632 return [] 633 new_array_type = new_type.expressions[0] 634 current_array_type = current_type.expressions[0] 635 if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT: 636 if self.nested_support.is_ignore: 637 return [] 638 if self.nested_support.is_all or not self._requires_drop_alteration( 639 current_array_type, new_array_type 640 ): 641 return self._get_operations( 642 columns, 643 current_array_type, 644 new_array_type, 645 root_struct, 646 table_name, 647 ignore_destructive=ignore_destructive, 648 ignore_additive=ignore_additive, 649 ) 650 if self._is_coerceable_type(current_type, new_type): 651 return [] 652 if self._is_compatible_type(current_type, new_type): 653 if ignore_additive: 654 return [] 655 struct.expressions.pop(pos) 656 struct.expressions.insert(pos, new_kwarg) 657 return [ 658 TableAlterChangeColumnTypeOperation( 659 target_table=exp.to_table(table_name), 660 column_parts=columns, 661 column_type=new_type, 662 current_type=current_type, 663 expected_table_struct=root_struct.copy(), 664 array_element_selector=self.array_element_selector, 665 is_part_of_destructive_change=self.treat_alter_data_type_as_destructive, 666 ) 667 ] 668 if ignore_destructive: 669 return [] 670 return self._drop_operation( 671 columns, 672 root_struct, 673 pos, 674 root_struct, 675 table_name, 676 ) + self._add_operation( 677 columns, 678 pos, 679 new_kwarg, 680 struct, 681 root_struct, 682 table_name, 683 is_part_of_destructive_change=True, 684 ) 685 686 def _resolve_alter_operations( 687 self, 688 parent_columns: t.List[TableAlterColumn], 689 current_struct: exp.DataType, 690 new_struct: exp.DataType, 691 root_struct: exp.DataType, 692 table_name: TableName, 693 *, 694 ignore_destructive: bool = False, 695 ignore_additive: bool = False, 696 ) -> t.List[TableAlterColumnOperation]: 697 operations = [] 698 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 699 _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 700 if new_kwarg is None: 701 if ignore_destructive: 702 continue 703 raise ValueError("Cannot alter a column that is being dropped") 704 _, new_type = _get_name_and_type(new_kwarg) 705 _, current_type = _get_name_and_type(current_kwarg) 706 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 707 if new_type == current_type: 708 continue 709 operations.extend( 710 self._alter_operation( 711 columns, 712 current_pos, 713 current_struct, 714 new_type, 715 current_type, 716 root_struct, 717 new_kwarg, 718 table_name, 719 ignore_destructive=ignore_destructive, 720 ignore_additive=ignore_additive, 721 ) 722 ) 723 return operations 724 725 def _get_operations( 726 self, 727 parent_columns: t.List[TableAlterColumn], 728 current_struct: exp.DataType, 729 new_struct: exp.DataType, 730 root_struct: exp.DataType, 731 table_name: TableName, 732 *, 733 ignore_destructive: bool = False, 734 ignore_additive: bool = False, 735 ) -> t.List[TableAlterColumnOperation]: 736 root_struct = root_struct or current_struct 737 parent_columns = parent_columns or [] 738 operations = [] 739 if not ignore_destructive: 740 operations.extend( 741 self._resolve_drop_operation( 742 parent_columns, current_struct, new_struct, root_struct, table_name 743 ) 744 ) 745 if not ignore_additive: 746 operations.extend( 747 self._resolve_add_operations( 748 parent_columns, current_struct, new_struct, root_struct, table_name 749 ) 750 ) 751 operations.extend( 752 self._resolve_alter_operations( 753 parent_columns, 754 current_struct, 755 new_struct, 756 root_struct, 757 ignore_destructive=ignore_destructive, 758 ignore_additive=ignore_additive, 759 table_name=table_name, 760 ) 761 ) 762 return operations 763 764 def _from_structs( 765 self, 766 current_struct: exp.DataType, 767 new_struct: exp.DataType, 768 table_name: TableName, 769 *, 770 ignore_destructive: bool = False, 771 ignore_additive: bool = False, 772 ) -> t.List[TableAlterColumnOperation]: 773 return self._get_operations( 774 [], 775 current_struct, 776 new_struct, 777 current_struct, 778 table_name=table_name, 779 ignore_destructive=ignore_destructive, 780 ignore_additive=ignore_additive, 781 ) 782 783 def _compare_structs( 784 self, 785 table_name: t.Union[str, exp.Table], 786 current: exp.DataType, 787 new: exp.DataType, 788 *, 789 ignore_destructive: bool = False, 790 ignore_additive: bool = False, 791 ) -> t.List[TableAlterColumnOperation]: 792 return self._from_structs( 793 current, 794 new, 795 table_name=table_name, 796 ignore_destructive=ignore_destructive, 797 ignore_additive=ignore_additive, 798 ) 799 800 def compare_columns( 801 self, 802 table_name: TableName, 803 current: t.Dict[str, exp.DataType], 804 new: t.Dict[str, exp.DataType], 805 *, 806 ignore_destructive: bool = False, 807 ignore_additive: bool = False, 808 ) -> t.List[TableAlterColumnOperation]: 809 return self._compare_structs( 810 table_name, 811 columns_to_types_to_struct(current), 812 columns_to_types_to_struct(new), 813 ignore_destructive=ignore_destructive, 814 ignore_additive=ignore_additive, 815 ) 816 817 818def has_drop_alteration(alter_operations: t.List[TableAlterOperation]) -> bool: 819 return any(op.is_destructive for op in alter_operations) 820 821 822def has_additive_alteration(alter_operations: t.List[TableAlterOperation]) -> bool: 823 return any(op.is_additive for op in alter_operations) 824 825 826def get_additive_changes( 827 alter_operations: t.List[TableAlterOperation], 828) -> t.List[TableAlterOperation]: 829 return [x for x in alter_operations if x.is_additive] 830 831 832def get_dropped_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]: 833 return [ 834 op.column.alias_or_name 835 for op in alter_expressions 836 if isinstance(op, TableAlterDropColumnOperation) 837 ] 838 839 840def get_additive_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]: 841 return [ 842 op.column.alias_or_name 843 for op in alter_expressions 844 if op.is_additive and isinstance(op, TableAlterColumnOperation) 845 ] 846 847 848def get_schema_differ( 849 dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None 850) -> SchemaDiffer: 851 """ 852 Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter. 853 854 Args: 855 dialect: The dialect for which to get the schema differ. 856 overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance. 857 858 Returns: 859 The SchemaDiffer instance configured for the given dialect. 860 """ 861 from sqlmesh.core.engine_adapter import ( 862 DIALECT_TO_ENGINE_ADAPTER, 863 DIALECT_ALIASES, 864 EngineAdapter, 865 ) 866 867 dialect = dialect.lower() 868 dialect = DIALECT_ALIASES.get(dialect, dialect) 869 engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter) 870 return SchemaDiffer( 871 **{ 872 **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"), 873 **(overrides or {}), 874 } 875 ) 876 877 878def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]: 879 return struct.this, struct.args["kind"]
25@dataclass(frozen=True) 26class TableAlterOperation(abc.ABC): 27 target_table: exp.Table 28 29 @property 30 @abc.abstractmethod 31 def is_destructive(self) -> bool: 32 pass 33 34 @property 35 @abc.abstractmethod 36 def is_additive(self) -> bool: 37 pass 38 39 @property 40 @abc.abstractmethod 41 def _alter_actions(self) -> t.List[exp.Expression]: 42 pass 43 44 @property 45 def expression(self) -> exp.Alter: 46 return exp.Alter( 47 this=self.target_table, 48 kind="TABLE", 49 actions=self._alter_actions, 50 )
53@dataclass(frozen=True) 54class TableAlterColumnOperation(TableAlterOperation, abc.ABC): 55 column_parts: t.List[TableAlterColumn] 56 expected_table_struct: exp.DataType 57 array_element_selector: str 58 59 @property 60 def column_identifiers(self) -> t.List[exp.Identifier]: 61 results = [] 62 for column in self.column_parts: 63 results.append(column.identifier) 64 if ( 65 column.is_array_of_struct 66 and len(self.column_parts) > 1 67 and self.array_element_selector 68 ): 69 results.append(exp.to_identifier(self.array_element_selector)) 70 return results 71 72 @property 73 def column(self) -> t.Union[exp.Dot, exp.Identifier]: 74 columns = self.column_identifiers 75 if len(columns) == 1: 76 return columns[0] 77 return exp.Dot.build(columns)
59 @property 60 def column_identifiers(self) -> t.List[exp.Identifier]: 61 results = [] 62 for column in self.column_parts: 63 results.append(column.identifier) 64 if ( 65 column.is_array_of_struct 66 and len(self.column_parts) > 1 67 and self.array_element_selector 68 ): 69 results.append(exp.to_identifier(self.array_element_selector)) 70 return results
Inherited Members
80@dataclass(frozen=True) 81class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC): 82 column_type: exp.DataType 83 84 @property 85 def column_def(self) -> exp.ColumnDef: 86 if not self.column_type: 87 raise SQLMeshError("Tried to access column type when it shouldn't be needed") 88 return exp.ColumnDef( 89 this=self.column, 90 kind=self.column_type, 91 )
94@dataclass(frozen=True) 95class TableAlterAddColumnOperation(TableAlterTypedColumnOperation): 96 position: t.Optional[TableAlterColumnPosition] = None 97 is_part_of_destructive_change: bool = False 98 99 @property 100 def is_additive(self) -> bool: 101 return not self.is_part_of_destructive_change 102 103 @property 104 def is_destructive(self) -> bool: 105 return self.is_part_of_destructive_change 106 107 @property 108 def _alter_actions(self) -> t.List[exp.Expression]: 109 column_def = exp.ColumnDef( 110 this=self.column, 111 kind=self.column_type, 112 ) 113 if self.position: 114 column_def.set("position", self.position.column_position_node) 115 return [column_def]
118@dataclass(frozen=True) 119class TableAlterDropColumnOperation(TableAlterColumnOperation): 120 cascade: bool = False 121 122 @property 123 def is_additive(self) -> bool: 124 return False 125 126 @property 127 def is_destructive(self) -> bool: 128 return True 129 130 @property 131 def _alter_actions(self) -> t.List[exp.Expression]: 132 return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)]
135@dataclass(frozen=True) 136class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation): 137 current_type: exp.DataType 138 is_part_of_destructive_change: bool = False 139 140 @property 141 def is_additive(self) -> bool: 142 return not self.is_part_of_destructive_change 143 144 @property 145 def is_destructive(self) -> bool: 146 return self.is_part_of_destructive_change 147 148 @property 149 def _alter_actions(self) -> t.List[exp.Expression]: 150 return [ 151 exp.AlterColumn( 152 this=self.column, 153 dtype=self.column_type, 154 ) 155 ]
158@dataclass(frozen=True) 159class TableAlterColumn: 160 name: str 161 is_struct: bool 162 is_array_of_struct: bool 163 is_array_of_primitive: bool 164 quoted: bool = False 165 166 @classmethod 167 def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 168 return cls( 169 name=name, 170 is_struct=False, 171 is_array_of_struct=False, 172 is_array_of_primitive=False, 173 quoted=quoted, 174 ) 175 176 @classmethod 177 def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 178 return cls( 179 name=name, 180 is_struct=True, 181 is_array_of_struct=False, 182 is_array_of_primitive=False, 183 quoted=quoted, 184 ) 185 186 @classmethod 187 def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn: 188 return cls( 189 name=name, 190 is_struct=False, 191 is_array_of_struct=True, 192 is_array_of_primitive=False, 193 quoted=quoted, 194 ) 195 196 @classmethod 197 def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn: 198 return cls( 199 name=name, 200 is_struct=False, 201 is_array_of_struct=False, 202 is_array_of_primitive=True, 203 quoted=quoted, 204 ) 205 206 @classmethod 207 def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn: 208 name = struct.alias_or_name 209 quoted = struct.this.quoted 210 kwarg_type = struct.args["kind"] 211 212 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 213 return cls.struct(name, quoted=quoted) 214 if kwarg_type.is_type(exp.DataType.Type.ARRAY): 215 if kwarg_type.expressions and kwarg_type.expressions[0].is_type( 216 exp.DataType.Type.STRUCT 217 ): 218 return cls.array_of_struct(name, quoted=quoted) 219 return cls.array_of_primitive(name, quoted=quoted) 220 return cls.primitive(name, quoted=quoted) 221 222 @property 223 def is_array(self) -> bool: 224 return self.is_array_of_struct or self.is_array_of_primitive 225 226 @property 227 def is_primitive(self) -> bool: 228 return not self.is_struct and not self.is_array 229 230 @property 231 def is_nested(self) -> bool: 232 return not self.is_primitive 233 234 @property 235 def identifier(self) -> exp.Identifier: 236 return exp.to_identifier(self.name, quoted=self.quoted)
206 @classmethod 207 def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn: 208 name = struct.alias_or_name 209 quoted = struct.this.quoted 210 kwarg_type = struct.args["kind"] 211 212 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 213 return cls.struct(name, quoted=quoted) 214 if kwarg_type.is_type(exp.DataType.Type.ARRAY): 215 if kwarg_type.expressions and kwarg_type.expressions[0].is_type( 216 exp.DataType.Type.STRUCT 217 ): 218 return cls.array_of_struct(name, quoted=quoted) 219 return cls.array_of_primitive(name, quoted=quoted) 220 return cls.primitive(name, quoted=quoted)
239@dataclass(frozen=True) 240class TableAlterColumnPosition: 241 is_first: bool 242 is_last: bool 243 after: t.Optional[exp.Identifier] = None 244 245 @classmethod 246 def first(cls) -> TableAlterColumnPosition: 247 return cls(is_first=True, is_last=False, after=None) 248 249 @classmethod 250 def last( 251 cls, after: t.Optional[t.Union[str, exp.Identifier]] = None 252 ) -> TableAlterColumnPosition: 253 return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None) 254 255 @classmethod 256 def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition: 257 return cls(is_first=False, is_last=False, after=exp.to_identifier(after)) 258 259 @classmethod 260 def create( 261 cls, 262 pos: int, 263 current_kwargs: t.List[exp.ColumnDef], 264 replacing_col: bool = False, 265 ) -> TableAlterColumnPosition: 266 is_first = pos == 0 267 is_last = pos == len(current_kwargs) - int(replacing_col) 268 after = None 269 if not is_first: 270 prior_kwarg = current_kwargs[pos - 1] 271 after, _ = _get_name_and_type(prior_kwarg) 272 return cls(is_first=is_first, is_last=is_last, after=after) 273 274 @property 275 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 276 column = self.after if not self.is_last else None 277 position = None 278 if self.is_first: 279 position = "FIRST" 280 elif column and not self.is_last: 281 position = "AFTER" 282 return exp.ColumnPosition(this=column, position=position)
259 @classmethod 260 def create( 261 cls, 262 pos: int, 263 current_kwargs: t.List[exp.ColumnDef], 264 replacing_col: bool = False, 265 ) -> TableAlterColumnPosition: 266 is_first = pos == 0 267 is_last = pos == len(current_kwargs) - int(replacing_col) 268 after = None 269 if not is_first: 270 prior_kwarg = current_kwargs[pos - 1] 271 after, _ = _get_name_and_type(prior_kwarg) 272 return cls(is_first=is_first, is_last=is_last, after=after)
274 @property 275 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 276 column = self.after if not self.is_last else None 277 position = None 278 if self.is_first: 279 position = "FIRST" 280 elif column and not self.is_last: 281 position = "AFTER" 282 return exp.ColumnPosition(this=column, position=position)
285class NestedSupport(str, Enum): 286 # Supports all nested data type operations 287 ALL = "ALL" 288 # Does not support any nested data type operations 289 NONE = "NONE" 290 # Supports nested data type operations except for those that require dropping a nested field 291 ALL_BUT_DROP = "ALL_BUT_DROP" 292 # Ignores all nested data type operations 293 IGNORE = "IGNORE" 294 295 @property 296 def is_all(self) -> bool: 297 return self == NestedSupport.ALL 298 299 @property 300 def is_none(self) -> bool: 301 return self == NestedSupport.NONE 302 303 @property 304 def is_all_but_drop(self) -> bool: 305 return self == NestedSupport.ALL_BUT_DROP 306 307 @property 308 def is_ignore(self) -> bool: 309 return self == NestedSupport.IGNORE
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
312class SchemaDiffer(PydanticModel): 313 """ 314 Compares a source schema against a target schema and returns a list of alter statements to have the source 315 match the structure of target. Some engines have constraints on the types of operations that can be performed 316 therefore the final structure may not match the target exactly but it will be as close as possible. Two potential 317 differences that can happen: 318 1. Column order can be different if the engine doesn't support positional additions. Another reason for difference 319 is if a column is just moved since we don't currently support fixing moves. 320 2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested 321 operations. As a result historical data is lost. 322 3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible 323 change. As a result historical data is lost. 324 325 Potential future improvements: 326 1. Support column moves. Databricks Delta supports moves and would allow exact matches. 327 328 Args: 329 support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a 330 specific position in the set of existing columns. 331 nested_support: How the engine for which the diff is being computed supports nested types. 332 compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data 333 type, and value is the set of types that are compatible with it. 334 coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without 335 altering the column type. NOTE: usually callers should not specify this attribute manually and set the 336 `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules. 337 For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted 338 into a FLOAT64 column just fine. 339 support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct 340 coercion of compatible types. 341 drop_cascade: Whether to add CASCADE modifier when dropping a column. 342 parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type, 343 but in the engine adapter specification we build it from the dialect string instead of specifying it directly. 344 Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT` 345 to which it parses. We do that because parameter default replacement will silently break if we specify type 346 directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default 347 values in a list, where the list index contains the remaining defaults given the number of parameter values 348 provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two 349 omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return 350 index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)". 351 max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`. 352 types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly 353 parameterized type can ALTER to its unlimited length version, along with different types in some engines. 354 treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it 355 concludes the change is compatible and won't result in data loss. If this flag is set to True, it will 356 flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't 357 be set outside of that context. 358 """ 359 360 support_positional_add: bool = False 361 nested_support: NestedSupport = NestedSupport.NONE 362 array_element_selector: str = "" 363 compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 364 coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field( 365 default_factory=dict, alias="coerceable_types" 366 ) 367 precision_increase_allowed_types: t.Optional[t.Set[exp.DataType.Type]] = None 368 support_coercing_compatible_types: bool = False 369 drop_cascade: bool = False 370 parameterized_type_defaults: t.Dict[ 371 exp.DataType.Type, t.List[t.Tuple[t.Union[int, float], ...]] 372 ] = {} 373 max_parameter_length: t.Dict[exp.DataType.Type, t.Union[int, float]] = {} 374 types_with_unlimited_length: t.Dict[exp.DataType.Type, t.Set[exp.DataType.Type]] = {} 375 treat_alter_data_type_as_destructive: bool = False 376 377 _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 378 379 @property 380 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 381 if not self._coerceable_types: 382 if not self.support_coercing_compatible_types or not self.compatible_types: 383 return self.coerceable_types_ 384 coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set) 385 coerceable_types.update(self.coerceable_types_) 386 for source_type, target_types in self.compatible_types.items(): 387 for target_type in target_types: 388 coerceable_types[target_type].add(source_type) 389 self._coerceable_types = coerceable_types 390 return self._coerceable_types 391 392 def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 393 # types are identical or both types are parameterized and new has higher precision 394 # - default parameter values are automatically provided if not present 395 if current_type == new_type or ( 396 self._is_precision_increase_allowed(current_type) 397 and self._is_precision_increase(current_type, new_type) 398 ): 399 return True 400 # types are un-parameterized and compatible 401 if current_type in self.compatible_types: 402 return new_type in self.compatible_types[current_type] 403 # new type is un-parameterized and has unlimited length, current type is compatible 404 if not new_type.expressions and new_type.this in self.types_with_unlimited_length: 405 return current_type.this in self.types_with_unlimited_length[new_type.this] 406 return False 407 408 def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 409 if current_type in self.coerceable_types: 410 is_coerceable = new_type in self.coerceable_types[current_type] 411 if is_coerceable: 412 from sqlmesh.core.console import get_console 413 414 get_console().log_warning( 415 f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning." 416 ) 417 418 return is_coerceable 419 return False 420 421 def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool: 422 return ( 423 self.precision_increase_allowed_types is None 424 or current_type.this in self.precision_increase_allowed_types 425 ) 426 427 def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 428 if current_type.this == new_type.this and not current_type.is_type( 429 *exp.DataType.NESTED_TYPES 430 ): 431 current_params = self.get_type_parameters(current_type) 432 new_params = self.get_type_parameters(new_type) 433 434 if len(current_params) != len(new_params): 435 return False 436 437 return all(new >= current for current, new in zip(current_params, new_params)) 438 return False 439 440 def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]: 441 def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]: 442 try: 443 return int(string) 444 except ValueError: 445 try: 446 return float(string) 447 except ValueError: 448 if allows_max_param and string.upper() == "MAX": 449 return self.max_parameter_length[type.this] 450 raise ValueError(f"Could not convert '{string}' to a number") 451 452 # extract existing parameters 453 params = [ 454 _str_to_number(param.this.this, type.this in self.max_parameter_length) 455 for param in type.expressions 456 ] 457 458 # maybe get default parameter values 459 param_defaults: t.Tuple[t.Union[int, float], ...] = () 460 if type.this in self.parameterized_type_defaults: 461 param_defaults_list = self.parameterized_type_defaults[type.this] 462 if len(params) < len(param_defaults_list): 463 param_defaults = param_defaults_list[len(params)] 464 465 return [*params, *param_defaults] 466 467 def _get_matching_kwarg( 468 self, 469 current_kwarg: t.Union[str, exp.ColumnDef], 470 new_struct: exp.DataType, 471 current_pos: int, 472 ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]: 473 current_name = ( 474 exp.to_identifier(current_kwarg) 475 if isinstance(current_kwarg, str) 476 else _get_name_and_type(current_kwarg)[0] 477 ) 478 # First check if we have the same column in the same position to get O(1) complexity 479 new_kwarg = seq_get(new_struct.expressions, current_pos) 480 if new_kwarg: 481 new_name, new_type = _get_name_and_type(new_kwarg) 482 if current_name.this == new_name.this: 483 return current_pos, new_kwarg 484 # If not, check if we have the same column in all positions with O(n) complexity 485 for i, new_kwarg in enumerate(new_struct.expressions): 486 new_name, new_type = _get_name_and_type(new_kwarg) 487 if current_name.this == new_name.this: 488 return i, new_kwarg 489 return None, None 490 491 def _drop_operation( 492 self, 493 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 494 struct: exp.DataType, 495 pos: int, 496 root_struct: exp.DataType, 497 table_name: TableName, 498 ) -> t.List[TableAlterColumnOperation]: 499 columns = ensure_list(columns) 500 operations: t.List[TableAlterColumnOperation] = [] 501 column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos) 502 if column_pos is None or not column_kwarg: 503 raise SQLMeshError( 504 f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. " 505 f"This may indicate a mismatch between the expected and actual table schemas." 506 ) 507 struct.expressions.pop(column_pos) 508 operations.append( 509 TableAlterDropColumnOperation( 510 target_table=exp.to_table(table_name), 511 column_parts=columns, 512 expected_table_struct=root_struct.copy(), 513 cascade=self.drop_cascade, 514 array_element_selector=self.array_element_selector, 515 ) 516 ) 517 return operations 518 519 def _requires_drop_alteration( 520 self, current_struct: exp.DataType, new_struct: exp.DataType 521 ) -> bool: 522 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 523 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 524 if new_pos is None: 525 return True 526 return False 527 528 def _resolve_drop_operation( 529 self, 530 parent_columns: t.List[TableAlterColumn], 531 current_struct: exp.DataType, 532 new_struct: exp.DataType, 533 root_struct: exp.DataType, 534 table_name: TableName, 535 ) -> t.List[TableAlterColumnOperation]: 536 operations = [] 537 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 538 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 539 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 540 if new_pos is None: 541 operations.extend( 542 self._drop_operation( 543 columns, current_struct, current_pos, root_struct, table_name 544 ) 545 ) 546 return operations 547 548 def _add_operation( 549 self, 550 columns: t.List[TableAlterColumn], 551 new_pos: int, 552 new_kwarg: exp.ColumnDef, 553 current_struct: exp.DataType, 554 root_struct: exp.DataType, 555 table_name: TableName, 556 is_part_of_destructive_change: bool = False, 557 ) -> t.List[TableAlterColumnOperation]: 558 if self.support_positional_add: 559 col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) 560 current_struct.expressions.insert(new_pos, new_kwarg) 561 else: 562 col_pos = None 563 current_struct.expressions.append(new_kwarg) 564 return [ 565 TableAlterAddColumnOperation( 566 target_table=exp.to_table(table_name), 567 column_parts=columns, 568 column_type=new_kwarg.args["kind"], 569 expected_table_struct=root_struct.copy(), 570 position=col_pos, 571 is_part_of_destructive_change=is_part_of_destructive_change, 572 array_element_selector=self.array_element_selector, 573 ) 574 ] 575 576 def _resolve_add_operations( 577 self, 578 parent_columns: t.List[TableAlterColumn], 579 current_struct: exp.DataType, 580 new_struct: exp.DataType, 581 root_struct: exp.DataType, 582 table_name: TableName, 583 ) -> t.List[TableAlterColumnOperation]: 584 operations = [] 585 for new_pos, new_kwarg in enumerate(new_struct.expressions): 586 possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos) 587 if possible_current_pos is None: 588 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)] 589 operations.extend( 590 self._add_operation( 591 columns, new_pos, new_kwarg, current_struct, root_struct, table_name 592 ) 593 ) 594 return operations 595 596 def _alter_operation( 597 self, 598 columns: t.List[TableAlterColumn], 599 pos: int, 600 struct: exp.DataType, 601 new_type: exp.DataType, 602 current_type: t.Union[str, exp.DataType], 603 root_struct: exp.DataType, 604 new_kwarg: exp.ColumnDef, 605 table_name: TableName, 606 *, 607 ignore_destructive: bool = False, 608 ignore_additive: bool = False, 609 ) -> t.List[TableAlterColumnOperation]: 610 # We don't copy on purpose here because current_type may need to be mutated inside 611 # _get_operations (struct.expressions.pop and struct.expressions.insert) 612 current_type = exp.DataType.build(current_type, copy=False) 613 if not self.nested_support.is_none: 614 if new_type.this == current_type.this == exp.DataType.Type.STRUCT: 615 if self.nested_support.is_ignore: 616 return [] 617 if self.nested_support.is_all or not self._requires_drop_alteration( 618 current_type, new_type 619 ): 620 return self._get_operations( 621 columns, 622 current_type, 623 new_type, 624 root_struct, 625 table_name, 626 ignore_destructive=ignore_destructive, 627 ignore_additive=ignore_additive, 628 ) 629 630 if new_type.this == current_type.this == exp.DataType.Type.ARRAY: 631 # Some engines (i.e. Snowflake) don't support defining types on arrays 632 if not new_type.expressions or not current_type.expressions: 633 return [] 634 new_array_type = new_type.expressions[0] 635 current_array_type = current_type.expressions[0] 636 if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT: 637 if self.nested_support.is_ignore: 638 return [] 639 if self.nested_support.is_all or not self._requires_drop_alteration( 640 current_array_type, new_array_type 641 ): 642 return self._get_operations( 643 columns, 644 current_array_type, 645 new_array_type, 646 root_struct, 647 table_name, 648 ignore_destructive=ignore_destructive, 649 ignore_additive=ignore_additive, 650 ) 651 if self._is_coerceable_type(current_type, new_type): 652 return [] 653 if self._is_compatible_type(current_type, new_type): 654 if ignore_additive: 655 return [] 656 struct.expressions.pop(pos) 657 struct.expressions.insert(pos, new_kwarg) 658 return [ 659 TableAlterChangeColumnTypeOperation( 660 target_table=exp.to_table(table_name), 661 column_parts=columns, 662 column_type=new_type, 663 current_type=current_type, 664 expected_table_struct=root_struct.copy(), 665 array_element_selector=self.array_element_selector, 666 is_part_of_destructive_change=self.treat_alter_data_type_as_destructive, 667 ) 668 ] 669 if ignore_destructive: 670 return [] 671 return self._drop_operation( 672 columns, 673 root_struct, 674 pos, 675 root_struct, 676 table_name, 677 ) + self._add_operation( 678 columns, 679 pos, 680 new_kwarg, 681 struct, 682 root_struct, 683 table_name, 684 is_part_of_destructive_change=True, 685 ) 686 687 def _resolve_alter_operations( 688 self, 689 parent_columns: t.List[TableAlterColumn], 690 current_struct: exp.DataType, 691 new_struct: exp.DataType, 692 root_struct: exp.DataType, 693 table_name: TableName, 694 *, 695 ignore_destructive: bool = False, 696 ignore_additive: bool = False, 697 ) -> t.List[TableAlterColumnOperation]: 698 operations = [] 699 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 700 _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 701 if new_kwarg is None: 702 if ignore_destructive: 703 continue 704 raise ValueError("Cannot alter a column that is being dropped") 705 _, new_type = _get_name_and_type(new_kwarg) 706 _, current_type = _get_name_and_type(current_kwarg) 707 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 708 if new_type == current_type: 709 continue 710 operations.extend( 711 self._alter_operation( 712 columns, 713 current_pos, 714 current_struct, 715 new_type, 716 current_type, 717 root_struct, 718 new_kwarg, 719 table_name, 720 ignore_destructive=ignore_destructive, 721 ignore_additive=ignore_additive, 722 ) 723 ) 724 return operations 725 726 def _get_operations( 727 self, 728 parent_columns: t.List[TableAlterColumn], 729 current_struct: exp.DataType, 730 new_struct: exp.DataType, 731 root_struct: exp.DataType, 732 table_name: TableName, 733 *, 734 ignore_destructive: bool = False, 735 ignore_additive: bool = False, 736 ) -> t.List[TableAlterColumnOperation]: 737 root_struct = root_struct or current_struct 738 parent_columns = parent_columns or [] 739 operations = [] 740 if not ignore_destructive: 741 operations.extend( 742 self._resolve_drop_operation( 743 parent_columns, current_struct, new_struct, root_struct, table_name 744 ) 745 ) 746 if not ignore_additive: 747 operations.extend( 748 self._resolve_add_operations( 749 parent_columns, current_struct, new_struct, root_struct, table_name 750 ) 751 ) 752 operations.extend( 753 self._resolve_alter_operations( 754 parent_columns, 755 current_struct, 756 new_struct, 757 root_struct, 758 ignore_destructive=ignore_destructive, 759 ignore_additive=ignore_additive, 760 table_name=table_name, 761 ) 762 ) 763 return operations 764 765 def _from_structs( 766 self, 767 current_struct: exp.DataType, 768 new_struct: exp.DataType, 769 table_name: TableName, 770 *, 771 ignore_destructive: bool = False, 772 ignore_additive: bool = False, 773 ) -> t.List[TableAlterColumnOperation]: 774 return self._get_operations( 775 [], 776 current_struct, 777 new_struct, 778 current_struct, 779 table_name=table_name, 780 ignore_destructive=ignore_destructive, 781 ignore_additive=ignore_additive, 782 ) 783 784 def _compare_structs( 785 self, 786 table_name: t.Union[str, exp.Table], 787 current: exp.DataType, 788 new: exp.DataType, 789 *, 790 ignore_destructive: bool = False, 791 ignore_additive: bool = False, 792 ) -> t.List[TableAlterColumnOperation]: 793 return self._from_structs( 794 current, 795 new, 796 table_name=table_name, 797 ignore_destructive=ignore_destructive, 798 ignore_additive=ignore_additive, 799 ) 800 801 def compare_columns( 802 self, 803 table_name: TableName, 804 current: t.Dict[str, exp.DataType], 805 new: t.Dict[str, exp.DataType], 806 *, 807 ignore_destructive: bool = False, 808 ignore_additive: bool = False, 809 ) -> t.List[TableAlterColumnOperation]: 810 return self._compare_structs( 811 table_name, 812 columns_to_types_to_struct(current), 813 columns_to_types_to_struct(new), 814 ignore_destructive=ignore_destructive, 815 ignore_additive=ignore_additive, 816 )
Compares a source schema against a target schema and returns a list of alter statements to have the source match the structure of target. Some engines have constraints on the types of operations that can be performed therefore the final structure may not match the target exactly but it will be as close as possible. Two potential differences that can happen:
- Column order can be different if the engine doesn't support positional additions. Another reason for difference is if a column is just moved since we don't currently support fixing moves.
- Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested operations. As a result historical data is lost.
- Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible change. As a result historical data is lost.
Potential future improvements:
- Support column moves. Databricks Delta supports moves and would allow exact matches.
Arguments:
- support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a specific position in the set of existing columns.
- nested_support: How the engine for which the diff is being computed supports nested types.
- compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data type, and value is the set of types that are compatible with it.
- coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without
altering the column type. NOTE: usually callers should not specify this attribute manually and set the
support_coercing_compatible_typesflag instead. Some engines are inconsistent about their type coercion rules. For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted into a FLOAT64 column just fine. - support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct coercion of compatible types.
- drop_cascade: Whether to add CASCADE modifier when dropping a column.
- parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type,
but in the engine adapter specification we build it from the dialect string instead of specifying it directly.
Example:
exp.DataType.build("STRING", dialect=DIALECT).thisinstead of the underlyingexp.DataType.Type.TEXTto which it parses. We do that because parameter default replacement will silently break if we specify type directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default values in a list, where the list index contains the remaining defaults given the number of parameter values provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two omitted parameters(38, 9)-> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return index 1 value for the one omitted parameters(0,)-> "DECIMAL(10,0)". - max_parameter_length: Numeric parameter values corresponding to "max". Example:
VARCHAR(max)->VARCHAR(65535). - types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly parameterized type can ALTER to its unlimited length version, along with different types in some engines.
- treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it concludes the change is compatible and won't result in data loss. If this flag is set to True, it will flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't be set outside of that context.
379 @property 380 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 381 if not self._coerceable_types: 382 if not self.support_coercing_compatible_types or not self.compatible_types: 383 return self.coerceable_types_ 384 coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set) 385 coerceable_types.update(self.coerceable_types_) 386 for source_type, target_types in self.compatible_types.items(): 387 for target_type in target_types: 388 coerceable_types[target_type].add(source_type) 389 self._coerceable_types = coerceable_types 390 return self._coerceable_types
440 def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]: 441 def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]: 442 try: 443 return int(string) 444 except ValueError: 445 try: 446 return float(string) 447 except ValueError: 448 if allows_max_param and string.upper() == "MAX": 449 return self.max_parameter_length[type.this] 450 raise ValueError(f"Could not convert '{string}' to a number") 451 452 # extract existing parameters 453 params = [ 454 _str_to_number(param.this.this, type.this in self.max_parameter_length) 455 for param in type.expressions 456 ] 457 458 # maybe get default parameter values 459 param_defaults: t.Tuple[t.Union[int, float], ...] = () 460 if type.this in self.parameterized_type_defaults: 461 param_defaults_list = self.parameterized_type_defaults[type.this] 462 if len(params) < len(param_defaults_list): 463 param_defaults = param_defaults_list[len(params)] 464 465 return [*params, *param_defaults]
801 def compare_columns( 802 self, 803 table_name: TableName, 804 current: t.Dict[str, exp.DataType], 805 new: t.Dict[str, exp.DataType], 806 *, 807 ignore_destructive: bool = False, 808 ignore_additive: bool = False, 809 ) -> t.List[TableAlterColumnOperation]: 810 return self._compare_structs( 811 table_name, 812 columns_to_types_to_struct(current), 813 columns_to_types_to_struct(new), 814 ignore_destructive=ignore_destructive, 815 ignore_additive=ignore_additive, 816 )
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
849def get_schema_differ( 850 dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None 851) -> SchemaDiffer: 852 """ 853 Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter. 854 855 Args: 856 dialect: The dialect for which to get the schema differ. 857 overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance. 858 859 Returns: 860 The SchemaDiffer instance configured for the given dialect. 861 """ 862 from sqlmesh.core.engine_adapter import ( 863 DIALECT_TO_ENGINE_ADAPTER, 864 DIALECT_ALIASES, 865 EngineAdapter, 866 ) 867 868 dialect = dialect.lower() 869 dialect = DIALECT_ALIASES.get(dialect, dialect) 870 engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter) 871 return SchemaDiffer( 872 **{ 873 **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"), 874 **(overrides or {}), 875 } 876 )
Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.
Arguments:
- dialect: The dialect for which to get the schema differ.
- overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
Returns:
The SchemaDiffer instance configured for the given dialect.