Edit on GitHub

Fix snapshots of added models with forward only parents.

 1"""Fix snapshots of added models with forward only parents."""
 2
 3import json
 4import typing as t
 5
 6from sqlglot import exp
 7
 8from sqlmesh.utils.dag import DAG
 9
10
11def migrate(state_sync: t.Any, **kwargs) -> None:  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    environments_table = "_environments"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18        environments_table = f"{schema}.{environments_table}"
19
20    dag: DAG[t.Tuple[str, str]] = DAG()
21    snapshot_mapping: t.Dict[t.Tuple[str, str], t.Dict[str, t.Any]] = {}
22
23    for identifier, snapshot in engine_adapter.fetchall(
24        exp.select("identifier", "snapshot").from_(snapshots_table),
25        quote_identifiers=True,
26    ):
27        parsed_snapshot = json.loads(snapshot)
28
29        snapshot_id = (parsed_snapshot["name"], identifier)
30        snapshot_mapping[snapshot_id] = parsed_snapshot
31
32        parent_ids = [
33            (parent["name"], parent["identifier"]) for parent in parsed_snapshot["parents"]
34        ]
35        dag.add(snapshot_id, parent_ids)
36
37    snapshots_to_delete = set()
38
39    for snapshot_id in dag:
40        if snapshot_id not in snapshot_mapping:
41            continue
42        parsed_snapshot = snapshot_mapping[snapshot_id]
43        is_breaking = parsed_snapshot.get("change_category") == 1
44        has_previous_versions = bool(parsed_snapshot.get("previous_versions", []))
45
46        has_paused_forward_only_parent = False
47        if is_breaking and not has_previous_versions:
48            for upstream_id in dag.upstream(snapshot_id):
49                if upstream_id not in snapshot_mapping:
50                    continue
51                upstream_snapshot = snapshot_mapping[upstream_id]
52                upstream_change_category = upstream_snapshot.get("change_category")
53                is_forward_only_upstream = upstream_change_category == 3
54                if is_forward_only_upstream and not upstream_snapshot.get("unpaused_ts"):
55                    has_paused_forward_only_parent = True
56                    break
57
58        if has_paused_forward_only_parent:
59            snapshots_to_delete.add(snapshot_id)
60
61    if snapshots_to_delete:
62        where = t.cast(exp.Tuple, exp.convert((exp.column("name"), exp.column("identifier")))).isin(
63            *snapshots_to_delete
64        )
65        engine_adapter.delete_from(snapshots_table, where)
def migrate(state_sync: Any, **kwargs) -> None:
12def migrate(state_sync: t.Any, **kwargs) -> None:  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    environments_table = "_environments"
17    if schema:
18        snapshots_table = f"{schema}.{snapshots_table}"
19        environments_table = f"{schema}.{environments_table}"
20
21    dag: DAG[t.Tuple[str, str]] = DAG()
22    snapshot_mapping: t.Dict[t.Tuple[str, str], t.Dict[str, t.Any]] = {}
23
24    for identifier, snapshot in engine_adapter.fetchall(
25        exp.select("identifier", "snapshot").from_(snapshots_table),
26        quote_identifiers=True,
27    ):
28        parsed_snapshot = json.loads(snapshot)
29
30        snapshot_id = (parsed_snapshot["name"], identifier)
31        snapshot_mapping[snapshot_id] = parsed_snapshot
32
33        parent_ids = [
34            (parent["name"], parent["identifier"]) for parent in parsed_snapshot["parents"]
35        ]
36        dag.add(snapshot_id, parent_ids)
37
38    snapshots_to_delete = set()
39
40    for snapshot_id in dag:
41        if snapshot_id not in snapshot_mapping:
42            continue
43        parsed_snapshot = snapshot_mapping[snapshot_id]
44        is_breaking = parsed_snapshot.get("change_category") == 1
45        has_previous_versions = bool(parsed_snapshot.get("previous_versions", []))
46
47        has_paused_forward_only_parent = False
48        if is_breaking and not has_previous_versions:
49            for upstream_id in dag.upstream(snapshot_id):
50                if upstream_id not in snapshot_mapping:
51                    continue
52                upstream_snapshot = snapshot_mapping[upstream_id]
53                upstream_change_category = upstream_snapshot.get("change_category")
54                is_forward_only_upstream = upstream_change_category == 3
55                if is_forward_only_upstream and not upstream_snapshot.get("unpaused_ts"):
56                    has_paused_forward_only_parent = True
57                    break
58
59        if has_paused_forward_only_parent:
60            snapshots_to_delete.add(snapshot_id)
61
62    if snapshots_to_delete:
63        where = t.cast(exp.Tuple, exp.convert((exp.column("name"), exp.column("identifier")))).isin(
64            *snapshots_to_delete
65        )
66        engine_adapter.delete_from(snapshots_table, where)