Edit on GitHub

Include environment in plan dag spec.

 1"""Include environment in plan dag spec."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    plan_dags_table = "_plan_dags"
15    if state_sync.schema:
16        plan_dags_table = f"{schema}.{plan_dags_table}"
17
18    new_specs = []
19
20    for request_id, dag_id, dag_spec in engine_adapter.fetchall(
21        exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table),
22        quote_identifiers=True,
23    ):
24        parsed_dag_spec = json.loads(dag_spec)
25
26        environment_naming_info = parsed_dag_spec.pop("environment_naming_info")
27        promoted_snapshots = parsed_dag_spec.pop("promoted_snapshots", [])
28        start = parsed_dag_spec.pop("start")
29        parsed_dag_spec.pop("end", None)
30        plan_id = parsed_dag_spec.pop("plan_id")
31        previous_plan_id = parsed_dag_spec.pop("previous_plan_id", None)
32        expiration_ts = parsed_dag_spec.pop("environment_expiration_ts", None)
33
34        parsed_dag_spec["environment"] = {
35            **environment_naming_info,
36            "snapshots": promoted_snapshots,
37            "start_at": start,
38            "end_at": start,
39            "plan_id": plan_id,
40            "previous_plan_id": previous_plan_id,
41            "expiration_ts": expiration_ts,
42        }
43
44        new_specs.append(
45            {
46                "request_id": request_id,
47                "dag_id": dag_id,
48                "dag_spec": json.dumps(parsed_dag_spec),
49            }
50        )
51
52    if new_specs:
53        engine_adapter.delete_from(plan_dags_table, "TRUE")
54
55        index_type = index_text_type(engine_adapter.dialect)
56
57        engine_adapter.insert_append(
58            plan_dags_table,
59            pd.DataFrame(new_specs),
60            columns_to_types={
61                "request_id": exp.DataType.build(index_type),
62                "dag_id": exp.DataType.build(index_type),
63                "dag_spec": exp.DataType.build("text"),
64            },
65        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    plan_dags_table = "_plan_dags"
16    if state_sync.schema:
17        plan_dags_table = f"{schema}.{plan_dags_table}"
18
19    new_specs = []
20
21    for request_id, dag_id, dag_spec in engine_adapter.fetchall(
22        exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table),
23        quote_identifiers=True,
24    ):
25        parsed_dag_spec = json.loads(dag_spec)
26
27        environment_naming_info = parsed_dag_spec.pop("environment_naming_info")
28        promoted_snapshots = parsed_dag_spec.pop("promoted_snapshots", [])
29        start = parsed_dag_spec.pop("start")
30        parsed_dag_spec.pop("end", None)
31        plan_id = parsed_dag_spec.pop("plan_id")
32        previous_plan_id = parsed_dag_spec.pop("previous_plan_id", None)
33        expiration_ts = parsed_dag_spec.pop("environment_expiration_ts", None)
34
35        parsed_dag_spec["environment"] = {
36            **environment_naming_info,
37            "snapshots": promoted_snapshots,
38            "start_at": start,
39            "end_at": start,
40            "plan_id": plan_id,
41            "previous_plan_id": previous_plan_id,
42            "expiration_ts": expiration_ts,
43        }
44
45        new_specs.append(
46            {
47                "request_id": request_id,
48                "dag_id": dag_id,
49                "dag_spec": json.dumps(parsed_dag_spec),
50            }
51        )
52
53    if new_specs:
54        engine_adapter.delete_from(plan_dags_table, "TRUE")
55
56        index_type = index_text_type(engine_adapter.dialect)
57
58        engine_adapter.insert_append(
59            plan_dags_table,
60            pd.DataFrame(new_specs),
61            columns_to_types={
62                "request_id": exp.DataType.build(index_type),
63                "dag_id": exp.DataType.build(index_type),
64                "dag_spec": exp.DataType.build("text"),
65            },
66        )