Add the expiration_ts column to the snapshots table.
1"""Add the expiration_ts column to the snapshots table.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.date import to_datetime, to_timestamp 9from sqlmesh.utils.migration import index_text_type 10 11 12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 index_type = index_text_type(engine_adapter.dialect) 20 21 alter_table_exp = exp.AlterTable( 22 this=exp.to_table(snapshots_table), 23 actions=[ 24 exp.ColumnDef( 25 this=exp.to_column("expiration_ts"), 26 kind=exp.DataType.build("bigint"), 27 ) 28 ], 29 ) 30 engine_adapter.execute(alter_table_exp) 31 32 new_snapshots = [] 33 34 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 35 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 36 quote_identifiers=True, 37 ): 38 parsed_snapshot = json.loads(snapshot) 39 40 updated_ts = parsed_snapshot["updated_ts"] 41 ttl = parsed_snapshot["ttl"] 42 expiration_ts = to_timestamp(ttl, relative_base=to_datetime(updated_ts)) 43 44 new_snapshots.append( 45 { 46 "name": name, 47 "identifier": identifier, 48 "version": version, 49 "snapshot": snapshot, 50 "kind_name": kind_name, 51 "expiration_ts": expiration_ts, 52 } 53 ) 54 55 if new_snapshots: 56 engine_adapter.delete_from(snapshots_table, "TRUE") 57 58 engine_adapter.insert_append( 59 snapshots_table, 60 pd.DataFrame(new_snapshots), 61 columns_to_types={ 62 "name": exp.DataType.build(index_type), 63 "identifier": exp.DataType.build(index_type), 64 "version": exp.DataType.build(index_type), 65 "snapshot": exp.DataType.build("text"), 66 "kind_name": exp.DataType.build(index_type), 67 "expiration_ts": exp.DataType.build("bigint"), 68 }, 69 )
def
migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs): # type: ignore 14 engine_adapter = state_sync.engine_adapter 15 schema = state_sync.schema 16 snapshots_table = "_snapshots" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 20 index_type = index_text_type(engine_adapter.dialect) 21 22 alter_table_exp = exp.AlterTable( 23 this=exp.to_table(snapshots_table), 24 actions=[ 25 exp.ColumnDef( 26 this=exp.to_column("expiration_ts"), 27 kind=exp.DataType.build("bigint"), 28 ) 29 ], 30 ) 31 engine_adapter.execute(alter_table_exp) 32 33 new_snapshots = [] 34 35 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 36 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 37 quote_identifiers=True, 38 ): 39 parsed_snapshot = json.loads(snapshot) 40 41 updated_ts = parsed_snapshot["updated_ts"] 42 ttl = parsed_snapshot["ttl"] 43 expiration_ts = to_timestamp(ttl, relative_base=to_datetime(updated_ts)) 44 45 new_snapshots.append( 46 { 47 "name": name, 48 "identifier": identifier, 49 "version": version, 50 "snapshot": snapshot, 51 "kind_name": kind_name, 52 "expiration_ts": expiration_ts, 53 } 54 ) 55 56 if new_snapshots: 57 engine_adapter.delete_from(snapshots_table, "TRUE") 58 59 engine_adapter.insert_append( 60 snapshots_table, 61 pd.DataFrame(new_snapshots), 62 columns_to_types={ 63 "name": exp.DataType.build(index_type), 64 "identifier": exp.DataType.build(index_type), 65 "version": exp.DataType.build(index_type), 66 "snapshot": exp.DataType.build("text"), 67 "kind_name": exp.DataType.build(index_type), 68 "expiration_ts": exp.DataType.build("bigint"), 69 }, 70 )