Add the kind_name column to the snapshots table.
1"""Add the kind_name column to the snapshots table.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 index_type = index_text_type(engine_adapter.dialect) 19 20 alter_table_exp = exp.AlterTable( 21 this=exp.to_table(snapshots_table), 22 actions=[ 23 exp.ColumnDef( 24 this=exp.to_column("kind_name"), 25 kind=exp.DataType.build(index_type), 26 ) 27 ], 28 ) 29 engine_adapter.execute(alter_table_exp) 30 31 new_snapshots = [] 32 33 for name, identifier, version, snapshot in engine_adapter.fetchall( 34 exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table), 35 quote_identifiers=True, 36 ): 37 parsed_snapshot = json.loads(snapshot) 38 new_snapshots.append( 39 { 40 "name": name, 41 "identifier": identifier, 42 "version": version, 43 "snapshot": snapshot, 44 "kind_name": parsed_snapshot["model"]["kind"]["name"], 45 } 46 ) 47 48 if new_snapshots: 49 engine_adapter.delete_from(snapshots_table, "TRUE") 50 51 engine_adapter.insert_append( 52 snapshots_table, 53 pd.DataFrame(new_snapshots), 54 columns_to_types={ 55 "name": exp.DataType.build(index_type), 56 "identifier": exp.DataType.build(index_type), 57 "version": exp.DataType.build(index_type), 58 "snapshot": exp.DataType.build("text"), 59 "kind_name": exp.DataType.build(index_type), 60 }, 61 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 index_type = index_text_type(engine_adapter.dialect) 20 21 alter_table_exp = exp.AlterTable( 22 this=exp.to_table(snapshots_table), 23 actions=[ 24 exp.ColumnDef( 25 this=exp.to_column("kind_name"), 26 kind=exp.DataType.build(index_type), 27 ) 28 ], 29 ) 30 engine_adapter.execute(alter_table_exp) 31 32 new_snapshots = [] 33 34 for name, identifier, version, snapshot in engine_adapter.fetchall( 35 exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table), 36 quote_identifiers=True, 37 ): 38 parsed_snapshot = json.loads(snapshot) 39 new_snapshots.append( 40 { 41 "name": name, 42 "identifier": identifier, 43 "version": version, 44 "snapshot": snapshot, 45 "kind_name": parsed_snapshot["model"]["kind"]["name"], 46 } 47 ) 48 49 if new_snapshots: 50 engine_adapter.delete_from(snapshots_table, "TRUE") 51 52 engine_adapter.insert_append( 53 snapshots_table, 54 pd.DataFrame(new_snapshots), 55 columns_to_types={ 56 "name": exp.DataType.build(index_type), 57 "identifier": exp.DataType.build(index_type), 58 "version": exp.DataType.build(index_type), 59 "snapshot": exp.DataType.build("text"), 60 "kind_name": exp.DataType.build(index_type), 61 }, 62 )