Generate mapping schema data types using the corresponding model's dialect.
1"""Generate mapping schema data types using the corresponding model's dialect.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp, parse_one 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 20 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 21 quote_identifiers=True, 22 ): 23 parsed_snapshot = json.loads(snapshot) 24 node = parsed_snapshot["node"] 25 26 mapping_schema = node.get("mapping_schema") 27 if mapping_schema: 28 node["mapping_schema"] = _convert_schema_types(mapping_schema, node["dialect"]) 29 30 new_snapshots.append( 31 { 32 "name": name, 33 "identifier": identifier, 34 "version": version, 35 "snapshot": json.dumps(parsed_snapshot), 36 "kind_name": kind_name, 37 } 38 ) 39 40 if new_snapshots: 41 engine_adapter.delete_from(snapshots_table, "TRUE") 42 43 index_type = index_text_type(engine_adapter.dialect) 44 45 engine_adapter.insert_append( 46 snapshots_table, 47 pd.DataFrame(new_snapshots), 48 columns_to_types={ 49 "name": exp.DataType.build(index_type), 50 "identifier": exp.DataType.build(index_type), 51 "version": exp.DataType.build(index_type), 52 "snapshot": exp.DataType.build("text"), 53 "kind_name": exp.DataType.build(index_type), 54 }, 55 ) 56 57 58def _convert_schema_types(schema, dialect): # type: ignore 59 if not schema: 60 return schema 61 62 for k, v in schema.items(): 63 if isinstance(v, dict): 64 _convert_schema_types(v, dialect) 65 else: 66 schema[k] = parse_one(v).sql(dialect=dialect) 67 68 return schema
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 22 quote_identifiers=True, 23 ): 24 parsed_snapshot = json.loads(snapshot) 25 node = parsed_snapshot["node"] 26 27 mapping_schema = node.get("mapping_schema") 28 if mapping_schema: 29 node["mapping_schema"] = _convert_schema_types(mapping_schema, node["dialect"]) 30 31 new_snapshots.append( 32 { 33 "name": name, 34 "identifier": identifier, 35 "version": version, 36 "snapshot": json.dumps(parsed_snapshot), 37 "kind_name": kind_name, 38 } 39 ) 40 41 if new_snapshots: 42 engine_adapter.delete_from(snapshots_table, "TRUE") 43 44 index_type = index_text_type(engine_adapter.dialect) 45 46 engine_adapter.insert_append( 47 snapshots_table, 48 pd.DataFrame(new_snapshots), 49 columns_to_types={ 50 "name": exp.DataType.build(index_type), 51 "identifier": exp.DataType.build(index_type), 52 "version": exp.DataType.build(index_type), 53 "snapshot": exp.DataType.build("text"), 54 "kind_name": exp.DataType.build(index_type), 55 }, 56 )