Fix snapshots of added models with forward only parents.
1"""Fix snapshots of added models with forward only parents.""" 2 3import json 4import typing as t 5 6from sqlglot import exp 7 8from sqlmesh.utils.dag import DAG 9 10 11def migrate(state_sync: t.Any, **kwargs) -> None: # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 environments_table = "_environments" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 environments_table = f"{schema}.{environments_table}" 19 20 dag: DAG[t.Tuple[str, str]] = DAG() 21 snapshot_mapping: t.Dict[t.Tuple[str, str], t.Dict[str, t.Any]] = {} 22 23 for identifier, snapshot in engine_adapter.fetchall( 24 exp.select("identifier", "snapshot").from_(snapshots_table), 25 quote_identifiers=True, 26 ): 27 parsed_snapshot = json.loads(snapshot) 28 29 snapshot_id = (parsed_snapshot["name"], identifier) 30 snapshot_mapping[snapshot_id] = parsed_snapshot 31 32 parent_ids = [ 33 (parent["name"], parent["identifier"]) for parent in parsed_snapshot["parents"] 34 ] 35 dag.add(snapshot_id, parent_ids) 36 37 snapshots_to_delete = set() 38 39 for snapshot_id in dag: 40 if snapshot_id not in snapshot_mapping: 41 continue 42 parsed_snapshot = snapshot_mapping[snapshot_id] 43 is_breaking = parsed_snapshot.get("change_category") == 1 44 has_previous_versions = bool(parsed_snapshot.get("previous_versions", [])) 45 46 has_paused_forward_only_parent = False 47 if is_breaking and not has_previous_versions: 48 for upstream_id in dag.upstream(snapshot_id): 49 if upstream_id not in snapshot_mapping: 50 continue 51 upstream_snapshot = snapshot_mapping[upstream_id] 52 upstream_change_category = upstream_snapshot.get("change_category") 53 is_forward_only_upstream = upstream_change_category == 3 54 if is_forward_only_upstream and not upstream_snapshot.get("unpaused_ts"): 55 has_paused_forward_only_parent = True 56 break 57 58 if has_paused_forward_only_parent: 59 snapshots_to_delete.add(snapshot_id) 60 61 if snapshots_to_delete: 62 where = t.cast(exp.Tuple, exp.convert((exp.column("name"), exp.column("identifier")))).isin( 63 *snapshots_to_delete 64 ) 65 engine_adapter.delete_from(snapshots_table, where)
def
migrate(state_sync: Any, **kwargs) -> None:
12def migrate(state_sync: t.Any, **kwargs) -> None: # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 environments_table = "_environments" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 environments_table = f"{schema}.{environments_table}" 20 21 dag: DAG[t.Tuple[str, str]] = DAG() 22 snapshot_mapping: t.Dict[t.Tuple[str, str], t.Dict[str, t.Any]] = {} 23 24 for identifier, snapshot in engine_adapter.fetchall( 25 exp.select("identifier", "snapshot").from_(snapshots_table), 26 quote_identifiers=True, 27 ): 28 parsed_snapshot = json.loads(snapshot) 29 30 snapshot_id = (parsed_snapshot["name"], identifier) 31 snapshot_mapping[snapshot_id] = parsed_snapshot 32 33 parent_ids = [ 34 (parent["name"], parent["identifier"]) for parent in parsed_snapshot["parents"] 35 ] 36 dag.add(snapshot_id, parent_ids) 37 38 snapshots_to_delete = set() 39 40 for snapshot_id in dag: 41 if snapshot_id not in snapshot_mapping: 42 continue 43 parsed_snapshot = snapshot_mapping[snapshot_id] 44 is_breaking = parsed_snapshot.get("change_category") == 1 45 has_previous_versions = bool(parsed_snapshot.get("previous_versions", [])) 46 47 has_paused_forward_only_parent = False 48 if is_breaking and not has_previous_versions: 49 for upstream_id in dag.upstream(snapshot_id): 50 if upstream_id not in snapshot_mapping: 51 continue 52 upstream_snapshot = snapshot_mapping[upstream_id] 53 upstream_change_category = upstream_snapshot.get("change_category") 54 is_forward_only_upstream = upstream_change_category == 3 55 if is_forward_only_upstream and not upstream_snapshot.get("unpaused_ts"): 56 has_paused_forward_only_parent = True 57 break 58 59 if has_paused_forward_only_parent: 60 snapshots_to_delete.add(snapshot_id) 61 62 if snapshots_to_delete: 63 where = t.cast(exp.Tuple, exp.convert((exp.column("name"), exp.column("identifier")))).isin( 64 *snapshots_to_delete 65 ) 66 engine_adapter.delete_from(snapshots_table, where)