Edit on GitHub

sqlmesh.core.schema_diff

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5from collections import defaultdict
  6from enum import Enum, auto
  7
  8from sqlglot import exp
  9from sqlglot.helper import ensure_list, seq_get
 10
 11from sqlmesh.utils import columns_to_types_to_struct
 12from sqlmesh.utils.pydantic import PydanticModel
 13
 14if t.TYPE_CHECKING:
 15    from sqlmesh.core._typing import TableName
 16
 17logger = logging.getLogger(__name__)
 18
 19
 20class TableAlterOperationType(Enum):
 21    ADD = auto()
 22    DROP = auto()
 23    ALTER_TYPE = auto()
 24
 25    @property
 26    def is_add(self) -> bool:
 27        return self == TableAlterOperationType.ADD
 28
 29    @property
 30    def is_drop(self) -> bool:
 31        return self == TableAlterOperationType.DROP
 32
 33    @property
 34    def is_alter_type(self) -> bool:
 35        return self == TableAlterOperationType.ALTER_TYPE
 36
 37
 38class TableAlterColumn(PydanticModel):
 39    name: str
 40    is_struct: bool
 41    is_array_of_struct: bool
 42    is_array_of_primitive: bool
 43    quoted: bool = False
 44
 45    @classmethod
 46    def primitive(self, name: str, quoted: bool = False) -> TableAlterColumn:
 47        return self(
 48            name=name,
 49            is_struct=False,
 50            is_array_of_struct=False,
 51            is_array_of_primitive=False,
 52            quoted=quoted,
 53        )
 54
 55    @classmethod
 56    def struct(self, name: str, quoted: bool = False) -> TableAlterColumn:
 57        return self(
 58            name=name,
 59            is_struct=True,
 60            is_array_of_struct=False,
 61            is_array_of_primitive=False,
 62            quoted=quoted,
 63        )
 64
 65    @classmethod
 66    def array_of_struct(self, name: str, quoted: bool = False) -> TableAlterColumn:
 67        return self(
 68            name=name,
 69            is_struct=False,
 70            is_array_of_struct=True,
 71            is_array_of_primitive=False,
 72            quoted=quoted,
 73        )
 74
 75    @classmethod
 76    def array_of_primitive(self, name: str, quoted: bool = False) -> TableAlterColumn:
 77        return self(
 78            name=name,
 79            is_struct=False,
 80            is_array_of_struct=False,
 81            is_array_of_primitive=True,
 82            quoted=quoted,
 83        )
 84
 85    @classmethod
 86    def from_struct_kwarg(self, struct: exp.ColumnDef) -> TableAlterColumn:
 87        name = struct.alias_or_name
 88        quoted = struct.this.quoted
 89        kwarg_type = struct.args["kind"]
 90
 91        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
 92            return self.struct(name, quoted=quoted)
 93        elif kwarg_type.is_type(exp.DataType.Type.ARRAY):
 94            if kwarg_type.expressions[0].is_type(exp.DataType.Type.STRUCT):
 95                return self.array_of_struct(name, quoted=quoted)
 96            else:
 97                return self.array_of_primitive(name, quoted=quoted)
 98        else:
 99            return self.primitive(name, quoted=quoted)
