Edit on GitHub

Update unrestorable snapshots.

 1"""Update unrestorable snapshots."""
 2
 3import json
 4import typing as t
 5from collections import defaultdict
 6
 7import pandas as pd
 8from sqlglot import exp
 9
10from sqlmesh.utils.migration import index_text_type
11
12
13def migrate(state_sync: t.Any, **kwargs: t.Any) -> None:  # type: ignore
14    engine_adapter = state_sync.engine_adapter
15    schema = state_sync.schema
16    snapshots_table = "_snapshots"
17    if schema:
18        snapshots_table = f"{schema}.{snapshots_table}"
19
20    new_snapshots = []
21    snapshots_by_version = defaultdict(list)
22
23    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
24        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
25        quote_identifiers=True,
26    ):
27        parsed_snapshot = json.loads(snapshot)
28        snapshots_by_version[(name, version)].append((identifier, kind_name, parsed_snapshot))
29
30    for (name, version), snapshots in snapshots_by_version.items():
31        has_forward_only = any(s["change_category"] == 3 for _, _, s in snapshots)
32        for identifier, kind_name, snapshot in snapshots:
33            if (
34                has_forward_only
35                and snapshot["change_category"] != 3
36                and not snapshot.get("unpaused_ts")
37            ):
38                snapshot["unrestorable"] = True
39            new_snapshots.append(
40                {
41                    "name": name,
42                    "identifier": identifier,
43                    "version": version,
44                    "snapshot": json.dumps(snapshot),
45                    "kind_name": kind_name,
46                }
47            )
48
49    if new_snapshots:
50        engine_adapter.delete_from(snapshots_table, "TRUE")
51
52        index_type = index_text_type(engine_adapter.dialect)
53
54        engine_adapter.insert_append(
55            snapshots_table,
56            pd.DataFrame(new_snapshots),
57            columns_to_types={
58                "name": exp.DataType.build(index_type),
59                "identifier": exp.DataType.build(index_type),
60                "version": exp.DataType.build(index_type),
61                "snapshot": exp.DataType.build("text"),
62                "kind_name": exp.DataType.build(index_type),
63            },
64        )
def migrate(state_sync: Any, **kwargs: Any) -> None:
14def migrate(state_sync: t.Any, **kwargs: t.Any) -> None:  # type: ignore
15    engine_adapter = state_sync.engine_adapter
16    schema = state_sync.schema
17    snapshots_table = "_snapshots"
18    if schema:
19        snapshots_table = f"{schema}.{snapshots_table}"
20
21    new_snapshots = []
22    snapshots_by_version = defaultdict(list)
23
24    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
25        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
26        quote_identifiers=True,
27    ):
28        parsed_snapshot = json.loads(snapshot)
29        snapshots_by_version[(name, version)].append((identifier, kind_name, parsed_snapshot))
30
31    for (name, version), snapshots in snapshots_by_version.items():
32        has_forward_only = any(s["change_category"] == 3 for _, _, s in snapshots)
33        for identifier, kind_name, snapshot in snapshots:
34            if (
35                has_forward_only
36                and snapshot["change_category"] != 3
37                and not snapshot.get("unpaused_ts")
38            ):
39                snapshot["unrestorable"] = True
40            new_snapshots.append(
41                {
42                    "name": name,
43                    "identifier": identifier,
44                    "version": version,
45                    "snapshot": json.dumps(snapshot),
46                    "kind_name": kind_name,
47                }
48            )
49
50    if new_snapshots:
51        engine_adapter.delete_from(snapshots_table, "TRUE")
52
53        index_type = index_text_type(engine_adapter.dialect)
54
55        engine_adapter.insert_append(
56            snapshots_table,
57            pd.DataFrame(new_snapshots),
58            columns_to_types={
59                "name": exp.DataType.build(index_type),
60                "identifier": exp.DataType.build(index_type),
61                "version": exp.DataType.build(index_type),
62                "snapshot": exp.DataType.build("text"),
63                "kind_name": exp.DataType.build(index_type),
64            },
65        )