Add dev_version and fingerprint columns to the snapshots table.
1"""Add dev_version and fingerprint columns to the snapshots table.""" 2 3import json 4 5from sqlglot import exp 6 7from sqlmesh.utils.migration import index_text_type, blob_text_type 8 9 10def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 11 snapshots_table = "_snapshots" 12 if schema: 13 snapshots_table = f"{schema}.{snapshots_table}" 14 15 index_type = index_text_type(engine_adapter.dialect) 16 blob_type = blob_text_type(engine_adapter.dialect) 17 18 add_dev_version_exp = exp.Alter( 19 this=exp.to_table(snapshots_table), 20 kind="TABLE", 21 actions=[ 22 exp.ColumnDef( 23 this=exp.to_column("dev_version"), 24 kind=exp.DataType.build(index_type), 25 ) 26 ], 27 ) 28 engine_adapter.execute(add_dev_version_exp) 29 30 add_fingerprint_exp = exp.Alter( 31 this=exp.to_table(snapshots_table), 32 kind="TABLE", 33 actions=[ 34 exp.ColumnDef( 35 this=exp.to_column("fingerprint"), 36 kind=exp.DataType.build(blob_type), 37 ) 38 ], 39 ) 40 engine_adapter.execute(add_fingerprint_exp) 41 42 43def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 44 import pandas as pd 45 46 snapshots_table = "_snapshots" 47 if schema: 48 snapshots_table = f"{schema}.{snapshots_table}" 49 50 index_type = index_text_type(engine_adapter.dialect) 51 blob_type = blob_text_type(engine_adapter.dialect) 52 53 new_snapshots = [] 54 55 for ( 56 name, 57 identifier, 58 version, 59 snapshot, 60 kind_name, 61 updated_ts, 62 unpaused_ts, 63 ttl_ms, 64 unrestorable, 65 forward_only, 66 _, 67 _, 68 ) in engine_adapter.fetchall( 69 exp.select( 70 "name", 71 "identifier", 72 "version", 73 "snapshot", 74 "kind_name", 75 "updated_ts", 76 "unpaused_ts", 77 "ttl_ms", 78 "unrestorable", 79 "forward_only", 80 "dev_version", 81 "fingerprint", 82 ).from_(snapshots_table), 83 quote_identifiers=True, 84 ): 85 parsed_snapshot = json.loads(snapshot) 86 new_snapshots.append( 87 { 88 "name": name, 89 "identifier": identifier, 90 "version": version, 91 "snapshot": snapshot, 92 "kind_name": kind_name, 93 "updated_ts": updated_ts, 94 "unpaused_ts": unpaused_ts, 95 "ttl_ms": ttl_ms, 96 "unrestorable": unrestorable, 97 "forward_only": forward_only, 98 "dev_version": parsed_snapshot.get("dev_version"), 99 "fingerprint": json.dumps(parsed_snapshot.get("fingerprint")), 100 } 101 ) 102 103 if new_snapshots: 104 engine_adapter.delete_from(snapshots_table, "TRUE") 105 106 engine_adapter.insert_append( 107 snapshots_table, 108 pd.DataFrame(new_snapshots), 109 target_columns_to_types={ 110 "name": exp.DataType.build(index_type), 111 "identifier": exp.DataType.build(index_type), 112 "version": exp.DataType.build(index_type), 113 "snapshot": exp.DataType.build(blob_type), 114 "kind_name": exp.DataType.build(index_type), 115 "updated_ts": exp.DataType.build("bigint"), 116 "unpaused_ts": exp.DataType.build("bigint"), 117 "ttl_ms": exp.DataType.build("bigint"), 118 "unrestorable": exp.DataType.build("boolean"), 119 "forward_only": exp.DataType.build("boolean"), 120 "dev_version": exp.DataType.build(index_type), 121 "fingerprint": exp.DataType.build(blob_type), 122 }, 123 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
11def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 12 snapshots_table = "_snapshots" 13 if schema: 14 snapshots_table = f"{schema}.{snapshots_table}" 15 16 index_type = index_text_type(engine_adapter.dialect) 17 blob_type = blob_text_type(engine_adapter.dialect) 18 19 add_dev_version_exp = exp.Alter( 20 this=exp.to_table(snapshots_table), 21 kind="TABLE", 22 actions=[ 23 exp.ColumnDef( 24 this=exp.to_column("dev_version"), 25 kind=exp.DataType.build(index_type), 26 ) 27 ], 28 ) 29 engine_adapter.execute(add_dev_version_exp) 30 31 add_fingerprint_exp = exp.Alter( 32 this=exp.to_table(snapshots_table), 33 kind="TABLE", 34 actions=[ 35 exp.ColumnDef( 36 this=exp.to_column("fingerprint"), 37 kind=exp.DataType.build(blob_type), 38 ) 39 ], 40 ) 41 engine_adapter.execute(add_fingerprint_exp)
def
migrate_rows(engine_adapter, schema, **kwargs):
44def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 45 import pandas as pd 46 47 snapshots_table = "_snapshots" 48 if schema: 49 snapshots_table = f"{schema}.{snapshots_table}" 50 51 index_type = index_text_type(engine_adapter.dialect) 52 blob_type = blob_text_type(engine_adapter.dialect) 53 54 new_snapshots = [] 55 56 for ( 57 name, 58 identifier, 59 version, 60 snapshot, 61 kind_name, 62 updated_ts, 63 unpaused_ts, 64 ttl_ms, 65 unrestorable, 66 forward_only, 67 _, 68 _, 69 ) in engine_adapter.fetchall( 70 exp.select( 71 "name", 72 "identifier", 73 "version", 74 "snapshot", 75 "kind_name", 76 "updated_ts", 77 "unpaused_ts", 78 "ttl_ms", 79 "unrestorable", 80 "forward_only", 81 "dev_version", 82 "fingerprint", 83 ).from_(snapshots_table), 84 quote_identifiers=True, 85 ): 86 parsed_snapshot = json.loads(snapshot) 87 new_snapshots.append( 88 { 89 "name": name, 90 "identifier": identifier, 91 "version": version, 92 "snapshot": snapshot, 93 "kind_name": kind_name, 94 "updated_ts": updated_ts, 95 "unpaused_ts": unpaused_ts, 96 "ttl_ms": ttl_ms, 97 "unrestorable": unrestorable, 98 "forward_only": forward_only, 99 "dev_version": parsed_snapshot.get("dev_version"), 100 "fingerprint": json.dumps(parsed_snapshot.get("fingerprint")), 101 } 102 ) 103 104 if new_snapshots: 105 engine_adapter.delete_from(snapshots_table, "TRUE") 106 107 engine_adapter.insert_append( 108 snapshots_table, 109 pd.DataFrame(new_snapshots), 110 target_columns_to_types={ 111 "name": exp.DataType.build(index_type), 112 "identifier": exp.DataType.build(index_type), 113 "version": exp.DataType.build(index_type), 114 "snapshot": exp.DataType.build(blob_type), 115 "kind_name": exp.DataType.build(index_type), 116 "updated_ts": exp.DataType.build("bigint"), 117 "unpaused_ts": exp.DataType.build("bigint"), 118 "ttl_ms": exp.DataType.build("bigint"), 119 "unrestorable": exp.DataType.build("boolean"), 120 "forward_only": exp.DataType.build("boolean"), 121 "dev_version": exp.DataType.build(index_type), 122 "fingerprint": exp.DataType.build(blob_type), 123 }, 124 )