100
101    @property
102    def is_array(self) -> bool:
103        return self.is_array_of_struct or self.is_array_of_primitive
104
105    @property
106    def is_primitive(self) -> bool:
107        return not self.is_struct and not self.is_array
108
109    @property
110    def is_nested(self) -> bool:
111        return not self.is_primitive
112
113    @property
114    def identifier(self) -> exp.Identifier:
115        return exp.to_identifier(self.name, quoted=self.quoted)
116
117
118class TableAlterColumnPosition(PydanticModel):
119    is_first: bool
120    is_last: bool
121    after: t.Optional[exp.Identifier] = None
122
123    @classmethod
124    def first(self) -> TableAlterColumnPosition:
125        return self(is_first=True, is_last=False, after=None)
126
127    @classmethod
128    def last(
129        self, after: t.Optional[t.Union[str, exp.Identifier]] = None
130    ) -> TableAlterColumnPosition:
131        return self(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
132
133    @classmethod
134    def middle(self, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
135        return self(is_first=False, is_last=False, after=exp.to_identifier(after))
136
137    @classmethod
138    def create(
139        self,
140        pos: int,
141        current_kwargs: t.List[exp.ColumnDef],
142        replacing_col: bool = False,
143    ) -> TableAlterColumnPosition:
144        is_first = pos == 0
145        is_last = pos == len(current_kwargs) - int(replacing_col)
146        after = None
147        if not is_first:
148            prior_kwarg = current_kwargs[pos - 1]
149            after, _ = _get_name_and_type(prior_kwarg)
150        return self(is_first=is_first, is_last=is_last, after=after)
151
152    @property
153    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
154        column = self.after if not self.is_last else None
155        position = None
156        if self.is_first:
157            position = "FIRST"
158        elif column and not self.is_last:
159            position = "AFTER"
160        return exp.ColumnPosition(this=column, position=position)
161
162
163class TableAlterOperation(PydanticModel):
164    op: TableAlterOperationType
165    columns: t.List[TableAlterColumn]
166    column_type: exp.DataType
167    expected_table_struct: exp.DataType
168    add_position: t.Optional[TableAlterColumnPosition] = None
169    current_type: t.Optional[exp.DataType] = None
170
171    @classmethod
172    def add(
173        self,
174        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
175        column_type: t.Union[str, exp.DataType],
176        expected_table_struct: t.Union[str, exp.DataType],
177        position: t.Optional[TableAlterColumnPosition] = None,
178    ) -> TableAlterOperation:
179        return self(
180            op=TableAlterOperationType.ADD,
181            columns=ensure_list(columns),
182            column_type=exp.DataType.build(column_type),
183            add_position=position,
184            expected_table_struct=exp.DataType.build(expected_table_struct),
185        )
186
187    @classmethod
188    def drop(
189        self,
190        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
191        expected_table_struct: t.Union[str, exp.DataType],
192        column_type: t.Optional[t.Union[str, exp.DataType]] = None,
193    ) -> TableAlterOperation:
194        column_type = exp.DataType.build(column_type) if column_type else exp.DataType.build("INT")
195        return self(
196            op=TableAlterOperationType.DROP,
197            columns=ensure_list(columns),
198            column_type=column_type,
199            expected_table_struct=exp.DataType.build(expected_table_struct),
200        )
201
202    @classmethod
203    def alter_type(
204        self,
205        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
206        column_type: t.Union[str, exp.DataType],
207        current_type: t.Union[str, exp.DataType],
208        expected_table_struct: t.Union[str, exp.DataType],
209        position: t.Optional[TableAlterColumnPosition] = None,
210    ) -> TableAlterOperation:
211        return self(
212            op=TableAlterOperationType.ALTER_TYPE,
213            columns=ensure_list(columns),
214            column_type=exp.DataType.build(column_type),
215            add_position=position,
216            current_type=exp.DataType.build(current_type),
217            expected_table_struct=exp.DataType.build(expected_table_struct),
218        )
219
220    @property
221    def is_add(self) -> bool:
222        return self.op.is_add
223
224    @property
225    def is_drop(self) -> bool:
226        return self.op.is_drop
227
228    @property
229    def is_alter_type(self) -> bool:
230        return self.op.is_alter_type
231
232    def column_identifiers(self, array_element_selector: str) -> t.List[exp.Identifier]:
233        results = []
234        for column in self.columns:
235            results.append(column.identifier)
236            if column.is_array_of_struct and len(self.columns) > 1 and array_element_selector:
237                results.append(exp.to_identifier(array_element_selector))
238        return results
239
240    def column(self, array_element_selector: str) -> t.Union[exp.Dot, exp.Identifier]:
241        columns = self.column_identifiers(array_element_selector)
242        if len(columns) == 1:
243            return columns[0]
244        return exp.Dot.build(columns)
245
246    def column_def(self, array_element_selector: str) -> exp.ColumnDef:
247        return exp.ColumnDef(
248            this=self.column(array_element_selector),
249            kind=self.column_type,
250        )
251
252    def expression(
253        self, table_name: t.Union[str, exp.Table], array_element_selector: str
254    ) -> exp.AlterTable:
255        if self.is_alter_type:
256            return exp.AlterTable(
257                this=exp.to_table(table_name),
258                actions=[
259                    exp.AlterColumn(
260                        this=self.column(array_element_selector),
261                        dtype=self.column_type,
262                    )
263                ],
264            )
265        elif self.is_add:
266            alter_table = exp.AlterTable(this=exp.to_table(table_name))
267            column = self.column_def(array_element_selector)
268            alter_table.set("actions", [column])
269            if self.add_position:
270                column.set("position", self.add_position.column_position_node)
271            return alter_table
272        elif self.is_drop:
273            alter_table = exp.AlterTable(this=exp.to_table(table_name))
274            drop_column = exp.Drop(this=self.column(array_element_selector), kind="COLUMN")
275            alter_table.set("actions", [drop_column])
276            return alter_table
277        else:
278            raise ValueError(f"Unknown operation {self.op}")
279
280
281class SchemaDiffer(PydanticModel):
282    """
283    Compares a source schema against a target schema and returns a list of alter statements to have the source
284    match the structure of target. Some engines have constraints on the types of operations that can be performed
285    therefore the final structure may not match the target exactly but it will be as close as possible. Two potential
286    differences that can happen:
287    1. Column order can be different if the engine doesn't support positional additions. Another reason for difference
288    is if a column is just moved since we don't currently support fixing moves.
289    2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested
290    operations. As a result historical data is lost.
291    3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible
292    change. As a result historical data is lost.
293
294    Potential future improvements:
295    1. Support precision changes on columns like VARCHAR and DECIMAL. Each engine has different rules on what is allowed
296    2. Support column moves. Databricks Delta supports moves and would allow exact matches.
297    """
298
299    support_positional_add: bool = False
300    support_nested_operations: bool = False
301    array_element_selector: str = ""
302    compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
303    support_coercing_compatible_types: bool = False
304
305    _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
306
307    @property
308    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
309        if not self._coerceable_types:
310            if not self.support_coercing_compatible_types or not self.compatible_types:
311                return {}
312            coerceable_types = defaultdict(set)
313            for source_type, target_types in self.compatible_types.items():
314                for target_type in target_types:
315                    coerceable_types[target_type].add(source_type)
316            self._coerceable_types = coerceable_types
317        return self._coerceable_types
318
319    def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
320        if current_type == new_type:
321            return True
322        if current_type in self.compatible_types:
323            return new_type in self.compatible_types[current_type]
324        return False
325
326    def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
327        if not self.support_coercing_compatible_types:
328            return False
329        if current_type in self.coerceable_types:
330            is_coerceable = new_type in self.coerceable_types[current_type]
331            if is_coerceable:
332                logger.warning(
333                    f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning.",
334                )
335            return is_coerceable
336        return False
337
338    def _get_matching_kwarg(
339        self,
340        current_kwarg: t.Union[str, exp.ColumnDef],
341        new_struct: exp.DataType,
342        current_pos: int,
343    ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]:
344        current_name = (
345            exp.to_identifier(current_kwarg)
346            if isinstance(current_kwarg, str)
347            else _get_name_and_type(current_kwarg)[0]
348        )
349        # First check if we have the same column in the same position to get O(1) complexity
350        new_kwarg = seq_get(new_struct.expressions, current_pos)
351        if new_kwarg:
352            new_name, new_type = _get_name_and_type(new_kwarg)
353            if current_name.this == new_name.this:
354                return current_pos, new_kwarg
355        # If not, check if we have the same column in all positions with O(n) complexity
356        for i, new_kwarg in enumerate(new_struct.expressions):
357            new_name, new_type = _get_name_and_type(new_kwarg)
358            if current_name.this == new_name.this:
359                return i, new_kwarg
360        return None, None
361
362    def _drop_operation(
363        self,
364        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
365        struct: exp.DataType,
366        pos: int,
367        root_struct: exp.DataType,
368    ) -> t.List[TableAlterOperation]:
369        columns = ensure_list(columns)
370        operations = []
371        column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos)
372        assert column_pos is not None
373        assert column_kwarg
374        struct.expressions.pop(column_pos)
375        operations.append(
376            TableAlterOperation.drop(columns, root_struct.copy(), column_kwarg.args["kind"])
377        )
378        return operations
379
380    def _resolve_drop_operation(
381        self,
382        parent_columns: t.List[TableAlterColumn],
383        current_struct: exp.DataType,
384        new_struct: exp.DataType,
385        root_struct: exp.DataType,
386    ) -> t.List[TableAlterOperation]:
387        operations = []
388        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
389            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
390            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
391            if new_pos is None:
392                operations.extend(
393                    self._drop_operation(columns, current_struct, current_pos, root_struct)
394                )
395        return operations
396
397    def _add_operation(
398        self,
399        columns: t.List[TableAlterColumn],
400        new_pos: int,
401        new_kwarg: exp.ColumnDef,
402        current_struct: exp.DataType,
403        root_struct: exp.DataType,
404    ) -> t.List[TableAlterOperation]:
405        if self.support_positional_add:
406            col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions)
407            current_struct.expressions.insert(new_pos, new_kwarg)
408        else:
409            col_pos = None
410            current_struct.expressions.append(new_kwarg)
411        return [
412            TableAlterOperation.add(
413                columns,
414                new_kwarg.args["kind"],
415                root_struct.copy(),
416                col_pos,
417            )
418        ]
419
420    def _resolve_add_operations(
421        self,
422        parent_columns: t.List[TableAlterColumn],
423        current_struct: exp.DataType,
424        new_struct: exp.DataType,
425        root_struct: exp.DataType,
426    ) -> t.List[TableAlterOperation]:
427        operations = []
428        for new_pos, new_kwarg in enumerate(new_struct.expressions):
429            possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos)
430            if possible_current_pos is None:
431                columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)]
432                operations.extend(
433                    self._add_operation(columns, new_pos, new_kwarg, current_struct, root_struct)
434                )
435        return operations
436
437    def _alter_operation(
438        self,
439        columns: t.List[TableAlterColumn],
440        pos: int,
441        struct: exp.DataType,
442        new_type: exp.DataType,
443        current_type: t.Union[str, exp.DataType],
444        root_struct: exp.DataType,
445        new_kwarg: exp.ColumnDef,
446    ) -> t.List[TableAlterOperation]:
447        # We don't copy on purpose here because current_type may need to be mutated inside
448        # _get_operations (struct.expressions.pop and struct.expressions.insert)
449        current_type = exp.DataType.build(current_type, copy=False)
450        if self.support_nested_operations:
451            if new_type.this == current_type.this == exp.DataType.Type.STRUCT:
452                return self._get_operations(
453                    columns,
454                    current_type,
455                    new_type,
456                    root_struct,
457                )
458            if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
459                new_array_type = new_type.expressions[0]
460                current_array_type = current_type.expressions[0]
461                if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT:
462                    return self._get_operations(
463                        columns,
464                        current_array_type,
465                        new_array_type,
466                        root_struct,
467                    )
468        if self._is_coerceable_type(current_type, new_type):
469            return []
470        elif self._is_compatible_type(current_type, new_type):
471            struct.expressions.pop(pos)
472            struct.expressions.insert(pos, new_kwarg)
473            col_pos = (
474                TableAlterColumnPosition.create(pos, struct.expressions, replacing_col=True)
475                if self.support_positional_add
476                else None
477            )
478            return [
479                TableAlterOperation.alter_type(
480                    columns,
481                    new_type,
482                    current_type,
483                    root_struct.copy(),
484                    col_pos,
485                )
486            ]
487        else:
488            return self._drop_operation(
489                columns, root_struct, pos, root_struct
490            ) + self._add_operation(columns, pos, new_kwarg, struct, root_struct)
491
492    def _resolve_alter_operations(
493        self,
494        parent_columns: t.List[TableAlterColumn],
495        current_struct: exp.DataType,
496        new_struct: exp.DataType,
497        root_struct: exp.DataType,
498    ) -> t.List[TableAlterOperation]:
499        operations = []
500        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
501            _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
502            assert new_kwarg
503            _, new_type = _get_name_and_type(new_kwarg)
504            _, current_type = _get_name_and_type(current_kwarg)
505            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
506            if new_type == current_type:
507                continue
508            operations.extend(
509                self._alter_operation(
510                    columns,
511                    current_pos,
512                    current_struct,
513                    new_type,
514                    current_type,
515                    root_struct,
516                    new_kwarg,
517                )
518            )
519        return operations
520
521    def _get_operations(
522        self,
523        parent_columns: t.List[TableAlterColumn],
524        current_struct: exp.DataType,
525        new_struct: exp.DataType,
526        root_struct: exp.DataType,
527    ) -> t.List[TableAlterOperation]:
528        root_struct = root_struct or current_struct
529        parent_columns = parent_columns or []
530        operations = []
531        operations.extend(
532            self._resolve_drop_operation(parent_columns, current_struct, new_struct, root_struct)
533        )
534        operations.extend(
535            self._resolve_add_operations(parent_columns, current_struct, new_struct, root_struct)
536        )
537        operations.extend(
538            self._resolve_alter_operations(parent_columns, current_struct, new_struct, root_struct)
539        )
540        return operations
541
542    def _from_structs(
543        self, current_struct: exp.DataType, new_struct: exp.DataType
544    ) -> t.List[TableAlterOperation]:
545        return self._get_operations([], current_struct, new_struct, current_struct)
546
547    def compare_structs(
548        self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType
549    ) -> t.List[exp.AlterTable]:
550        """
551        Compares two schemas represented as structs.
552
553        Args:
554            current: The current schema.
555            new: The new schema.
556
557        Returns:
558            The list of table alter operations.
559        """
560        return [
561            op.expression(table_name, self.array_element_selector)
562            for op in self._from_structs(current, new)
563        ]
564
565    def compare_columns(
566        self,
567        table_name: TableName,
568        current: t.Dict[str, exp.DataType],
569        new: t.Dict[str, exp.DataType],
570    ) -> t.List[exp.AlterTable]:
571        """
572        Compares two schemas represented as dictionaries of column names and types.
573
574        Args:
575            current: The current schema.
576            new: The new schema.
577
578        Returns:
579            The list of schema deltas.
580        """
581        return self.compare_structs(
582            table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new)
583        )
584
585
586def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]:
587    return struct.this, struct.args["kind"]
class TableAlterOperationType(enum.Enum):
21class TableAlterOperationType(Enum):
22    ADD = auto()
23    DROP = auto()
24    ALTER_TYPE = auto()
25
26    @property
27    def is_add(self) -> bool:
28        return self == TableAlterOperationType.ADD
29
30    @property
31    def is_drop(self) -> bool:
32        return self == TableAlterOperationType.DROP
33
34    @property
35    def is_alter_type(self) -> bool:
36        return self == TableAlterOperationType.ALTER_TYPE

