Update unrestorable snapshots.
1"""Update unrestorable snapshots.""" 2 3import json 4import typing as t 5from collections import defaultdict 6 7import pandas as pd 8from sqlglot import exp 9 10from sqlmesh.utils.migration import index_text_type 11 12 13def migrate(state_sync: t.Any, **kwargs: t.Any) -> None: # type: ignore 14 engine_adapter = state_sync.engine_adapter 15 schema = state_sync.schema 16 snapshots_table = "_snapshots" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 20 new_snapshots = [] 21 snapshots_by_version = defaultdict(list) 22 23 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 24 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 25 quote_identifiers=True, 26 ): 27 parsed_snapshot = json.loads(snapshot) 28 snapshots_by_version[(name, version)].append((identifier, kind_name, parsed_snapshot)) 29 30 for (name, version), snapshots in snapshots_by_version.items(): 31 has_forward_only = any(s["change_category"] == 3 for _, _, s in snapshots) 32 for identifier, kind_name, snapshot in snapshots: 33 if ( 34 has_forward_only 35 and snapshot["change_category"] != 3 36 and not snapshot.get("unpaused_ts") 37 ): 38 snapshot["unrestorable"] = True 39 new_snapshots.append( 40 { 41 "name": name, 42 "identifier": identifier, 43 "version": version, 44 "snapshot": json.dumps(snapshot), 45 "kind_name": kind_name, 46 } 47 ) 48 49 if new_snapshots: 50 engine_adapter.delete_from(snapshots_table, "TRUE") 51 52 index_type = index_text_type(engine_adapter.dialect) 53 54 engine_adapter.insert_append( 55 snapshots_table, 56 pd.DataFrame(new_snapshots), 57 columns_to_types={ 58 "name": exp.DataType.build(index_type), 59 "identifier": exp.DataType.build(index_type), 60 "version": exp.DataType.build(index_type), 61 "snapshot": exp.DataType.build("text"), 62 "kind_name": exp.DataType.build(index_type), 63 }, 64 )
def
migrate(state_sync: Any, **kwargs: Any) -> None:
14def migrate(state_sync: t.Any, **kwargs: t.Any) -> None: # type: ignore 15 engine_adapter = state_sync.engine_adapter 16 schema = state_sync.schema 17 snapshots_table = "_snapshots" 18 if schema: 19 snapshots_table = f"{schema}.{snapshots_table}" 20 21 new_snapshots = [] 22 snapshots_by_version = defaultdict(list) 23 24 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 25 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 26 quote_identifiers=True, 27 ): 28 parsed_snapshot = json.loads(snapshot) 29 snapshots_by_version[(name, version)].append((identifier, kind_name, parsed_snapshot)) 30 31 for (name, version), snapshots in snapshots_by_version.items(): 32 has_forward_only = any(s["change_category"] == 3 for _, _, s in snapshots) 33 for identifier, kind_name, snapshot in snapshots: 34 if ( 35 has_forward_only 36 and snapshot["change_category"] != 3 37 and not snapshot.get("unpaused_ts") 38 ): 39 snapshot["unrestorable"] = True 40 new_snapshots.append( 41 { 42 "name": name, 43 "identifier": identifier, 44 "version": version, 45 "snapshot": json.dumps(snapshot), 46 "kind_name": kind_name, 47 } 48 ) 49 50 if new_snapshots: 51 engine_adapter.delete_from(snapshots_table, "TRUE") 52 53 index_type = index_text_type(engine_adapter.dialect) 54 55 engine_adapter.insert_append( 56 snapshots_table, 57 pd.DataFrame(new_snapshots), 58 columns_to_types={ 59 "name": exp.DataType.build(index_type), 60 "identifier": exp.DataType.build(index_type), 61 "version": exp.DataType.build(index_type), 62 "snapshot": exp.DataType.build("text"), 63 "kind_name": exp.DataType.build(index_type), 64 }, 65 )