Trim irrelevant attributes from indirect versions.
1"""Trim irrelevant attributes from indirect versions.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 20 for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_( 22 snapshots_table 23 ), 24 quote_identifiers=True, 25 ): 26 parsed_snapshot = json.loads(snapshot) 27 for indirect_versions in parsed_snapshot["indirect_versions"].values(): 28 for indirect_version in indirect_versions: 29 # Only keep version and change_category. 30 version = indirect_version.get("version") 31 change_category = indirect_version.get("change_category") 32 indirect_version.clear() 33 indirect_version["version"] = version 34 indirect_version["change_category"] = change_category 35 36 new_snapshots.append( 37 { 38 "name": name, 39 "identifier": identifier, 40 "version": version, 41 "snapshot": json.dumps(parsed_snapshot), 42 "kind_name": kind_name, 43 "expiration_ts": expiration_ts, 44 } 45 ) 46 47 if new_snapshots: 48 engine_adapter.delete_from(snapshots_table, "TRUE") 49 50 index_type = index_text_type(engine_adapter.dialect) 51 52 engine_adapter.insert_append( 53 snapshots_table, 54 pd.DataFrame(new_snapshots), 55 columns_to_types={ 56 "name": exp.DataType.build(index_type), 57 "identifier": exp.DataType.build(index_type), 58 "version": exp.DataType.build(index_type), 59 "snapshot": exp.DataType.build("text"), 60 "kind_name": exp.DataType.build(index_type), 61 "expiration_ts": exp.DataType.build("bigint"), 62 }, 63 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 21 for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_( 23 snapshots_table 24 ), 25 quote_identifiers=True, 26 ): 27 parsed_snapshot = json.loads(snapshot) 28 for indirect_versions in parsed_snapshot["indirect_versions"].values(): 29 for indirect_version in indirect_versions: 30 # Only keep version and change_category. 31 version = indirect_version.get("version") 32 change_category = indirect_version.get("change_category") 33 indirect_version.clear() 34 indirect_version["version"] = version 35 indirect_version["change_category"] = change_category 36 37 new_snapshots.append( 38 { 39 "name": name, 40 "identifier": identifier, 41 "version": version, 42 "snapshot": json.dumps(parsed_snapshot), 43 "kind_name": kind_name, 44 "expiration_ts": expiration_ts, 45 } 46 ) 47 48 if new_snapshots: 49 engine_adapter.delete_from(snapshots_table, "TRUE") 50 51 index_type = index_text_type(engine_adapter.dialect) 52 53 engine_adapter.insert_append( 54 snapshots_table, 55 pd.DataFrame(new_snapshots), 56 columns_to_types={ 57 "name": exp.DataType.build(index_type), 58 "identifier": exp.DataType.build(index_type), 59 "version": exp.DataType.build(index_type), 60 "snapshot": exp.DataType.build("text"), 61 "kind_name": exp.DataType.build(index_type), 62 "expiration_ts": exp.DataType.build("bigint"), 63 }, 64 )