An enumeration.

Inherited Members
enum.Enum
name
value
class TableAlterColumn(sqlmesh.utils.pydantic.PydanticModel):
 39class TableAlterColumn(PydanticModel):
 40    name: str
 41    is_struct: bool
 42    is_array_of_struct: bool
 43    is_array_of_primitive: bool
 44    quoted: bool = False
 45
 46    @classmethod
 47    def primitive(self, name: str, quoted: bool = False) -> TableAlterColumn:
 48        return self(
 49            name=name,
 50            is_struct=False,
 51            is_array_of_struct=False,
 52            is_array_of_primitive=False,
 53            quoted=quoted,
 54        )
 55
 56    @classmethod
 57    def struct(self, name: str, quoted: bool = False) -> TableAlterColumn:
 58        return self(
 59            name=name,
 60            is_struct=True,
 61            is_array_of_struct=False,
 62            is_array_of_primitive=False,
 63            quoted=quoted,
 64        )
 65
 66    @classmethod
 67    def array_of_struct(self, name: str, quoted: bool = False) -> TableAlterColumn:
 68        return self(
 69            name=name,
 70            is_struct=False,
 71            is_array_of_struct=True,
 72            is_array_of_primitive=False,
 73            quoted=quoted,
 74        )
 75
 76    @classmethod
 77    def array_of_primitive(self, name: str, quoted: bool = False) -> TableAlterColumn:
 78        return self(
 79            name=name,
 80            is_struct=False,
 81            is_array_of_struct=False,
 82            is_array_of_primitive=True,
 83            quoted=quoted,
 84        )
 85
 86    @classmethod
 87    def from_struct_kwarg(self, struct: exp.ColumnDef) -> TableAlterColumn:
 88        name = struct.alias_or_name
 89        quoted = struct.this.quoted
 90        kwarg_type = struct.args["kind"]
 91
 92        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
 93            return self.struct(name, quoted=quoted)
 94        elif kwarg_type.is_type(exp.DataType.Type.ARRAY):
 95            if kwarg_type.expressions[0].is_type(exp.DataType.Type.STRUCT):
 96                return self.array_of_struct(name, quoted=quoted)
 97            else:
 98                return self.array_of_primitive(name, quoted=quoted)
 99        else:
