sqlmesh.core.snapshot.categorizer
1from __future__ import annotations 2 3import typing as t 4 5from sqlmesh.core.config import AutoCategorizationMode, CategorizerConfig 6from sqlmesh.core.snapshot.definition import Snapshot, SnapshotChangeCategory 7from sqlmesh.utils.errors import SQLMeshError 8 9 10def categorize_change( 11 new: Snapshot, 12 old: Snapshot, 13 config: t.Optional[CategorizerConfig] = None, 14 is_breaking_change: t.Optional[t.Callable[..., t.Optional[bool]]] = None, 15 **kwargs: t.Any, 16) -> t.Optional[SnapshotChangeCategory]: 17 """Attempts to automatically categorize a change between two snapshots. 18 19 Presently the implementation only returns the NON_BREAKING category iff 20 a new projections have been added to one or more SELECT statement(s). In 21 all other cases None is returned. 22 23 Args: 24 new: The new snapshot. 25 old: The old snapshot. 26 config: Configuration for the automatic categorizer of snapshot changes. 27 is_breaking_change: Callable that compares two models (new, old) and determines 28 whether there is a breaking change between them. 29 kwargs: Additional arguments to pass to is_breaking_change. 30 31 Returns: 32 The change category or None if the category can't be determined automatically. 33 34 """ 35 old_model = old.model 36 new_model = new.model 37 38 config = config or CategorizerConfig() 39 mode = config.dict().get(new_model.source_type, AutoCategorizationMode.OFF) 40 if mode == AutoCategorizationMode.OFF: 41 return None 42 43 default_category = ( 44 SnapshotChangeCategory.BREAKING if mode == AutoCategorizationMode.FULL else None 45 ) 46 47 if type(new_model) != type(old_model): 48 return default_category 49 50 if new.fingerprint == old.fingerprint: 51 raise SQLMeshError( 52 f"{new} is unmodified or indirectly modified and should not be categorized" 53 ) 54 55 if not new.is_directly_modified(old): 56 if new.fingerprint.parent_data_hash == old.fingerprint.parent_data_hash: 57 return SnapshotChangeCategory.NON_BREAKING 58 return None 59 60 breaking_change = ( 61 is_breaking_change(new_model, old_model, **kwargs) 62 if is_breaking_change 63 else new_model.is_breaking_change(old_model) 64 ) 65 if breaking_change is None: 66 return default_category 67 68 return ( 69 SnapshotChangeCategory.BREAKING if breaking_change else SnapshotChangeCategory.NON_BREAKING 70 )
def
categorize_change( new: sqlmesh.core.snapshot.definition.Snapshot, old: sqlmesh.core.snapshot.definition.Snapshot, config: Optional[sqlmesh.core.config.categorizer.CategorizerConfig] = None, is_breaking_change: Optional[Callable[..., Optional[bool]]] = None, **kwargs: Any) -> Optional[sqlmesh.core.snapshot.definition.SnapshotChangeCategory]:
11def categorize_change( 12 new: Snapshot, 13 old: Snapshot, 14 config: t.Optional[CategorizerConfig] = None, 15 is_breaking_change: t.Optional[t.Callable[..., t.Optional[bool]]] = None, 16 **kwargs: t.Any, 17) -> t.Optional[SnapshotChangeCategory]: 18 """Attempts to automatically categorize a change between two snapshots. 19 20 Presently the implementation only returns the NON_BREAKING category iff 21 a new projections have been added to one or more SELECT statement(s). In 22 all other cases None is returned. 23 24 Args: 25 new: The new snapshot. 26 old: The old snapshot. 27 config: Configuration for the automatic categorizer of snapshot changes. 28 is_breaking_change: Callable that compares two models (new, old) and determines 29 whether there is a breaking change between them. 30 kwargs: Additional arguments to pass to is_breaking_change. 31 32 Returns: 33 The change category or None if the category can't be determined automatically. 34 35 """ 36 old_model = old.model 37 new_model = new.model 38 39 config = config or CategorizerConfig() 40 mode = config.dict().get(new_model.source_type, AutoCategorizationMode.OFF) 41 if mode == AutoCategorizationMode.OFF: 42 return None 43 44 default_category = ( 45 SnapshotChangeCategory.BREAKING if mode == AutoCategorizationMode.FULL else None 46 ) 47 48 if type(new_model) != type(old_model): 49 return default_category 50 51 if new.fingerprint == old.fingerprint: 52 raise SQLMeshError( 53 f"{new} is unmodified or indirectly modified and should not be categorized" 54 ) 55 56 if not new.is_directly_modified(old): 57 if new.fingerprint.parent_data_hash == old.fingerprint.parent_data_hash: 58 return SnapshotChangeCategory.NON_BREAKING 59 return None 60 61 breaking_change = ( 62 is_breaking_change(new_model, old_model, **kwargs) 63 if is_breaking_change 64 else new_model.is_breaking_change(old_model) 65 ) 66 if breaking_change is None: 67 return default_category 68 69 return ( 70 SnapshotChangeCategory.BREAKING if breaking_change else SnapshotChangeCategory.NON_BREAKING 71 )
Attempts to automatically categorize a change between two snapshots.
Presently the implementation only returns the NON_BREAKING category iff a new projections have been added to one or more SELECT statement(s). In all other cases None is returned.
Arguments:
- new: The new snapshot.
- old: The old snapshot.
- config: Configuration for the automatic categorizer of snapshot changes.
- is_breaking_change: Callable that compares two models (new, old) and determines whether there is a breaking change between them.
- kwargs: Additional arguments to pass to is_breaking_change.
Returns:
The change category or None if the category can't be determined automatically.