Edit on GitHub

sqlmesh.core.schema_diff

  1from __future__ import annotations
  2
  3import abc
  4import logging
  5import typing as t
  6from dataclasses import dataclass
  7from collections import defaultdict
  8from enum import Enum
  9
 10from pydantic import Field
 11from sqlglot import exp
 12from sqlglot.helper import ensure_list, seq_get
 13
 14from sqlmesh.utils import columns_to_types_to_struct
 15from sqlmesh.utils.pydantic import PydanticModel
 16from sqlmesh.utils.errors import SQLMeshError
 17
 18if t.TYPE_CHECKING:
 19    from sqlmesh.core._typing import TableName
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24@dataclass(frozen=True)
 25class TableAlterOperation(abc.ABC):
 26    target_table: exp.Table
 27
 28    @property
 29    @abc.abstractmethod
 30    def is_destructive(self) -> bool:
 31        pass
 32
 33    @property
 34    @abc.abstractmethod
 35    def is_additive(self) -> bool:
 36        pass
 37
 38    @property
 39    @abc.abstractmethod
 40    def _alter_actions(self) -> t.List[exp.Expression]:
 41        pass
 42
 43    @property
 44    def expression(self) -> exp.Alter:
 45        return exp.Alter(
 46            this=self.target_table,
 47            kind="TABLE",
 48            actions=self._alter_actions,
 49        )
 50
 51
 52@dataclass(frozen=True)
 53class TableAlterColumnOperation(TableAlterOperation, abc.ABC):
 54    column_parts: t.List[TableAlterColumn]
 55    expected_table_struct: exp.DataType
 56    array_element_selector: str
 57
 58    @property
 59    def column_identifiers(self) -> t.List[exp.Identifier]:
 60        results = []
 61        for column in self.column_parts:
 62            results.append(column.identifier)
 63            if (
 64                column.is_array_of_struct
 65                and len(self.column_parts) > 1
 66                and self.array_element_selector
 67            ):
 68                results.append(exp.to_identifier(self.array_element_selector))
 69        return results
 70
 71    @property
 72    def column(self) -> t.Union[exp.Dot, exp.Identifier]:
 73        columns = self.column_identifiers
 74        if len(columns) == 1:
 75            return columns[0]
 76        return exp.Dot.build(columns)
 77
 78
 79@dataclass(frozen=True)
 80class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC):
 81    column_type: exp.DataType
 82
 83    @property
 84    def column_def(self) -> exp.ColumnDef:
 85        if not self.column_type:
 86            raise SQLMeshError("Tried to access column type when it shouldn't be needed")
 87        return exp.ColumnDef(
 88            this=self.column,
 89            kind=self.column_type,
 90        )
 91
 92
 93@dataclass(frozen=True)
 94class TableAlterAddColumnOperation(TableAlterTypedColumnOperation):
 95    position: t.Optional[TableAlterColumnPosition] = None
 96    is_part_of_destructive_change: bool = False
 97
 98    @property
 99    def is_additive(self) -> bool:
100        return not self.is_part_of_destructive_change
101
102    @property
103    def is_destructive(self) -> bool:
104        return self.is_part_of_destructive_change
105
106    @property
107    def _alter_actions(self) -> t.List[exp.Expression]:
108        column_def = exp.ColumnDef(
109            this=self.column,
110            kind=self.column_type,
111        )
112        if self.position:
113            column_def.set("position", self.position.column_position_node)
114        return [column_def]
115
116
117@dataclass(frozen=True)
118class TableAlterDropColumnOperation(TableAlterColumnOperation):
119    cascade: bool = False
120
121    @property
122    def is_additive(self) -> bool:
123        return False
124
125    @property
126    def is_destructive(self) -> bool:
127        return True
128
129    @property
130    def _alter_actions(self) -> t.List[exp.Expression]:
131        return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)]
132
133
134@dataclass(frozen=True)
135class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation):
136    current_type: exp.DataType
137    is_part_of_destructive_change: bool = False
138
139    @property
140    def is_additive(self) -> bool:
141        return not self.is_part_of_destructive_change
142
143    @property
144    def is_destructive(self) -> bool:
145        return self.is_part_of_destructive_change
146
147    @property
148    def _alter_actions(self) -> t.List[exp.Expression]:
149        return [
150            exp.AlterColumn(
151                this=self.column,
152                dtype=self.column_type,
153            )
154        ]
155
156
157@dataclass(frozen=True)
158class TableAlterColumn:
159    name: str
160    is_struct: bool
161    is_array_of_struct: bool
162    is_array_of_primitive: bool
163    quoted: bool = False
164
165    @classmethod
166    def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
167        return cls(
168            name=name,
169            is_struct=False,
170            is_array_of_struct=False,
171            is_array_of_primitive=False,
172            quoted=quoted,
173        )
174
175    @classmethod
176    def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
177        return cls(
178            name=name,
179            is_struct=True,
180            is_array_of_struct=False,
181            is_array_of_primitive=False,
182            quoted=quoted,
183        )
184
185    @classmethod
186    def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
187        return cls(
188            name=name,
189            is_struct=False,
190            is_array_of_struct=True,
191            is_array_of_primitive=False,
192            quoted=quoted,
193        )
194
195    @classmethod
196    def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
197        return cls(
198            name=name,
199            is_struct=False,
200            is_array_of_struct=False,
201            is_array_of_primitive=True,
202            quoted=quoted,
203        )
204
205    @classmethod
206    def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn:
207        name = struct.alias_or_name
208        quoted = struct.this.quoted
209        kwarg_type = struct.args["kind"]
210
211        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
212            return cls.struct(name, quoted=quoted)
213        if kwarg_type.is_type(exp.DataType.Type.ARRAY):
214            if kwarg_type.expressions and kwarg_type.expressions[0].is_type(
215                exp.DataType.Type.STRUCT
216            ):
217                return cls.array_of_struct(name, quoted=quoted)
218            return cls.array_of_primitive(name, quoted=quoted)
219        return cls.primitive(name, quoted=quoted)
220
221    @property
222    def is_array(self) -> bool:
223        return self.is_array_of_struct or self.is_array_of_primitive
224
225    @property
226    def is_primitive(self) -> bool:
227        return not self.is_struct and not self.is_array
228
229    @property
230    def is_nested(self) -> bool:
231        return not self.is_primitive
232
233    @property
234    def identifier(self) -> exp.Identifier:
235        return exp.to_identifier(self.name, quoted=self.quoted)
236
237
238@dataclass(frozen=True)
239class TableAlterColumnPosition:
240    is_first: bool
241    is_last: bool
242    after: t.Optional[exp.Identifier] = None
243
244    @classmethod
245    def first(cls) -> TableAlterColumnPosition:
246        return cls(is_first=True, is_last=False, after=None)
247
248    @classmethod
249    def last(
250        cls, after: t.Optional[t.Union[str, exp.Identifier]] = None
251    ) -> TableAlterColumnPosition:
252        return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
253
254    @classmethod
255    def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
256        return cls(is_first=False, is_last=False, after=exp.to_identifier(after))
257
258    @classmethod
259    def create(
260        cls,
261        pos: int,
262        current_kwargs: t.List[exp.ColumnDef],
263        replacing_col: bool = False,
264    ) -> TableAlterColumnPosition:
265        is_first = pos == 0
266        is_last = pos == len(current_kwargs) - int(replacing_col)
267        after = None
268        if not is_first:
269            prior_kwarg = current_kwargs[pos - 1]
270            after, _ = _get_name_and_type(prior_kwarg)
271        return cls(is_first=is_first, is_last=is_last, after=after)
272
273    @property
274    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
275        column = self.after if not self.is_last else None
276        position = None
277        if self.is_first:
278            position = "FIRST"
279        elif column and not self.is_last:
280            position = "AFTER"
281        return exp.ColumnPosition(this=column, position=position)
282
283
284class NestedSupport(str, Enum):
285    # Supports all nested data type operations
286    ALL = "ALL"
287    # Does not support any nested data type operations
288    NONE = "NONE"
289    # Supports nested data type operations except for those that require dropping a nested field
290    ALL_BUT_DROP = "ALL_BUT_DROP"
291    # Ignores all nested data type operations
292    IGNORE = "IGNORE"
293
294    @property
295    def is_all(self) -> bool:
296        return self == NestedSupport.ALL
297
298    @property
299    def is_none(self) -> bool:
300        return self == NestedSupport.NONE
301
302    @property
303    def is_all_but_drop(self) -> bool:
304        return self == NestedSupport.ALL_BUT_DROP
305
306    @property
307    def is_ignore(self) -> bool:
308        return self == NestedSupport.IGNORE
309
310
311class SchemaDiffer(PydanticModel):
312    """
313    Compares a source schema against a target schema and returns a list of alter statements to have the source
314    match the structure of target. Some engines have constraints on the types of operations that can be performed
315    therefore the final structure may not match the target exactly but it will be as close as possible. Two potential
316    differences that can happen:
317    1. Column order can be different if the engine doesn't support positional additions. Another reason for difference
318    is if a column is just moved since we don't currently support fixing moves.
319    2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested
320    operations. As a result historical data is lost.
321    3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible
322    change. As a result historical data is lost.
323
324    Potential future improvements:
325    1. Support column moves. Databricks Delta supports moves and would allow exact matches.
326
327    Args:
328        support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a
329            specific position in the set of existing columns.
330        nested_support: How the engine for which the diff is being computed supports nested types.
331        compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data
332            type, and value is the set of types that are compatible with it.
333        coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without
334            altering the column type. NOTE: usually callers should not specify this attribute manually and set the
335            `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules.
336            For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted
337            into a FLOAT64 column just fine.
338        support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct
339            coercion of compatible types.
340        drop_cascade: Whether to add CASCADE modifier when dropping a column.
341        parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type,
342            but in the engine adapter specification we build it from the dialect string instead of specifying it directly.
343            Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT`
344            to which it parses. We do that because parameter default replacement will silently break if we specify type
345            directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default
346            values in a list, where the list index contains the remaining defaults given the number of parameter values
347            provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two
348            omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return
349            index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)".
350        max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`.
351        types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly
352            parameterized type can ALTER to its unlimited length version, along with different types in some engines.
353        treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it
354            concludes the change is compatible and won't result in data loss. If this flag is set to True, it will
355            flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't
356            be set outside of that context.
357    """
358
359    support_positional_add: bool = False
360    nested_support: NestedSupport = NestedSupport.NONE
361    array_element_selector: str = ""
362    compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
363    coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field(
364        default_factory=dict, alias="coerceable_types"
365    )
366    precision_increase_allowed_types: t.Optional[t.Set[exp.DataType.Type]] = None
367    support_coercing_compatible_types: bool = False
368    drop_cascade: bool = False
369    parameterized_type_defaults: t.Dict[
370        exp.DataType.Type, t.List[t.Tuple[t.Union[int, float], ...]]
371    ] = {}
372    max_parameter_length: t.Dict[exp.DataType.Type, t.Union[int, float]] = {}
373    types_with_unlimited_length: t.Dict[exp.DataType.Type, t.Set[exp.DataType.Type]] = {}
374    treat_alter_data_type_as_destructive: bool = False
375
376    _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
377
378    @property
379    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
380        if not self._coerceable_types:
381            if not self.support_coercing_compatible_types or not self.compatible_types:
382                return self.coerceable_types_
383            coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set)
384            coerceable_types.update(self.coerceable_types_)
385            for source_type, target_types in self.compatible_types.items():
386                for target_type in target_types:
387                    coerceable_types[target_type].add(source_type)
388            self._coerceable_types = coerceable_types
389        return self._coerceable_types
390
391    def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
392        # types are identical or both types are parameterized and new has higher precision
393        # - default parameter values are automatically provided if not present
394        if current_type == new_type or (
395            self._is_precision_increase_allowed(current_type)
396            and self._is_precision_increase(current_type, new_type)
397        ):
398            return True
399        # types are un-parameterized and compatible
400        if current_type in self.compatible_types:
401            return new_type in self.compatible_types[current_type]
402        # new type is un-parameterized and has unlimited length, current type is compatible
403        if not new_type.expressions and new_type.this in self.types_with_unlimited_length:
404            return current_type.this in self.types_with_unlimited_length[new_type.this]
405        return False
406
407    def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
408        if current_type in self.coerceable_types:
409            is_coerceable = new_type in self.coerceable_types[current_type]
410            if is_coerceable:
411                from sqlmesh.core.console import get_console
412
413                get_console().log_warning(
414                    f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning."
415                )
416
417            return is_coerceable
418        return False
419
420    def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool:
421        return (
422            self.precision_increase_allowed_types is None
423            or current_type.this in self.precision_increase_allowed_types
424        )
425
426    def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
427        if current_type.this == new_type.this and not current_type.is_type(
428            *exp.DataType.NESTED_TYPES
429        ):
430            current_params = self.get_type_parameters(current_type)
431            new_params = self.get_type_parameters(new_type)
432
433            if len(current_params) != len(new_params):
434                return False
435
436            return all(new >= current for current, new in zip(current_params, new_params))
437        return False
438
439    def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]:
440        def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]:
441            try:
442                return int(string)
443            except ValueError:
444                try:
445                    return float(string)
446                except ValueError:
447                    if allows_max_param and string.upper() == "MAX":
448                        return self.max_parameter_length[type.this]
449                    raise ValueError(f"Could not convert '{string}' to a number")
450
451        # extract existing parameters
452        params = [
453            _str_to_number(param.this.this, type.this in self.max_parameter_length)
454            for param in type.expressions
455        ]
456
457        # maybe get default parameter values
458        param_defaults: t.Tuple[t.Union[int, float], ...] = ()
459        if type.this in self.parameterized_type_defaults:
460            param_defaults_list = self.parameterized_type_defaults[type.this]
461            if len(params) < len(param_defaults_list):
462                param_defaults = param_defaults_list[len(params)]
463
464        return [*params, *param_defaults]
465
466    def _get_matching_kwarg(
467        self,
468        current_kwarg: t.Union[str, exp.ColumnDef],
469        new_struct: exp.DataType,
470        current_pos: int,
471    ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]:
472        current_name = (
473            exp.to_identifier(current_kwarg)
474            if isinstance(current_kwarg, str)
475            else _get_name_and_type(current_kwarg)[0]
476        )
477        # First check if we have the same column in the same position to get O(1) complexity
478        new_kwarg = seq_get(new_struct.expressions, current_pos)
479        if new_kwarg:
480            new_name, new_type = _get_name_and_type(new_kwarg)
481            if current_name.this == new_name.this:
482                return current_pos, new_kwarg
483        # If not, check if we have the same column in all positions with O(n) complexity
484        for i, new_kwarg in enumerate(new_struct.expressions):
485            new_name, new_type = _get_name_and_type(new_kwarg)
486            if current_name.this == new_name.this:
487                return i, new_kwarg
488        return None, None
489
490    def _drop_operation(
491        self,
492        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
493        struct: exp.DataType,
494        pos: int,
495        root_struct: exp.DataType,
496        table_name: TableName,
497    ) -> t.List[TableAlterColumnOperation]:
498        columns = ensure_list(columns)
499        operations: t.List[TableAlterColumnOperation] = []
500        column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos)
501        if column_pos is None or not column_kwarg:
502            raise SQLMeshError(
503                f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. "
504                f"This may indicate a mismatch between the expected and actual table schemas."
505            )
506        struct.expressions.pop(column_pos)
507        operations.append(
508            TableAlterDropColumnOperation(
509                target_table=exp.to_table(table_name),
510                column_parts=columns,
511                expected_table_struct=root_struct.copy(),
512                cascade=self.drop_cascade,
513                array_element_selector=self.array_element_selector,
514            )
515        )
516        return operations
517
518    def _requires_drop_alteration(
519        self, current_struct: exp.DataType, new_struct: exp.DataType
520    ) -> bool:
521        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
522            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
523            if new_pos is None:
524                return True
525        return False
526
527    def _resolve_drop_operation(
528        self,
529        parent_columns: t.List[TableAlterColumn],
530        current_struct: exp.DataType,
531        new_struct: exp.DataType,
532        root_struct: exp.DataType,
533        table_name: TableName,
534    ) -> t.List[TableAlterColumnOperation]:
535        operations = []
536        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
537            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
538            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
539            if new_pos is None:
540                operations.extend(
541                    self._drop_operation(
542                        columns, current_struct, current_pos, root_struct, table_name
543                    )
544                )
545        return operations
546
547    def _add_operation(
548        self,
549        columns: t.List[TableAlterColumn],
550        new_pos: int,
551        new_kwarg: exp.ColumnDef,
552        current_struct: exp.DataType,
553        root_struct: exp.DataType,
554        table_name: TableName,
555        is_part_of_destructive_change: bool = False,
556    ) -> t.List[TableAlterColumnOperation]:
557        if self.support_positional_add:
558            col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions)
559            current_struct.expressions.insert(new_pos, new_kwarg)
560        else:
561            col_pos = None
562            current_struct.expressions.append(new_kwarg)
563        return [
564            TableAlterAddColumnOperation(
565                target_table=exp.to_table(table_name),
566                column_parts=columns,
567                column_type=new_kwarg.args["kind"],
568                expected_table_struct=root_struct.copy(),
569                position=col_pos,
570                is_part_of_destructive_change=is_part_of_destructive_change,
571                array_element_selector=self.array_element_selector,
572            )
573        ]
574
575    def _resolve_add_operations(
576        self,
577        parent_columns: t.List[TableAlterColumn],
578        current_struct: exp.DataType,
579        new_struct: exp.DataType,
580        root_struct: exp.DataType,
581        table_name: TableName,
582    ) -> t.List[TableAlterColumnOperation]:
583        operations = []
584        for new_pos, new_kwarg in enumerate(new_struct.expressions):
585            possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos)
586            if possible_current_pos is None:
587                columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)]
588                operations.extend(
589                    self._add_operation(
590                        columns, new_pos, new_kwarg, current_struct, root_struct, table_name
591                    )
592                )
593        return operations
594
595    def _alter_operation(
596        self,
597        columns: t.List[TableAlterColumn],
598        pos: int,
599        struct: exp.DataType,
600        new_type: exp.DataType,
601        current_type: t.Union[str, exp.DataType],
602        root_struct: exp.DataType,
603        new_kwarg: exp.ColumnDef,
604        table_name: TableName,
605        *,
606        ignore_destructive: bool = False,
607        ignore_additive: bool = False,
608    ) -> t.List[TableAlterColumnOperation]:
609        # We don't copy on purpose here because current_type may need to be mutated inside
610        # _get_operations (struct.expressions.pop and struct.expressions.insert)
611        current_type = exp.DataType.build(current_type, copy=False)
612        if not self.nested_support.is_none:
613            if new_type.this == current_type.this == exp.DataType.Type.STRUCT:
614                if self.nested_support.is_ignore:
615                    return []
616                if self.nested_support.is_all or not self._requires_drop_alteration(
617                    current_type, new_type
618                ):
619                    return self._get_operations(
620                        columns,
621                        current_type,
622                        new_type,
623                        root_struct,
624                        table_name,
625                        ignore_destructive=ignore_destructive,
626                        ignore_additive=ignore_additive,
627                    )
628
629            if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
630                # Some engines (i.e. Snowflake) don't support defining types on arrays
631                if not new_type.expressions or not current_type.expressions:
632                    return []
633                new_array_type = new_type.expressions[0]
634                current_array_type = current_type.expressions[0]
635                if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT:
636                    if self.nested_support.is_ignore:
637                        return []
638                    if self.nested_support.is_all or not self._requires_drop_alteration(
639                        current_array_type, new_array_type
640                    ):
641                        return self._get_operations(
642                            columns,
643                            current_array_type,
644                            new_array_type,
645                            root_struct,
646                            table_name,
647                            ignore_destructive=ignore_destructive,
648                            ignore_additive=ignore_additive,
649                        )
650        if self._is_coerceable_type(current_type, new_type):
651            return []
652        if self._is_compatible_type(current_type, new_type):
653            if ignore_additive:
654                return []
655            struct.expressions.pop(pos)
656            struct.expressions.insert(pos, new_kwarg)
657            return [
658                TableAlterChangeColumnTypeOperation(
659                    target_table=exp.to_table(table_name),
660                    column_parts=columns,
661                    column_type=new_type,
662                    current_type=current_type,
663                    expected_table_struct=root_struct.copy(),
664                    array_element_selector=self.array_element_selector,
665                    is_part_of_destructive_change=self.treat_alter_data_type_as_destructive,
666                )
667            ]
668        if ignore_destructive:
669            return []
670        return self._drop_operation(
671            columns,
672            root_struct,
673            pos,
674            root_struct,
675            table_name,
676        ) + self._add_operation(
677            columns,
678            pos,
679            new_kwarg,
680            struct,
681            root_struct,
682            table_name,
683            is_part_of_destructive_change=True,
684        )
685
686    def _resolve_alter_operations(
687        self,
688        parent_columns: t.List[TableAlterColumn],
689        current_struct: exp.DataType,
690        new_struct: exp.DataType,
691        root_struct: exp.DataType,
692        table_name: TableName,
693        *,
694        ignore_destructive: bool = False,
695        ignore_additive: bool = False,
696    ) -> t.List[TableAlterColumnOperation]:
697        operations = []
698        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
699            _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
700            if new_kwarg is None:
701                if ignore_destructive:
702                    continue
703                raise ValueError("Cannot alter a column that is being dropped")
704            _, new_type = _get_name_and_type(new_kwarg)
705            _, current_type = _get_name_and_type(current_kwarg)
706            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
707            if new_type == current_type:
708                continue
709            operations.extend(
710                self._alter_operation(
711                    columns,
712                    current_pos,
713                    current_struct,
714                    new_type,
715                    current_type,
716                    root_struct,
717                    new_kwarg,
718                    table_name,
719                    ignore_destructive=ignore_destructive,
720                    ignore_additive=ignore_additive,
721                )
722            )
723        return operations
724
725    def _get_operations(
726        self,
727        parent_columns: t.List[TableAlterColumn],
728        current_struct: exp.DataType,
729        new_struct: exp.DataType,
730        root_struct: exp.DataType,
731        table_name: TableName,
732        *,
733        ignore_destructive: bool = False,
734        ignore_additive: bool = False,
735    ) -> t.List[TableAlterColumnOperation]:
736        root_struct = root_struct or current_struct
737        parent_columns = parent_columns or []
738        operations = []
739        if not ignore_destructive:
740            operations.extend(
741                self._resolve_drop_operation(
742                    parent_columns, current_struct, new_struct, root_struct, table_name
743                )
744            )
745        if not ignore_additive:
746            operations.extend(
747                self._resolve_add_operations(
748                    parent_columns, current_struct, new_struct, root_struct, table_name
749                )
750            )
751        operations.extend(
752            self._resolve_alter_operations(
753                parent_columns,
754                current_struct,
755                new_struct,
756                root_struct,
757                ignore_destructive=ignore_destructive,
758                ignore_additive=ignore_additive,
759                table_name=table_name,
760            )
761        )
762        return operations
763
764    def _from_structs(
765        self,
766        current_struct: exp.DataType,
767        new_struct: exp.DataType,
768        table_name: TableName,
769        *,
770        ignore_destructive: bool = False,
771        ignore_additive: bool = False,
772    ) -> t.List[TableAlterColumnOperation]:
773        return self._get_operations(
774            [],
775            current_struct,
776            new_struct,
777            current_struct,
778            table_name=table_name,
779            ignore_destructive=ignore_destructive,
780            ignore_additive=ignore_additive,
781        )
782
783    def _compare_structs(
784        self,
785        table_name: t.Union[str, exp.Table],
786        current: exp.DataType,
787        new: exp.DataType,
788        *,
789        ignore_destructive: bool = False,
790        ignore_additive: bool = False,
791    ) -> t.List[TableAlterColumnOperation]:
792        return self._from_structs(
793            current,
794            new,
795            table_name=table_name,
796            ignore_destructive=ignore_destructive,
797            ignore_additive=ignore_additive,
798        )
799
800    def compare_columns(
801        self,
802        table_name: TableName,
803        current: t.Dict[str, exp.DataType],
804        new: t.Dict[str, exp.DataType],
805        *,
806        ignore_destructive: bool = False,
807        ignore_additive: bool = False,
808    ) -> t.List[TableAlterColumnOperation]:
809        return self._compare_structs(
810            table_name,
811            columns_to_types_to_struct(current),
812            columns_to_types_to_struct(new),
813            ignore_destructive=ignore_destructive,
814            ignore_additive=ignore_additive,
815        )
816
817
818def has_drop_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
819    return any(op.is_destructive for op in alter_operations)
820
821
822def has_additive_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
823    return any(op.is_additive for op in alter_operations)
824
825
826def get_additive_changes(
827    alter_operations: t.List[TableAlterOperation],
828) -> t.List[TableAlterOperation]:
829    return [x for x in alter_operations if x.is_additive]
830
831
832def get_dropped_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
833    return [
834        op.column.alias_or_name
835        for op in alter_expressions
836        if isinstance(op, TableAlterDropColumnOperation)
837    ]
838
839
840def get_additive_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
841    return [
842        op.column.alias_or_name
843        for op in alter_expressions
844        if op.is_additive and isinstance(op, TableAlterColumnOperation)
845    ]
846
847
848def get_schema_differ(
849    dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None
850) -> SchemaDiffer:
851    """
852    Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.
853
854    Args:
855        dialect: The dialect for which to get the schema differ.
856        overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
857
858    Returns:
859        The SchemaDiffer instance configured for the given dialect.
860    """
861    from sqlmesh.core.engine_adapter import (
862        DIALECT_TO_ENGINE_ADAPTER,
863        DIALECT_ALIASES,
864        EngineAdapter,
865    )
866
867    dialect = dialect.lower()
868    dialect = DIALECT_ALIASES.get(dialect, dialect)
869    engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter)
870    return SchemaDiffer(
871        **{
872            **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"),
873            **(overrides or {}),
874        }
875    )
876
877
878def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]:
879    return struct.this, struct.args["kind"]
logger = <Logger sqlmesh.core.schema_diff (WARNING)>
@dataclass(frozen=True)
class TableAlterOperation(abc.ABC):
25@dataclass(frozen=True)
26class TableAlterOperation(abc.ABC):
27    target_table: exp.Table
28
29    @property
30    @abc.abstractmethod
31    def is_destructive(self) -> bool:
32        pass
33
34    @property
35    @abc.abstractmethod
36    def is_additive(self) -> bool:
37        pass
38
39    @property
40    @abc.abstractmethod
41    def _alter_actions(self) -> t.List[exp.Expression]:
42        pass
43
44    @property
45    def expression(self) -> exp.Alter:
46        return exp.Alter(
47            this=self.target_table,
48            kind="TABLE",
49            actions=self._alter_actions,
50        )
target_table: sqlglot.expressions.Table
is_destructive: bool
29    @property
30    @abc.abstractmethod
31    def is_destructive(self) -> bool:
32        pass
is_additive: bool
34    @property
35    @abc.abstractmethod
36    def is_additive(self) -> bool:
37        pass
expression: sqlglot.expressions.Alter
44    @property
45    def expression(self) -> exp.Alter:
46        return exp.Alter(
47            this=self.target_table,
48            kind="TABLE",
49            actions=self._alter_actions,
50        )
@dataclass(frozen=True)
class TableAlterColumnOperation(TableAlterOperation, abc.ABC):
53@dataclass(frozen=True)
54class TableAlterColumnOperation(TableAlterOperation, abc.ABC):
55    column_parts: t.List[TableAlterColumn]
56    expected_table_struct: exp.DataType
57    array_element_selector: str
58
59    @property
60    def column_identifiers(self) -> t.List[exp.Identifier]:
61        results = []
62        for column in self.column_parts:
63            results.append(column.identifier)
64            if (
65                column.is_array_of_struct
66                and len(self.column_parts) > 1
67                and self.array_element_selector
68            ):
69                results.append(exp.to_identifier(self.array_element_selector))
70        return results
71
72    @property
73    def column(self) -> t.Union[exp.Dot, exp.Identifier]:
74        columns = self.column_identifiers
75        if len(columns) == 1:
76            return columns[0]
77        return exp.Dot.build(columns)
column_parts: List[TableAlterColumn]
expected_table_struct: sqlglot.expressions.DataType
array_element_selector: str
column_identifiers: List[sqlglot.expressions.Identifier]
59    @property
60    def column_identifiers(self) -> t.List[exp.Identifier]:
61        results = []
62        for column in self.column_parts:
63            results.append(column.identifier)
64            if (
65                column.is_array_of_struct
66                and len(self.column_parts) > 1
67                and self.array_element_selector
68            ):
69                results.append(exp.to_identifier(self.array_element_selector))
70        return results
column: Union[sqlglot.expressions.Dot, sqlglot.expressions.Identifier]
72    @property
73    def column(self) -> t.Union[exp.Dot, exp.Identifier]:
74        columns = self.column_identifiers
75        if len(columns) == 1:
76            return columns[0]
77        return exp.Dot.build(columns)
@dataclass(frozen=True)
class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC):
80@dataclass(frozen=True)
81class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC):
82    column_type: exp.DataType
83
84    @property
85    def column_def(self) -> exp.ColumnDef:
86        if not self.column_type:
87            raise SQLMeshError("Tried to access column type when it shouldn't be needed")
88        return exp.ColumnDef(
89            this=self.column,
90            kind=self.column_type,
91        )
column_type: sqlglot.expressions.DataType
column_def: sqlglot.expressions.ColumnDef
84    @property
85    def column_def(self) -> exp.ColumnDef:
86        if not self.column_type:
87            raise SQLMeshError("Tried to access column type when it shouldn't be needed")
88        return exp.ColumnDef(
89            this=self.column,
90            kind=self.column_type,
91        )
@dataclass(frozen=True)
class TableAlterAddColumnOperation(TableAlterTypedColumnOperation):
 94@dataclass(frozen=True)
 95class TableAlterAddColumnOperation(TableAlterTypedColumnOperation):
 96    position: t.Optional[TableAlterColumnPosition] = None
 97    is_part_of_destructive_change: bool = False
 98
 99    @property
100    def is_additive(self) -> bool:
101        return not self.is_part_of_destructive_change
102
103    @property
104    def is_destructive(self) -> bool:
105        return self.is_part_of_destructive_change
106
107    @property
108    def _alter_actions(self) -> t.List[exp.Expression]:
109        column_def = exp.ColumnDef(
110            this=self.column,
111            kind=self.column_type,
112        )
113        if self.position:
114            column_def.set("position", self.position.column_position_node)
115        return [column_def]
TableAlterAddColumnOperation( target_table: sqlglot.expressions.Table, column_parts: List[TableAlterColumn], expected_table_struct: sqlglot.expressions.DataType, array_element_selector: str, column_type: sqlglot.expressions.DataType, position: Optional[TableAlterColumnPosition] = None, is_part_of_destructive_change: bool = False)
position: Optional[TableAlterColumnPosition] = None
is_part_of_destructive_change: bool = False
is_additive: bool
 99    @property
100    def is_additive(self) -> bool:
101        return not self.is_part_of_destructive_change
is_destructive: bool
103    @property
104    def is_destructive(self) -> bool:
105        return self.is_part_of_destructive_change
@dataclass(frozen=True)
class TableAlterDropColumnOperation(TableAlterColumnOperation):
118@dataclass(frozen=True)
119class TableAlterDropColumnOperation(TableAlterColumnOperation):
120    cascade: bool = False
121
122    @property
123    def is_additive(self) -> bool:
124        return False
125
126    @property
127    def is_destructive(self) -> bool:
128        return True
129
130    @property
131    def _alter_actions(self) -> t.List[exp.Expression]:
132        return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)]
TableAlterDropColumnOperation( target_table: sqlglot.expressions.Table, column_parts: List[TableAlterColumn], expected_table_struct: sqlglot.expressions.DataType, array_element_selector: str, cascade: bool = False)
cascade: bool = False
is_additive: bool
122    @property
123    def is_additive(self) -> bool:
124        return False
is_destructive: bool
126    @property
127    def is_destructive(self) -> bool:
128        return True
@dataclass(frozen=True)
class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation):
135@dataclass(frozen=True)
136class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation):
137    current_type: exp.DataType
138    is_part_of_destructive_change: bool = False
139
140    @property
141    def is_additive(self) -> bool:
142        return not self.is_part_of_destructive_change
143
144    @property
145    def is_destructive(self) -> bool:
146        return self.is_part_of_destructive_change
147
148    @property
149    def _alter_actions(self) -> t.List[exp.Expression]:
150        return [
151            exp.AlterColumn(
152                this=self.column,
153                dtype=self.column_type,
154            )
155        ]
TableAlterChangeColumnTypeOperation( target_table: sqlglot.expressions.Table, column_parts: List[TableAlterColumn], expected_table_struct: sqlglot.expressions.DataType, array_element_selector: str, column_type: sqlglot.expressions.DataType, current_type: sqlglot.expressions.DataType, is_part_of_destructive_change: bool = False)
current_type: sqlglot.expressions.DataType
is_part_of_destructive_change: bool = False
is_additive: bool
140    @property
141    def is_additive(self) -> bool:
142        return not self.is_part_of_destructive_change
is_destructive: bool
144    @property
145    def is_destructive(self) -> bool:
146        return self.is_part_of_destructive_change
@dataclass(frozen=True)
class TableAlterColumn:
158@dataclass(frozen=True)
159class TableAlterColumn:
160    name: str
161    is_struct: bool
162    is_array_of_struct: bool
163    is_array_of_primitive: bool
164    quoted: bool = False
165
166    @classmethod
167    def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
168        return cls(
169            name=name,
170            is_struct=False,
171            is_array_of_struct=False,
172            is_array_of_primitive=False,
173            quoted=quoted,
174        )
175
176    @classmethod
177    def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
178        return cls(
179            name=name,
180            is_struct=True,
181            is_array_of_struct=False,
182            is_array_of_primitive=False,
183            quoted=quoted,
184        )
185
186    @classmethod
187    def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
188        return cls(
189            name=name,
190            is_struct=False,
191            is_array_of_struct=True,
192            is_array_of_primitive=False,
193            quoted=quoted,
194        )
195
196    @classmethod
197    def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
198        return cls(
199            name=name,
200            is_struct=False,
201            is_array_of_struct=False,
202            is_array_of_primitive=True,
203            quoted=quoted,
204        )
205
206    @classmethod
207    def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn:
208        name = struct.alias_or_name
209        quoted = struct.this.quoted
210        kwarg_type = struct.args["kind"]
211
212        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
213            return cls.struct(name, quoted=quoted)
214        if kwarg_type.is_type(exp.DataType.Type.ARRAY):
215            if kwarg_type.expressions and kwarg_type.expressions[0].is_type(
216                exp.DataType.Type.STRUCT
217            ):
218                return cls.array_of_struct(name, quoted=quoted)
219            return cls.array_of_primitive(name, quoted=quoted)
220        return cls.primitive(name, quoted=quoted)
221
222    @property
223    def is_array(self) -> bool:
224        return self.is_array_of_struct or self.is_array_of_primitive
225
226    @property
227    def is_primitive(self) -> bool:
228        return not self.is_struct and not self.is_array
229
230    @property
231    def is_nested(self) -> bool:
232        return not self.is_primitive
233
234    @property
235    def identifier(self) -> exp.Identifier:
236        return exp.to_identifier(self.name, quoted=self.quoted)
TableAlterColumn( name: str, is_struct: bool, is_array_of_struct: bool, is_array_of_primitive: bool, quoted: bool = False)
name: str
is_struct: bool
is_array_of_struct: bool
is_array_of_primitive: bool
quoted: bool = False
@classmethod
def primitive( cls, name: str, quoted: bool = False) -> TableAlterColumn:
166    @classmethod
167    def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
168        return cls(
169            name=name,
170            is_struct=False,
171            is_array_of_struct=False,
172            is_array_of_primitive=False,
173            quoted=quoted,
174        )
@classmethod
def struct( cls, name: str, quoted: bool = False) -> TableAlterColumn:
176    @classmethod
177    def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
178        return cls(
179            name=name,
180            is_struct=True,
181            is_array_of_struct=False,
182            is_array_of_primitive=False,
183            quoted=quoted,
184        )
@classmethod
def array_of_struct( cls, name: str, quoted: bool = False) -> TableAlterColumn:
186    @classmethod
187    def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
188        return cls(
189            name=name,
190            is_struct=False,
191            is_array_of_struct=True,
192            is_array_of_primitive=False,
193            quoted=quoted,
194        )
@classmethod
def array_of_primitive( cls, name: str, quoted: bool = False) -> TableAlterColumn:
196    @classmethod
197    def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
198        return cls(
199            name=name,
200            is_struct=False,
201            is_array_of_struct=False,
202            is_array_of_primitive=True,
203            quoted=quoted,
204        )
@classmethod
def from_struct_kwarg( cls, struct: sqlglot.expressions.ColumnDef) -> TableAlterColumn:
206    @classmethod
207    def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn:
208        name = struct.alias_or_name
209        quoted = struct.this.quoted
210        kwarg_type = struct.args["kind"]
211
212        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
213            return cls.struct(name, quoted=quoted)
214        if kwarg_type.is_type(exp.DataType.Type.ARRAY):
215            if kwarg_type.expressions and kwarg_type.expressions[0].is_type(
216                exp.DataType.Type.STRUCT
217            ):
218                return cls.array_of_struct(name, quoted=quoted)
219            return cls.array_of_primitive(name, quoted=quoted)
220        return cls.primitive(name, quoted=quoted)
is_array: bool
222    @property
223    def is_array(self) -> bool:
224        return self.is_array_of_struct or self.is_array_of_primitive
is_primitive: bool
226    @property
227    def is_primitive(self) -> bool:
228        return not self.is_struct and not self.is_array
is_nested: bool
230    @property
231    def is_nested(self) -> bool:
232        return not self.is_primitive
identifier: sqlglot.expressions.Identifier
234    @property
235    def identifier(self) -> exp.Identifier:
236        return exp.to_identifier(self.name, quoted=self.quoted)
@dataclass(frozen=True)
class TableAlterColumnPosition:
239@dataclass(frozen=True)
240class TableAlterColumnPosition:
241    is_first: bool
242    is_last: bool
243    after: t.Optional[exp.Identifier] = None
244
245    @classmethod
246    def first(cls) -> TableAlterColumnPosition:
247        return cls(is_first=True, is_last=False, after=None)
248
249    @classmethod
250    def last(
251        cls, after: t.Optional[t.Union[str, exp.Identifier]] = None
252    ) -> TableAlterColumnPosition:
253        return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
254
255    @classmethod
256    def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
257        return cls(is_first=False, is_last=False, after=exp.to_identifier(after))
258
259    @classmethod
260    def create(
261        cls,
262        pos: int,
263        current_kwargs: t.List[exp.ColumnDef],
264        replacing_col: bool = False,
265    ) -> TableAlterColumnPosition:
266        is_first = pos == 0
267        is_last = pos == len(current_kwargs) - int(replacing_col)
268        after = None
269        if not is_first:
270            prior_kwarg = current_kwargs[pos - 1]
271            after, _ = _get_name_and_type(prior_kwarg)
272        return cls(is_first=is_first, is_last=is_last, after=after)
273
274    @property
275    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
276        column = self.after if not self.is_last else None
277        position = None
278        if self.is_first:
279            position = "FIRST"
280        elif column and not self.is_last:
281            position = "AFTER"
282        return exp.ColumnPosition(this=column, position=position)
TableAlterColumnPosition( is_first: bool, is_last: bool, after: Optional[sqlglot.expressions.Identifier] = None)
is_first: bool
is_last: bool
after: Optional[sqlglot.expressions.Identifier] = None
@classmethod
def first(cls) -> TableAlterColumnPosition:
245    @classmethod
246    def first(cls) -> TableAlterColumnPosition:
247        return cls(is_first=True, is_last=False, after=None)
@classmethod
def last( cls, after: Union[str, sqlglot.expressions.Identifier, NoneType] = None) -> TableAlterColumnPosition:
249    @classmethod
250    def last(
251        cls, after: t.Optional[t.Union[str, exp.Identifier]] = None
252    ) -> TableAlterColumnPosition:
253        return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
@classmethod
def middle( cls, after: Union[str, sqlglot.expressions.Identifier]) -> TableAlterColumnPosition:
255    @classmethod
256    def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
257        return cls(is_first=False, is_last=False, after=exp.to_identifier(after))
@classmethod
def create( cls, pos: int, current_kwargs: List[sqlglot.expressions.ColumnDef], replacing_col: bool = False) -> TableAlterColumnPosition:
259    @classmethod
260    def create(
261        cls,
262        pos: int,
263        current_kwargs: t.List[exp.ColumnDef],
264        replacing_col: bool = False,
265    ) -> TableAlterColumnPosition:
266        is_first = pos == 0
267        is_last = pos == len(current_kwargs) - int(replacing_col)
268        after = None
269        if not is_first:
270            prior_kwarg = current_kwargs[pos - 1]
271            after, _ = _get_name_and_type(prior_kwarg)
272        return cls(is_first=is_first, is_last=is_last, after=after)
column_position_node: Optional[sqlglot.expressions.ColumnPosition]
274    @property
275    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
276        column = self.after if not self.is_last else None
277        position = None
278        if self.is_first:
279            position = "FIRST"
280        elif column and not self.is_last:
281            position = "AFTER"
282        return exp.ColumnPosition(this=column, position=position)
class NestedSupport(builtins.str, enum.Enum):
285class NestedSupport(str, Enum):
286    # Supports all nested data type operations
287    ALL = "ALL"
288    # Does not support any nested data type operations
289    NONE = "NONE"
290    # Supports nested data type operations except for those that require dropping a nested field
291    ALL_BUT_DROP = "ALL_BUT_DROP"
292    # Ignores all nested data type operations
293    IGNORE = "IGNORE"
294
295    @property
296    def is_all(self) -> bool:
297        return self == NestedSupport.ALL
298
299    @property
300    def is_none(self) -> bool:
301        return self == NestedSupport.NONE
302
303    @property
304    def is_all_but_drop(self) -> bool:
305        return self == NestedSupport.ALL_BUT_DROP
306
307    @property
308    def is_ignore(self) -> bool:
309        return self == NestedSupport.IGNORE

