sqlmesh.core.schema_diff
1from __future__ import annotations 2 3import logging 4import typing as t 5from collections import defaultdict 6from enum import Enum, auto 7 8from sqlglot import exp 9from sqlglot.helper import ensure_list, seq_get 10 11from sqlmesh.utils import columns_to_types_to_struct 12from sqlmesh.utils.pydantic import PydanticModel 13 14if t.TYPE_CHECKING: 15 from sqlmesh.core._typing import TableName 16 17logger = logging.getLogger(__name__) 18 19 20class TableAlterOperationType(Enum): 21 ADD = auto() 22 DROP = auto() 23 ALTER_TYPE = auto() 24 25 @property 26 def is_add(self) -> bool: 27 return self == TableAlterOperationType.ADD 28 29 @property 30 def is_drop(self) -> bool: 31 return self == TableAlterOperationType.DROP 32 33 @property 34 def is_alter_type(self) -> bool: 35 return self == TableAlterOperationType.ALTER_TYPE 36 37 38class TableAlterColumn(PydanticModel): 39 name: str 40 is_struct: bool 41 is_array_of_struct: bool 42 is_array_of_primitive: bool 43 quoted: bool = False 44 45 @classmethod 46 def primitive(self, name: str, quoted: bool = False) -> TableAlterColumn: 47 return self( 48 name=name, 49 is_struct=False, 50 is_array_of_struct=False, 51 is_array_of_primitive=False, 52 quoted=quoted, 53 ) 54 55 @classmethod 56 def struct(self, name: str, quoted: bool = False) -> TableAlterColumn: 57 return self( 58 name=name, 59 is_struct=True, 60 is_array_of_struct=False, 61 is_array_of_primitive=False, 62 quoted=quoted, 63 ) 64 65 @classmethod 66 def array_of_struct(self, name: str, quoted: bool = False) -> TableAlterColumn: 67 return self( 68 name=name, 69 is_struct=False, 70 is_array_of_struct=True, 71 is_array_of_primitive=False, 72 quoted=quoted, 73 ) 74 75 @classmethod 76 def array_of_primitive(self, name: str, quoted: bool = False) -> TableAlterColumn: 77 return self( 78 name=name, 79 is_struct=False, 80 is_array_of_struct=False, 81 is_array_of_primitive=True, 82 quoted=quoted, 83 ) 84 85 @classmethod 86 def from_struct_kwarg(self, struct: exp.ColumnDef) -> TableAlterColumn: 87 name = struct.alias_or_name 88 quoted = struct.this.quoted 89 kwarg_type = struct.args["kind"] 90 91 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 92 return self.struct(name, quoted=quoted) 93 elif kwarg_type.is_type(exp.DataType.Type.ARRAY): 94 if kwarg_type.expressions[0].is_type(exp.DataType.Type.STRUCT): 95 return self.array_of_struct(name, quoted=quoted) 96 else: 97 return self.array_of_primitive(name, quoted=quoted) 98 else: 99 return self.primitive(name, quoted=quoted) 100 101 @property 102 def is_array(self) -> bool: 103 return self.is_array_of_struct or self.is_array_of_primitive 104 105 @property 106 def is_primitive(self) -> bool: 107 return not self.is_struct and not self.is_array 108 109 @property 110 def is_nested(self) -> bool: 111 return not self.is_primitive 112 113 @property 114 def identifier(self) -> exp.Identifier: 115 return exp.to_identifier(self.name, quoted=self.quoted) 116 117 118class TableAlterColumnPosition(PydanticModel): 119 is_first: bool 120 is_last: bool 121 after: t.Optional[exp.Identifier] = None 122 123 @classmethod 124 def first(self) -> TableAlterColumnPosition: 125 return self(is_first=True, is_last=False, after=None) 126 127 @classmethod 128 def last( 129 self, after: t.Optional[t.Union[str, exp.Identifier]] = None 130 ) -> TableAlterColumnPosition: 131 return self(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None) 132 133 @classmethod 134 def middle(self, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition: 135 return self(is_first=False, is_last=False, after=exp.to_identifier(after)) 136 137 @classmethod 138 def create( 139 self, 140 pos: int, 141 current_kwargs: t.List[exp.ColumnDef], 142 replacing_col: bool = False, 143 ) -> TableAlterColumnPosition: 144 is_first = pos == 0 145 is_last = pos == len(current_kwargs) - int(replacing_col) 146 after = None 147 if not is_first: 148 prior_kwarg = current_kwargs[pos - 1] 149 after, _ = _get_name_and_type(prior_kwarg) 150 return self(is_first=is_first, is_last=is_last, after=after) 151 152 @property 153 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 154 column = self.after if not self.is_last else None 155 position = None 156 if self.is_first: 157 position = "FIRST" 158 elif column and not self.is_last: 159 position = "AFTER" 160 return exp.ColumnPosition(this=column, position=position) 161 162 163class TableAlterOperation(PydanticModel): 164 op: TableAlterOperationType 165 columns: t.List[TableAlterColumn] 166 column_type: exp.DataType 167 expected_table_struct: exp.DataType 168 add_position: t.Optional[TableAlterColumnPosition] = None 169 current_type: t.Optional[exp.DataType] = None 170 171 @classmethod 172 def add( 173 self, 174 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 175 column_type: t.Union[str, exp.DataType], 176 expected_table_struct: t.Union[str, exp.DataType], 177 position: t.Optional[TableAlterColumnPosition] = None, 178 ) -> TableAlterOperation: 179 return self( 180 op=TableAlterOperationType.ADD, 181 columns=ensure_list(columns), 182 column_type=exp.DataType.build(column_type), 183 add_position=position, 184 expected_table_struct=exp.DataType.build(expected_table_struct), 185 ) 186 187 @classmethod 188 def drop( 189 self, 190 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 191 expected_table_struct: t.Union[str, exp.DataType], 192 column_type: t.Optional[t.Union[str, exp.DataType]] = None, 193 ) -> TableAlterOperation: 194 column_type = exp.DataType.build(column_type) if column_type else exp.DataType.build("INT") 195 return self( 196 op=TableAlterOperationType.DROP, 197 columns=ensure_list(columns), 198 column_type=column_type, 199 expected_table_struct=exp.DataType.build(expected_table_struct), 200 ) 201 202 @classmethod 203 def alter_type( 204 self, 205 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 206 column_type: t.Union[str, exp.DataType], 207 current_type: t.Union[str, exp.DataType], 208 expected_table_struct: t.Union[str, exp.DataType], 209 position: t.Optional[TableAlterColumnPosition] = None, 210 ) -> TableAlterOperation: 211 return self( 212 op=TableAlterOperationType.ALTER_TYPE, 213 columns=ensure_list(columns), 214 column_type=exp.DataType.build(column_type), 215 add_position=position, 216 current_type=exp.DataType.build(current_type), 217 expected_table_struct=exp.DataType.build(expected_table_struct), 218 ) 219 220 @property 221 def is_add(self) -> bool: 222 return self.op.is_add 223 224 @property 225 def is_drop(self) -> bool: 226 return self.op.is_drop 227 228 @property 229 def is_alter_type(self) -> bool: 230 return self.op.is_alter_type 231 232 def column_identifiers(self, array_element_selector: str) -> t.List[exp.Identifier]: 233 results = [] 234 for column in self.columns: 235 results.append(column.identifier) 236 if column.is_array_of_struct and len(self.columns) > 1 and array_element_selector: 237 results.append(exp.to_identifier(array_element_selector)) 238 return results 239 240 def column(self, array_element_selector: str) -> t.Union[exp.Dot, exp.Identifier]: 241 columns = self.column_identifiers(array_element_selector) 242 if len(columns) == 1: 243 return columns[0] 244 return exp.Dot.build(columns) 245 246 def column_def(self, array_element_selector: str) -> exp.ColumnDef: 247 return exp.ColumnDef( 248 this=self.column(array_element_selector), 249 kind=self.column_type, 250 ) 251 252 def expression( 253 self, table_name: t.Union[str, exp.Table], array_element_selector: str 254 ) -> exp.AlterTable: 255 if self.is_alter_type: 256 return exp.AlterTable( 257 this=exp.to_table(table_name), 258 actions=[ 259 exp.AlterColumn( 260 this=self.column(array_element_selector), 261 dtype=self.column_type, 262 ) 263 ], 264 ) 265 elif self.is_add: 266 alter_table = exp.AlterTable(this=exp.to_table(table_name)) 267 column = self.column_def(array_element_selector) 268 alter_table.set("actions", [column]) 269 if self.add_position: 270 column.set("position", self.add_position.column_position_node) 271 return alter_table 272 elif self.is_drop: 273 alter_table = exp.AlterTable(this=exp.to_table(table_name)) 274 drop_column = exp.Drop(this=self.column(array_element_selector), kind="COLUMN") 275 alter_table.set("actions", [drop_column]) 276 return alter_table 277 else: 278 raise ValueError(f"Unknown operation {self.op}") 279 280 281class SchemaDiffer(PydanticModel): 282 """ 283 Compares a source schema against a target schema and returns a list of alter statements to have the source 284 match the structure of target. Some engines have constraints on the types of operations that can be performed 285 therefore the final structure may not match the target exactly but it will be as close as possible. Two potential 286 differences that can happen: 287 1. Column order can be different if the engine doesn't support positional additions. Another reason for difference 288 is if a column is just moved since we don't currently support fixing moves. 289 2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested 290 operations. As a result historical data is lost. 291 3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible 292 change. As a result historical data is lost. 293 294 Potential future improvements: 295 1. Support precision changes on columns like VARCHAR and DECIMAL. Each engine has different rules on what is allowed 296 2. Support column moves. Databricks Delta supports moves and would allow exact matches. 297 """ 298 299 support_positional_add: bool = False 300 support_nested_operations: bool = False 301 array_element_selector: str = "" 302 compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 303 support_coercing_compatible_types: bool = False 304 305 _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 306 307 @property 308 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 309 if not self._coerceable_types: 310 if not self.support_coercing_compatible_types or not self.compatible_types: 311 return {} 312 coerceable_types = defaultdict(set) 313 for source_type, target_types in self.compatible_types.items(): 314 for target_type in target_types: 315 coerceable_types[target_type].add(source_type) 316 self._coerceable_types = coerceable_types 317 return self._coerceable_types 318 319 def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 320 if current_type == new_type: 321 return True 322 if current_type in self.compatible_types: 323 return new_type in self.compatible_types[current_type] 324 return False 325 326 def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 327 if not self.support_coercing_compatible_types: 328 return False 329 if current_type in self.coerceable_types: 330 is_coerceable = new_type in self.coerceable_types[current_type] 331 if is_coerceable: 332 logger.warning( 333 f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning.", 334 ) 335 return is_coerceable 336 return False 337 338 def _get_matching_kwarg( 339 self, 340 current_kwarg: t.Union[str, exp.ColumnDef], 341 new_struct: exp.DataType, 342 current_pos: int, 343 ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]: 344 current_name = ( 345 exp.to_identifier(current_kwarg) 346 if isinstance(current_kwarg, str) 347 else _get_name_and_type(current_kwarg)[0] 348 ) 349 # First check if we have the same column in the same position to get O(1) complexity 350 new_kwarg = seq_get(new_struct.expressions, current_pos) 351 if new_kwarg: 352 new_name, new_type = _get_name_and_type(new_kwarg) 353 if current_name.this == new_name.this: 354 return current_pos, new_kwarg 355 # If not, check if we have the same column in all positions with O(n) complexity 356 for i, new_kwarg in enumerate(new_struct.expressions): 357 new_name, new_type = _get_name_and_type(new_kwarg) 358 if current_name.this == new_name.this: 359 return i, new_kwarg 360 return None, None 361 362 def _drop_operation( 363 self, 364 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 365 struct: exp.DataType, 366 pos: int, 367 root_struct: exp.DataType, 368 ) -> t.List[TableAlterOperation]: 369 columns = ensure_list(columns) 370 operations = [] 371 column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos) 372 assert column_pos is not None 373 assert column_kwarg 374 struct.expressions.pop(column_pos) 375 operations.append( 376 TableAlterOperation.drop(columns, root_struct.copy(), column_kwarg.args["kind"]) 377 ) 378 return operations 379 380 def _resolve_drop_operation( 381 self, 382 parent_columns: t.List[TableAlterColumn], 383 current_struct: exp.DataType, 384 new_struct: exp.DataType, 385 root_struct: exp.DataType, 386 ) -> t.List[TableAlterOperation]: 387 operations = [] 388 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 389 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 390 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 391 if new_pos is None: 392 operations.extend( 393 self._drop_operation(columns, current_struct, current_pos, root_struct) 394 ) 395 return operations 396 397 def _add_operation( 398 self, 399 columns: t.List[TableAlterColumn], 400 new_pos: int, 401 new_kwarg: exp.ColumnDef, 402 current_struct: exp.DataType, 403 root_struct: exp.DataType, 404 ) -> t.List[TableAlterOperation]: 405 if self.support_positional_add: 406 col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) 407 current_struct.expressions.insert(new_pos, new_kwarg) 408 else: 409 col_pos = None 410 current_struct.expressions.append(new_kwarg) 411 return [ 412 TableAlterOperation.add( 413 columns, 414 new_kwarg.args["kind"], 415 root_struct.copy(), 416 col_pos, 417 ) 418 ] 419 420 def _resolve_add_operations( 421 self, 422 parent_columns: t.List[TableAlterColumn], 423 current_struct: exp.DataType, 424 new_struct: exp.DataType, 425 root_struct: exp.DataType, 426 ) -> t.List[TableAlterOperation]: 427 operations = [] 428 for new_pos, new_kwarg in enumerate(new_struct.expressions): 429 possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos) 430 if possible_current_pos is None: 431 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)] 432 operations.extend( 433 self._add_operation(columns, new_pos, new_kwarg, current_struct, root_struct) 434 ) 435 return operations 436 437 def _alter_operation( 438 self, 439 columns: t.List[TableAlterColumn], 440 pos: int, 441 struct: exp.DataType, 442 new_type: exp.DataType, 443 current_type: t.Union[str, exp.DataType], 444 root_struct: exp.DataType, 445 new_kwarg: exp.ColumnDef, 446 ) -> t.List[TableAlterOperation]: 447 # We don't copy on purpose here because current_type may need to be mutated inside 448 # _get_operations (struct.expressions.pop and struct.expressions.insert) 449 current_type = exp.DataType.build(current_type, copy=False) 450 if self.support_nested_operations: 451 if new_type.this == current_type.this == exp.DataType.Type.STRUCT: 452 return self._get_operations( 453 columns, 454 current_type, 455 new_type, 456 root_struct, 457 ) 458 if new_type.this == current_type.this == exp.DataType.Type.ARRAY: 459 new_array_type = new_type.expressions[0] 460 current_array_type = current_type.expressions[0] 461 if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT: 462 return self._get_operations( 463 columns, 464 current_array_type, 465 new_array_type, 466 root_struct, 467 ) 468 if self._is_coerceable_type(current_type, new_type): 469 return [] 470 elif self._is_compatible_type(current_type, new_type): 471 struct.expressions.pop(pos) 472 struct.expressions.insert(pos, new_kwarg) 473 col_pos = ( 474 TableAlterColumnPosition.create(pos, struct.expressions, replacing_col=True) 475 if self.support_positional_add 476 else None 477 ) 478 return [ 479 TableAlterOperation.alter_type( 480 columns, 481 new_type, 482 current_type, 483 root_struct.copy(), 484 col_pos, 485 ) 486 ] 487 else: 488 return self._drop_operation( 489 columns, root_struct, pos, root_struct 490 ) + self._add_operation(columns, pos, new_kwarg, struct, root_struct) 491 492 def _resolve_alter_operations( 493 self, 494 parent_columns: t.List[TableAlterColumn], 495 current_struct: exp.DataType, 496 new_struct: exp.DataType, 497 root_struct: exp.DataType, 498 ) -> t.List[TableAlterOperation]: 499 operations = [] 500 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 501 _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 502 assert new_kwarg 503 _, new_type = _get_name_and_type(new_kwarg) 504 _, current_type = _get_name_and_type(current_kwarg) 505 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 506 if new_type == current_type: 507 continue 508 operations.extend( 509 self._alter_operation( 510 columns, 511 current_pos, 512 current_struct, 513 new_type, 514 current_type, 515 root_struct, 516 new_kwarg, 517 ) 518 ) 519 return operations 520 521 def _get_operations( 522 self, 523 parent_columns: t.List[TableAlterColumn], 524 current_struct: exp.DataType, 525 new_struct: exp.DataType, 526 root_struct: exp.DataType, 527 ) -> t.List[TableAlterOperation]: 528 root_struct = root_struct or current_struct 529 parent_columns = parent_columns or [] 530 operations = [] 531 operations.extend( 532 self._resolve_drop_operation(parent_columns, current_struct, new_struct, root_struct) 533 ) 534 operations.extend( 535 self._resolve_add_operations(parent_columns, current_struct, new_struct, root_struct) 536 ) 537 operations.extend( 538 self._resolve_alter_operations(parent_columns, current_struct, new_struct, root_struct) 539 ) 540 return operations 541 542 def _from_structs( 543 self, current_struct: exp.DataType, new_struct: exp.DataType 544 ) -> t.List[TableAlterOperation]: 545 return self._get_operations([], current_struct, new_struct, current_struct) 546 547 def compare_structs( 548 self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType 549 ) -> t.List[exp.AlterTable]: 550 """ 551 Compares two schemas represented as structs. 552 553 Args: 554 current: The current schema. 555 new: The new schema. 556 557 Returns: 558 The list of table alter operations. 559 """ 560 return [ 561 op.expression(table_name, self.array_element_selector) 562 for op in self._from_structs(current, new) 563 ] 564 565 def compare_columns( 566 self, 567 table_name: TableName, 568 current: t.Dict[str, exp.DataType], 569 new: t.Dict[str, exp.DataType], 570 ) -> t.List[exp.AlterTable]: 571 """ 572 Compares two schemas represented as dictionaries of column names and types. 573 574 Args: 575 current: The current schema. 576 new: The new schema. 577 578 Returns: 579 The list of schema deltas. 580 """ 581 return self.compare_structs( 582 table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new) 583 ) 584 585 586def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]: 587 return struct.this, struct.args["kind"]
21class TableAlterOperationType(Enum): 22 ADD = auto() 23 DROP = auto() 24 ALTER_TYPE = auto() 25 26 @property 27 def is_add(self) -> bool: 28 return self == TableAlterOperationType.ADD 29 30 @property 31 def is_drop(self) -> bool: 32 return self == TableAlterOperationType.DROP 33 34 @property 35 def is_alter_type(self) -> bool: 36 return self == TableAlterOperationType.ALTER_TYPE
An enumeration.
Inherited Members
- enum.Enum
- name
- value
39class TableAlterColumn(PydanticModel): 40 name: str 41 is_struct: bool 42 is_array_of_struct: bool 43 is_array_of_primitive: bool 44 quoted: bool = False 45 46 @classmethod 47 def primitive(self, name: str, quoted: bool = False) -> TableAlterColumn: 48 return self( 49 name=name, 50 is_struct=False, 51 is_array_of_struct=False, 52 is_array_of_primitive=False, 53 quoted=quoted, 54 ) 55 56 @classmethod 57 def struct(self, name: str, quoted: bool = False) -> TableAlterColumn: 58 return self( 59 name=name, 60 is_struct=True, 61 is_array_of_struct=False, 62 is_array_of_primitive=False, 63 quoted=quoted, 64 ) 65 66 @classmethod 67 def array_of_struct(self, name: str, quoted: bool = False) -> TableAlterColumn: 68 return self( 69 name=name, 70 is_struct=False, 71 is_array_of_struct=True, 72 is_array_of_primitive=False, 73 quoted=quoted, 74 ) 75 76 @classmethod 77 def array_of_primitive(self, name: str, quoted: bool = False) -> TableAlterColumn: 78 return self( 79 name=name, 80 is_struct=False, 81 is_array_of_struct=False, 82 is_array_of_primitive=True, 83 quoted=quoted, 84 ) 85 86 @classmethod 87 def from_struct_kwarg(self, struct: exp.ColumnDef) -> TableAlterColumn: 88 name = struct.alias_or_name 89 quoted = struct.this.quoted 90 kwarg_type = struct.args["kind"] 91 92 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 93 return self.struct(name, quoted=quoted) 94 elif kwarg_type.is_type(exp.DataType.Type.ARRAY): 95 if kwarg_type.expressions[0].is_type(exp.DataType.Type.STRUCT): 96 return self.array_of_struct(name, quoted=quoted) 97 else: 98 return self.array_of_primitive(name, quoted=quoted) 99 else: 100 return self.primitive(name, quoted=quoted) 101 102 @property 103 def is_array(self) -> bool: 104 return self.is_array_of_struct or self.is_array_of_primitive 105 106 @property 107 def is_primitive(self) -> bool: 108 return not self.is_struct and not self.is_array 109 110 @property 111 def is_nested(self) -> bool: 112 return not self.is_primitive 113 114 @property 115 def identifier(self) -> exp.Identifier: 116 return exp.to_identifier(self.name, quoted=self.quoted)
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
86 @classmethod 87 def from_struct_kwarg(self, struct: exp.ColumnDef) -> TableAlterColumn: 88 name = struct.alias_or_name 89 quoted = struct.this.quoted 90 kwarg_type = struct.args["kind"] 91 92 if kwarg_type.is_type(exp.DataType.Type.STRUCT): 93 return self.struct(name, quoted=quoted) 94 elif kwarg_type.is_type(exp.DataType.Type.ARRAY): 95 if kwarg_type.expressions[0].is_type(exp.DataType.Type.STRUCT): 96 return self.array_of_struct(name, quoted=quoted) 97 else: 98 return self.array_of_primitive(name, quoted=quoted) 99 else: 100 return self.primitive(name, quoted=quoted)
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
119class TableAlterColumnPosition(PydanticModel): 120 is_first: bool 121 is_last: bool 122 after: t.Optional[exp.Identifier] = None 123 124 @classmethod 125 def first(self) -> TableAlterColumnPosition: 126 return self(is_first=True, is_last=False, after=None) 127 128 @classmethod 129 def last( 130 self, after: t.Optional[t.Union[str, exp.Identifier]] = None 131 ) -> TableAlterColumnPosition: 132 return self(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None) 133 134 @classmethod 135 def middle(self, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition: 136 return self(is_first=False, is_last=False, after=exp.to_identifier(after)) 137 138 @classmethod 139 def create( 140 self, 141 pos: int, 142 current_kwargs: t.List[exp.ColumnDef], 143 replacing_col: bool = False, 144 ) -> TableAlterColumnPosition: 145 is_first = pos == 0 146 is_last = pos == len(current_kwargs) - int(replacing_col) 147 after = None 148 if not is_first: 149 prior_kwarg = current_kwargs[pos - 1] 150 after, _ = _get_name_and_type(prior_kwarg) 151 return self(is_first=is_first, is_last=is_last, after=after) 152 153 @property 154 def column_position_node(self) -> t.Optional[exp.ColumnPosition]: 155 column = self.after if not self.is_last else None 156 position = None 157 if self.is_first: 158 position = "FIRST" 159 elif column and not self.is_last: 160 position = "AFTER" 161 return exp.ColumnPosition(this=column, position=position)
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
138 @classmethod 139 def create( 140 self, 141 pos: int, 142 current_kwargs: t.List[exp.ColumnDef], 143 replacing_col: bool = False, 144 ) -> TableAlterColumnPosition: 145 is_first = pos == 0 146 is_last = pos == len(current_kwargs) - int(replacing_col) 147 after = None 148 if not is_first: 149 prior_kwarg = current_kwargs[pos - 1] 150 after, _ = _get_name_and_type(prior_kwarg) 151 return self(is_first=is_first, is_last=is_last, after=after)
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
164class TableAlterOperation(PydanticModel): 165 op: TableAlterOperationType 166 columns: t.List[TableAlterColumn] 167 column_type: exp.DataType 168 expected_table_struct: exp.DataType 169 add_position: t.Optional[TableAlterColumnPosition] = None 170 current_type: t.Optional[exp.DataType] = None 171 172 @classmethod 173 def add( 174 self, 175 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 176 column_type: t.Union[str, exp.DataType], 177 expected_table_struct: t.Union[str, exp.DataType], 178 position: t.Optional[TableAlterColumnPosition] = None, 179 ) -> TableAlterOperation: 180 return self( 181 op=TableAlterOperationType.ADD, 182 columns=ensure_list(columns), 183 column_type=exp.DataType.build(column_type), 184 add_position=position, 185 expected_table_struct=exp.DataType.build(expected_table_struct), 186 ) 187 188 @classmethod 189 def drop( 190 self, 191 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 192 expected_table_struct: t.Union[str, exp.DataType], 193 column_type: t.Optional[t.Union[str, exp.DataType]] = None, 194 ) -> TableAlterOperation: 195 column_type = exp.DataType.build(column_type) if column_type else exp.DataType.build("INT") 196 return self( 197 op=TableAlterOperationType.DROP, 198 columns=ensure_list(columns), 199 column_type=column_type, 200 expected_table_struct=exp.DataType.build(expected_table_struct), 201 ) 202 203 @classmethod 204 def alter_type( 205 self, 206 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 207 column_type: t.Union[str, exp.DataType], 208 current_type: t.Union[str, exp.DataType], 209 expected_table_struct: t.Union[str, exp.DataType], 210 position: t.Optional[TableAlterColumnPosition] = None, 211 ) -> TableAlterOperation: 212 return self( 213 op=TableAlterOperationType.ALTER_TYPE, 214 columns=ensure_list(columns), 215 column_type=exp.DataType.build(column_type), 216 add_position=position, 217 current_type=exp.DataType.build(current_type), 218 expected_table_struct=exp.DataType.build(expected_table_struct), 219 ) 220 221 @property 222 def is_add(self) -> bool: 223 return self.op.is_add 224 225 @property 226 def is_drop(self) -> bool: 227 return self.op.is_drop 228 229 @property 230 def is_alter_type(self) -> bool: 231 return self.op.is_alter_type 232 233 def column_identifiers(self, array_element_selector: str) -> t.List[exp.Identifier]: 234 results = [] 235 for column in self.columns: 236 results.append(column.identifier) 237 if column.is_array_of_struct and len(self.columns) > 1 and array_element_selector: 238 results.append(exp.to_identifier(array_element_selector)) 239 return results 240 241 def column(self, array_element_selector: str) -> t.Union[exp.Dot, exp.Identifier]: 242 columns = self.column_identifiers(array_element_selector) 243 if len(columns) == 1: 244 return columns[0] 245 return exp.Dot.build(columns) 246 247 def column_def(self, array_element_selector: str) -> exp.ColumnDef: 248 return exp.ColumnDef( 249 this=self.column(array_element_selector), 250 kind=self.column_type, 251 ) 252 253 def expression( 254 self, table_name: t.Union[str, exp.Table], array_element_selector: str 255 ) -> exp.AlterTable: 256 if self.is_alter_type: 257 return exp.AlterTable( 258 this=exp.to_table(table_name), 259 actions=[ 260 exp.AlterColumn( 261 this=self.column(array_element_selector), 262 dtype=self.column_type, 263 ) 264 ], 265 ) 266 elif self.is_add: 267 alter_table = exp.AlterTable(this=exp.to_table(table_name)) 268 column = self.column_def(array_element_selector) 269 alter_table.set("actions", [column]) 270 if self.add_position: 271 column.set("position", self.add_position.column_position_node) 272 return alter_table 273 elif self.is_drop: 274 alter_table = exp.AlterTable(this=exp.to_table(table_name)) 275 drop_column = exp.Drop(this=self.column(array_element_selector), kind="COLUMN") 276 alter_table.set("actions", [drop_column]) 277 return alter_table 278 else: 279 raise ValueError(f"Unknown operation {self.op}")
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
172 @classmethod 173 def add( 174 self, 175 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 176 column_type: t.Union[str, exp.DataType], 177 expected_table_struct: t.Union[str, exp.DataType], 178 position: t.Optional[TableAlterColumnPosition] = None, 179 ) -> TableAlterOperation: 180 return self( 181 op=TableAlterOperationType.ADD, 182 columns=ensure_list(columns), 183 column_type=exp.DataType.build(column_type), 184 add_position=position, 185 expected_table_struct=exp.DataType.build(expected_table_struct), 186 )
188 @classmethod 189 def drop( 190 self, 191 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 192 expected_table_struct: t.Union[str, exp.DataType], 193 column_type: t.Optional[t.Union[str, exp.DataType]] = None, 194 ) -> TableAlterOperation: 195 column_type = exp.DataType.build(column_type) if column_type else exp.DataType.build("INT") 196 return self( 197 op=TableAlterOperationType.DROP, 198 columns=ensure_list(columns), 199 column_type=column_type, 200 expected_table_struct=exp.DataType.build(expected_table_struct), 201 )
203 @classmethod 204 def alter_type( 205 self, 206 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 207 column_type: t.Union[str, exp.DataType], 208 current_type: t.Union[str, exp.DataType], 209 expected_table_struct: t.Union[str, exp.DataType], 210 position: t.Optional[TableAlterColumnPosition] = None, 211 ) -> TableAlterOperation: 212 return self( 213 op=TableAlterOperationType.ALTER_TYPE, 214 columns=ensure_list(columns), 215 column_type=exp.DataType.build(column_type), 216 add_position=position, 217 current_type=exp.DataType.build(current_type), 218 expected_table_struct=exp.DataType.build(expected_table_struct), 219 )
233 def column_identifiers(self, array_element_selector: str) -> t.List[exp.Identifier]: 234 results = [] 235 for column in self.columns: 236 results.append(column.identifier) 237 if column.is_array_of_struct and len(self.columns) > 1 and array_element_selector: 238 results.append(exp.to_identifier(array_element_selector)) 239 return results
253 def expression( 254 self, table_name: t.Union[str, exp.Table], array_element_selector: str 255 ) -> exp.AlterTable: 256 if self.is_alter_type: 257 return exp.AlterTable( 258 this=exp.to_table(table_name), 259 actions=[ 260 exp.AlterColumn( 261 this=self.column(array_element_selector), 262 dtype=self.column_type, 263 ) 264 ], 265 ) 266 elif self.is_add: 267 alter_table = exp.AlterTable(this=exp.to_table(table_name)) 268 column = self.column_def(array_element_selector) 269 alter_table.set("actions", [column]) 270 if self.add_position: 271 column.set("position", self.add_position.column_position_node) 272 return alter_table 273 elif self.is_drop: 274 alter_table = exp.AlterTable(this=exp.to_table(table_name)) 275 drop_column = exp.Drop(this=self.column(array_element_selector), kind="COLUMN") 276 alter_table.set("actions", [drop_column]) 277 return alter_table 278 else: 279 raise ValueError(f"Unknown operation {self.op}")
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
282class SchemaDiffer(PydanticModel): 283 """ 284 Compares a source schema against a target schema and returns a list of alter statements to have the source 285 match the structure of target. Some engines have constraints on the types of operations that can be performed 286 therefore the final structure may not match the target exactly but it will be as close as possible. Two potential 287 differences that can happen: 288 1. Column order can be different if the engine doesn't support positional additions. Another reason for difference 289 is if a column is just moved since we don't currently support fixing moves. 290 2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested 291 operations. As a result historical data is lost. 292 3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible 293 change. As a result historical data is lost. 294 295 Potential future improvements: 296 1. Support precision changes on columns like VARCHAR and DECIMAL. Each engine has different rules on what is allowed 297 2. Support column moves. Databricks Delta supports moves and would allow exact matches. 298 """ 299 300 support_positional_add: bool = False 301 support_nested_operations: bool = False 302 array_element_selector: str = "" 303 compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 304 support_coercing_compatible_types: bool = False 305 306 _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {} 307 308 @property 309 def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]: 310 if not self._coerceable_types: 311 if not self.support_coercing_compatible_types or not self.compatible_types: 312 return {} 313 coerceable_types = defaultdict(set) 314 for source_type, target_types in self.compatible_types.items(): 315 for target_type in target_types: 316 coerceable_types[target_type].add(source_type) 317 self._coerceable_types = coerceable_types 318 return self._coerceable_types 319 320 def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 321 if current_type == new_type: 322 return True 323 if current_type in self.compatible_types: 324 return new_type in self.compatible_types[current_type] 325 return False 326 327 def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool: 328 if not self.support_coercing_compatible_types: 329 return False 330 if current_type in self.coerceable_types: 331 is_coerceable = new_type in self.coerceable_types[current_type] 332 if is_coerceable: 333 logger.warning( 334 f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning.", 335 ) 336 return is_coerceable 337 return False 338 339 def _get_matching_kwarg( 340 self, 341 current_kwarg: t.Union[str, exp.ColumnDef], 342 new_struct: exp.DataType, 343 current_pos: int, 344 ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]: 345 current_name = ( 346 exp.to_identifier(current_kwarg) 347 if isinstance(current_kwarg, str) 348 else _get_name_and_type(current_kwarg)[0] 349 ) 350 # First check if we have the same column in the same position to get O(1) complexity 351 new_kwarg = seq_get(new_struct.expressions, current_pos) 352 if new_kwarg: 353 new_name, new_type = _get_name_and_type(new_kwarg) 354 if current_name.this == new_name.this: 355 return current_pos, new_kwarg 356 # If not, check if we have the same column in all positions with O(n) complexity 357 for i, new_kwarg in enumerate(new_struct.expressions): 358 new_name, new_type = _get_name_and_type(new_kwarg) 359 if current_name.this == new_name.this: 360 return i, new_kwarg 361 return None, None 362 363 def _drop_operation( 364 self, 365 columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]], 366 struct: exp.DataType, 367 pos: int, 368 root_struct: exp.DataType, 369 ) -> t.List[TableAlterOperation]: 370 columns = ensure_list(columns) 371 operations = [] 372 column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos) 373 assert column_pos is not None 374 assert column_kwarg 375 struct.expressions.pop(column_pos) 376 operations.append( 377 TableAlterOperation.drop(columns, root_struct.copy(), column_kwarg.args["kind"]) 378 ) 379 return operations 380 381 def _resolve_drop_operation( 382 self, 383 parent_columns: t.List[TableAlterColumn], 384 current_struct: exp.DataType, 385 new_struct: exp.DataType, 386 root_struct: exp.DataType, 387 ) -> t.List[TableAlterOperation]: 388 operations = [] 389 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 390 new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 391 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 392 if new_pos is None: 393 operations.extend( 394 self._drop_operation(columns, current_struct, current_pos, root_struct) 395 ) 396 return operations 397 398 def _add_operation( 399 self, 400 columns: t.List[TableAlterColumn], 401 new_pos: int, 402 new_kwarg: exp.ColumnDef, 403 current_struct: exp.DataType, 404 root_struct: exp.DataType, 405 ) -> t.List[TableAlterOperation]: 406 if self.support_positional_add: 407 col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) 408 current_struct.expressions.insert(new_pos, new_kwarg) 409 else: 410 col_pos = None 411 current_struct.expressions.append(new_kwarg) 412 return [ 413 TableAlterOperation.add( 414 columns, 415 new_kwarg.args["kind"], 416 root_struct.copy(), 417 col_pos, 418 ) 419 ] 420 421 def _resolve_add_operations( 422 self, 423 parent_columns: t.List[TableAlterColumn], 424 current_struct: exp.DataType, 425 new_struct: exp.DataType, 426 root_struct: exp.DataType, 427 ) -> t.List[TableAlterOperation]: 428 operations = [] 429 for new_pos, new_kwarg in enumerate(new_struct.expressions): 430 possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos) 431 if possible_current_pos is None: 432 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)] 433 operations.extend( 434 self._add_operation(columns, new_pos, new_kwarg, current_struct, root_struct) 435 ) 436 return operations 437 438 def _alter_operation( 439 self, 440 columns: t.List[TableAlterColumn], 441 pos: int, 442 struct: exp.DataType, 443 new_type: exp.DataType, 444 current_type: t.Union[str, exp.DataType], 445 root_struct: exp.DataType, 446 new_kwarg: exp.ColumnDef, 447 ) -> t.List[TableAlterOperation]: 448 # We don't copy on purpose here because current_type may need to be mutated inside 449 # _get_operations (struct.expressions.pop and struct.expressions.insert) 450 current_type = exp.DataType.build(current_type, copy=False) 451 if self.support_nested_operations: 452 if new_type.this == current_type.this == exp.DataType.Type.STRUCT: 453 return self._get_operations( 454 columns, 455 current_type, 456 new_type, 457 root_struct, 458 ) 459 if new_type.this == current_type.this == exp.DataType.Type.ARRAY: 460 new_array_type = new_type.expressions[0] 461 current_array_type = current_type.expressions[0] 462 if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT: 463 return self._get_operations( 464 columns, 465 current_array_type, 466 new_array_type, 467 root_struct, 468 ) 469 if self._is_coerceable_type(current_type, new_type): 470 return [] 471 elif self._is_compatible_type(current_type, new_type): 472 struct.expressions.pop(pos) 473 struct.expressions.insert(pos, new_kwarg) 474 col_pos = ( 475 TableAlterColumnPosition.create(pos, struct.expressions, replacing_col=True) 476 if self.support_positional_add 477 else None 478 ) 479 return [ 480 TableAlterOperation.alter_type( 481 columns, 482 new_type, 483 current_type, 484 root_struct.copy(), 485 col_pos, 486 ) 487 ] 488 else: 489 return self._drop_operation( 490 columns, root_struct, pos, root_struct 491 ) + self._add_operation(columns, pos, new_kwarg, struct, root_struct) 492 493 def _resolve_alter_operations( 494 self, 495 parent_columns: t.List[TableAlterColumn], 496 current_struct: exp.DataType, 497 new_struct: exp.DataType, 498 root_struct: exp.DataType, 499 ) -> t.List[TableAlterOperation]: 500 operations = [] 501 for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()): 502 _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos) 503 assert new_kwarg 504 _, new_type = _get_name_and_type(new_kwarg) 505 _, current_type = _get_name_and_type(current_kwarg) 506 columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)] 507 if new_type == current_type: 508 continue 509 operations.extend( 510 self._alter_operation( 511 columns, 512 current_pos, 513 current_struct, 514 new_type, 515 current_type, 516 root_struct, 517 new_kwarg, 518 ) 519 ) 520 return operations 521 522 def _get_operations( 523 self, 524 parent_columns: t.List[TableAlterColumn], 525 current_struct: exp.DataType, 526 new_struct: exp.DataType, 527 root_struct: exp.DataType, 528 ) -> t.List[TableAlterOperation]: 529 root_struct = root_struct or current_struct 530 parent_columns = parent_columns or [] 531 operations = [] 532 operations.extend( 533 self._resolve_drop_operation(parent_columns, current_struct, new_struct, root_struct) 534 ) 535 operations.extend( 536 self._resolve_add_operations(parent_columns, current_struct, new_struct, root_struct) 537 ) 538 operations.extend( 539 self._resolve_alter_operations(parent_columns, current_struct, new_struct, root_struct) 540 ) 541 return operations 542 543 def _from_structs( 544 self, current_struct: exp.DataType, new_struct: exp.DataType 545 ) -> t.List[TableAlterOperation]: 546 return self._get_operations([], current_struct, new_struct, current_struct) 547 548 def compare_structs( 549 self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType 550 ) -> t.List[exp.AlterTable]: 551 """ 552 Compares two schemas represented as structs. 553 554 Args: 555 current: The current schema. 556 new: The new schema. 557 558 Returns: 559 The list of table alter operations. 560 """ 561 return [ 562 op.expression(table_name, self.array_element_selector) 563 for op in self._from_structs(current, new) 564 ] 565 566 def compare_columns( 567 self, 568 table_name: TableName, 569 current: t.Dict[str, exp.DataType], 570 new: t.Dict[str, exp.DataType], 571 ) -> t.List[exp.AlterTable]: 572 """ 573 Compares two schemas represented as dictionaries of column names and types. 574 575 Args: 576 current: The current schema. 577 new: The new schema. 578 579 Returns: 580 The list of schema deltas. 581 """ 582 return self.compare_structs( 583 table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new) 584 )
Compares a source schema against a target schema and returns a list of alter statements to have the source match the structure of target. Some engines have constraints on the types of operations that can be performed therefore the final structure may not match the target exactly but it will be as close as possible. Two potential differences that can happen:
- Column order can be different if the engine doesn't support positional additions. Another reason for difference is if a column is just moved since we don't currently support fixing moves.
- Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested operations. As a result historical data is lost.
- Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible change. As a result historical data is lost.
Potential future improvements:
- Support precision changes on columns like VARCHAR and DECIMAL. Each engine has different rules on what is allowed
- Support column moves. Databricks Delta supports moves and would allow exact matches.
548 def compare_structs( 549 self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType 550 ) -> t.List[exp.AlterTable]: 551 """ 552 Compares two schemas represented as structs. 553 554 Args: 555 current: The current schema. 556 new: The new schema. 557 558 Returns: 559 The list of table alter operations. 560 """ 561 return [ 562 op.expression(table_name, self.array_element_selector) 563 for op in self._from_structs(current, new) 564 ]
Compares two schemas represented as structs.
Arguments:
- current: The current schema.
- new: The new schema.
Returns:
The list of table alter operations.
566 def compare_columns( 567 self, 568 table_name: TableName, 569 current: t.Dict[str, exp.DataType], 570 new: t.Dict[str, exp.DataType], 571 ) -> t.List[exp.AlterTable]: 572 """ 573 Compares two schemas represented as dictionaries of column names and types. 574 575 Args: 576 current: The current schema. 577 new: The new schema. 578 579 Returns: 580 The list of schema deltas. 581 """ 582 return self.compare_structs( 583 table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new) 584 )
Compares two schemas represented as dictionaries of column names and types.
Arguments:
- current: The current schema.
- new: The new schema.
Returns:
The list of schema deltas.
102 def wrapped_model_post_init(self: BaseModel, __context: Any) -> None: 103 """We need to both initialize private attributes and call the user-defined model_post_init 104 method. 105 """ 106 init_private_attributes(self, __context) 107 original_model_post_init(self, __context)
Override this method to perform additional initialization after __init__
and model_construct
.
This is useful if you want to do some validation that requires the entire model to be initialized.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs