Edit on GitHub

sqlmesh.core.schema_diff

  1from __future__ import annotations
  2
  3import abc
  4import logging
  5import typing as t
  6from dataclasses import dataclass
  7from collections import defaultdict
  8from enum import Enum
  9
 10from pydantic import Field
 11from sqlglot import exp
 12from sqlglot.helper import ensure_list, seq_get
 13
 14from sqlmesh.utils import columns_to_types_to_struct
 15from sqlmesh.utils.pydantic import PydanticModel
 16from sqlmesh.utils.errors import SQLMeshError
 17
 18if t.TYPE_CHECKING:
 19    from sqlmesh.core._typing import TableName
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24@dataclass(frozen=True)
 25class TableAlterOperation(abc.ABC):
 26    target_table: exp.Table
 27
 28    @property
 29    @abc.abstractmethod
 30    def is_destructive(self) -> bool:
 31        pass
 32
 33    @property
 34    @abc.abstractmethod
 35    def is_additive(self) -> bool:
 36        pass
 37
 38    @property
 39    @abc.abstractmethod
 40    def _alter_actions(self) -> t.List[exp.Expr]:
 41        pass
 42
 43    @property
 44    def expression(self) -> exp.Alter:
 45        return exp.Alter(
 46            this=self.target_table,
 47            kind="TABLE",
 48            actions=self._alter_actions,
 49        )
 50
 51
 52@dataclass(frozen=True)
 53class TableAlterColumnOperation(TableAlterOperation, abc.ABC):
 54    column_parts: t.List[TableAlterColumn]
 55    expected_table_struct: exp.DataType
 56    array_element_selector: str
 57
 58    @property
 59    def column_identifiers(self) -> t.List[exp.Identifier]:
 60        results = []
 61        for column in self.column_parts:
 62            results.append(column.identifier)
 63            if (
 64                column.is_array_of_struct
 65                and len(self.column_parts) > 1
 66                and self.array_element_selector
 67            ):
 68                results.append(exp.to_identifier(self.array_element_selector))
 69        return results
 70
 71    @property
 72    def column(self) -> t.Union[exp.Dot, exp.Identifier]:
 73        columns = self.column_identifiers
 74        if len(columns) == 1:
 75            return columns[0]
 76        return exp.Dot.build(columns)
 77
 78
 79@dataclass(frozen=True)
 80class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC):
 81    column_type: exp.DataType
 82
 83    @property
 84    def column_def(self) -> exp.ColumnDef:
 85        if not self.column_type:
 86            raise SQLMeshError("Tried to access column type when it shouldn't be needed")
 87        return exp.ColumnDef(
 88            this=self.column,
 89            kind=self.column_type,
 90        )
 91
 92
 93@dataclass(frozen=True)
 94class TableAlterAddColumnOperation(TableAlterTypedColumnOperation):
 95    position: t.Optional[TableAlterColumnPosition] = None
 96    is_part_of_destructive_change: bool = False
 97
 98    @property
 99    def is_additive(self) -> bool:
100        return not self.is_part_of_destructive_change
101
102    @property
103    def is_destructive(self) -> bool:
104        return self.is_part_of_destructive_change
105
106    @property
107    def _alter_actions(self) -> t.List[exp.Expr]:
108        column_def = exp.ColumnDef(
109            this=self.column,
110            kind=self.column_type,
111        )
112        if self.position:
113            column_def.set("position", self.position.column_position_node)
114        return [column_def]
115
116
117@dataclass(frozen=True)
118class TableAlterDropColumnOperation(TableAlterColumnOperation):
119    cascade: bool = False
120
121    @property
122    def is_additive(self) -> bool:
123        return False
124
125    @property
126    def is_destructive(self) -> bool:
127        return True
128
129    @property
130    def _alter_actions(self) -> t.List[exp.Expr]:
131        return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)]
132
133
134@dataclass(frozen=True)
135class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation):
136    current_type: exp.DataType
137    is_part_of_destructive_change: bool = False
138
139    @property
140    def is_additive(self) -> bool:
141        return not self.is_part_of_destructive_change
142
143    @property
144    def is_destructive(self) -> bool:
145        return self.is_part_of_destructive_change
146
147    @property
148    def _alter_actions(self) -> t.List[exp.Expr]:
149        return [
150            exp.AlterColumn(
151                this=self.column,
152                dtype=self.column_type,
153            )
154        ]
155
156
157@dataclass(frozen=True)
158class TableAlterColumn:
159    name: str
160    is_struct: bool
161    is_array_of_struct: bool
162    is_array_of_primitive: bool
163    quoted: bool = False
164
165    @classmethod
166    def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
167        return cls(
168            name=name,
169            is_struct=False,
170            is_array_of_struct=False,
171            is_array_of_primitive=False,
172            quoted=quoted,
173        )
174
175    @classmethod
176    def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
177        return cls(
178            name=name,
179            is_struct=True,
180            is_array_of_struct=False,
181            is_array_of_primitive=False,
182            quoted=quoted,
183        )
184
185    @classmethod
186    def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
187        return cls(
188            name=name,
189            is_struct=False,
190            is_array_of_struct=True,
191            is_array_of_primitive=False,
192            quoted=quoted,
193        )
194
195    @classmethod
196    def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
197        return cls(
198            name=name,
199            is_struct=False,
200            is_array_of_struct=False,
201            is_array_of_primitive=True,
202            quoted=quoted,
203        )
204
205    @classmethod
206    def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn:
207        name = struct.alias_or_name
208        quoted = struct.this.quoted
209        kwarg_type = struct.args["kind"]
210
211        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
212            return cls.struct(name, quoted=quoted)
213        if kwarg_type.is_type(exp.DataType.Type.ARRAY):
214            if kwarg_type.expressions and kwarg_type.expressions[0].is_type(
215                exp.DataType.Type.STRUCT
216            ):
217                return cls.array_of_struct(name, quoted=quoted)
218            return cls.array_of_primitive(name, quoted=quoted)
219        return cls.primitive(name, quoted=quoted)
220
221    @property
222    def is_array(self) -> bool:
223        return self.is_array_of_struct or self.is_array_of_primitive
224
225    @property
226    def is_primitive(self) -> bool:
227        return not self.is_struct and not self.is_array
228
229    @property
230    def is_nested(self) -> bool:
231        return not self.is_primitive
232
233    @property
234    def identifier(self) -> exp.Identifier:
235        return exp.to_identifier(self.name, quoted=self.quoted)
236
237
238@dataclass(frozen=True)
239class TableAlterColumnPosition:
240    is_first: bool
241    is_last: bool
242    after: t.Optional[exp.Identifier] = None
243
244    @classmethod
245    def first(cls) -> TableAlterColumnPosition:
246        return cls(is_first=True, is_last=False, after=None)
247
248    @classmethod
249    def last(
250        cls, after: t.Optional[t.Union[str, exp.Identifier]] = None
251    ) -> TableAlterColumnPosition:
252        return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
253
254    @classmethod
255    def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
256        return cls(is_first=False, is_last=False, after=exp.to_identifier(after))
257
258    @classmethod
259    def create(
260        cls,
261        pos: int,
262        current_kwargs: t.List[exp.ColumnDef],
263        replacing_col: bool = False,
264    ) -> TableAlterColumnPosition:
265        is_first = pos == 0
266        is_last = pos == len(current_kwargs) - int(replacing_col)
267        after = None
268        if not is_first:
269            prior_kwarg = current_kwargs[pos - 1]
270            after, _ = _get_name_and_type(prior_kwarg)
271        return cls(is_first=is_first, is_last=is_last, after=after)
272
273    @property
274    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
275        column = self.after if not self.is_last else None
276        position = None
277        if self.is_first:
278            position = "FIRST"
279        elif column and not self.is_last:
280            position = "AFTER"
281        return exp.ColumnPosition(this=column, position=position)
282
283
284class NestedSupport(str, Enum):
285    # Supports all nested data type operations
286    ALL = "ALL"
287    # Does not support any nested data type operations
288    NONE = "NONE"
289    # Supports nested data type operations except for those that require dropping a nested field
290    ALL_BUT_DROP = "ALL_BUT_DROP"
291    # Ignores all nested data type operations
292    IGNORE = "IGNORE"
293
294    @property
295    def is_all(self) -> bool:
296        return self == NestedSupport.ALL
297
298    @property
299    def is_none(self) -> bool:
300        return self == NestedSupport.NONE
301
302    @property
303    def is_all_but_drop(self) -> bool:
304        return self == NestedSupport.ALL_BUT_DROP
305
306    @property
307    def is_ignore(self) -> bool:
308        return self == NestedSupport.IGNORE
309
310
311class SchemaDiffer(PydanticModel):
312    """
313    Compares a source schema against a target schema and returns a list of alter statements to have the source
314    match the structure of target. Some engines have constraints on the types of operations that can be performed
315    therefore the final structure may not match the target exactly but it will be as close as possible. Two potential
316    differences that can happen:
317    1. Column order can be different if the engine doesn't support positional additions. Another reason for difference
318    is if a column is just moved since we don't currently support fixing moves.
319    2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested
320    operations. As a result historical data is lost.
321    3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible
322    change. As a result historical data is lost.
323
324    Potential future improvements:
325    1. Support column moves. Databricks Delta supports moves and would allow exact matches.
326
327    Args:
328        support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a
329            specific position in the set of existing columns.
330        nested_support: How the engine for which the diff is being computed supports nested types.
331        compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data
332            type, and value is the set of types that are compatible with it.
333        coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without
334            altering the column type. NOTE: usually callers should not specify this attribute manually and set the
335            `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules.
336            For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted
337            into a FLOAT64 column just fine.
338        support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct
339            coercion of compatible types.
340        drop_cascade: Whether to add CASCADE modifier when dropping a column.
341        parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type,
342            but in the engine adapter specification we build it from the dialect string instead of specifying it directly.
343            Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT`
344            to which it parses. We do that because parameter default replacement will silently break if we specify type
345            directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default
346            values in a list, where the list index contains the remaining defaults given the number of parameter values
347            provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two
348            omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return
349            index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)".
350        max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`.
351        types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly
352            parameterized type can ALTER to its unlimited length version, along with different types in some engines.
353        treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it
354            concludes the change is compatible and won't result in data loss. If this flag is set to True, it will
355            flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't
356            be set outside of that context.
357    """
358
359    support_positional_add: bool = False
360    nested_support: NestedSupport = NestedSupport.NONE
361    array_element_selector: str = ""
362    compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
363    coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field(
364        default_factory=dict, alias="coerceable_types"
365    )
366    precision_increase_allowed_types: t.Optional[t.Set[exp.DType]] = None
367    support_coercing_compatible_types: bool = False
368    drop_cascade: bool = False
369    parameterized_type_defaults: t.Dict[exp.DType, t.List[t.Tuple[t.Union[int, float], ...]]] = {}
370    max_parameter_length: t.Dict[exp.DType, t.Union[int, float]] = {}
371    types_with_unlimited_length: t.Dict[exp.DType, t.Set[exp.DType]] = {}
372    treat_alter_data_type_as_destructive: bool = False
373
374    _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
375
376    @property
377    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
378        if not self._coerceable_types:
379            if not self.support_coercing_compatible_types or not self.compatible_types:
380                return self.coerceable_types_
381            coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set)
382            coerceable_types.update(self.coerceable_types_)
383            for source_type, target_types in self.compatible_types.items():
384                for target_type in target_types:
385                    coerceable_types[target_type].add(source_type)
386            self._coerceable_types = coerceable_types
387        return self._coerceable_types
388
389    def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
390        # types are identical or both types are parameterized and new has higher precision
391        # - default parameter values are automatically provided if not present
392        if current_type == new_type or (
393            self._is_precision_increase_allowed(current_type)
394            and self._is_precision_increase(current_type, new_type)
395        ):
396            return True
397        # types are un-parameterized and compatible
398        if current_type in self.compatible_types:
399            return new_type in self.compatible_types[current_type]
400        # new type is un-parameterized and has unlimited length, current type is compatible
401        if not new_type.expressions and new_type.this in self.types_with_unlimited_length:
402            return current_type.this in self.types_with_unlimited_length[new_type.this]
403        return False
404
405    def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
406        if current_type in self.coerceable_types:
407            is_coerceable = new_type in self.coerceable_types[current_type]
408            if is_coerceable:
409                from sqlmesh.core.console import get_console
410
411                get_console().log_warning(
412                    f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning."
413                )
414
415            return is_coerceable
416        return False
417
418    def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool:
419        return (
420            self.precision_increase_allowed_types is None
421            or current_type.this in self.precision_increase_allowed_types
422        )
423
424    def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
425        if current_type.this == new_type.this and not current_type.is_type(
426            *exp.DataType.NESTED_TYPES
427        ):
428            current_params = self.get_type_parameters(current_type)
429            new_params = self.get_type_parameters(new_type)
430
431            if len(current_params) != len(new_params):
432                return False
433
434            return all(new >= current for current, new in zip(current_params, new_params))
435        return False
436
437    def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]:
438        def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]:
439            try:
440                return int(string)
441            except ValueError:
442                try:
443                    return float(string)
444                except ValueError:
445                    if allows_max_param and string.upper() == "MAX":
446                        return self.max_parameter_length[type.this]
447                    raise ValueError(f"Could not convert '{string}' to a number")
448
449        # extract existing parameters
450        params = [
451            _str_to_number(param.this.this, type.this in self.max_parameter_length)
452            for param in type.expressions
453        ]
454
455        # maybe get default parameter values
456        param_defaults: t.Tuple[t.Union[int, float], ...] = ()
457        if type.this in self.parameterized_type_defaults:
458            param_defaults_list = self.parameterized_type_defaults[type.this]
459            if len(params) < len(param_defaults_list):
460                param_defaults = param_defaults_list[len(params)]
461
462        return [*params, *param_defaults]
463
464    def _get_matching_kwarg(
465        self,
466        current_kwarg: t.Union[str, exp.ColumnDef],
467        new_struct: exp.DataType,
468        current_pos: int,
469    ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]:
470        current_name = (
471            exp.to_identifier(current_kwarg)
472            if isinstance(current_kwarg, str)
473            else _get_name_and_type(current_kwarg)[0]
474        )
475        # First check if we have the same column in the same position to get O(1) complexity
476        new_kwarg = seq_get(new_struct.expressions, current_pos)
477        if new_kwarg:
478            new_name, new_type = _get_name_and_type(new_kwarg)
479            if current_name.this == new_name.this:
480                return current_pos, new_kwarg
481        # If not, check if we have the same column in all positions with O(n) complexity
482        for i, new_kwarg in enumerate(new_struct.expressions):
483            new_name, new_type = _get_name_and_type(new_kwarg)
484            if current_name.this == new_name.this:
485                return i, new_kwarg
486        return None, None
487
488    def _drop_operation(
489        self,
490        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
491        struct: exp.DataType,
492        pos: int,
493        root_struct: exp.DataType,
494        table_name: TableName,
495    ) -> t.List[TableAlterColumnOperation]:
496        columns = ensure_list(columns)
497        operations: t.List[TableAlterColumnOperation] = []
498        column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos)
499        if column_pos is None or not column_kwarg:
500            raise SQLMeshError(
501                f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. "
502                f"This may indicate a mismatch between the expected and actual table schemas."
503            )
504        struct.expressions.pop(column_pos)
505        operations.append(
506            TableAlterDropColumnOperation(
507                target_table=exp.to_table(table_name),
508                column_parts=columns,
509                expected_table_struct=root_struct.copy(),
510                cascade=self.drop_cascade,
511                array_element_selector=self.array_element_selector,
512            )
513        )
514        return operations
515
516    def _requires_drop_alteration(
517        self, current_struct: exp.DataType, new_struct: exp.DataType
518    ) -> bool:
519        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
520            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
521            if new_pos is None:
522                return True
523        return False
524
525    def _resolve_drop_operation(
526        self,
527        parent_columns: t.List[TableAlterColumn],
528        current_struct: exp.DataType,
529        new_struct: exp.DataType,
530        root_struct: exp.DataType,
531        table_name: TableName,
532    ) -> t.List[TableAlterColumnOperation]:
533        operations = []
534        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
535            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
536            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
537            if new_pos is None:
538                operations.extend(
539                    self._drop_operation(
540                        columns, current_struct, current_pos, root_struct, table_name
541                    )
542                )
543        return operations
544
545    def _add_operation(
546        self,
547        columns: t.List[TableAlterColumn],
548        new_pos: int,
549        new_kwarg: exp.ColumnDef,
550        current_struct: exp.DataType,
551        root_struct: exp.DataType,
552        table_name: TableName,
553        is_part_of_destructive_change: bool = False,
554    ) -> t.List[TableAlterColumnOperation]:
555        if self.support_positional_add:
556            col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions)
557            current_struct.expressions.insert(new_pos, new_kwarg)
558        else:
559            col_pos = None
560            current_struct.expressions.append(new_kwarg)
561        return [
562            TableAlterAddColumnOperation(
563                target_table=exp.to_table(table_name),
564                column_parts=columns,
565                column_type=new_kwarg.args["kind"],
566                expected_table_struct=root_struct.copy(),
567                position=col_pos,
568                is_part_of_destructive_change=is_part_of_destructive_change,
569                array_element_selector=self.array_element_selector,
570            )
571        ]
572
573    def _resolve_add_operations(
574        self,
575        parent_columns: t.List[TableAlterColumn],
576        current_struct: exp.DataType,
577        new_struct: exp.DataType,
578        root_struct: exp.DataType,
579        table_name: TableName,
580    ) -> t.List[TableAlterColumnOperation]:
581        operations = []
582        for new_pos, new_kwarg in enumerate(new_struct.expressions):
583            possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos)
584            if possible_current_pos is None:
585                columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)]
586                operations.extend(
587                    self._add_operation(
588                        columns, new_pos, new_kwarg, current_struct, root_struct, table_name
589                    )
590                )
591        return operations
592
593    def _alter_operation(
594        self,
595        columns: t.List[TableAlterColumn],
596        pos: int,
597        struct: exp.DataType,
598        new_type: exp.DataType,
599        current_type: t.Union[str, exp.DataType],
600        root_struct: exp.DataType,
601        new_kwarg: exp.ColumnDef,
602        table_name: TableName,
603        *,
604        ignore_destructive: bool = False,
605        ignore_additive: bool = False,
606    ) -> t.List[TableAlterColumnOperation]:
607        # We don't copy on purpose here because current_type may need to be mutated inside
608        # _get_operations (struct.expressions.pop and struct.expressions.insert)
609        current_type = exp.DataType.build(current_type, copy=False)
610        if not self.nested_support.is_none:
611            if new_type.this == current_type.this == exp.DataType.Type.STRUCT:
612                if self.nested_support.is_ignore:
613                    return []
614                if self.nested_support.is_all or not self._requires_drop_alteration(
615                    current_type, new_type
616                ):
617                    return self._get_operations(
618                        columns,
619                        current_type,
620                        new_type,
621                        root_struct,
622                        table_name,
623                        ignore_destructive=ignore_destructive,
624                        ignore_additive=ignore_additive,
625                    )
626
627            if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
628                # Some engines (i.e. Snowflake) don't support defining types on arrays
629                if not new_type.expressions or not current_type.expressions:
630                    return []
631                new_array_type = new_type.expressions[0]
632                current_array_type = current_type.expressions[0]
633                if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT:
634                    if self.nested_support.is_ignore:
635                        return []
636                    if self.nested_support.is_all or not self._requires_drop_alteration(
637                        current_array_type, new_array_type
638                    ):
639                        return self._get_operations(
640                            columns,
641                            current_array_type,
642                            new_array_type,
643                            root_struct,
644                            table_name,
645                            ignore_destructive=ignore_destructive,
646                            ignore_additive=ignore_additive,
647                        )
648        if self._is_coerceable_type(current_type, new_type):
649            return []
650        if self._is_compatible_type(current_type, new_type):
651            if ignore_additive:
652                return []
653            struct.expressions.pop(pos)
654            struct.expressions.insert(pos, new_kwarg)
655            return [
656                TableAlterChangeColumnTypeOperation(
657                    target_table=exp.to_table(table_name),
658                    column_parts=columns,
659                    column_type=new_type,
660                    current_type=current_type,
661                    expected_table_struct=root_struct.copy(),
662                    array_element_selector=self.array_element_selector,
663                    is_part_of_destructive_change=self.treat_alter_data_type_as_destructive,
664                )
665            ]
666        if ignore_destructive:
667            return []
668        return self._drop_operation(
669            columns,
670            root_struct,
671            pos,
672            root_struct,
673            table_name,
674        ) + self._add_operation(
675            columns,
676            pos,
677            new_kwarg,
678            struct,
679            root_struct,
680            table_name,
681            is_part_of_destructive_change=True,
682        )
683
684    def _resolve_alter_operations(
685        self,
686        parent_columns: t.List[TableAlterColumn],
687        current_struct: exp.DataType,
688        new_struct: exp.DataType,
689        root_struct: exp.DataType,
690        table_name: TableName,
691        *,
692        ignore_destructive: bool = False,
693        ignore_additive: bool = False,
694    ) -> t.List[TableAlterColumnOperation]:
695        operations = []
696        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
697            _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
698            if new_kwarg is None:
699                if ignore_destructive:
700                    continue
701                raise ValueError("Cannot alter a column that is being dropped")
702            _, new_type = _get_name_and_type(new_kwarg)
703            _, current_type = _get_name_and_type(current_kwarg)
704            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
705            if new_type == current_type:
706                continue
707            operations.extend(
708                self._alter_operation(
709                    columns,
710                    current_pos,
711                    current_struct,
712                    new_type,
713                    current_type,
714                    root_struct,
715                    new_kwarg,
716                    table_name,
717                    ignore_destructive=ignore_destructive,
718                    ignore_additive=ignore_additive,
719                )
720            )
721        return operations
722
723    def _get_operations(
724        self,
725        parent_columns: t.List[TableAlterColumn],
726        current_struct: exp.DataType,
727        new_struct: exp.DataType,
728        root_struct: exp.DataType,
729        table_name: TableName,
730        *,
731        ignore_destructive: bool = False,
732        ignore_additive: bool = False,
733    ) -> t.List[TableAlterColumnOperation]:
734        root_struct = root_struct or current_struct
735        parent_columns = parent_columns or []
736        operations = []
737        if not ignore_destructive:
738            operations.extend(
739                self._resolve_drop_operation(
740                    parent_columns, current_struct, new_struct, root_struct, table_name
741                )
742            )
743        if not ignore_additive:
744            operations.extend(
745                self._resolve_add_operations(
746                    parent_columns, current_struct, new_struct, root_struct, table_name
747                )
748            )
749        operations.extend(
750            self._resolve_alter_operations(
751                parent_columns,
752                current_struct,
753                new_struct,
754                root_struct,
755                ignore_destructive=ignore_destructive,
756                ignore_additive=ignore_additive,
757                table_name=table_name,
758            )
759        )
760        return operations
761
762    def _from_structs(
763        self,
764        current_struct: exp.DataType,
765        new_struct: exp.DataType,
766        table_name: TableName,
767        *,
768        ignore_destructive: bool = False,
769        ignore_additive: bool = False,
770    ) -> t.List[TableAlterColumnOperation]:
771        return self._get_operations(
772            [],
773            current_struct,
774            new_struct,
775            current_struct,
776            table_name=table_name,
777            ignore_destructive=ignore_destructive,
778            ignore_additive=ignore_additive,
779        )
780
781    def _compare_structs(
782        self,
783        table_name: t.Union[str, exp.Table],
784        current: exp.DataType,
785        new: exp.DataType,
786        *,
787        ignore_destructive: bool = False,
788        ignore_additive: bool = False,
789    ) -> t.List[TableAlterColumnOperation]:
790        return self._from_structs(
791            current,
792            new,
793            table_name=table_name,
794            ignore_destructive=ignore_destructive,
795            ignore_additive=ignore_additive,
796        )
797
798    def compare_columns(
799        self,
800        table_name: TableName,
801        current: t.Dict[str, exp.DataType],
802        new: t.Dict[str, exp.DataType],
803        *,
804        ignore_destructive: bool = False,
805        ignore_additive: bool = False,
806    ) -> t.List[TableAlterColumnOperation]:
807        return self._compare_structs(
808            table_name,
809            columns_to_types_to_struct(current),
810            columns_to_types_to_struct(new),
811            ignore_destructive=ignore_destructive,
812            ignore_additive=ignore_additive,
813        )
814
815
816def has_drop_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
817    return any(op.is_destructive for op in alter_operations)
818
819
820def has_additive_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
821    return any(op.is_additive for op in alter_operations)
822
823
824def get_additive_changes(
825    alter_operations: t.List[TableAlterOperation],
826) -> t.List[TableAlterOperation]:
827    return [x for x in alter_operations if x.is_additive]
828
829
830def get_dropped_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
831    return [
832        op.column.alias_or_name
833        for op in alter_expressions
834        if isinstance(op, TableAlterDropColumnOperation)
835    ]
836
837
838def get_additive_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
839    return [
840        op.column.alias_or_name
841        for op in alter_expressions
842        if op.is_additive and isinstance(op, TableAlterColumnOperation)
843    ]
844
845
846def get_schema_differ(
847    dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None
848) -> SchemaDiffer:
849    """
850    Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.
851
852    Args:
853        dialect: The dialect for which to get the schema differ.
854        overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
855
856    Returns:
857        The SchemaDiffer instance configured for the given dialect.
858    """
859    from sqlmesh.core.engine_adapter import (
860        DIALECT_TO_ENGINE_ADAPTER,
861        DIALECT_ALIASES,
862        EngineAdapter,
863    )
864
865    dialect = dialect.lower()
866    dialect = DIALECT_ALIASES.get(dialect, dialect)
867    engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter)
868    return SchemaDiffer(
869        **{
870            **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"),
871            **(overrides or {}),
872        }
873    )
874
875
876def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]:
877    return struct.this, struct.args["kind"]
logger = <Logger sqlmesh.core.schema_diff (WARNING)>
@dataclass(frozen=True)
class TableAlterOperation(abc.ABC):
25@dataclass(frozen=True)
26class TableAlterOperation(abc.ABC):
27    target_table: exp.Table
28
29    @property
30    @abc.abstractmethod
31    def is_destructive(self) -> bool:
32        pass
33
34    @property
35    @abc.abstractmethod
36    def is_additive(self) -> bool:
37        pass
38
39    @property
40    @abc.abstractmethod
41    def _alter_actions(self) -> t.List[exp.Expr]:
42        pass
43
44    @property
45    def expression(self) -> exp.Alter:
46        return exp.Alter(
47            this=self.target_table,
48            kind="TABLE",
49            actions=self._alter_actions,
50        )
target_table: sqlglot.expressions.query.Table
is_destructive: bool
29    @property
30    @abc.abstractmethod
31    def is_destructive(self) -> bool:
32        pass
is_additive: bool
34    @property
35    @abc.abstractmethod
36    def is_additive(self) -> bool:
37        pass
expression: sqlglot.expressions.ddl.Alter
44    @property
45    def expression(self) -> exp.Alter:
46        return exp.Alter(
47            this=self.target_table,
48            kind="TABLE",
49            actions=self._alter_actions,
50        )
@dataclass(frozen=True)
class TableAlterColumnOperation(TableAlterOperation, abc.ABC):
53@dataclass(frozen=True)
54class TableAlterColumnOperation(TableAlterOperation, abc.ABC):
55    column_parts: t.List[TableAlterColumn]
56    expected_table_struct: exp.DataType
57    array_element_selector: str
58
59    @property
60    def column_identifiers(self) -> t.List[exp.Identifier]:
61        results = []
62        for column in self.column_parts:
63            results.append(column.identifier)
64            if (
65                column.is_array_of_struct
66                and len(self.column_parts) > 1
67                and self.array_element_selector
68            ):
69                results.append(exp.to_identifier(self.array_element_selector))
70        return results
71
72    @property
73    def column(self) -> t.Union[exp.Dot, exp.Identifier]:
74        columns = self.column_identifiers
75        if len(columns) == 1:
76            return columns[0]
77        return exp.Dot.build(columns)
column_parts: List[TableAlterColumn]
expected_table_struct: sqlglot.expressions.datatypes.DataType
array_element_selector: str
column_identifiers: List[sqlglot.expressions.core.Identifier]
59    @property
60    def column_identifiers(self) -> t.List[exp.Identifier]:
61        results = []
62        for column in self.column_parts:
63            results.append(column.identifier)
64            if (
65                column.is_array_of_struct
66                and len(self.column_parts) > 1
67                and self.array_element_selector
68            ):
69                results.append(exp.to_identifier(self.array_element_selector))
70        return results
column: Union[sqlglot.expressions.core.Dot, sqlglot.expressions.core.Identifier]
72    @property
73    def column(self) -> t.Union[exp.Dot, exp.Identifier]:
74        columns = self.column_identifiers
75        if len(columns) == 1:
76            return columns[0]
77        return exp.Dot.build(columns)
@dataclass(frozen=True)
class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC):
80@dataclass(frozen=True)
81class TableAlterTypedColumnOperation(TableAlterColumnOperation, abc.ABC):
82    column_type: exp.DataType
83
84    @property
85    def column_def(self) -> exp.ColumnDef:
86        if not self.column_type:
87            raise SQLMeshError("Tried to access column type when it shouldn't be needed")
88        return exp.ColumnDef(
89            this=self.column,
90            kind=self.column_type,
91        )
column_type: sqlglot.expressions.datatypes.DataType
column_def: sqlglot.expressions.query.ColumnDef
84    @property
85    def column_def(self) -> exp.ColumnDef:
86        if not self.column_type:
87            raise SQLMeshError("Tried to access column type when it shouldn't be needed")
88        return exp.ColumnDef(
89            this=self.column,
90            kind=self.column_type,
91        )
@dataclass(frozen=True)
class TableAlterAddColumnOperation(TableAlterTypedColumnOperation):
 94@dataclass(frozen=True)
 95class TableAlterAddColumnOperation(TableAlterTypedColumnOperation):
 96    position: t.Optional[TableAlterColumnPosition] = None
 97    is_part_of_destructive_change: bool = False
 98
 99    @property
100    def is_additive(self) -> bool:
101        return not self.is_part_of_destructive_change
102
103    @property
104    def is_destructive(self) -> bool:
105        return self.is_part_of_destructive_change
106
107    @property
108    def _alter_actions(self) -> t.List[exp.Expr]:
109        column_def = exp.ColumnDef(
110            this=self.column,
111            kind=self.column_type,
112        )
113        if self.position:
114            column_def.set("position", self.position.column_position_node)
115        return [column_def]
TableAlterAddColumnOperation( target_table: sqlglot.expressions.query.Table, column_parts: List[TableAlterColumn], expected_table_struct: sqlglot.expressions.datatypes.DataType, array_element_selector: str, column_type: sqlglot.expressions.datatypes.DataType, position: Optional[TableAlterColumnPosition] = None, is_part_of_destructive_change: bool = False)
position: Optional[TableAlterColumnPosition] = None
is_part_of_destructive_change: bool = False
is_additive: bool
 99    @property
100    def is_additive(self) -> bool:
101        return not self.is_part_of_destructive_change
is_destructive: bool
103    @property
104    def is_destructive(self) -> bool:
105        return self.is_part_of_destructive_change
@dataclass(frozen=True)
class TableAlterDropColumnOperation(TableAlterColumnOperation):
118@dataclass(frozen=True)
119class TableAlterDropColumnOperation(TableAlterColumnOperation):
120    cascade: bool = False
121
122    @property
123    def is_additive(self) -> bool:
124        return False
125
126    @property
127    def is_destructive(self) -> bool:
128        return True
129
130    @property
131    def _alter_actions(self) -> t.List[exp.Expr]:
132        return [exp.Drop(this=self.column, kind="COLUMN", cascade=self.cascade)]
TableAlterDropColumnOperation( target_table: sqlglot.expressions.query.Table, column_parts: List[TableAlterColumn], expected_table_struct: sqlglot.expressions.datatypes.DataType, array_element_selector: str, cascade: bool = False)
cascade: bool = False
is_additive: bool
122    @property
123    def is_additive(self) -> bool:
124        return False
is_destructive: bool
126    @property
127    def is_destructive(self) -> bool:
128        return True
@dataclass(frozen=True)
class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation):
135@dataclass(frozen=True)
136class TableAlterChangeColumnTypeOperation(TableAlterTypedColumnOperation):
137    current_type: exp.DataType
138    is_part_of_destructive_change: bool = False
139
140    @property
141    def is_additive(self) -> bool:
142        return not self.is_part_of_destructive_change
143
144    @property
145    def is_destructive(self) -> bool:
146        return self.is_part_of_destructive_change
147
148    @property
149    def _alter_actions(self) -> t.List[exp.Expr]:
150        return [
151            exp.AlterColumn(
152                this=self.column,
153                dtype=self.column_type,
154            )
155        ]
TableAlterChangeColumnTypeOperation( target_table: sqlglot.expressions.query.Table, column_parts: List[TableAlterColumn], expected_table_struct: sqlglot.expressions.datatypes.DataType, array_element_selector: str, column_type: sqlglot.expressions.datatypes.DataType, current_type: sqlglot.expressions.datatypes.DataType, is_part_of_destructive_change: bool = False)
current_type: sqlglot.expressions.datatypes.DataType
is_part_of_destructive_change: bool = False
is_additive: bool
140    @property
141    def is_additive(self) -> bool:
142        return not self.is_part_of_destructive_change
is_destructive: bool
144    @property
145    def is_destructive(self) -> bool:
146        return self.is_part_of_destructive_change
@dataclass(frozen=True)
class TableAlterColumn:
158@dataclass(frozen=True)
159class TableAlterColumn:
160    name: str
161    is_struct: bool
162    is_array_of_struct: bool
163    is_array_of_primitive: bool
164    quoted: bool = False
165
166    @classmethod
167    def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
168        return cls(
169            name=name,
170            is_struct=False,
171            is_array_of_struct=False,
172            is_array_of_primitive=False,
173            quoted=quoted,
174        )
175
176    @classmethod
177    def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
178        return cls(
179            name=name,
180            is_struct=True,
181            is_array_of_struct=False,
182            is_array_of_primitive=False,
183            quoted=quoted,
184        )
185
186    @classmethod
187    def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
188        return cls(
189            name=name,
190            is_struct=False,
191            is_array_of_struct=True,
192            is_array_of_primitive=False,
193            quoted=quoted,
194        )
195
196    @classmethod
197    def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
198        return cls(
199            name=name,
200            is_struct=False,
201            is_array_of_struct=False,
202            is_array_of_primitive=True,
203            quoted=quoted,
204        )
205
206    @classmethod
207    def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn:
208        name = struct.alias_or_name
209        quoted = struct.this.quoted
210        kwarg_type = struct.args["kind"]
211
212        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
213            return cls.struct(name, quoted=quoted)
214        if kwarg_type.is_type(exp.DataType.Type.ARRAY):
215            if kwarg_type.expressions and kwarg_type.expressions[0].is_type(
216                exp.DataType.Type.STRUCT
217            ):
218                return cls.array_of_struct(name, quoted=quoted)
219            return cls.array_of_primitive(name, quoted=quoted)
220        return cls.primitive(name, quoted=quoted)
221
222    @property
223    def is_array(self) -> bool:
224        return self.is_array_of_struct or self.is_array_of_primitive
225
226    @property
227    def is_primitive(self) -> bool:
228        return not self.is_struct and not self.is_array
229
230    @property
231    def is_nested(self) -> bool:
232        return not self.is_primitive
233
234    @property
235    def identifier(self) -> exp.Identifier:
236        return exp.to_identifier(self.name, quoted=self.quoted)
TableAlterColumn( name: str, is_struct: bool, is_array_of_struct: bool, is_array_of_primitive: bool, quoted: bool = False)
name: str
is_struct: bool
is_array_of_struct: bool
is_array_of_primitive: bool
quoted: bool = False
@classmethod
def primitive( cls, name: str, quoted: bool = False) -> TableAlterColumn:
166    @classmethod
167    def primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
168        return cls(
169            name=name,
170            is_struct=False,
171            is_array_of_struct=False,
172            is_array_of_primitive=False,
173            quoted=quoted,
174        )
@classmethod
def struct( cls, name: str, quoted: bool = False) -> TableAlterColumn:
176    @classmethod
177    def struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
178        return cls(
179            name=name,
180            is_struct=True,
181            is_array_of_struct=False,
182            is_array_of_primitive=False,
183            quoted=quoted,
184        )
@classmethod
def array_of_struct( cls, name: str, quoted: bool = False) -> TableAlterColumn:
186    @classmethod
187    def array_of_struct(cls, name: str, quoted: bool = False) -> TableAlterColumn:
188        return cls(
189            name=name,
190            is_struct=False,
191            is_array_of_struct=True,
192            is_array_of_primitive=False,
193            quoted=quoted,
194        )
@classmethod
def array_of_primitive( cls, name: str, quoted: bool = False) -> TableAlterColumn:
196    @classmethod
197    def array_of_primitive(cls, name: str, quoted: bool = False) -> TableAlterColumn:
198        return cls(
199            name=name,
200            is_struct=False,
201            is_array_of_struct=False,
202            is_array_of_primitive=True,
203            quoted=quoted,
204        )
@classmethod
def from_struct_kwarg( cls, struct: sqlglot.expressions.query.ColumnDef) -> TableAlterColumn:
206    @classmethod
207    def from_struct_kwarg(cls, struct: exp.ColumnDef) -> TableAlterColumn:
208        name = struct.alias_or_name
209        quoted = struct.this.quoted
210        kwarg_type = struct.args["kind"]
211
212        if kwarg_type.is_type(exp.DataType.Type.STRUCT):
213            return cls.struct(name, quoted=quoted)
214        if kwarg_type.is_type(exp.DataType.Type.ARRAY):
215            if kwarg_type.expressions and kwarg_type.expressions[0].is_type(
216                exp.DataType.Type.STRUCT
217            ):
218                return cls.array_of_struct(name, quoted=quoted)
219            return cls.array_of_primitive(name, quoted=quoted)
220        return cls.primitive(name, quoted=quoted)
is_array: bool
222    @property
223    def is_array(self) -> bool:
224        return self.is_array_of_struct or self.is_array_of_primitive
is_primitive: bool
226    @property
227    def is_primitive(self) -> bool:
228        return not self.is_struct and not self.is_array
is_nested: bool
230    @property
231    def is_nested(self) -> bool:
232        return not self.is_primitive
identifier: sqlglot.expressions.core.Identifier
234    @property
235    def identifier(self) -> exp.Identifier:
236        return exp.to_identifier(self.name, quoted=self.quoted)
@dataclass(frozen=True)
class TableAlterColumnPosition:
239@dataclass(frozen=True)
240class TableAlterColumnPosition:
241    is_first: bool
242    is_last: bool
243    after: t.Optional[exp.Identifier] = None
244
245    @classmethod
246    def first(cls) -> TableAlterColumnPosition:
247        return cls(is_first=True, is_last=False, after=None)
248
249    @classmethod
250    def last(
251        cls, after: t.Optional[t.Union[str, exp.Identifier]] = None
252    ) -> TableAlterColumnPosition:
253        return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
254
255    @classmethod
256    def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
257        return cls(is_first=False, is_last=False, after=exp.to_identifier(after))
258
259    @classmethod
260    def create(
261        cls,
262        pos: int,
263        current_kwargs: t.List[exp.ColumnDef],
264        replacing_col: bool = False,
265    ) -> TableAlterColumnPosition:
266        is_first = pos == 0
267        is_last = pos == len(current_kwargs) - int(replacing_col)
268        after = None
269        if not is_first:
270            prior_kwarg = current_kwargs[pos - 1]
271            after, _ = _get_name_and_type(prior_kwarg)
272        return cls(is_first=is_first, is_last=is_last, after=after)
273
274    @property
275    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
276        column = self.after if not self.is_last else None
277        position = None
278        if self.is_first:
279            position = "FIRST"
280        elif column and not self.is_last:
281            position = "AFTER"
282        return exp.ColumnPosition(this=column, position=position)
TableAlterColumnPosition( is_first: bool, is_last: bool, after: Optional[sqlglot.expressions.core.Identifier] = None)
is_first: bool
is_last: bool
after: Optional[sqlglot.expressions.core.Identifier] = None
@classmethod
def first(cls) -> TableAlterColumnPosition:
245    @classmethod
246    def first(cls) -> TableAlterColumnPosition:
247        return cls(is_first=True, is_last=False, after=None)
@classmethod
def last( cls, after: Union[str, sqlglot.expressions.core.Identifier, NoneType] = None) -> TableAlterColumnPosition:
249    @classmethod
250    def last(
251        cls, after: t.Optional[t.Union[str, exp.Identifier]] = None
252    ) -> TableAlterColumnPosition:
253        return cls(is_first=False, is_last=True, after=exp.to_identifier(after) if after else None)
@classmethod
def middle( cls, after: Union[str, sqlglot.expressions.core.Identifier]) -> TableAlterColumnPosition:
255    @classmethod
256    def middle(cls, after: t.Union[str, exp.Identifier]) -> TableAlterColumnPosition:
257        return cls(is_first=False, is_last=False, after=exp.to_identifier(after))
@classmethod
def create( cls, pos: int, current_kwargs: List[sqlglot.expressions.query.ColumnDef], replacing_col: bool = False) -> TableAlterColumnPosition:
259    @classmethod
260    def create(
261        cls,
262        pos: int,
263        current_kwargs: t.List[exp.ColumnDef],
264        replacing_col: bool = False,
265    ) -> TableAlterColumnPosition:
266        is_first = pos == 0
267        is_last = pos == len(current_kwargs) - int(replacing_col)
268        after = None
269        if not is_first:
270            prior_kwarg = current_kwargs[pos - 1]
271            after, _ = _get_name_and_type(prior_kwarg)
272        return cls(is_first=is_first, is_last=is_last, after=after)
column_position_node: Optional[sqlglot.expressions.query.ColumnPosition]
274    @property
275    def column_position_node(self) -> t.Optional[exp.ColumnPosition]:
276        column = self.after if not self.is_last else None
277        position = None
278        if self.is_first:
279            position = "FIRST"
280        elif column and not self.is_last:
281            position = "AFTER"
282        return exp.ColumnPosition(this=column, position=position)
class NestedSupport(builtins.str, enum.Enum):
285class NestedSupport(str, Enum):
286    # Supports all nested data type operations
287    ALL = "ALL"
288    # Does not support any nested data type operations
289    NONE = "NONE"
290    # Supports nested data type operations except for those that require dropping a nested field
291    ALL_BUT_DROP = "ALL_BUT_DROP"
292    # Ignores all nested data type operations
293    IGNORE = "IGNORE"
294
295    @property
296    def is_all(self) -> bool:
297        return self == NestedSupport.ALL
298
299    @property
300    def is_none(self) -> bool:
301        return self == NestedSupport.NONE
302
303    @property
304    def is_all_but_drop(self) -> bool:
305        return self == NestedSupport.ALL_BUT_DROP
306
307    @property
308    def is_ignore(self) -> bool:
309        return self == NestedSupport.IGNORE

