Remove superfluous exp.Paren references from partitioned_by
1"""Remove superfluous exp.Paren references from partitioned_by""" 2 3import json 4 5from sqlglot import exp 6 7from sqlmesh.utils.migration import index_text_type 8from sqlmesh.utils.migration import blob_text_type 9 10 11def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 12 pass 13 14 15def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 16 import pandas as pd 17 18 snapshots_table = "_snapshots" 19 index_type = index_text_type(engine_adapter.dialect) 20 if schema: 21 snapshots_table = f"{schema}.{snapshots_table}" 22 23 new_snapshots = [] 24 updated = False 25 26 for ( 27 name, 28 identifier, 29 version, 30 snapshot, 31 kind_name, 32 updated_ts, 33 unpaused_ts, 34 ttl_ms, 35 unrestorable, 36 ) in engine_adapter.fetchall( 37 exp.select( 38 "name", 39 "identifier", 40 "version", 41 "snapshot", 42 "kind_name", 43 "updated_ts", 44 "unpaused_ts", 45 "ttl_ms", 46 "unrestorable", 47 ).from_(snapshots_table), 48 quote_identifiers=True, 49 ): 50 parsed_snapshot = json.loads(snapshot) 51 52 if partitioned_by := parsed_snapshot["node"].get("partitioned_by"): 53 new_partitioned_by = [] 54 for item in partitioned_by: 55 # rewrite '(foo)' to 'foo' 56 if item.startswith("(") and item.endswith(")"): 57 item = item[1:-1] 58 updated = True 59 new_partitioned_by.append(item) 60 parsed_snapshot["node"]["partitioned_by"] = new_partitioned_by 61 62 new_snapshots.append( 63 { 64 "name": name, 65 "identifier": identifier, 66 "version": version, 67 "snapshot": json.dumps(parsed_snapshot), 68 "kind_name": kind_name, 69 "updated_ts": updated_ts, 70 "unpaused_ts": unpaused_ts, 71 "ttl_ms": ttl_ms, 72 "unrestorable": unrestorable, 73 } 74 ) 75 76 if new_snapshots and updated: 77 engine_adapter.delete_from(snapshots_table, "TRUE") 78 blob_type = blob_text_type(engine_adapter.dialect) 79 80 engine_adapter.insert_append( 81 snapshots_table, 82 pd.DataFrame(new_snapshots), 83 target_columns_to_types={ 84 "name": exp.DataType.build(index_type), 85 "identifier": exp.DataType.build(index_type), 86 "version": exp.DataType.build(index_type), 87 "snapshot": exp.DataType.build(blob_type), 88 "kind_name": exp.DataType.build(index_type), 89 "updated_ts": exp.DataType.build("bigint"), 90 "unpaused_ts": exp.DataType.build("bigint"), 91 "ttl_ms": exp.DataType.build("bigint"), 92 "unrestorable": exp.DataType.build("boolean"), 93 }, 94 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
16def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 17 import pandas as pd 18 19 snapshots_table = "_snapshots" 20 index_type = index_text_type(engine_adapter.dialect) 21 if schema: 22 snapshots_table = f"{schema}.{snapshots_table}" 23 24 new_snapshots = [] 25 updated = False 26 27 for ( 28 name, 29 identifier, 30 version, 31 snapshot, 32 kind_name, 33 updated_ts, 34 unpaused_ts, 35 ttl_ms, 36 unrestorable, 37 ) in engine_adapter.fetchall( 38 exp.select( 39 "name", 40 "identifier", 41 "version", 42 "snapshot", 43 "kind_name", 44 "updated_ts", 45 "unpaused_ts", 46 "ttl_ms", 47 "unrestorable", 48 ).from_(snapshots_table), 49 quote_identifiers=True, 50 ): 51 parsed_snapshot = json.loads(snapshot) 52 53 if partitioned_by := parsed_snapshot["node"].get("partitioned_by"): 54 new_partitioned_by = [] 55 for item in partitioned_by: 56 # rewrite '(foo)' to 'foo' 57 if item.startswith("(") and item.endswith(")"): 58 item = item[1:-1] 59 updated = True 60 new_partitioned_by.append(item) 61 parsed_snapshot["node"]["partitioned_by"] = new_partitioned_by 62 63 new_snapshots.append( 64 { 65 "name": name, 66 "identifier": identifier, 67 "version": version, 68 "snapshot": json.dumps(parsed_snapshot), 69 "kind_name": kind_name, 70 "updated_ts": updated_ts, 71 "unpaused_ts": unpaused_ts, 72 "ttl_ms": ttl_ms, 73 "unrestorable": unrestorable, 74 } 75 ) 76 77 if new_snapshots and updated: 78 engine_adapter.delete_from(snapshots_table, "TRUE") 79 blob_type = blob_text_type(engine_adapter.dialect) 80 81 engine_adapter.insert_append( 82 snapshots_table, 83 pd.DataFrame(new_snapshots), 84 target_columns_to_types={ 85 "name": exp.DataType.build(index_type), 86 "identifier": exp.DataType.build(index_type), 87 "version": exp.DataType.build(index_type), 88 "snapshot": exp.DataType.build(blob_type), 89 "kind_name": exp.DataType.build(index_type), 90 "updated_ts": exp.DataType.build("bigint"), 91 "unpaused_ts": exp.DataType.build("bigint"), 92 "ttl_ms": exp.DataType.build("bigint"), 93 "unrestorable": exp.DataType.build("boolean"), 94 }, 95 )