Remove validate_query from existing snapshots.
1"""Remove validate_query from existing snapshots.""" 2 3import json 4 5from sqlglot import exp 6 7from sqlmesh.utils.migration import index_text_type 8from sqlmesh.utils.migration import blob_text_type 9 10 11def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 12 pass 13 14 15def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 16 import pandas as pd 17 18 snapshots_table = "_snapshots" 19 index_type = index_text_type(engine_adapter.dialect) 20 if schema: 21 snapshots_table = f"{schema}.{snapshots_table}" 22 23 new_snapshots = [] 24 25 for ( 26 name, 27 identifier, 28 version, 29 snapshot, 30 kind_name, 31 updated_ts, 32 unpaused_ts, 33 ttl_ms, 34 unrestorable, 35 ) in engine_adapter.fetchall( 36 exp.select( 37 "name", 38 "identifier", 39 "version", 40 "snapshot", 41 "kind_name", 42 "updated_ts", 43 "unpaused_ts", 44 "ttl_ms", 45 "unrestorable", 46 ).from_(snapshots_table), 47 quote_identifiers=True, 48 ): 49 parsed_snapshot = json.loads(snapshot) 50 51 parsed_snapshot["node"].pop("validate_query", None) 52 53 new_snapshots.append( 54 { 55 "name": name, 56 "identifier": identifier, 57 "version": version, 58 "snapshot": json.dumps(parsed_snapshot), 59 "kind_name": kind_name, 60 "updated_ts": updated_ts, 61 "unpaused_ts": unpaused_ts, 62 "ttl_ms": ttl_ms, 63 "unrestorable": unrestorable, 64 } 65 ) 66 67 if new_snapshots: 68 engine_adapter.delete_from(snapshots_table, "TRUE") 69 blob_type = blob_text_type(engine_adapter.dialect) 70 71 engine_adapter.insert_append( 72 snapshots_table, 73 pd.DataFrame(new_snapshots), 74 target_columns_to_types={ 75 "name": exp.DataType.build(index_type), 76 "identifier": exp.DataType.build(index_type), 77 "version": exp.DataType.build(index_type), 78 "snapshot": exp.DataType.build(blob_type), 79 "kind_name": exp.DataType.build(index_type), 80 "updated_ts": exp.DataType.build("bigint"), 81 "unpaused_ts": exp.DataType.build("bigint"), 82 "ttl_ms": exp.DataType.build("bigint"), 83 "unrestorable": exp.DataType.build("boolean"), 84 }, 85 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
16def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 17 import pandas as pd 18 19 snapshots_table = "_snapshots" 20 index_type = index_text_type(engine_adapter.dialect) 21 if schema: 22 snapshots_table = f"{schema}.{snapshots_table}" 23 24 new_snapshots = [] 25 26 for ( 27 name, 28 identifier, 29 version, 30 snapshot, 31 kind_name, 32 updated_ts, 33 unpaused_ts, 34 ttl_ms, 35 unrestorable, 36 ) in engine_adapter.fetchall( 37 exp.select( 38 "name", 39 "identifier", 40 "version", 41 "snapshot", 42 "kind_name", 43 "updated_ts", 44 "unpaused_ts", 45 "ttl_ms", 46 "unrestorable", 47 ).from_(snapshots_table), 48 quote_identifiers=True, 49 ): 50 parsed_snapshot = json.loads(snapshot) 51 52 parsed_snapshot["node"].pop("validate_query", None) 53 54 new_snapshots.append( 55 { 56 "name": name, 57 "identifier": identifier, 58 "version": version, 59 "snapshot": json.dumps(parsed_snapshot), 60 "kind_name": kind_name, 61 "updated_ts": updated_ts, 62 "unpaused_ts": unpaused_ts, 63 "ttl_ms": ttl_ms, 64 "unrestorable": unrestorable, 65 } 66 ) 67 68 if new_snapshots: 69 engine_adapter.delete_from(snapshots_table, "TRUE") 70 blob_type = blob_text_type(engine_adapter.dialect) 71 72 engine_adapter.insert_append( 73 snapshots_table, 74 pd.DataFrame(new_snapshots), 75 target_columns_to_types={ 76 "name": exp.DataType.build(index_type), 77 "identifier": exp.DataType.build(index_type), 78 "version": exp.DataType.build(index_type), 79 "snapshot": exp.DataType.build(blob_type), 80 "kind_name": exp.DataType.build(index_type), 81 "updated_ts": exp.DataType.build("bigint"), 82 "unpaused_ts": exp.DataType.build("bigint"), 83 "ttl_ms": exp.DataType.build("bigint"), 84 "unrestorable": exp.DataType.build("boolean"), 85 }, 86 )