An enumeration.

ALL = <NestedSupport.ALL: 'ALL'>
NONE = <NestedSupport.NONE: 'NONE'>
ALL_BUT_DROP = <NestedSupport.ALL_BUT_DROP: 'ALL_BUT_DROP'>
IGNORE = <NestedSupport.IGNORE: 'IGNORE'>
is_all: bool
295    @property
296    def is_all(self) -> bool:
297        return self == NestedSupport.ALL
is_none: bool
299    @property
300    def is_none(self) -> bool:
301        return self == NestedSupport.NONE
is_all_but_drop: bool
303    @property
304    def is_all_but_drop(self) -> bool:
305        return self == NestedSupport.ALL_BUT_DROP
is_ignore: bool
307    @property
308    def is_ignore(self) -> bool:
309        return self == NestedSupport.IGNORE
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class SchemaDiffer(sqlmesh.utils.pydantic.PydanticModel):
312class SchemaDiffer(PydanticModel):
313    """
314    Compares a source schema against a target schema and returns a list of alter statements to have the source
315    match the structure of target. Some engines have constraints on the types of operations that can be performed
316    therefore the final structure may not match the target exactly but it will be as close as possible. Two potential
317    differences that can happen:
318    1. Column order can be different if the engine doesn't support positional additions. Another reason for difference
319    is if a column is just moved since we don't currently support fixing moves.
320    2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested
321    operations. As a result historical data is lost.
322    3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible
323    change. As a result historical data is lost.
324
325    Potential future improvements:
326    1. Support column moves. Databricks Delta supports moves and would allow exact matches.
327
328    Args:
329        support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a
330            specific position in the set of existing columns.
331        nested_support: How the engine for which the diff is being computed supports nested types.
332        compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data
333            type, and value is the set of types that are compatible with it.
334        coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without
335            altering the column type. NOTE: usually callers should not specify this attribute manually and set the
336            `support_coercing_compatible_types` flag instead. Some engines are inconsistent about their type coercion rules.
337            For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted
338            into a FLOAT64 column just fine.
339        support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct
340            coercion of compatible types.
341        drop_cascade: Whether to add CASCADE modifier when dropping a column.
342        parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type,
343            but in the engine adapter specification we build it from the dialect string instead of specifying it directly.
344            Example: `exp.DataType.build("STRING", dialect=DIALECT).this` instead of the underlying `exp.DataType.Type.TEXT`
345            to which it parses. We do that because parameter default replacement will silently break if we specify type
346            directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default
347            values in a list, where the list index contains the remaining defaults given the number of parameter values
348            provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two
349            omitted parameters `(38, 9)` -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return
350            index 1 value for the one omitted parameters `(0,)` -> "DECIMAL(10,0)".
351        max_parameter_length: Numeric parameter values corresponding to "max". Example: `VARCHAR(max)` -> `VARCHAR(65535)`.
352        types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly
353            parameterized type can ALTER to its unlimited length version, along with different types in some engines.
354        treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it
355            concludes the change is compatible and won't result in data loss. If this flag is set to True, it will
356            flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't
357            be set outside of that context.
358    """
359
360    support_positional_add: bool = False
361    nested_support: NestedSupport = NestedSupport.NONE
362    array_element_selector: str = ""
363    compatible_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
364    coerceable_types_: t.Dict[exp.DataType, t.Set[exp.DataType]] = Field(
365        default_factory=dict, alias="coerceable_types"
366    )
367    precision_increase_allowed_types: t.Optional[t.Set[exp.DType]] = None
368    support_coercing_compatible_types: bool = False
369    drop_cascade: bool = False
370    parameterized_type_defaults: t.Dict[exp.DType, t.List[t.Tuple[t.Union[int, float], ...]]] = {}
371    max_parameter_length: t.Dict[exp.DType, t.Union[int, float]] = {}
372    types_with_unlimited_length: t.Dict[exp.DType, t.Set[exp.DType]] = {}
373    treat_alter_data_type_as_destructive: bool = False
374
375    _coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = {}
376
377    @property
378    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
379        if not self._coerceable_types:
380            if not self.support_coercing_compatible_types or not self.compatible_types:
381                return self.coerceable_types_
382            coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set)
383            coerceable_types.update(self.coerceable_types_)
384            for source_type, target_types in self.compatible_types.items():
385                for target_type in target_types:
386                    coerceable_types[target_type].add(source_type)
387            self._coerceable_types = coerceable_types
388        return self._coerceable_types
389
390    def _is_compatible_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
391        # types are identical or both types are parameterized and new has higher precision
392        # - default parameter values are automatically provided if not present
393        if current_type == new_type or (
394            self._is_precision_increase_allowed(current_type)
395            and self._is_precision_increase(current_type, new_type)
396        ):
397            return True
398        # types are un-parameterized and compatible
399        if current_type in self.compatible_types:
400            return new_type in self.compatible_types[current_type]
401        # new type is un-parameterized and has unlimited length, current type is compatible
402        if not new_type.expressions and new_type.this in self.types_with_unlimited_length:
403            return current_type.this in self.types_with_unlimited_length[new_type.this]
404        return False
405
406    def _is_coerceable_type(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
407        if current_type in self.coerceable_types:
408            is_coerceable = new_type in self.coerceable_types[current_type]
409            if is_coerceable:
410                from sqlmesh.core.console import get_console
411
412                get_console().log_warning(
413                    f"Coercing type {current_type} to {new_type} which means an alter will not be performed and therefore the resulting table structure will not match what is in the query.\nUpdate your model to cast the value to {current_type} type in order to remove this warning."
414                )
415
416            return is_coerceable
417        return False
418
419    def _is_precision_increase_allowed(self, current_type: exp.DataType) -> bool:
420        return (
421            self.precision_increase_allowed_types is None
422            or current_type.this in self.precision_increase_allowed_types
423        )
424
425    def _is_precision_increase(self, current_type: exp.DataType, new_type: exp.DataType) -> bool:
426        if current_type.this == new_type.this and not current_type.is_type(
427            *exp.DataType.NESTED_TYPES
428        ):
429            current_params = self.get_type_parameters(current_type)
430            new_params = self.get_type_parameters(new_type)
431
432            if len(current_params) != len(new_params):
433                return False
434
435            return all(new >= current for current, new in zip(current_params, new_params))
436        return False
437
438    def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]:
439        def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]:
440            try:
441                return int(string)
442            except ValueError:
443                try:
444                    return float(string)
445                except ValueError:
446                    if allows_max_param and string.upper() == "MAX":
447                        return self.max_parameter_length[type.this]
448                    raise ValueError(f"Could not convert '{string}' to a number")
449
450        # extract existing parameters
451        params = [
452            _str_to_number(param.this.this, type.this in self.max_parameter_length)
453            for param in type.expressions
454        ]
455
456        # maybe get default parameter values
457        param_defaults: t.Tuple[t.Union[int, float], ...] = ()
458        if type.this in self.parameterized_type_defaults:
459            param_defaults_list = self.parameterized_type_defaults[type.this]
460            if len(params) < len(param_defaults_list):
461                param_defaults = param_defaults_list[len(params)]
462
463        return [*params, *param_defaults]
464
465    def _get_matching_kwarg(
466        self,
467        current_kwarg: t.Union[str, exp.ColumnDef],
468        new_struct: exp.DataType,
469        current_pos: int,
470    ) -> t.Tuple[t.Optional[int], t.Optional[exp.ColumnDef]]:
471        current_name = (
472            exp.to_identifier(current_kwarg)
473            if isinstance(current_kwarg, str)
474            else _get_name_and_type(current_kwarg)[0]
475        )
476        # First check if we have the same column in the same position to get O(1) complexity
477        new_kwarg = seq_get(new_struct.expressions, current_pos)
478        if new_kwarg:
479            new_name, new_type = _get_name_and_type(new_kwarg)
480            if current_name.this == new_name.this:
481                return current_pos, new_kwarg
482        # If not, check if we have the same column in all positions with O(n) complexity
483        for i, new_kwarg in enumerate(new_struct.expressions):
484            new_name, new_type = _get_name_and_type(new_kwarg)
485            if current_name.this == new_name.this:
486                return i, new_kwarg
487        return None, None
488
489    def _drop_operation(
490        self,
491        columns: t.Union[TableAlterColumn, t.List[TableAlterColumn]],
492        struct: exp.DataType,
493        pos: int,
494        root_struct: exp.DataType,
495        table_name: TableName,
496    ) -> t.List[TableAlterColumnOperation]:
497        columns = ensure_list(columns)
498        operations: t.List[TableAlterColumnOperation] = []
499        column_pos, column_kwarg = self._get_matching_kwarg(columns[-1].name, struct, pos)
500        if column_pos is None or not column_kwarg:
501            raise SQLMeshError(
502                f"Cannot drop column '{columns[-1].name}' from table '{table_name}' - column not found. "
503                f"This may indicate a mismatch between the expected and actual table schemas."
504            )
505        struct.expressions.pop(column_pos)
506        operations.append(
507            TableAlterDropColumnOperation(
508                target_table=exp.to_table(table_name),
509                column_parts=columns,
510                expected_table_struct=root_struct.copy(),
511                cascade=self.drop_cascade,
512                array_element_selector=self.array_element_selector,
513            )
514        )
515        return operations
516
517    def _requires_drop_alteration(
518        self, current_struct: exp.DataType, new_struct: exp.DataType
519    ) -> bool:
520        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
521            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
522            if new_pos is None:
523                return True
524        return False
525
526    def _resolve_drop_operation(
527        self,
528        parent_columns: t.List[TableAlterColumn],
529        current_struct: exp.DataType,
530        new_struct: exp.DataType,
531        root_struct: exp.DataType,
532        table_name: TableName,
533    ) -> t.List[TableAlterColumnOperation]:
534        operations = []
535        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
536            new_pos, _ = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
537            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
538            if new_pos is None:
539                operations.extend(
540                    self._drop_operation(
541                        columns, current_struct, current_pos, root_struct, table_name
542                    )
543                )
544        return operations
545
546    def _add_operation(
547        self,
548        columns: t.List[TableAlterColumn],
549        new_pos: int,
550        new_kwarg: exp.ColumnDef,
551        current_struct: exp.DataType,
552        root_struct: exp.DataType,
553        table_name: TableName,
554        is_part_of_destructive_change: bool = False,
555    ) -> t.List[TableAlterColumnOperation]:
556        if self.support_positional_add:
557            col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions)
558            current_struct.expressions.insert(new_pos, new_kwarg)
559        else:
560            col_pos = None
561            current_struct.expressions.append(new_kwarg)
562        return [
563            TableAlterAddColumnOperation(
564                target_table=exp.to_table(table_name),
565                column_parts=columns,
566                column_type=new_kwarg.args["kind"],
567                expected_table_struct=root_struct.copy(),
568                position=col_pos,
569                is_part_of_destructive_change=is_part_of_destructive_change,
570                array_element_selector=self.array_element_selector,
571            )
572        ]
573
574    def _resolve_add_operations(
575        self,
576        parent_columns: t.List[TableAlterColumn],
577        current_struct: exp.DataType,
578        new_struct: exp.DataType,
579        root_struct: exp.DataType,
580        table_name: TableName,
581    ) -> t.List[TableAlterColumnOperation]:
582        operations = []
583        for new_pos, new_kwarg in enumerate(new_struct.expressions):
584            possible_current_pos, _ = self._get_matching_kwarg(new_kwarg, current_struct, new_pos)
585            if possible_current_pos is None:
586                columns = parent_columns + [TableAlterColumn.from_struct_kwarg(new_kwarg)]
587                operations.extend(
588                    self._add_operation(
589                        columns, new_pos, new_kwarg, current_struct, root_struct, table_name
590                    )
591                )
592        return operations
593
594    def _alter_operation(
595        self,
596        columns: t.List[TableAlterColumn],
597        pos: int,
598        struct: exp.DataType,
599        new_type: exp.DataType,
600        current_type: t.Union[str, exp.DataType],
601        root_struct: exp.DataType,
602        new_kwarg: exp.ColumnDef,
603        table_name: TableName,
604        *,
605        ignore_destructive: bool = False,
606        ignore_additive: bool = False,
607    ) -> t.List[TableAlterColumnOperation]:
608        # We don't copy on purpose here because current_type may need to be mutated inside
609        # _get_operations (struct.expressions.pop and struct.expressions.insert)
610        current_type = exp.DataType.build(current_type, copy=False)
611        if not self.nested_support.is_none:
612            if new_type.this == current_type.this == exp.DataType.Type.STRUCT:
613                if self.nested_support.is_ignore:
614                    return []
615                if self.nested_support.is_all or not self._requires_drop_alteration(
616                    current_type, new_type
617                ):
618                    return self._get_operations(
619                        columns,
620                        current_type,
621                        new_type,
622                        root_struct,
623                        table_name,
624                        ignore_destructive=ignore_destructive,
625                        ignore_additive=ignore_additive,
626                    )
627
628            if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
629                # Some engines (i.e. Snowflake) don't support defining types on arrays
630                if not new_type.expressions or not current_type.expressions:
631                    return []
632                new_array_type = new_type.expressions[0]
633                current_array_type = current_type.expressions[0]
634                if new_array_type.this == current_array_type.this == exp.DataType.Type.STRUCT:
635                    if self.nested_support.is_ignore:
636                        return []
637                    if self.nested_support.is_all or not self._requires_drop_alteration(
638                        current_array_type, new_array_type
639                    ):
640                        return self._get_operations(
641                            columns,
642                            current_array_type,
643                            new_array_type,
644                            root_struct,
645                            table_name,
646                            ignore_destructive=ignore_destructive,
647                            ignore_additive=ignore_additive,
648                        )
649        if self._is_coerceable_type(current_type, new_type):
650            return []
651        if self._is_compatible_type(current_type, new_type):
652            if ignore_additive:
653                return []
654            struct.expressions.pop(pos)
655            struct.expressions.insert(pos, new_kwarg)
656            return [
657                TableAlterChangeColumnTypeOperation(
658                    target_table=exp.to_table(table_name),
659                    column_parts=columns,
660                    column_type=new_type,
661                    current_type=current_type,
662                    expected_table_struct=root_struct.copy(),
663                    array_element_selector=self.array_element_selector,
664                    is_part_of_destructive_change=self.treat_alter_data_type_as_destructive,
665                )
666            ]
667        if ignore_destructive:
668            return []
669        return self._drop_operation(
670            columns,
671            root_struct,
672            pos,
673            root_struct,
674            table_name,
675        ) + self._add_operation(
676            columns,
677            pos,
678            new_kwarg,
679            struct,
680            root_struct,
681            table_name,
682            is_part_of_destructive_change=True,
683        )
684
685    def _resolve_alter_operations(
686        self,
687        parent_columns: t.List[TableAlterColumn],
688        current_struct: exp.DataType,
689        new_struct: exp.DataType,
690        root_struct: exp.DataType,
691        table_name: TableName,
692        *,
693        ignore_destructive: bool = False,
694        ignore_additive: bool = False,
695    ) -> t.List[TableAlterColumnOperation]:
696        operations = []
697        for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
698            _, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
699            if new_kwarg is None:
700                if ignore_destructive:
701                    continue
702                raise ValueError("Cannot alter a column that is being dropped")
703            _, new_type = _get_name_and_type(new_kwarg)
704            _, current_type = _get_name_and_type(current_kwarg)
705            columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
706            if new_type == current_type:
707                continue
708            operations.extend(
709                self._alter_operation(
710                    columns,
711                    current_pos,
712                    current_struct,
713                    new_type,
714                    current_type,
715                    root_struct,
716                    new_kwarg,
717                    table_name,
718                    ignore_destructive=ignore_destructive,
719                    ignore_additive=ignore_additive,
720                )
721            )
722        return operations
723
724    def _get_operations(
725        self,
726        parent_columns: t.List[TableAlterColumn],
727        current_struct: exp.DataType,
728        new_struct: exp.DataType,
729        root_struct: exp.DataType,
730        table_name: TableName,
731        *,
732        ignore_destructive: bool = False,
733        ignore_additive: bool = False,
734    ) -> t.List[TableAlterColumnOperation]:
735        root_struct = root_struct or current_struct
736        parent_columns = parent_columns or []
737        operations = []
738        if not ignore_destructive:
739            operations.extend(
740                self._resolve_drop_operation(
741                    parent_columns, current_struct, new_struct, root_struct, table_name
742                )
743            )
744        if not ignore_additive:
745            operations.extend(
746                self._resolve_add_operations(
747                    parent_columns, current_struct, new_struct, root_struct, table_name
748                )
749            )
750        operations.extend(
751            self._resolve_alter_operations(
752                parent_columns,
753                current_struct,
754                new_struct,
755                root_struct,
756                ignore_destructive=ignore_destructive,
757                ignore_additive=ignore_additive,
758                table_name=table_name,
759            )
760        )
761        return operations
762
763    def _from_structs(
764        self,
765        current_struct: exp.DataType,
766        new_struct: exp.DataType,
767        table_name: TableName,
768        *,
769        ignore_destructive: bool = False,
770        ignore_additive: bool = False,
771    ) -> t.List[TableAlterColumnOperation]:
772        return self._get_operations(
773            [],
774            current_struct,
775            new_struct,
776            current_struct,
777            table_name=table_name,
778            ignore_destructive=ignore_destructive,
779            ignore_additive=ignore_additive,
780        )
781
782    def _compare_structs(
783        self,
784        table_name: t.Union[str, exp.Table],
785        current: exp.DataType,
786        new: exp.DataType,
787        *,
788        ignore_destructive: bool = False,
789        ignore_additive: bool = False,
790    ) -> t.List[TableAlterColumnOperation]:
791        return self._from_structs(
792            current,
793            new,
794            table_name=table_name,
795            ignore_destructive=ignore_destructive,
796            ignore_additive=ignore_additive,
797        )
798
799    def compare_columns(
800        self,
801        table_name: TableName,
802        current: t.Dict[str, exp.DataType],
803        new: t.Dict[str, exp.DataType],
804        *,
805        ignore_destructive: bool = False,
806        ignore_additive: bool = False,
807    ) -> t.List[TableAlterColumnOperation]:
808        return self._compare_structs(
809            table_name,
810            columns_to_types_to_struct(current),
811            columns_to_types_to_struct(new),
812            ignore_destructive=ignore_destructive,
813            ignore_additive=ignore_additive,
814        )

