Trim irrelevant attributes from the plan DAGs state.
1"""Trim irrelevant attributes from the plan DAGs state.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 plan_dags_table = "_plan_dags" 15 if schema: 16 plan_dags_table = f"{schema}.{plan_dags_table}" 17 18 new_dag_specs = [] 19 20 for request_id, dag_id, dag_spec in engine_adapter.fetchall( 21 exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table), 22 quote_identifiers=True, 23 ): 24 parsed_dag_spec = json.loads(dag_spec) 25 for snapshot in parsed_dag_spec.get("new_snapshots", []): 26 snapshot["node"].pop("hash_raw_query", None) 27 28 for indirect_versions in snapshot.get("indirect_versions", {}).values(): 29 for indirect_version in indirect_versions: 30 # Only keep version and change_category. 31 version = indirect_version.get("version") 32 change_category = indirect_version.get("change_category") 33 indirect_version.clear() 34 indirect_version["version"] = version 35 indirect_version["change_category"] = change_category 36 37 new_dag_specs.append( 38 { 39 "request_id": request_id, 40 "dag_id": dag_id, 41 "dag_spec": json.dumps(parsed_dag_spec), 42 } 43 ) 44 45 if new_dag_specs: 46 engine_adapter.delete_from(plan_dags_table, "TRUE") 47 48 index_type = index_text_type(engine_adapter.dialect) 49 50 engine_adapter.insert_append( 51 plan_dags_table, 52 pd.DataFrame(new_dag_specs), 53 columns_to_types={ 54 "request_id": exp.DataType.build(index_type), 55 "dag_id": exp.DataType.build(index_type), 56 "dag_spec": exp.DataType.build("text"), 57 }, 58 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 plan_dags_table = "_plan_dags" 16 if schema: 17 plan_dags_table = f"{schema}.{plan_dags_table}" 18 19 new_dag_specs = [] 20 21 for request_id, dag_id, dag_spec in engine_adapter.fetchall( 22 exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table), 23 quote_identifiers=True, 24 ): 25 parsed_dag_spec = json.loads(dag_spec) 26 for snapshot in parsed_dag_spec.get("new_snapshots", []): 27 snapshot["node"].pop("hash_raw_query", None) 28 29 for indirect_versions in snapshot.get("indirect_versions", {}).values(): 30 for indirect_version in indirect_versions: 31 # Only keep version and change_category. 32 version = indirect_version.get("version") 33 change_category = indirect_version.get("change_category") 34 indirect_version.clear() 35 indirect_version["version"] = version 36 indirect_version["change_category"] = change_category 37 38 new_dag_specs.append( 39 { 40 "request_id": request_id, 41 "dag_id": dag_id, 42 "dag_spec": json.dumps(parsed_dag_spec), 43 } 44 ) 45 46 if new_dag_specs: 47 engine_adapter.delete_from(plan_dags_table, "TRUE") 48 49 index_type = index_text_type(engine_adapter.dialect) 50 51 engine_adapter.insert_append( 52 plan_dags_table, 53 pd.DataFrame(new_dag_specs), 54 columns_to_types={ 55 "request_id": exp.DataType.build(index_type), 56 "dag_id": exp.DataType.build(index_type), 57 "dag_spec": exp.DataType.build("text"), 58 }, 59 )