Creates the '_plan_dags' table if Airflow is used.
1"""Creates the '_plan_dags' table if Airflow is used.""" 2 3from sqlglot import exp 4 5from sqlmesh.utils.migration import index_text_type 6 7 8def migrate(state_sync, **kwargs): # type: ignore 9 engine_adapter = state_sync.engine_adapter 10 schema = state_sync.schema 11 plan_dags_table = "_plan_dags" 12 13 if schema: 14 engine_adapter.create_schema(schema) 15 plan_dags_table = f"{schema}.{plan_dags_table}" 16 17 index_type = index_text_type(engine_adapter.dialect) 18 19 engine_adapter.create_state_table( 20 plan_dags_table, 21 { 22 "request_id": exp.DataType.build(index_type), 23 "dag_id": exp.DataType.build(index_type), 24 "dag_spec": exp.DataType.build("text"), 25 }, 26 primary_key=("request_id",), 27 ) 28 29 engine_adapter.create_index(plan_dags_table, "dag_id_idx", ("dag_id",))
def
migrate(state_sync, **kwargs):
9def migrate(state_sync, **kwargs): # type: ignore 10 engine_adapter = state_sync.engine_adapter 11 schema = state_sync.schema 12 plan_dags_table = "_plan_dags" 13 14 if schema: 15 engine_adapter.create_schema(schema) 16 plan_dags_table = f"{schema}.{plan_dags_table}" 17 18 index_type = index_text_type(engine_adapter.dialect) 19 20 engine_adapter.create_state_table( 21 plan_dags_table, 22 { 23 "request_id": exp.DataType.build(index_type), 24 "dag_id": exp.DataType.build(index_type), 25 "dag_spec": exp.DataType.build("text"), 26 }, 27 primary_key=("request_id",), 28 ) 29 30 engine_adapter.create_index(plan_dags_table, "dag_id_idx", ("dag_id",))