Normalize intervals and fix missing change category.
1"""Normalize intervals and fix missing change category.""" 2 3import json 4import zlib 5 6import pandas as pd 7from sqlglot import exp 8 9from sqlmesh.utils import random_id 10from sqlmesh.utils.date import now_timestamp 11from sqlmesh.utils.migration import index_text_type 12 13 14def migrate(state_sync, **kwargs): # type: ignore 15 engine_adapter = state_sync.engine_adapter 16 schema = state_sync.schema 17 snapshots_table = "_snapshots" 18 intervals_table = "_intervals" 19 if schema: 20 snapshots_table = f"{schema}.{snapshots_table}" 21 intervals_table = f"{schema}.{intervals_table}" 22 23 migration_required = False 24 new_snapshots = [] 25 new_intervals = [] 26 27 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 28 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 29 quote_identifiers=True, 30 ): 31 parsed_snapshot = json.loads(snapshot) 32 33 if not parsed_snapshot.get("change_category"): 34 fingerprint = parsed_snapshot.get("fingerprint") 35 version = _hash( 36 [ 37 fingerprint["data_hash"], 38 fingerprint["parent_data_hash"], 39 ] 40 ) 41 parsed_snapshot["change_category"] = ( 42 4 if version == parsed_snapshot.get("version") else 5 43 ) 44 migration_required = True 45 46 def _add_interval(start_ts: int, end_ts: int, is_dev: bool) -> None: 47 new_intervals.append( 48 { 49 "id": random_id(), 50 "created_ts": now_timestamp(), 51 "name": name, 52 "identifier": identifier, 53 "version": version, 54 "start_ts": start_ts, 55 "end_ts": end_ts, 56 "is_dev": is_dev, 57 "is_removed": False, 58 "is_compacted": True, 59 } 60 ) 61 62 for interval in parsed_snapshot.pop("intervals", []): 63 _add_interval(interval[0], interval[1], False) 64 migration_required = True 65 66 for interval in parsed_snapshot.pop("dev_intervals", []): 67 _add_interval(interval[0], interval[1], True) 68 migration_required = True 69 70 new_snapshots.append( 71 { 72 "name": name, 73 "identifier": identifier, 74 "version": version, 75 "snapshot": json.dumps(parsed_snapshot), 76 "kind_name": kind_name, 77 } 78 ) 79 80 if migration_required: 81 index_type = index_text_type(engine_adapter.dialect) 82 83 engine_adapter.delete_from(snapshots_table, "TRUE") 84 engine_adapter.insert_append( 85 snapshots_table, 86 pd.DataFrame(new_snapshots), 87 columns_to_types={ 88 "name": exp.DataType.build(index_type), 89 "identifier": exp.DataType.build(index_type), 90 "version": exp.DataType.build(index_type), 91 "snapshot": exp.DataType.build("text"), 92 "kind_name": exp.DataType.build(index_type), 93 }, 94 ) 95 96 if new_intervals: 97 engine_adapter.insert_append( 98 intervals_table, 99 pd.DataFrame(new_intervals), 100 columns_to_types={ 101 "id": exp.DataType.build(index_type), 102 "created_ts": exp.DataType.build("bigint"), 103 "name": exp.DataType.build(index_type), 104 "identifier": exp.DataType.build(index_type), 105 "version": exp.DataType.build(index_type), 106 "start_ts": exp.DataType.build("bigint"), 107 "end_ts": exp.DataType.build("bigint"), 108 "is_dev": exp.DataType.build("boolean"), 109 "is_removed": exp.DataType.build("boolean"), 110 "is_compacted": exp.DataType.build("boolean"), 111 }, 112 ) 113 114 115def _hash(data): # type: ignore 116 return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8")))
def
migrate(state_sync, **kwargs):
15def migrate(state_sync, **kwargs): # type: ignore 16 engine_adapter = state_sync.engine_adapter 17 schema = state_sync.schema 18 snapshots_table = "_snapshots" 19 intervals_table = "_intervals" 20 if schema: 21 snapshots_table = f"{schema}.{snapshots_table}" 22 intervals_table = f"{schema}.{intervals_table}" 23 24 migration_required = False 25 new_snapshots = [] 26 new_intervals = [] 27 28 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 29 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 30 quote_identifiers=True, 31 ): 32 parsed_snapshot = json.loads(snapshot) 33 34 if not parsed_snapshot.get("change_category"): 35 fingerprint = parsed_snapshot.get("fingerprint") 36 version = _hash( 37 [ 38 fingerprint["data_hash"], 39 fingerprint["parent_data_hash"], 40 ] 41 ) 42 parsed_snapshot["change_category"] = ( 43 4 if version == parsed_snapshot.get("version") else 5 44 ) 45 migration_required = True 46 47 def _add_interval(start_ts: int, end_ts: int, is_dev: bool) -> None: 48 new_intervals.append( 49 { 50 "id": random_id(), 51 "created_ts": now_timestamp(), 52 "name": name, 53 "identifier": identifier, 54 "version": version, 55 "start_ts": start_ts, 56 "end_ts": end_ts, 57 "is_dev": is_dev, 58 "is_removed": False, 59 "is_compacted": True, 60 } 61 ) 62 63 for interval in parsed_snapshot.pop("intervals", []): 64 _add_interval(interval[0], interval[1], False) 65 migration_required = True 66 67 for interval in parsed_snapshot.pop("dev_intervals", []): 68 _add_interval(interval[0], interval[1], True) 69 migration_required = True 70 71 new_snapshots.append( 72 { 73 "name": name, 74 "identifier": identifier, 75 "version": version, 76 "snapshot": json.dumps(parsed_snapshot), 77 "kind_name": kind_name, 78 } 79 ) 80 81 if migration_required: 82 index_type = index_text_type(engine_adapter.dialect) 83 84 engine_adapter.delete_from(snapshots_table, "TRUE") 85 engine_adapter.insert_append( 86 snapshots_table, 87 pd.DataFrame(new_snapshots), 88 columns_to_types={ 89 "name": exp.DataType.build(index_type), 90 "identifier": exp.DataType.build(index_type), 91 "version": exp.DataType.build(index_type), 92 "snapshot": exp.DataType.build("text"), 93 "kind_name": exp.DataType.build(index_type), 94 }, 95 ) 96 97 if new_intervals: 98 engine_adapter.insert_append( 99 intervals_table, 100 pd.DataFrame(new_intervals), 101 columns_to_types={ 102 "id": exp.DataType.build(index_type), 103 "created_ts": exp.DataType.build("bigint"), 104 "name": exp.DataType.build(index_type), 105 "identifier": exp.DataType.build(index_type), 106 "version": exp.DataType.build(index_type), 107 "start_ts": exp.DataType.build("bigint"), 108 "end_ts": exp.DataType.build("bigint"), 109 "is_dev": exp.DataType.build("boolean"), 110 "is_removed": exp.DataType.build("boolean"), 111 "is_compacted": exp.DataType.build("boolean"), 112 }, 113 )