An enumeration.

ALL = <NestedSupport.ALL: 'ALL'>
NONE = <NestedSupport.NONE: 'NONE'>
ALL_BUT_DROP = <NestedSupport.ALL_BUT_DROP: 'ALL_BUT_DROP'>
IGNORE = <NestedSupport.IGNORE: 'IGNORE'>
is_all: bool
295    @property
296    def is_all(self) -> bool:
297        return self == NestedSupport.ALL
is_none: bool
299    @property
300    def is_none(self) -> bool:
301        return self == NestedSupport.NONE
is_all_but_drop: bool
303    @property
304    def is_all_but_drop(self) -> bool:
305        return self == NestedSupport.ALL_BUT_DROP
is_ignore: bool
307    @property
308    def is_ignore(self) -> bool:
309        return self == NestedSupport.IGNORE
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class SchemaDiffer(sqlmesh.utils.pydantic.PydanticModel):
312class SchemaDiffer(PydanticModel):
313    """
314    Compares a source schema against a target schema and returns a list of alter statements to have the source
315    match the structure of target. Some engines have constraints on the types of operations that can be performed
316    therefore the final structure may not match the target exactly but it will be as close as possible. Two potential
317    differences that can happen:
318    1. Column order can be different if the engine doesn't support positional additions. Another reason for difference
319    is if a column is just moved since we don't currently support fixing moves.
320    2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested
321    operations. As a result historical data is lost.
322    3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible
323    change. As a result historical data is lost.
324
325    Potential future improvements:
326    1. Support column moves. Databricks Delta supports moves and would allow exact matches.
327
328    Args:
329        support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a
330            specific position in the set of existing columns.
331        nested_support: How the engine for which the diff is being computed supports nested types.
332        compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data
333            type, and value is the set of types that are compatible with it.
334        coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without
335            altering the column type. NOTE: usually callers should not specify this attribute manually and set the
336            `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules.
337            For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted
338            into a FLOAT64 column just fine.
339        support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct
340            coercion of compatible types.
341        drop_cascade: Whether to add CASCADE modifier when dropping a column.
342        parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type,
343            but in the engine adapter specification we build it from the dialect string instead of specifying it directly.
344            Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT`
345            to which it parses. We do that because parameter default replacement will silently break if we specify type
346            directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default
347            values in a list, where the list index contains the remaining defaults given the number of parameter values
348            provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two
349            omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return
350            index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)".
351        max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`.
352        types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly
353            parameterized type can ALTER to its unlimited length version, along with different types in some engines.
354        treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it
355            concludes the change is compatible and won't result in data loss. If this flag is set to True, it will
356            flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't
357            be set outside of that context.
358    """
359
360    support_positional_add: bool = False
361    nested_support: NestedSupport = NestedSupport.NONE
362    array_element_selector: str = ""
363    compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
364    coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field(
365        default_factory=dict, alias="coerceable_types"
366    )
367    precision_increase_allowed_types: t.Optional[t.Set[exp.DataType.Type]] = None
368    support_coercing_compatible_types: bool = False
369    drop_cascade: bool = False
370    parameterized_type_defaults: t.Dict[
371        exp.DataType.Type, t.List[t.Tuple[t.Union[int, float], ...]]
372    ] = {}
373    max_parameter_length: t.Dict[exp.DataType.Type, t.Union[int, float]] = {}
374    types_with_unlimited_length: t.Dict[exp.DataType.Type, t.Set[exp.DataType.Type]] = {}
375    treat_alter_data_type_as_destructive: bool = False
376
377    _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
378
379    @property
380    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
381        if not self._coerceable_types:
382            if not self.support_coercing_compatible_types or not self.compatible_types:
383                return self.coerceable_types_
384            coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set)
385            coerceable_types.update(self.coerceable_types_)
386            for source_type, target_types in self.compatible_types.items():
387                for target_type in target_types:
388                    coerceable_types[target_type].add(source_type)
389            self._coerceable_types = coerceable_types
390        return self._coerceable_types
391
392    def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
393        # types are identical or both types are parameterized and new has higher precision
394        # - default parameter values are automatically provided if not present
395        if current_type == new_type or (
396            self._is_precision_increase_allowed(current_type)
397            and self._is_precision_increase(current_type, new_type)
398        ):
399            return True
400        # types are un-parameterized and compatible
401        if current_type in self.compatible_types:
402            return new_type in self.compatible_types[current_type]
403        # new type is un-parameterized and has unlimited length, current type is compatible
404        if not new_type.expressions and new_type.this in self.types_with_unlimited_length:
405            return current_type.this in self.types_with_unlimited_length[new_type.this]
406        return False
407
408    def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
409        if current_type in self.coerceable_types:
410            is_coerceable = new_type in self.coerceable_types[current_type]
411            if is_coerceable:
412                from sqlmesh.core.console import get_console
413
414                get_console().log_warning(
415                    f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning."
416                )
417
418            return is_coerceable
419        return False
420
421    def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool:
422        return (
423            self.precision_increase_allowed_types is None
424            or current_type.this in self.precision_increase_allowed_types
425        )
426
427    def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
428        if current_type.this == new_type.this and not current_type.is_type(
429            *exp.DataType.NESTED_TYPES
430        ):
431            current_params = self.get_type_parameters(current_type)
432            new_params = self.get_type_parameters(new_type)
433
434            if len(current_params) != len(new_params):
435                return False
436
437            return all(new >= current for current, new in zip(current_params, new_params))
438        return False
439
440    def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]:
441        def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]:
442            try:
443                return int(string)
444            except ValueError:
445                try:
446                    return float(string)
447                except ValueError:
448                    if allows_max_param and string.upper() == "MAX":
449                        return self.max_parameter_length[type.this]
450                    raise ValueError(f"Could not convert '{string}' to a number")
451
452        # extract existing parameters
453        params = [
454            _str_to_number(param.this.this, type.this in self.max_parameter_length)
455            for param in type.expressions
456        ]
457
458        # maybe get default parameter values
459        param_defaults: t.Tuple[t.Union[int, float], ...] = ()
460        if type.this in self.parameterized_type_defaults:
461            param_defaults_list = self.parameterized_type_defaults[type.this]
462            if len(params) < len(param_defaults_list):
463                param_defaults = param_defaults_list[len(params)]
464
465        return [*params, *param_defaults]
466
467    def _get_matching_kwarg(
468        self,
469        current_kwarg: t.Union[str, exp.ColumnDef],
470        new_struct: exp.DataType,
471        current_pos: int,
472    ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]:
473        current_name = (
474            exp.to_identifier(current_kwarg)
475            if isinstance(current_kwarg, str)
476            else _get_name_and_type(current_kwarg)[0]
477        )
478        # First check if we have the same column in the same position to get O(1) complexity
479        new_kwarg = seq_get(new_struct.expressions, current_pos)
480        if new_kwarg:
481            new_name, new_type = _get_name_and_type(new_kwarg)
482            if current_name.this == new_name.this:
483                return current_pos, new_kwarg
484        # If not, check if we have the same column in all positions with O(n) complexity
485        for i, new_kwarg in enumerate(new_struct.expressions):
486            new_name, new_type = _get_name_and_type(new_kwarg)
487            if current_name.this == new_name.this:
488                return i, new_kwarg
489        return None, None
490
491    def _drop_operation(
492        self,
493        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
494        struct: exp.DataType,
495        pos: int,
496        root_struct: exp.DataType,
497        table_name: TableName,
498    ) -> t.List[TableAlterColumnOperation]:
499        columns = ensure_list(columns)
500        operations: t.List[TableAlterColumnOperation] = []
501        column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos)
502        if column_pos is None or not column_kwarg:
503            raise SQLMeshError(
504                f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. "
505                f"This may indicate a mismatch between the expected and actual table schemas."
506            )
507        struct.expressions.pop(column_pos)
508        operations.append(
509            TableAlterDropColumnOperation(
510                target_table=exp.to_table(table_name),
511                column_parts=columns,
512                expected_table_struct=root_struct.copy(),
513                cascade=self.drop_cascade,
514                array_element_selector=self.array_element_selector,
515            )
516        )
517        return operations
518
519    def _requires_drop_alteration(
520        self, current_struct: exp.DataType, new_struct: exp.DataType
521    ) -> bool:
522        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
523            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
524            if new_pos is None:
525                return True
526        return False
527
528    def _resolve_drop_operation(
529        self,
530        parent_columns: t.List[TableAlterColumn],
531        current_struct: exp.DataType,
532        new_struct: exp.DataType,
533        root_struct: exp.DataType,
534        table_name: TableName,
535    ) -> t.List[TableAlterColumnOperation]:
536        operations = []
537        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
538            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
539            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
540            if new_pos is None:
541                operations.extend(
542                    self._drop_operation(
543                        columns, current_struct, current_pos, root_struct, table_name
544                    )
545                )
546        return operations
547
548    def _add_operation(
549        self,
550        columns: t.List[TableAlterColumn],
551        new_pos: int,
552        new_kwarg: exp.ColumnDef,
553        current_struct: exp.DataType,
554        root_struct: exp.DataType,
555        table_name: TableName,
556        is_part_of_destructive_change: bool = False,
557    ) -> t.List[TableAlterColumnOperation]:
558        if self.support_positional_add:
559            col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions)
560            current_struct.expressions.insert(new_pos, new_kwarg)
561        else:
562            col_pos = None
563            current_struct.expressions.append(new_kwarg)
564        return [
565            TableAlterAddColumnOperation(
566                target_table=exp.to_table(table_name),
567                column_parts=columns,
568                column_type=new_kwarg.args["kind"],
569                expected_table_struct=root_struct.copy(),
570                position=col_pos,
571                is_part_of_destructive_change=is_part_of_destructive_change,
572                array_element_selector=self.array_element_selector,
573            )
574        ]
575
576    def _resolve_add_operations(
577        self,
578        parent_columns: t.List[TableAlterColumn],
579        current_struct: exp.DataType,
580        new_struct: exp.DataType,
581        root_struct: exp.DataType,
582        table_name: TableName,
583    ) -> t.List[TableAlterColumnOperation]:
584        operations = []
585        for new_pos, new_kwarg in enumerate(new_struct.expressions):
586            possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos)
587            if possible_current_pos is None:
588                columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)]
589                operations.extend(
590                    self._add_operation(
591                        columns, new_pos, new_kwarg, current_struct, root_struct, table_name
592                    )
593                )
594        return operations
595
596    def _alter_operation(
597        self,
598        columns: t.List[TableAlterColumn],
599        pos: int,
600        struct: exp.DataType,
601        new_type: exp.DataType,
602        current_type: t.Union[str, exp.DataType],
603        root_struct: exp.DataType,
604        new_kwarg: exp.ColumnDef,
605        table_name: TableName,
606        *,
607        ignore_destructive: bool = False,
608        ignore_additive: bool = False,
609    ) -> t.List[TableAlterColumnOperation]:
610        # We don't copy on purpose here because current_type may need to be mutated inside
611        # _get_operations (struct.expressions.pop and struct.expressions.insert)
612        current_type = exp.DataType.build(current_type, copy=False)
613        if not self.nested_support.is_none:
614            if new_type.this == current_type.this == exp.DataType.Type.STRUCT:
615                if self.nested_support.is_ignore:
616                    return []
617                if self.nested_support.is_all or not self._requires_drop_alteration(
618                    current_type, new_type
619                ):
620                    return self._get_operations(
621                        columns,
622                        current_type,
623                        new_type,
624                        root_struct,
625                        table_name,
626                        ignore_destructive=ignore_destructive,
627                        ignore_additive=ignore_additive,
628                    )
629
630            if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
631                # Some engines (i.e. Snowflake) don't support defining types on arrays
632                if not new_type.expressions or not current_type.expressions:
633                    return []
634                new_array_type = new_type.expressions[0]
635                current_array_type = current_type.expressions[0]
636                if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT:
637                    if self.nested_support.is_ignore:
638                        return []
639                    if self.nested_support.is_all or not self._requires_drop_alteration(
640                        current_array_type, new_array_type
641                    ):
642                        return self._get_operations(
643                            columns,
644                            current_array_type,
645                            new_array_type,
646                            root_struct,
647                            table_name,
648                            ignore_destructive=ignore_destructive,
649                            ignore_additive=ignore_additive,
650                        )
651        if self._is_coerceable_type(current_type, new_type):
652            return []
653        if self._is_compatible_type(current_type, new_type):
654            if ignore_additive:
655                return []
656            struct.expressions.pop(pos)
657            struct.expressions.insert(pos, new_kwarg)
658            return [
659                TableAlterChangeColumnTypeOperation(
660                    target_table=exp.to_table(table_name),
661                    column_parts=columns,
662                    column_type=new_type,
663                    current_type=current_type,
664                    expected_table_struct=root_struct.copy(),
665                    array_element_selector=self.array_element_selector,
666                    is_part_of_destructive_change=self.treat_alter_data_type_as_destructive,
667                )
668            ]
669        if ignore_destructive:
670            return []
671        return self._drop_operation(
672            columns,
673            root_struct,
674            pos,
675            root_struct,
676            table_name,
677        ) + self._add_operation(
678            columns,
679            pos,
680            new_kwarg,
681            struct,
682            root_struct,
683            table_name,
684            is_part_of_destructive_change=True,
685        )
686
687    def _resolve_alter_operations(
688        self,
689        parent_columns: t.List[TableAlterColumn],
690        current_struct: exp.DataType,
691        new_struct: exp.DataType,
692        root_struct: exp.DataType,
693        table_name: TableName,
694        *,
695        ignore_destructive: bool = False,
696        ignore_additive: bool = False,
697    ) -> t.List[TableAlterColumnOperation]:
698        operations = []
699        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
700            _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
701            if new_kwarg is None:
702                if ignore_destructive:
703                    continue
704                raise ValueError("Cannot alter a column that is being dropped")
705            _, new_type = _get_name_and_type(new_kwarg)
706            _, current_type = _get_name_and_type(current_kwarg)
707            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
708            if new_type == current_type:
709                continue
710            operations.extend(
711                self._alter_operation(
712                    columns,
713                    current_pos,
714                    current_struct,
715                    new_type,
716                    current_type,
717                    root_struct,
718                    new_kwarg,
719                    table_name,
720                    ignore_destructive=ignore_destructive,
721                    ignore_additive=ignore_additive,
722                )
723            )
724        return operations
725
726    def _get_operations(
727        self,
728        parent_columns: t.List[TableAlterColumn],
729        current_struct: exp.DataType,
730        new_struct: exp.DataType,
731        root_struct: exp.DataType,
732        table_name: TableName,
733        *,
734        ignore_destructive: bool = False,
735        ignore_additive: bool = False,
736    ) -> t.List[TableAlterColumnOperation]:
737        root_struct = root_struct or current_struct
738        parent_columns = parent_columns or []
739        operations = []
740        if not ignore_destructive:
741            operations.extend(
742                self._resolve_drop_operation(
743                    parent_columns, current_struct, new_struct, root_struct, table_name
744                )
745            )
746        if not ignore_additive:
747            operations.extend(
748                self._resolve_add_operations(
749                    parent_columns, current_struct, new_struct, root_struct, table_name
750                )
751            )
752        operations.extend(
753            self._resolve_alter_operations(
754                parent_columns,
755                current_struct,
756                new_struct,
757                root_struct,
758                ignore_destructive=ignore_destructive,
759                ignore_additive=ignore_additive,
760                table_name=table_name,
761            )
762        )
763        return operations
764
765    def _from_structs(
766        self,
767        current_struct: exp.DataType,
768        new_struct: exp.DataType,
769        table_name: TableName,
770        *,
771        ignore_destructive: bool = False,
772        ignore_additive: bool = False,
773    ) -> t.List[TableAlterColumnOperation]:
774        return self._get_operations(
775            [],
776            current_struct,
777            new_struct,
778            current_struct,
779            table_name=table_name,
780            ignore_destructive=ignore_destructive,
781            ignore_additive=ignore_additive,
782        )
783
784    def _compare_structs(
785        self,
786        table_name: t.Union[str, exp.Table],
787        current: exp.DataType,
788        new: exp.DataType,
789        *,
790        ignore_destructive: bool = False,
791        ignore_additive: bool = False,
792    ) -> t.List[TableAlterColumnOperation]:
793        return self._from_structs(
794            current,
795            new,
796            table_name=table_name,
797            ignore_destructive=ignore_destructive,
798            ignore_additive=ignore_additive,
799        )
800
801    def compare_columns(
802        self,
803        table_name: TableName,
804        current: t.Dict[str, exp.DataType],
805        new: t.Dict[str, exp.DataType],
806        *,
807        ignore_destructive: bool = False,
808        ignore_additive: bool = False,
809    ) -> t.List[TableAlterColumnOperation]:
810        return self._compare_structs(
811            table_name,
812            columns_to_types_to_struct(current),
813            columns_to_types_to_struct(new),
814            ignore_destructive=ignore_destructive,
815            ignore_additive=ignore_additive,
816        )

