Add forward_only column to the snapshots table.
1"""Add forward_only column to the snapshots table.""" 2 3import json 4 5from sqlglot import exp 6 7from sqlmesh.utils.migration import index_text_type, blob_text_type 8 9 10def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 11 snapshots_table = "_snapshots" 12 if schema: 13 snapshots_table = f"{schema}.{snapshots_table}" 14 15 alter_table_exp = exp.Alter( 16 this=exp.to_table(snapshots_table), 17 kind="TABLE", 18 actions=[ 19 exp.ColumnDef( 20 this=exp.to_column("forward_only"), 21 kind=exp.DataType.build("boolean"), 22 ) 23 ], 24 ) 25 engine_adapter.execute(alter_table_exp) 26 27 28def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 29 import pandas as pd 30 31 snapshots_table = "_snapshots" 32 if schema: 33 snapshots_table = f"{schema}.{snapshots_table}" 34 35 new_snapshots = [] 36 37 for ( 38 name, 39 identifier, 40 version, 41 snapshot, 42 kind_name, 43 updated_ts, 44 unpaused_ts, 45 ttl_ms, 46 unrestorable, 47 forward_only, 48 ) in engine_adapter.fetchall( 49 exp.select( 50 "name", 51 "identifier", 52 "version", 53 "snapshot", 54 "kind_name", 55 "updated_ts", 56 "unpaused_ts", 57 "ttl_ms", 58 "unrestorable", 59 "forward_only", 60 ).from_(snapshots_table), 61 quote_identifiers=True, 62 ): 63 parsed_snapshot = json.loads(snapshot) 64 65 forward_only = parsed_snapshot.get("forward_only") 66 if forward_only is None: 67 forward_only = parsed_snapshot.get("change_category") == 3 68 69 new_snapshots.append( 70 { 71 "name": name, 72 "identifier": identifier, 73 "version": version, 74 "snapshot": json.dumps(parsed_snapshot), 75 "kind_name": kind_name, 76 "updated_ts": updated_ts, 77 "unpaused_ts": unpaused_ts, 78 "ttl_ms": ttl_ms, 79 "unrestorable": unrestorable, 80 "forward_only": forward_only, 81 } 82 ) 83 84 if new_snapshots: 85 engine_adapter.delete_from(snapshots_table, "TRUE") 86 index_type = index_text_type(engine_adapter.dialect) 87 blob_type = blob_text_type(engine_adapter.dialect) 88 89 engine_adapter.insert_append( 90 snapshots_table, 91 pd.DataFrame(new_snapshots), 92 target_columns_to_types={ 93 "name": exp.DataType.build(index_type), 94 "identifier": exp.DataType.build(index_type), 95 "version": exp.DataType.build(index_type), 96 "snapshot": exp.DataType.build(blob_type), 97 "kind_name": exp.DataType.build(index_type), 98 "updated_ts": exp.DataType.build("bigint"), 99 "unpaused_ts": exp.DataType.build("bigint"), 100 "ttl_ms": exp.DataType.build("bigint"), 101 "unrestorable": exp.DataType.build("boolean"), 102 "forward_only": exp.DataType.build("boolean"), 103 }, 104 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
11def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 12 snapshots_table = "_snapshots" 13 if schema: 14 snapshots_table = f"{schema}.{snapshots_table}" 15 16 alter_table_exp = exp.Alter( 17 this=exp.to_table(snapshots_table), 18 kind="TABLE", 19 actions=[ 20 exp.ColumnDef( 21 this=exp.to_column("forward_only"), 22 kind=exp.DataType.build("boolean"), 23 ) 24 ], 25 ) 26 engine_adapter.execute(alter_table_exp)
def
migrate_rows(engine_adapter, schema, **kwargs):
29def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 30 import pandas as pd 31 32 snapshots_table = "_snapshots" 33 if schema: 34 snapshots_table = f"{schema}.{snapshots_table}" 35 36 new_snapshots = [] 37 38 for ( 39 name, 40 identifier, 41 version, 42 snapshot, 43 kind_name, 44 updated_ts, 45 unpaused_ts, 46 ttl_ms, 47 unrestorable, 48 forward_only, 49 ) in engine_adapter.fetchall( 50 exp.select( 51 "name", 52 "identifier", 53 "version", 54 "snapshot", 55 "kind_name", 56 "updated_ts", 57 "unpaused_ts", 58 "ttl_ms", 59 "unrestorable", 60 "forward_only", 61 ).from_(snapshots_table), 62 quote_identifiers=True, 63 ): 64 parsed_snapshot = json.loads(snapshot) 65 66 forward_only = parsed_snapshot.get("forward_only") 67 if forward_only is None: 68 forward_only = parsed_snapshot.get("change_category") == 3 69 70 new_snapshots.append( 71 { 72 "name": name, 73 "identifier": identifier, 74 "version": version, 75 "snapshot": json.dumps(parsed_snapshot), 76 "kind_name": kind_name, 77 "updated_ts": updated_ts, 78 "unpaused_ts": unpaused_ts, 79 "ttl_ms": ttl_ms, 80 "unrestorable": unrestorable, 81 "forward_only": forward_only, 82 } 83 ) 84 85 if new_snapshots: 86 engine_adapter.delete_from(snapshots_table, "TRUE") 87 index_type = index_text_type(engine_adapter.dialect) 88 blob_type = blob_text_type(engine_adapter.dialect) 89 90 engine_adapter.insert_append( 91 snapshots_table, 92 pd.DataFrame(new_snapshots), 93 target_columns_to_types={ 94 "name": exp.DataType.build(index_type), 95 "identifier": exp.DataType.build(index_type), 96 "version": exp.DataType.build(index_type), 97 "snapshot": exp.DataType.build(blob_type), 98 "kind_name": exp.DataType.build(index_type), 99 "updated_ts": exp.DataType.build("bigint"), 100 "unpaused_ts": exp.DataType.build("bigint"), 101 "ttl_ms": exp.DataType.build("bigint"), 102 "unrestorable": exp.DataType.build("boolean"), 103 "forward_only": exp.DataType.build("boolean"), 104 }, 105 )