100            return self.primitive(name, quoted=quoted)
101
102    @property
103    def is_array(self) -> bool:
104        return self.is_array_of_struct or self.is_array_of_primitive
105
106    @property
107    def is_primitive(self) -> bool:
108        return not self.is_struct and not self.is_array
109
110    @property
111    def is_nested(self) -> bool:
112        return not self.is_primitive
113
114    @property
115    def identifier(self) -> exp.Identifier:
116        return exp.to_identifier(self.name, quoted=self.quoted)

Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of classvars defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The signature for instantiating the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a RootModel.
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_extra__: An instance attribute with the values of extra fields from validation when model_config['extra'] == 'allow'.
  • __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
  • __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
@classmethod
def primitive( self, name: str, quoted: bool = False) -> sqlmesh.core.schema_diff.TableAlterColumn:
46    @classmethod
47    def primitive(self, name: str, quoted: bool = False) -> TableAlterColumn:
48        return self(
49            name=name,
50            is_struct=False,
51            is_array_of_struct=False,
52            is_array_of_primitive=False,
53            quoted=quoted,
54        )
@classmethod
def struct( self, name: str, quoted: bool = False) -> sqlmesh.core.schema_diff.TableAlterColumn:
56    @classmethod
57    def struct(self, name: str, quoted: bool = False) -> TableAlterColumn:
58        return self(
59            name=name,
60            is_struct=True,
61            is_array_of_struct=False,
62            is_array_of_primitive=False,
63            quoted=quoted,
64        )
@classmethod
def array_of_struct( self, name: str, quoted: bool = False) -> sqlmesh.core.schema_diff.TableAlterColumn:
66    @classmethod
67    def array_of_struct(self, name: str, quoted: bool = False) -> TableAlterColumn:
68        return self(
69            name=name,
70            is_struct=False,
71            is_array_of_struct=True,
72            is_array_of_primitive=False,
73            quoted=quoted,
74        )
@classmethod
def array_of_primitive( self, name: str, quoted: bool = False) -> sqlmesh.core.schema_diff.TableAlterColumn:
76    @classmethod
77    def array_of_primitive(self, name: str, quoted: bool = False) -> TableAlterColumn:
78        return self(
79            name=name,
80            is_struct=False,
81            is_array_of_struct=False,
82            is_array_of_primitive=True,
83            quoted=quoted,
84        )
@classmethod
def from_struct_kwarg( self, struct: sqlglot.expressions.ColumnDef) -> sqlmesh.core.schema_diff.TableAlterColumn:
 86    @classmethod
 87    def from_struct_kwarg(self, struct: exp.ColumnDef) -> TableAlterColumn:
 88        name = struct.alias_or_name
 89        quoted = struct.this.quoted
 90        kwarg_type = struct.args["kind"]
 91
 92        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
 93            return self.struct(name, quoted=quoted)
 94        elif kwarg_type.is_type(exp.DataType.Type.ARRAY):
 95            if kwarg_type.expressions[0].is_type(exp.DataType.Type.STRUCT):
 96                return self.array_of_struct(name, quoted=quoted)
 97            else:
 98                return self.array_of_primitive(name, quoted=quoted)
 99        else:
100            return self.primitive(name, quoted=quoted)
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class TableAlterColumnPosition(sqlmesh.utils.pydantic.PydanticModel):
119class TableAlterColumnPosition(PydanticModel):
120    is_first: bool
121    is_last: bool
122    after: t.Optional[exp.Identifier] = None
123
124    @classmethod
125    def first(self) -> TableAlterColumnPosition:
126        return self(is_first=True, is_last=False, after=None)
127
128    @classmethod
129    def last(
130        self, after: t.Optional[t.Union[str, exp.Identifier]] = None
131    ) -> TableAlterColumnPosition:
132        return self(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
133
134    @classmethod
135    def middle(self, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
136        return self(is_first=False, is_last=False, after=exp.to_identifier(after))
137
138    @classmethod
139    def create(
140        self,
141        pos: int,
142        current_kwargs: t.List[exp.ColumnDef],
143        replacing_col: bool = False,
144    ) -> TableAlterColumnPosition:
145        is_first = pos == 0
146        is_last = pos == len(current_kwargs) - int(replacing_col)
147        after = None
148        if not is_first:
149            prior_kwarg = current_kwargs[pos - 1]
150            after, _ = _get_name_and_type(prior_kwarg)
151        return self(is_first=is_first, is_last=is_last, after=after)
152
153    @property
154    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
155        column = self.after if not self.is_last else None
156        position = None
157        if self.is_first:
158            position = "FIRST"
159        elif column and not self.is_last:
160            position = "AFTER"
161        return exp.ColumnPosition(this=column, position=position)

Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of classvars defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The signature for instantiating the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a RootModel.
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_extra__: An instance attribute with the values of extra fields from validation when model_config['extra'] == 'allow'.
  • __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
  • __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
@classmethod
def first(self) -> sqlmesh.core.schema_diff.TableAlterColumnPosition:
124    @classmethod
125    def first(self) -> TableAlterColumnPosition:
126        return self(is_first=True, is_last=False, after=None)
@classmethod
def last( self, after: Union[str, sqlglot.expressions.Identifier, NoneType] = None) -> sqlmesh.core.schema_diff.TableAlterColumnPosition:
128    @classmethod
129    def last(
130        self, after: t.Optional[t.Union[str, exp.Identifier]] = None
131    ) -> TableAlterColumnPosition:
132        return self(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
@classmethod
def middle( self, after: Union[str, sqlglot.expressions.Identifier]) -> sqlmesh.core.schema_diff.TableAlterColumnPosition:
134    @classmethod
135    def middle(self, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
136        return self(is_first=False, is_last=False, after=exp.to_identifier(after))
@classmethod
def create( self, pos: int, current_kwargs: List[sqlglot.expressions.ColumnDef], replacing_col: bool = False) -> sqlmesh.core.schema_diff.TableAlterColumnPosition:
138    @classmethod
139    def create(
140        self,
141        pos: int,
142        current_kwargs: t.List[exp.ColumnDef],
143        replacing_col: bool = False,
144    ) -> TableAlterColumnPosition:
145        is_first = pos == 0
146        is_last = pos == len(current_kwargs) - int(replacing_col)
147        after = None
148        if not is_first:
149            prior_kwarg = current_kwargs[pos - 1]
150            after, _ = _get_name_and_type(prior_kwarg)
151        return self(is_first=is_first, is_last=is_last, after=after)
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class TableAlterOperation(sqlmesh.utils.pydantic.PydanticModel):
164class TableAlterOperation(PydanticModel):
165    op: TableAlterOperationType
166    columns: t.List[TableAlterColumn]
167    column_type: exp.DataType
168    expected_table_struct: exp.DataType
169    add_position: t.Optional[TableAlterColumnPosition] = None
170    current_type: t.Optional[exp.DataType] = None
171
172    @classmethod
173    def add(
174        self,
175        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
176        column_type: t.Union[str, exp.DataType],
177        expected_table_struct: t.Union[str, exp.DataType],
178        position: t.Optional[TableAlterColumnPosition] = None,
179    ) -> TableAlterOperation:
180        return self(
181            op=TableAlterOperationType.ADD,
182            columns=ensure_list(columns),
183            column_type=exp.DataType.build(column_type),
184            add_position=position,
185            expected_table_struct=exp.DataType.build(expected_table_struct),
186        )
187
188    @classmethod
189    def drop(
190        self,
191        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
192        expected_table_struct: t.Union[str, exp.DataType],
193        column_type: t.Optional[t.Union[str, exp.DataType]] = None,
194    ) -> TableAlterOperation:
195        column_type = exp.DataType.build(column_type) if column_type else exp.DataType.build("INT")
196        return self(
197            op=TableAlterOperationType.DROP,
198            columns=ensure_list(columns),
199            column_type=column_type,
200            expected_table_struct=exp.DataType.build(expected_table_struct),
201        )
202
203    @classmethod
204    def alter_type(
205        self,
206        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
207        column_type: t.Union[str, exp.DataType],
208        current_type: t.Union[str, exp.DataType],
209        expected_table_struct: t.Union[str, exp.DataType],
210        position: t.Optional[TableAlterColumnPosition] = None,
211    ) -> TableAlterOperation:
212        return self(
213            op=TableAlterOperationType.ALTER_TYPE,
214            columns=ensure_list(columns),
215            column_type=exp.DataType.build(column_type),
216            add_position=position,
217            current_type=exp.DataType.build(current_type),
218            expected_table_struct=exp.DataType.build(expected_table_struct),
219        )
220
221    @property
222    def is_add(self) -> bool:
223        return self.op.is_add
224
225    @property
226    def is_drop(self) -> bool:
227        return self.op.is_drop
228
229    @property
230    def is_alter_type(self) -> bool:
231        return self.op.is_alter_type
232
233    def column_identifiers(self, array_element_selector: str) -> t.List[exp.Identifier]:
234        results = []
235        for column in self.columns:
236            results.append(column.identifier)
237            if column.is_array_of_struct and len(self.columns) > 1 and array_element_selector:
238                results.append(exp.to_identifier(array_element_selector))
239        return results
240
241    def column(self, array_element_selector: str) -> t.Union[exp.Dot, exp.Identifier]:
242        columns = self.column_identifiers(array_element_selector)
243        if len(columns) == 1:
244            return columns[0]
245        return exp.Dot.build(columns)
246
247    def column_def(self, array_element_selector: str) -> exp.ColumnDef:
248        return exp.ColumnDef(
249            this=self.column(array_element_selector),
250            kind=self.column_type,
251        )
252
253    def expression(
254        self, table_name: t.Union[str, exp.Table], array_element_selector: str
255    ) -> exp.AlterTable:
256        if self.is_alter_type:
257            return exp.AlterTable(
258                this=exp.to_table(table_name),
259                actions=[
260                    exp.AlterColumn(
261                        this=self.column(array_element_selector),
262                        dtype=self.column_type,
263                    )
264                ],
265            )
266        elif self.is_add:
267            alter_table = exp.AlterTable(this=exp.to_table(table_name))
268            column = self.column_def(array_element_selector)
269            alter_table.set("actions", [column])
270            if self.add_position:
271                column.set("position", self.add_position.column_position_node)
272            return alter_table
273        elif self.is_drop:
274            alter_table = exp.AlterTable(this=exp.to_table(table_name))
275            drop_column = exp.Drop(this=self.column(array_element_selector), kind="COLUMN")
276            alter_table.set("actions", [drop_column])
277            return alter_table
278        else:
279            raise ValueError(f"Unknown operation {self.op}")

Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of classvars defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The signature for instantiating the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a RootModel.
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_extra__: An instance attribute with the values of extra fields from validation when model_config['extra'] == 'allow'.
  • __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
  • __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
@classmethod
def add( self, columns: Union[sqlmesh.core.schema_diff.TableAlterColumn, List[sqlmesh.core.schema_diff.TableAlterColumn]], column_type: Union[str, sqlglot.expressions.DataType], expected_table_struct: Union[str, sqlglot.expressions.DataType], position: Union[sqlmesh.core.schema_diff.TableAlterColumnPosition, NoneType] = None) -> sqlmesh.core.schema_diff.TableAlterOperation:
172    @classmethod
173    def add(
174        self,
175        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
176        column_type: t.Union[str, exp.DataType],
177        expected_table_struct: t.Union[str, exp.DataType],
178        position: t.Optional[TableAlterColumnPosition] = None,
179    ) -> TableAlterOperation:
180        return self(
181            op=TableAlterOperationType.ADD,
182            columns=ensure_list(columns),
183            column_type=exp.DataType.build(column_type),
184            add_position=position,
185            expected_table_struct=exp.DataType.build(expected_table_struct),
186        )
@classmethod
def drop( self, columns: Union[sqlmesh.core.schema_diff.TableAlterColumn, List[sqlmesh.core.schema_diff.TableAlterColumn]], expected_table_struct: Union[str, sqlglot.expressions.DataType], column_type: Union[str, sqlglot.expressions.DataType, NoneType] = None) -> sqlmesh.core.schema_diff.TableAlterOperation:
188    @classmethod
189    def drop(
190        self,
191        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
192        expected_table_struct: t.Union[str, exp.DataType],
193        column_type: t.Optional[t.Union[str, exp.DataType]] = None,
194    ) -> TableAlterOperation:
195        column_type = exp.DataType.build(column_type) if column_type else exp.DataType.build("INT")
196        return self(
197            op=TableAlterOperationType.DROP,
198            columns=ensure_list(columns),
199            column_type=column_type,
200            expected_table_struct=exp.DataType.build(expected_table_struct),
201        )
@classmethod
def alter_type( self, columns: Union[sqlmesh.core.schema_diff.TableAlterColumn, List[sqlmesh.core.schema_diff.TableAlterColumn]], column_type: Union[str, sqlglot.expressions.DataType], current_type: Union[str, sqlglot.expressions.DataType], expected_table_struct: Union[str, sqlglot.expressions.DataType], position: Union[sqlmesh.core.schema_diff.TableAlterColumnPosition, NoneType] = None) -> sqlmesh.core.schema_diff.TableAlterOperation:
203    @classmethod
204    def alter_type(
205        self,
206        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
207        column_type: t.Union[str, exp.DataType],
208        current_type: t.Union[str, exp.DataType],
209        expected_table_struct: t.Union[str, exp.DataType],
210        position: t.Optional[TableAlterColumnPosition] = None,
211    ) -> TableAlterOperation:
212        return self(
213            op=TableAlterOperationType.ALTER_TYPE,
214            columns=ensure_list(columns),
215            column_type=exp.DataType.build(column_type),
216            add_position=position,
217            current_type=exp.DataType.build(current_type),
218            expected_table_struct=exp.DataType.build(expected_table_struct),
219        )
def column_identifiers( self, array_element_selector: str) -> List[sqlglot.expressions.Identifier]:
233    def column_identifiers(self, array_element_selector: str) -> t.List[exp.Identifier]:
234        results = []
235        for column in self.columns:
236            results.append(column.identifier)
237            if column.is_array_of_struct and len(self.columns) > 1 and array_element_selector:
238                results.append(exp.to_identifier(array_element_selector))
239        return results
def column( self, array_element_selector: str) -> Union[sqlglot.expressions.Dot, sqlglot.expressions.Identifier]:
241    def column(self, array_element_selector: str) -> t.Union[exp.Dot, exp.Identifier]:
242        columns = self.column_identifiers(array_element_selector)
243        if len(columns) == 1:
244            return columns[0]
245        return exp.Dot.build(columns)
def column_def(self, array_element_selector: str) -> sqlglot.expressions.ColumnDef:
247    def column_def(self, array_element_selector: str) -> exp.ColumnDef:
248        return exp.ColumnDef(
249            this=self.column(array_element_selector),
250            kind=self.column_type,
251        )
def expression( self, table_name: Union[str, sqlglot.expressions.Table], array_element_selector: str) -> sqlglot.expressions.AlterTable:
253    def expression(
254        self, table_name: t.Union[str, exp.Table], array_element_selector: str
255    ) -> exp.AlterTable:
256        if self.is_alter_type:
257            return exp.AlterTable(
258                this=exp.to_table(table_name),
259                actions=[
260                    exp.AlterColumn(
261                        this=self.column(array_element_selector),
262                        dtype=self.column_type,
263                    )
264                ],
265            )
266        elif self.is_add:
267            alter_table = exp.AlterTable(this=exp.to_table(table_name))
268            column = self.column_def(array_element_selector)
269            alter_table.set("actions", [column])
270            if self.add_position:
271                column.set("position", self.add_position.column_position_node)
272            return alter_table
273        elif self.is_drop:
274            alter_table = exp.AlterTable(this=exp.to_table(table_name))
275            drop_column = exp.Drop(this=self.column(array_element_selector), kind="COLUMN")
276            alter_table.set("actions", [drop_column])
277            return alter_table
278        else:
279            raise ValueError(f"Unknown operation {self.op}")
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class SchemaDiffer(sqlmesh.utils.pydantic.PydanticModel):
282class SchemaDiffer(PydanticModel):
283    """
284    Compares a source schema against a target schema and returns a list of alter statements to have the source
285    match the structure of target. Some engines have constraints on the types of operations that can be performed
286    therefore the final structure may not match the target exactly but it will be as close as possible. Two potential
287    differences that can happen:
288    1. Column order can be different if the engine doesn't support positional additions. Another reason for difference
289    is if a column is just moved since we don't currently support fixing moves.
290    2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested
291    operations. As a result historical data is lost.
292    3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible
293    change. As a result historical data is lost.
294
295    Potential future improvements:
296    1. Support precision changes on columns like VARCHAR and DECIMAL. Each engine has different rules on what is allowed
297    2. Support column moves. Databricks Delta supports moves and would allow exact matches.
298    """
299
300    support_positional_add: bool = False
301    support_nested_operations: bool = False
302    array_element_selector: str = ""
303    compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
304    support_coercing_compatible_types: bool = False
305
306    _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
307
308    @property
309    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
310        if not self._coerceable_types:
311            if not self.support_coercing_compatible_types or not self.compatible_types:
312                return {}
313            coerceable_types = defaultdict(set)
314            for source_type, target_types in self.compatible_types.items():
315                for target_type in target_types:
316                    coerceable_types[target_type].add(source_type)
317            self._coerceable_types = coerceable_types
318        return self._coerceable_types
319
320    def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
321        if current_type == new_type:
322            return True
323        if current_type in self.compatible_types:
324            return new_type in self.compatible_types[current_type]
325        return False
326
327    def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
328        if not self.support_coercing_compatible_types:
329            return False
330        if current_type in self.coerceable_types:
331            is_coerceable = new_type in self.coerceable_types[current_type]
332            if is_coerceable:
333                logger.warning(
334                    f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning.",
335                )
336            return is_coerceable
337        return False
338
339    def _get_matching_kwarg(
340        self,
341        current_kwarg: t.Union[str, exp.ColumnDef],
342        new_struct: exp.DataType,
343        current_pos: int,
344    ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]:
345        current_name = (
346            exp.to_identifier(current_kwarg)
347            if isinstance(current_kwarg, str)
348            else _get_name_and_type(current_kwarg)[0]
349        )
350        # First check if we have the same column in the same position to get O(1) complexity
351        new_kwarg = seq_get(new_struct.expressions, current_pos)
352        if new_kwarg:
353            new_name, new_type = _get_name_and_type(new_kwarg)
354            if current_name.this == new_name.this:
355                return current_pos, new_kwarg
356        # If not, check if we have the same column in all positions with O(n) complexity
357        for i, new_kwarg in enumerate(new_struct.expressions):
358            new_name, new_type = _get_name_and_type(new_kwarg)
359            if current_name.this == new_name.this:
360                return i, new_kwarg
361        return None, None
362
363    def _drop_operation(
364        self,
365        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
366        struct: exp.DataType,
367        pos: int,
368        root_struct: exp.DataType,
369    ) -> t.List[TableAlterOperation]:
370        columns = ensure_list(columns)
371        operations = []
372        column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos)
373        assert column_pos is not None
374        assert column_kwarg
375        struct.expressions.pop(column_pos)
376        operations.append(
377            TableAlterOperation.drop(columns, root_struct.copy(), column_kwarg.args["kind"])
378        )
379        return operations
380
381    def _resolve_drop_operation(
382        self,
383        parent_columns: t.List[TableAlterColumn],
384        current_struct: exp.DataType,
385        new_struct: exp.DataType,
386        root_struct: exp.DataType,
387    ) -> t.List[TableAlterOperation]:
388        operations = []
389        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
390            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
391            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
392            if new_pos is None:
393                operations.extend(
394                    self._drop_operation(columns, current_struct, current_pos, root_struct)
395                )
396        return operations
397
398    def _add_operation(
399        self,
400        columns: t.List[TableAlterColumn],
401        new_pos: int,
402        new_kwarg: exp.ColumnDef,
403        current_struct: exp.DataType,
404        root_struct: exp.DataType,
405    ) -> t.List[TableAlterOperation]:
406        if self.support_positional_add:
407            col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions)
408            current_struct.expressions.insert(new_pos, new_kwarg)
409        else:
410            col_pos = None
411            current_struct.expressions.append(new_kwarg)
412        return [
413            TableAlterOperation.add(
414                columns,
415                new_kwarg.args["kind"],
416                root_struct.copy(),
417                col_pos,
418            )
419        ]
420
421    def _resolve_add_operations(
422        self,
423        parent_columns: t.List[TableAlterColumn],
424        current_struct: exp.DataType,
425        new_struct: exp.DataType,
426        root_struct: exp.DataType,
427    ) -> t.List[TableAlterOperation]:
428        operations = []
429        for new_pos, new_kwarg in enumerate(new_struct.expressions):
430            possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos)
431            if possible_current_pos is None:
432                columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)]
433                operations.extend(
434                    self._add_operation(columns, new_pos, new_kwarg, current_struct, root_struct)
435                )
436        return operations
437
438    def _alter_operation(
439        self,
440        columns: t.List[TableAlterColumn],
441        pos: int,
442        struct: exp.DataType,
443        new_type: exp.DataType,
444        current_type: t.Union[str, exp.DataType],
445        root_struct: exp.DataType,
446        new_kwarg: exp.ColumnDef,
447    ) -> t.List[TableAlterOperation]:
448        # We don't copy on purpose here because current_type may need to be mutated inside
449        # _get_operations (struct.expressions.pop and struct.expressions.insert)
450        current_type = exp.DataType.build(current_type, copy=False)
451        if self.support_nested_operations:
452            if new_type.this == current_type.this == exp.DataType.Type.STRUCT:
453                return self._get_operations(
454                    columns,
455                    current_type,
456                    new_type,
457                    root_struct,
458                )
459            if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
460                new_array_type = new_type.expressions[0]
461                current_array_type = current_type.expressions[0]
462                if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT:
463                    return self._get_operations(
464                        columns,
465                        current_array_type,
466                        new_array_type,
467                        root_struct,
468                    )
469        if self._is_coerceable_type(current_type, new_type):
470            return []
471        elif self._is_compatible_type(current_type, new_type):
472            struct.expressions.pop(pos)
473            struct.expressions.insert(pos, new_kwarg)
474            col_pos = (
475                TableAlterColumnPosition.create(pos, struct.expressions, replacing_col=True)
476                if self.support_positional_add
477                else None
478            )
479            return [
480                TableAlterOperation.alter_type(
481                    columns,
482                    new_type,
483                    current_type,
484                    root_struct.copy(),
485                    col_pos,
486                )
487            ]
488        else:
489            return self._drop_operation(
490                columns, root_struct, pos, root_struct
491            ) + self._add_operation(columns, pos, new_kwarg, struct, root_struct)
492
493    def _resolve_alter_operations(
494        self,
495        parent_columns: t.List[TableAlterColumn],
496        current_struct: exp.DataType,
497        new_struct: exp.DataType,
498        root_struct: exp.DataType,
499    ) -> t.List[TableAlterOperation]:
500        operations = []
501        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
502            _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
503            assert new_kwarg
504            _, new_type = _get_name_and_type(new_kwarg)
505            _, current_type = _get_name_and_type(current_kwarg)
506            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
507            if new_type == current_type:
508                continue
509            operations.extend(
510                self._alter_operation(
511                    columns,
512                    current_pos,
513                    current_struct,
514                    new_type,
515                    current_type,
516                    root_struct,
517                    new_kwarg,
518                )
519            )
520        return operations
521
522    def _get_operations(
523        self,
524        parent_columns: t.List[TableAlterColumn],
525        current_struct: exp.DataType,
526        new_struct: exp.DataType,
527        root_struct: exp.DataType,
528    ) -> t.List[TableAlterOperation]:
529        root_struct = root_struct or current_struct
530        parent_columns = parent_columns or []
531        operations = []
532        operations.extend(
533            self._resolve_drop_operation(parent_columns, current_struct, new_struct, root_struct)
534        )
535        operations.extend(
536            self._resolve_add_operations(parent_columns, current_struct, new_struct, root_struct)
537        )
538        operations.extend(
539            self._resolve_alter_operations(parent_columns, current_struct, new_struct, root_struct)
540        )
541        return operations
542
543    def _from_structs(
544        self, current_struct: exp.DataType, new_struct: exp.DataType
545    ) -> t.List[TableAlterOperation]:
546        return self._get_operations([], current_struct, new_struct, current_struct)
547
548    def compare_structs(
549        self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType
550    ) -> t.List[exp.AlterTable]:
551        """
552        Compares two schemas represented as structs.
553
554        Args:
555            current: The current schema.
556            new: The new schema.
557
558        Returns:
559            The list of table alter operations.
560        """
561        return [
562            op.expression(table_name, self.array_element_selector)
563            for op in self._from_structs(current, new)
564        ]
565
566    def compare_columns(
567        self,
568        table_name: TableName,
569        current: t.Dict[str, exp.DataType],
570        new: t.Dict[str, exp.DataType],
571    ) -> t.List[exp.AlterTable]:
572        """
573        Compares two schemas represented as dictionaries of column names and types.
574
575        Args:
576            current: The current schema.
577            new: The new schema.
578
579        Returns:
580            The list of schema deltas.
581        """
582        return self.compare_structs(
583            table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new)
584        )