Compares a source schema against a target schema and returns a list of alter statements to have the source match the structure of target. Some engines have constraints on the types of operations that can be performed therefore the final structure may not match the target exactly but it will be as close as possible. Two potential differences that can happen:

  1. Column order can be different if the engine doesn't support positional additions. Another reason for difference is if a column is just moved since we don't currently support fixing moves.
  2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested operations. As a result historical data is lost.
  3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible change. As a result historical data is lost.

Potential future improvements:

  1. Support column moves. Databricks Delta supports moves and would allow exact matches.
Arguments:
  • support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a specific position in the set of existing columns.
  • nested_support: How the engine for which the diff is being computed supports nested types.
  • compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data type, and value is the set of types that are compatible with it.
  • coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without altering the column type. NOTE: usually callers should not specify this attribute manually and set the support_coercing_compatible_types flag instead. Some engines are inconsistent about their type coercion rules. For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted into a FLOAT64 column just fine.
  • support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct coercion of compatible types.
  • drop_cascade: Whether to add CASCADE modifier when dropping a column.
  • parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type, but in the engine adapter specification we build it from the dialect string instead of specifying it directly. Example: exp.DataType.build("STRING", dialect=DIALECT).this instead of the underlying exp.DataType.Type.TEXT to which it parses. We do that because parameter default replacement will silently break if we specify type directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default values in a list, where the list index contains the remaining defaults given the number of parameter values provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two omitted parameters (38, 9) -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return index 1 value for the one omitted parameters (0,) -> "DECIMAL(10,0)".
  • max_parameter_length: Numeric parameter values corresponding to "max". Example: VARCHAR(max) -> VARCHAR(65535).
  • types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly parameterized type can ALTER to its unlimited length version, along with different types in some engines.
  • treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it concludes the change is compatible and won't result in data loss. If this flag is set to True, it will flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't be set outside of that context.
