Include environment in plan dag spec.
1"""Include environment in plan dag spec.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 plan_dags_table = "_plan_dags" 15 if state_sync.schema: 16 plan_dags_table = f"{schema}.{plan_dags_table}" 17 18 new_specs = [] 19 20 for request_id, dag_id, dag_spec in engine_adapter.fetchall( 21 exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table), 22 quote_identifiers=True, 23 ): 24 parsed_dag_spec = json.loads(dag_spec) 25 26 environment_naming_info = parsed_dag_spec.pop("environment_naming_info") 27 promoted_snapshots = parsed_dag_spec.pop("promoted_snapshots", []) 28 start = parsed_dag_spec.pop("start") 29 parsed_dag_spec.pop("end", None) 30 plan_id = parsed_dag_spec.pop("plan_id") 31 previous_plan_id = parsed_dag_spec.pop("previous_plan_id", None) 32 expiration_ts = parsed_dag_spec.pop("environment_expiration_ts", None) 33 34 parsed_dag_spec["environment"] = { 35 **environment_naming_info, 36 "snapshots": promoted_snapshots, 37 "start_at": start, 38 "end_at": start, 39 "plan_id": plan_id, 40 "previous_plan_id": previous_plan_id, 41 "expiration_ts": expiration_ts, 42 } 43 44 new_specs.append( 45 { 46 "request_id": request_id, 47 "dag_id": dag_id, 48 "dag_spec": json.dumps(parsed_dag_spec), 49 } 50 ) 51 52 if new_specs: 53 engine_adapter.delete_from(plan_dags_table, "TRUE") 54 55 index_type = index_text_type(engine_adapter.dialect) 56 57 engine_adapter.insert_append( 58 plan_dags_table, 59 pd.DataFrame(new_specs), 60 columns_to_types={ 61 "request_id": exp.DataType.build(index_type), 62 "dag_id": exp.DataType.build(index_type), 63 "dag_spec": exp.DataType.build("text"), 64 }, 65 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 plan_dags_table = "_plan_dags" 16 if state_sync.schema: 17 plan_dags_table = f"{schema}.{plan_dags_table}" 18 19 new_specs = [] 20 21 for request_id, dag_id, dag_spec in engine_adapter.fetchall( 22 exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table), 23 quote_identifiers=True, 24 ): 25 parsed_dag_spec = json.loads(dag_spec) 26 27 environment_naming_info = parsed_dag_spec.pop("environment_naming_info") 28 promoted_snapshots = parsed_dag_spec.pop("promoted_snapshots", []) 29 start = parsed_dag_spec.pop("start") 30 parsed_dag_spec.pop("end", None) 31 plan_id = parsed_dag_spec.pop("plan_id") 32 previous_plan_id = parsed_dag_spec.pop("previous_plan_id", None) 33 expiration_ts = parsed_dag_spec.pop("environment_expiration_ts", None) 34 35 parsed_dag_spec["environment"] = { 36 **environment_naming_info, 37 "snapshots": promoted_snapshots, 38 "start_at": start, 39 "end_at": start, 40 "plan_id": plan_id, 41 "previous_plan_id": previous_plan_id, 42 "expiration_ts": expiration_ts, 43 } 44 45 new_specs.append( 46 { 47 "request_id": request_id, 48 "dag_id": dag_id, 49 "dag_spec": json.dumps(parsed_dag_spec), 50 } 51 ) 52 53 if new_specs: 54 engine_adapter.delete_from(plan_dags_table, "TRUE") 55 56 index_type = index_text_type(engine_adapter.dialect) 57 58 engine_adapter.insert_append( 59 plan_dags_table, 60 pd.DataFrame(new_specs), 61 columns_to_types={ 62 "request_id": exp.DataType.build(index_type), 63 "dag_id": exp.DataType.build(index_type), 64 "dag_spec": exp.DataType.build("text"), 65 }, 66 )