Edit on GitHub

Creates the '_plan_dags' table if Airflow is used.

 1"""Creates the '_plan_dags' table if Airflow is used."""
 2
 3from sqlglot import exp
 4
 5from sqlmesh.utils.migration import index_text_type
 6
 7
 8def migrate(state_sync, **kwargs):  # type: ignore
 9    engine_adapter = state_sync.engine_adapter
10    schema = state_sync.schema
11    plan_dags_table = "_plan_dags"
12
13    if schema:
14        engine_adapter.create_schema(schema)
15        plan_dags_table = f"{schema}.{plan_dags_table}"
16
17    index_type = index_text_type(engine_adapter.dialect)
18
19    engine_adapter.create_state_table(
20        plan_dags_table,
21        {
22            "request_id": exp.DataType.build(index_type),
23            "dag_id": exp.DataType.build(index_type),
24            "dag_spec": exp.DataType.build("text"),
25        },
26        primary_key=("request_id",),
27    )
28
29    engine_adapter.create_index(plan_dags_table, "dag_id_idx", ("dag_id",))
def migrate(state_sync, **kwargs):
 9def migrate(state_sync, **kwargs):  # type: ignore
10    engine_adapter = state_sync.engine_adapter
11    schema = state_sync.schema
12    plan_dags_table = "_plan_dags"
13
14    if schema:
15        engine_adapter.create_schema(schema)
16        plan_dags_table = f"{schema}.{plan_dags_table}"
17
18    index_type = index_text_type(engine_adapter.dialect)
19
20    engine_adapter.create_state_table(
21        plan_dags_table,
22        {
23            "request_id": exp.DataType.build(index_type),
24            "dag_id": exp.DataType.build(index_type),
25            "dag_spec": exp.DataType.build("text"),
26        },
27        primary_key=("request_id",),
28    )
29
30    engine_adapter.create_index(plan_dags_table, "dag_id_idx", ("dag_id",))