Edit on GitHub

sqlmesh.core.snapshot.categorizer

 1from __future__ import annotations
 2
 3import typing as t
 4
 5from sqlmesh.core.config import AutoCategorizationMode, CategorizerConfig
 6from sqlmesh.core.snapshot.definition import Snapshot, SnapshotChangeCategory
 7from sqlmesh.utils.errors import SQLMeshError
 8
 9
10def categorize_change(
11    new: Snapshot,
12    old: Snapshot,
13    config: t.Optional[CategorizerConfig] = None,
14    is_breaking_change: t.Optional[t.Callable[..., t.Optional[bool]]] = None,
15    **kwargs: t.Any,
16) -> t.Optional[SnapshotChangeCategory]:
17    """Attempts to automatically categorize a change between two snapshots.
18
19    Presently the implementation only returns the NON_BREAKING category iff
20    a new projections have been added to one or more SELECT statement(s). In
21    all other cases None is returned.
22
23    Args:
24        new: The new snapshot.
25        old: The old snapshot.
26        config: Configuration for the automatic categorizer of snapshot changes.
27        is_breaking_change: Callable that compares two models (new, old) and determines
28            whether there is a breaking change between them.
29        kwargs: Additional arguments to pass to is_breaking_change.
30
31    Returns:
32        The change category or None if the category can't be determined automatically.
33
34    """
35    old_model = old.model
36    new_model = new.model
37
38    config = config or CategorizerConfig()
39    mode = config.dict().get(new_model.source_type, AutoCategorizationMode.OFF)
40    if mode == AutoCategorizationMode.OFF:
41        return None
42
43    default_category = (
44        SnapshotChangeCategory.BREAKING if mode == AutoCategorizationMode.FULL else None
45    )
46
47    if type(new_model) != type(old_model):
48        return default_category
49
50    if new.fingerprint == old.fingerprint:
51        raise SQLMeshError(
52            f"{new} is unmodified or indirectly modified and should not be categorized"
53        )
54
55    if not new.is_directly_modified(old):
56        if new.fingerprint.parent_data_hash == old.fingerprint.parent_data_hash:
57            return SnapshotChangeCategory.NON_BREAKING
58        return None
59
60    breaking_change = (
61        is_breaking_change(new_model, old_model, **kwargs)
62        if is_breaking_change
63        else new_model.is_breaking_change(old_model)
64    )
65    if breaking_change is None:
66        return default_category
67
68    return (
69        SnapshotChangeCategory.BREAKING if breaking_change else SnapshotChangeCategory.NON_BREAKING
70    )
def categorize_change( new: sqlmesh.core.snapshot.definition.Snapshot, old: sqlmesh.core.snapshot.definition.Snapshot, config: Optional[sqlmesh.core.config.categorizer.CategorizerConfig] = None, is_breaking_change: Optional[Callable[..., Optional[bool]]] = None, **kwargs: Any) -> Optional[sqlmesh.core.snapshot.definition.SnapshotChangeCategory]:
11def categorize_change(
12    new: Snapshot,
13    old: Snapshot,
14    config: t.Optional[CategorizerConfig] = None,
15    is_breaking_change: t.Optional[t.Callable[..., t.Optional[bool]]] = None,
16    **kwargs: t.Any,
17) -> t.Optional[SnapshotChangeCategory]:
18    """Attempts to automatically categorize a change between two snapshots.
19
20    Presently the implementation only returns the NON_BREAKING category iff
21    a new projections have been added to one or more SELECT statement(s). In
22    all other cases None is returned.
23
24    Args:
25        new: The new snapshot.
26        old: The old snapshot.
27        config: Configuration for the automatic categorizer of snapshot changes.
28        is_breaking_change: Callable that compares two models (new, old) and determines
29            whether there is a breaking change between them.
30        kwargs: Additional arguments to pass to is_breaking_change.
31
32    Returns:
33        The change category or None if the category can't be determined automatically.
34
35    """
36    old_model = old.model
37    new_model = new.model
38
39    config = config or CategorizerConfig()
40    mode = config.dict().get(new_model.source_type, AutoCategorizationMode.OFF)
41    if mode == AutoCategorizationMode.OFF:
42        return None
43
44    default_category = (
45        SnapshotChangeCategory.BREAKING if mode == AutoCategorizationMode.FULL else None
46    )
47
48    if type(new_model) != type(old_model):
49        return default_category
50
51    if new.fingerprint == old.fingerprint:
52        raise SQLMeshError(
53            f"{new} is unmodified or indirectly modified and should not be categorized"
54        )
55
56    if not new.is_directly_modified(old):
57        if new.fingerprint.parent_data_hash == old.fingerprint.parent_data_hash:
58            return SnapshotChangeCategory.NON_BREAKING
59        return None
60
61    breaking_change = (
62        is_breaking_change(new_model, old_model, **kwargs)
63        if is_breaking_change
64        else new_model.is_breaking_change(old_model)
65    )
66    if breaking_change is None:
67        return default_category
68
69    return (
70        SnapshotChangeCategory.BREAKING if breaking_change else SnapshotChangeCategory.NON_BREAKING
71    )

Attempts to automatically categorize a change between two snapshots.

Presently the implementation only returns the NON_BREAKING category iff a new projections have been added to one or more SELECT statement(s). In all other cases None is returned.

Arguments:
  • new: The new snapshot.
  • old: The old snapshot.
  • config: Configuration for the automatic categorizer of snapshot changes.
  • is_breaking_change: Callable that compares two models (new, old) and determines whether there is a breaking change between them.
  • kwargs: Additional arguments to pass to is_breaking_change.
Returns:

The change category or None if the category can't be determined automatically.