Fix table properties that have extra quoting due to a bug.
1"""Fix table properties that have extra quoting due to a bug.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.core import dialect as d 9from sqlmesh.utils.migration import index_text_type 10 11 12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 found_table_properties = False 21 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 23 quote_identifiers=True, 24 ): 25 parsed_snapshot = json.loads(snapshot) 26 table_properties = parsed_snapshot["node"].get("table_properties") 27 if table_properties: 28 found_table_properties = True 29 dialect = parsed_snapshot["node"].get("dialect") 30 parsed_snapshot["node"]["table_properties"] = exp.Tuple( 31 expressions=[ 32 exp.Literal.string(k).eq(d.parse_one(v)) for k, v in table_properties.items() 33 ] 34 ).sql(dialect=dialect) 35 36 new_snapshots.append( 37 { 38 "name": name, 39 "identifier": identifier, 40 "version": version, 41 "snapshot": json.dumps(parsed_snapshot), 42 "kind_name": kind_name, 43 } 44 ) 45 46 if found_table_properties: 47 engine_adapter.delete_from(snapshots_table, "TRUE") 48 49 index_type = index_text_type(engine_adapter.dialect) 50 51 engine_adapter.insert_append( 52 snapshots_table, 53 pd.DataFrame(new_snapshots), 54 columns_to_types={ 55 "name": exp.DataType.build(index_type), 56 "identifier": exp.DataType.build(index_type), 57 "version": exp.DataType.build(index_type), 58 "snapshot": exp.DataType.build("text"), 59 "kind_name": exp.DataType.build(index_type), 60 }, 61 )
def
migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs): # type: ignore 14 engine_adapter = state_sync.engine_adapter 15 schema = state_sync.schema 16 snapshots_table = "_snapshots" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 20 new_snapshots = [] 21 found_table_properties = False 22 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 23 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 24 quote_identifiers=True, 25 ): 26 parsed_snapshot = json.loads(snapshot) 27 table_properties = parsed_snapshot["node"].get("table_properties") 28 if table_properties: 29 found_table_properties = True 30 dialect = parsed_snapshot["node"].get("dialect") 31 parsed_snapshot["node"]["table_properties"] = exp.Tuple( 32 expressions=[ 33 exp.Literal.string(k).eq(d.parse_one(v)) for k, v in table_properties.items() 34 ] 35 ).sql(dialect=dialect) 36 37 new_snapshots.append( 38 { 39 "name": name, 40 "identifier": identifier, 41 "version": version, 42 "snapshot": json.dumps(parsed_snapshot), 43 "kind_name": kind_name, 44 } 45 ) 46 47 if found_table_properties: 48 engine_adapter.delete_from(snapshots_table, "TRUE") 49 50 index_type = index_text_type(engine_adapter.dialect) 51 52 engine_adapter.insert_append( 53 snapshots_table, 54 pd.DataFrame(new_snapshots), 55 columns_to_types={ 56 "name": exp.DataType.build(index_type), 57 "identifier": exp.DataType.build(index_type), 58 "version": exp.DataType.build(index_type), 59 "snapshot": exp.DataType.build("text"), 60 "kind_name": exp.DataType.build(index_type), 61 }, 62 )