Compares a source schema against a target schema and returns a list of alter statements to have the source match the structure of target. Some engines have constraints on the types of operations that can be performed therefore the final structure may not match the target exactly but it will be as close as possible. Two potential differences that can happen:

  1. Column order can be different if the engine doesn't support positional additions. Another reason for difference is if a column is just moved since we don't currently support fixing moves.
  2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested operations. As a result historical data is lost.
  3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible change. As a result historical data is lost.

Potential future improvements:

  1. Support precision changes on columns like VARCHAR and DECIMAL. Each engine has different rules on what is allowed
  2. Support column moves. Databricks Delta supports moves and would allow exact matches.
def compare_structs( self, table_name: Union[str, sqlglot.expressions.Table], current: sqlglot.expressions.DataType, new: sqlglot.expressions.DataType) -> List[sqlglot.expressions.AlterTable]:
548    def compare_structs(
549        self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType
550    ) -> t.List[exp.AlterTable]:
551        """
552        Compares two schemas represented as structs.
553
554        Args:
555            current: The current schema.
556            new: The new schema.
557
558        Returns:
559            The list of table alter operations.
560        """
561        return [
562            op.expression(table_name, self.array_element_selector)
563            for op in self._from_structs(current, new)
564        ]

Compares two schemas represented as structs.