support_positional_add: bool
nested_support: NestedSupport
array_element_selector: str
compatible_types: Dict[sqlglot.expressions.DataType, Set[sqlglot.expressions.DataType]]
coerceable_types_: Dict[sqlglot.expressions.DataType, Set[sqlglot.expressions.DataType]]
precision_increase_allowed_types: Optional[Set[sqlglot.expressions.DataType.Type]]
support_coercing_compatible_types: bool
drop_cascade: bool
parameterized_type_defaults: Dict[sqlglot.expressions.DataType.Type, List[Tuple[Union[int, float], ...]]]
max_parameter_length: Dict[sqlglot.expressions.DataType.Type, Union[int, float]]
types_with_unlimited_length: Dict[sqlglot.expressions.DataType.Type, Set[sqlglot.expressions.DataType.Type]]
treat_alter_data_type_as_destructive: bool
coerceable_types: Dict[sqlglot.expressions.DataType, Set[sqlglot.expressions.DataType]]
379    @property
380    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
381        if not self._coerceable_types:
382            if not self.support_coercing_compatible_types or not self.compatible_types:
383                return self.coerceable_types_
384            coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set)
385            coerceable_types.update(self.coerceable_types_)
386            for source_type, target_types in self.compatible_types.items():
387                for target_type in target_types:
388                    coerceable_types[target_type].add(source_type)
389            self._coerceable_types = coerceable_types
390        return self._coerceable_types
def get_type_parameters(self, type: sqlglot.expressions.DataType) -> List[Union[int, float]]:
440    def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]:
441        def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]:
442            try:
443                return int(string)
444            except ValueError:
445                try:
446                    return float(string)
447                except ValueError:
448                    if allows_max_param and string.upper() == "MAX":
449                        return self.max_parameter_length[type.this]
450                    raise ValueError(f"Could not convert '{string}' to a number")
451
452        # extract existing parameters
453        params = [
454            _str_to_number(param.this.this, type.this in self.max_parameter_length)
455            for param in type.expressions
456        ]
457
458        # maybe get default parameter values
459        param_defaults: t.Tuple[t.Union[int, float], ...] = ()
460        if type.this in self.parameterized_type_defaults:
461            param_defaults_list = self.parameterized_type_defaults[type.this]
462            if len(params) < len(param_defaults_list):
463                param_defaults = param_defaults_list[len(params)]
464
465        return [*params, *param_defaults]
def compare_columns( self, table_name: Union[str, sqlglot.expressions.Table], current: Dict[str, sqlglot.expressions.DataType], new: Dict[str, sqlglot.expressions.DataType], *, ignore_destructive: bool = False, ignore_additive: bool = False) -> List[TableAlterColumnOperation]:
801    def compare_columns(
802        self,
803        table_name: TableName,
804        current: t.Dict[str, exp.DataType],
805        new: t.Dict[str, exp.DataType],
806        *,
807        ignore_destructive: bool = False,
808        ignore_additive: bool = False,
809    ) -> t.List[TableAlterColumnOperation]:
810        return self._compare_structs(
811            table_name,
812            columns_to_types_to_struct(current),
813            columns_to_types_to_struct(new),
814            ignore_destructive=ignore_destructive,
815            ignore_additive=ignore_additive,
816        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def has_drop_alteration( alter_operations: List[TableAlterOperation]) -> bool:
819def has_drop_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
820    return any(op.is_destructive for op in alter_operations)
def has_additive_alteration( alter_operations: List[TableAlterOperation]) -> bool:
823def has_additive_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
824    return any(op.is_additive for op in alter_operations)
def get_additive_changes( alter_operations: List[TableAlterOperation]) -> List[TableAlterOperation]:
827def get_additive_changes(
828    alter_operations: t.List[TableAlterOperation],
829) -> t.List[TableAlterOperation]:
830    return [x for x in alter_operations if x.is_additive]
def get_dropped_column_names( alter_expressions: List[TableAlterOperation]) -> List[str]:
833def get_dropped_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
834    return [
835        op.column.alias_or_name
836        for op in alter_expressions
837        if isinstance(op, TableAlterDropColumnOperation)
838    ]
def get_additive_column_names( alter_expressions: List[TableAlterOperation]) -> List[str]:
841def get_additive_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
842    return [
843        op.column.alias_or_name
844        for op in alter_expressions
845        if op.is_additive and isinstance(op, TableAlterColumnOperation)
846    ]
def get_schema_differ( dialect: str, overrides: Optional[Dict[str, Any]] = None) -> SchemaDiffer:
849def get_schema_differ(
850    dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None
851) -> SchemaDiffer:
852    """
853    Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.
854
855    Args:
856        dialect: The dialect for which to get the schema differ.
857        overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
858
859    Returns:
860        The SchemaDiffer instance configured for the given dialect.
861    """
862    from sqlmesh.core.engine_adapter import (
863        DIALECT_TO_ENGINE_ADAPTER,
864        DIALECT_ALIASES,
865        EngineAdapter,
866    )
867
868    dialect = dialect.lower()
869    dialect = DIALECT_ALIASES.get(dialect, dialect)
870    engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter)
871    return SchemaDiffer(
872        **{
873            **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"),
874            **(overrides or {}),
875        }
876    )

Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.

Arguments:
  • dialect: The dialect for which to get the schema differ.
  • overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
Returns:

The SchemaDiffer instance configured for the given dialect.