Compares a source schema against a target schema and returns a list of alter statements to have the source match the structure of target. Some engines have constraints on the types of operations that can be performed therefore the final structure may not match the target exactly but it will be as close as possible. Two potential differences that can happen:

  1. Column order can be different if the engine doesn't support positional additions. Another reason for difference is if a column is just moved since we don't currently support fixing moves.
  2. Nested operations will be represented using a drop/add of the root column if the engine doesn't support nested operations. As a result historical data is lost.
  3. Column type changes will be reflected but it can be done through a drop/add if the change is not a compatible change. As a result historical data is lost.

Potential future improvements:

  1. Support column moves. Databricks Delta supports moves and would allow exact matches.
Arguments:
  • support_positional_add: Whether the engine for which the diff is being computed supports adding columns in a specific position in the set of existing columns.
  • nested_support: How the engine for which the diff is being computed supports nested types.
  • compatible_types: Types that are compatible and automatically coerced in actions like UNION ALL. Dict key is data type, and value is the set of types that are compatible with it.
  • coerceable_types: The mapping from a current type to all types that can be safely coerced to the current one without altering the column type. NOTE: usually callers should not specify this attribute manually and set the support_coercing_compatible_types flag instead. Some engines are inconsistent about their type coercion rules. For example, in BigQuery a BIGNUMERIC column can't be altered to be FLOAT64, while BIGNUMERIC values can be inserted into a FLOAT64 column just fine.
  • support_coercing_compatible_types: Whether or not the engine for which the diff is being computed supports direct coercion of compatible types.
  • drop_cascade: Whether to add CASCADE modifier when dropping a column.
  • parameterized_type_defaults: Default values for parameterized data types. Dict key is a sqlglot exp.DataType.Type, but in the engine adapter specification we build it from the dialect string instead of specifying it directly. Example: exp.DataType.build("STRING", dialect=DIALECT).this instead of the underlying exp.DataType.Type.TEXT to which it parses. We do that because parameter default replacement will silently break if we specify type directly and SQLGlot changes the dialect's mapping of type string to exp.DataType.Type. Dict value is default values in a list, where the list index contains the remaining defaults given the number of parameter values provided by the user. Example: if user provides 0 parameters "DECIMAL", we return index 0 values for the two omitted parameters (38, 9) -> "DECIMAL(38,9)". Example: if user provides 1 parameters "DECIMAL(10)", we return index 1 value for the one omitted parameters (0,) -> "DECIMAL(10,0)".
  • max_parameter_length: Numeric parameter values corresponding to "max". Example: VARCHAR(max) -> VARCHAR(65535).
  • types_with_unlimited_length: Data types that accept values of any length up to system limits. Any explicitly parameterized type can ALTER to its unlimited length version, along with different types in some engines.
  • treat_alter_data_type_as_destructive: The SchemaDiffer will only output change data type operations if it concludes the change is compatible and won't result in data loss. If this flag is set to True, it will flag these data type changes as destructive. This was added for dbt adapter support and likely shouldn't be set outside of that context.