Arguments:
  • current: The current schema.
  • new: The new schema.
Returns:

The list of table alter operations.

def compare_columns( self, table_name: <MagicMock id='140024777878880'>, current: Dict[str, sqlglot.expressions.DataType], new: Dict[str, sqlglot.expressions.DataType]) -> List[sqlglot.expressions.AlterTable]:
566    def compare_columns(
567        self,
568        table_name: TableName,
569        current: t.Dict[str, exp.DataType],
570        new: t.Dict[str, exp.DataType],
571    ) -> t.List[exp.AlterTable]:
572        """
573        Compares two schemas represented as dictionaries of column names and types.
574
575        Args:
576            current: The current schema.
577            new: The new schema.
578
579        Returns:
580            The list of schema deltas.
581        """
582        return self.compare_structs(
583            table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new)
584        )

Compares two schemas represented as dictionaries of column names and types.

Arguments:
  • current: The current schema.
  • new: The new schema.
Returns:

The list of schema deltas.

def model_post_init(self: pydantic.main.BaseModel, _ModelMetaclass__context: Any) -> None:
102                    def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
103                        """We need to both initialize private attributes and call the user-defined model_post_init
104                        method.
105                        """
106                        init_private_attributes(self, __context)
107                        original_model_post_init(self, __context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields