Move the gateway variable.
1"""Move the gateway variable.""" 2 3import ast 4import json 5 6import pandas as pd 7from sqlglot import exp 8 9from sqlmesh.utils.migration import index_text_type 10 11 12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 migration_needed = False 20 new_snapshots = [] 21 22 for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall( 23 exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_( 24 snapshots_table 25 ), 26 quote_identifiers=True, 27 ): 28 parsed_snapshot = json.loads(snapshot) 29 python_env = parsed_snapshot["node"].get("python_env") 30 if python_env: 31 gateway = python_env.pop("gateway", None) 32 if gateway is not None: 33 migration_needed = True 34 sqlmesh_vars = {"gateway": ast.literal_eval(gateway["payload"])} 35 python_env["__sqlmesh__vars__"] = { 36 "payload": repr(sqlmesh_vars), 37 "kind": "value", 38 } 39 40 new_snapshots.append( 41 { 42 "name": name, 43 "identifier": identifier, 44 "version": version, 45 "snapshot": json.dumps(parsed_snapshot), 46 "kind_name": kind_name, 47 "expiration_ts": expiration_ts, 48 } 49 ) 50 51 if migration_needed and new_snapshots: 52 engine_adapter.delete_from(snapshots_table, "TRUE") 53 54 index_type = index_text_type(engine_adapter.dialect) 55 56 engine_adapter.insert_append( 57 snapshots_table, 58 pd.DataFrame(new_snapshots), 59 columns_to_types={ 60 "name": exp.DataType.build(index_type), 61 "identifier": exp.DataType.build(index_type), 62 "version": exp.DataType.build(index_type), 63 "snapshot": exp.DataType.build("text"), 64 "kind_name": exp.DataType.build(index_type), 65 "expiration_ts": exp.DataType.build("bigint"), 66 }, 67 )
def
migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs): # type: ignore 14 engine_adapter = state_sync.engine_adapter 15 schema = state_sync.schema 16 snapshots_table = "_snapshots" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 20 migration_needed = False 21 new_snapshots = [] 22 23 for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall( 24 exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_( 25 snapshots_table 26 ), 27 quote_identifiers=True, 28 ): 29 parsed_snapshot = json.loads(snapshot) 30 python_env = parsed_snapshot["node"].get("python_env") 31 if python_env: 32 gateway = python_env.pop("gateway", None) 33 if gateway is not None: 34 migration_needed = True 35 sqlmesh_vars = {"gateway": ast.literal_eval(gateway["payload"])} 36 python_env["__sqlmesh__vars__"] = { 37 "payload": repr(sqlmesh_vars), 38 "kind": "value", 39 } 40 41 new_snapshots.append( 42 { 43 "name": name, 44 "identifier": identifier, 45 "version": version, 46 "snapshot": json.dumps(parsed_snapshot), 47 "kind_name": kind_name, 48 "expiration_ts": expiration_ts, 49 } 50 ) 51 52 if migration_needed and new_snapshots: 53 engine_adapter.delete_from(snapshots_table, "TRUE") 54 55 index_type = index_text_type(engine_adapter.dialect) 56 57 engine_adapter.insert_append( 58 snapshots_table, 59 pd.DataFrame(new_snapshots), 60 columns_to_types={ 61 "name": exp.DataType.build(index_type), 62 "identifier": exp.DataType.build(index_type), 63 "version": exp.DataType.build(index_type), 64 "snapshot": exp.DataType.build("text"), 65 "kind_name": exp.DataType.build(index_type), 66 "expiration_ts": exp.DataType.build("bigint"), 67 }, 68 )