support_positional_add: bool
nested_support: NestedSupport
array_element_selector: str
compatible_types: Dict[sqlglot.expressions.datatypes.DataType, Set[sqlglot.expressions.datatypes.DataType]]
coerceable_types_: Dict[sqlglot.expressions.datatypes.DataType, Set[sqlglot.expressions.datatypes.DataType]]
precision_increase_allowed_types: Optional[Set[sqlglot.expressions.datatypes.DType]]
support_coercing_compatible_types: bool
drop_cascade: bool
parameterized_type_defaults: Dict[sqlglot.expressions.datatypes.DType, List[Tuple[Union[int, float], ...]]]
max_parameter_length: Dict[sqlglot.expressions.datatypes.DType, Union[int, float]]
types_with_unlimited_length: Dict[sqlglot.expressions.datatypes.DType, Set[sqlglot.expressions.datatypes.DType]]
treat_alter_data_type_as_destructive: bool
coerceable_types: Dict[sqlglot.expressions.datatypes.DataType, Set[sqlglot.expressions.datatypes.DataType]]
377    @property
378    def coerceable_types(self) -> t.Dict[exp.DataType, t.Set[exp.DataType]]:
379        if not self._coerceable_types:
380            if not self.support_coercing_compatible_types or not self.compatible_types:
381                return self.coerceable_types_
382            coerceable_types: t.Dict[exp.DataType, t.Set[exp.DataType]] = defaultdict(set)
383            coerceable_types.update(self.coerceable_types_)
384            for source_type, target_types in self.compatible_types.items():
385                for target_type in target_types:
386                    coerceable_types[target_type].add(source_type)
387            self._coerceable_types = coerceable_types
388        return self._coerceable_types
def get_type_parameters( self, type: sqlglot.expressions.datatypes.DataType) -> List[Union[int, float]]:
438    def get_type_parameters(self, type: exp.DataType) -> t.List[t.Union[int, float]]:
439        def _str_to_number(string: str, allows_max_param: bool) -> t.Union[int, float]:
440            try:
441                return int(string)
442            except ValueError:
443                try:
444                    return float(string)
445                except ValueError:
446                    if allows_max_param and string.upper() == "MAX":
447                        return self.max_parameter_length[type.this]
448                    raise ValueError(f"Could not convert '{string}' to a number")
449
450        # extract existing parameters
451        params = [
452            _str_to_number(param.this.this, type.this in self.max_parameter_length)
453            for param in type.expressions
454        ]
455
456        # maybe get default parameter values
457        param_defaults: t.Tuple[t.Union[int, float], ...] = ()
458        if type.this in self.parameterized_type_defaults:
459            param_defaults_list = self.parameterized_type_defaults[type.this]
460            if len(params) < len(param_defaults_list):
461                param_defaults = param_defaults_list[len(params)]
462
463        return [*params, *param_defaults]
def compare_columns( self, table_name: Union[str, sqlglot.expressions.query.Table], current: Dict[str, sqlglot.expressions.datatypes.DataType], new: Dict[str, sqlglot.expressions.datatypes.DataType], *, ignore_destructive: bool = False, ignore_additive: bool = False) -> List[TableAlterColumnOperation]:
799    def compare_columns(
800        self,
801        table_name: TableName,
802        current: t.Dict[str, exp.DataType],
803        new: t.Dict[str, exp.DataType],
804        *,
805        ignore_destructive: bool = False,
806        ignore_additive: bool = False,
807    ) -> t.List[TableAlterColumnOperation]:
808        return self._compare_structs(
809            table_name,
810            columns_to_types_to_struct(current),
811            columns_to_types_to_struct(new),
812            ignore_destructive=ignore_destructive,
813            ignore_additive=ignore_additive,
814        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def has_drop_alteration( alter_operations: List[TableAlterOperation]) -> bool:
817def has_drop_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
818    return any(op.is_destructive for op in alter_operations)
def has_additive_alteration( alter_operations: List[TableAlterOperation]) -> bool:
821def has_additive_alteration(alter_operations: t.List[TableAlterOperation]) -> bool:
822    return any(op.is_additive for op in alter_operations)
def get_additive_changes( alter_operations: List[TableAlterOperation]) -> List[TableAlterOperation]:
825def get_additive_changes(
826    alter_operations: t.List[TableAlterOperation],
827) -> t.List[TableAlterOperation]:
828    return [x for x in alter_operations if x.is_additive]
def get_dropped_column_names( alter_expressions: List[TableAlterOperation]) -> List[str]:
831def get_dropped_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
832    return [
833        op.column.alias_or_name
834        for op in alter_expressions
835        if isinstance(op, TableAlterDropColumnOperation)
836    ]
def get_additive_column_names( alter_expressions: List[TableAlterOperation]) -> List[str]:
839def get_additive_column_names(alter_expressions: t.List[TableAlterOperation]) -> t.List[str]:
840    return [
841        op.column.alias_or_name
842        for op in alter_expressions
843        if op.is_additive and isinstance(op, TableAlterColumnOperation)
844    ]
def get_schema_differ( dialect: str, overrides: Optional[Dict[str, Any]] = None) -> SchemaDiffer:
847def get_schema_differ(
848    dialect: str, overrides: t.Optional[t.Dict[str, t.Any]] = None
849) -> SchemaDiffer:
850    """
851    Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.
852
853    Args:
854        dialect: The dialect for which to get the schema differ.
855        overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
856
857    Returns:
858        The SchemaDiffer instance configured for the given dialect.
859    """
860    from sqlmesh.core.engine_adapter import (
861        DIALECT_TO_ENGINE_ADAPTER,
862        DIALECT_ALIASES,
863        EngineAdapter,
864    )
865
866    dialect = dialect.lower()
867    dialect = DIALECT_ALIASES.get(dialect, dialect)
868    engine_adapter_class = DIALECT_TO_ENGINE_ADAPTER.get(dialect, EngineAdapter)
869    return SchemaDiffer(
870        **{
871            **getattr(engine_adapter_class, "SCHEMA_DIFFER_KWARGS"),
872            **(overrides or {}),
873        }
874    )

Returns the appropriate SchemaDiffer for a given dialect without initializing the engine adapter.

Arguments:
  • dialect: The dialect for which to get the schema differ.
  • overrides: Optional dictionary of overrides to apply to the SchemaDiffer instance.
Returns:

The SchemaDiffer instance configured for the given dialect.