Edit on GitHub

Trim irrelevant attributes from the plan DAGs state.

 1"""Trim irrelevant attributes from the plan DAGs state."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    plan_dags_table = "_plan_dags"
15    if schema:
16        plan_dags_table = f"{schema}.{plan_dags_table}"
17
18    new_dag_specs = []
19
20    for request_id, dag_id, dag_spec in engine_adapter.fetchall(
21        exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table),
22        quote_identifiers=True,
23    ):
24        parsed_dag_spec = json.loads(dag_spec)
25        for snapshot in parsed_dag_spec.get("new_snapshots", []):
26            snapshot["node"].pop("hash_raw_query", None)
27
28            for indirect_versions in snapshot.get("indirect_versions", {}).values():
29                for indirect_version in indirect_versions:
30                    # Only keep version and change_category.
31                    version = indirect_version.get("version")
32                    change_category = indirect_version.get("change_category")
33                    indirect_version.clear()
34                    indirect_version["version"] = version
35                    indirect_version["change_category"] = change_category
36
37        new_dag_specs.append(
38            {
39                "request_id": request_id,
40                "dag_id": dag_id,
41                "dag_spec": json.dumps(parsed_dag_spec),
42            }
43        )
44
45    if new_dag_specs:
46        engine_adapter.delete_from(plan_dags_table, "TRUE")
47
48        index_type = index_text_type(engine_adapter.dialect)
49
50        engine_adapter.insert_append(
51            plan_dags_table,
52            pd.DataFrame(new_dag_specs),
53            columns_to_types={
54                "request_id": exp.DataType.build(index_type),
55                "dag_id": exp.DataType.build(index_type),
56                "dag_spec": exp.DataType.build("text"),
57            },
58        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    plan_dags_table = "_plan_dags"
16    if schema:
17        plan_dags_table = f"{schema}.{plan_dags_table}"
18
19    new_dag_specs = []
20
21    for request_id, dag_id, dag_spec in engine_adapter.fetchall(
22        exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table),
23        quote_identifiers=True,
24    ):
25        parsed_dag_spec = json.loads(dag_spec)
26        for snapshot in parsed_dag_spec.get("new_snapshots", []):
27            snapshot["node"].pop("hash_raw_query", None)
28
29            for indirect_versions in snapshot.get("indirect_versions", {}).values():
30                for indirect_version in indirect_versions:
31                    # Only keep version and change_category.
32                    version = indirect_version.get("version")
33                    change_category = indirect_version.get("change_category")
34                    indirect_version.clear()
35                    indirect_version["version"] = version
36                    indirect_version["change_category"] = change_category
37
38        new_dag_specs.append(
39            {
40                "request_id": request_id,
41                "dag_id": dag_id,
42                "dag_spec": json.dumps(parsed_dag_spec),
43            }
44        )
45
46    if new_dag_specs:
47        engine_adapter.delete_from(plan_dags_table, "TRUE")
48
49        index_type = index_text_type(engine_adapter.dialect)
50
51        engine_adapter.insert_append(
52            plan_dags_table,
53            pd.DataFrame(new_dag_specs),
54            columns_to_types={
55                "request_id": exp.DataType.build(index_type),
56                "dag_id": exp.DataType.build(index_type),
57                "dag_spec": exp.DataType.build("text"),
58            },
59        )