Replace snapshot model field with node.
1"""Replace snapshot model field with node.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 20 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 22 quote_identifiers=True, 23 ): 24 parsed_snapshot = json.loads(snapshot) 25 parsed_snapshot["node"] = parsed_snapshot.pop("model") 26 27 new_snapshots.append( 28 { 29 "name": name, 30 "identifier": identifier, 31 "version": version, 32 "snapshot": json.dumps(parsed_snapshot), 33 "kind_name": kind_name, 34 } 35 ) 36 37 if new_snapshots: 38 engine_adapter.delete_from(snapshots_table, "TRUE") 39 40 index_type = index_text_type(engine_adapter.dialect) 41 42 engine_adapter.insert_append( 43 snapshots_table, 44 pd.DataFrame(new_snapshots), 45 columns_to_types={ 46 "name": exp.DataType.build(index_type), 47 "identifier": exp.DataType.build(index_type), 48 "version": exp.DataType.build(index_type), 49 "snapshot": exp.DataType.build("text"), 50 "kind_name": exp.DataType.build(index_type), 51 }, 52 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 21 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 23 quote_identifiers=True, 24 ): 25 parsed_snapshot = json.loads(snapshot) 26 parsed_snapshot["node"] = parsed_snapshot.pop("model") 27 28 new_snapshots.append( 29 { 30 "name": name, 31 "identifier": identifier, 32 "version": version, 33 "snapshot": json.dumps(parsed_snapshot), 34 "kind_name": kind_name, 35 } 36 ) 37 38 if new_snapshots: 39 engine_adapter.delete_from(snapshots_table, "TRUE") 40 41 index_type = index_text_type(engine_adapter.dialect) 42 43 engine_adapter.insert_append( 44 snapshots_table, 45 pd.DataFrame(new_snapshots), 46 columns_to_types={ 47 "name": exp.DataType.build(index_type), 48 "identifier": exp.DataType.build(index_type), 49 "version": exp.DataType.build(index_type), 50 "snapshot": exp.DataType.build("text"), 51 "kind_name": exp.DataType.build(index_type), 52 }, 53 )