Edit on GitHub

All migrations should be named _XXXX.py, they will be executed sequentially.

If a migration alters the payload of any pydantic models, you should not actually use them because the running model may not be able to load them. Make sure that these migration files are standalone.

 1"""All migrations should be named _XXXX.py, they will be executed sequentially.
 2
 3If a migration alters the payload of any pydantic models, you should not actually use them because
 4the running model may not be able to load them. Make sure that these migration files are standalone.
 5"""
 6
 7from sqlglot import exp
 8
 9from sqlmesh.utils.migration import index_text_type
10
11
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    environments_table = "_environments"
17    versions_table = "_versions"
18
19    if schema:
20        engine_adapter.create_schema(schema)
21        snapshots_table = f"{schema}.{snapshots_table}"
22        environments_table = f"{schema}.{environments_table}"
23        versions_table = f"{schema}.{versions_table}"
24
25    index_type = index_text_type(engine_adapter.dialect)
26
27    engine_adapter.create_state_table(
28        snapshots_table,
29        {
30            "name": exp.DataType.build(index_type),
31            "identifier": exp.DataType.build(index_type),
32            "version": exp.DataType.build(index_type),
33            "snapshot": exp.DataType.build("text"),
34        },
35        primary_key=("name", "identifier"),
36    )
37
38    engine_adapter.create_index(snapshots_table, "name_version_idx", ("name", "version"))
39
40    engine_adapter.create_state_table(
41        environments_table,
42        {
43            "name": exp.DataType.build(index_type),
44            "snapshots": exp.DataType.build("text"),
45            "start_at": exp.DataType.build("text"),
46            "end_at": exp.DataType.build("text"),
47            "plan_id": exp.DataType.build("text"),
48            "previous_plan_id": exp.DataType.build("text"),
49            "expiration_ts": exp.DataType.build("bigint"),
50        },
51        primary_key=("name",),
52    )
53
54    engine_adapter.create_state_table(
55        versions_table,
56        {
57            "schema_version": exp.DataType.build("int"),
58            "sqlglot_version": exp.DataType.build("text"),
59        },
60    )
def migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs):  # type: ignore
14    engine_adapter = state_sync.engine_adapter
15    schema = state_sync.schema
16    snapshots_table = "_snapshots"
17    environments_table = "_environments"
18    versions_table = "_versions"
19
20    if schema:
21        engine_adapter.create_schema(schema)
22        snapshots_table = f"{schema}.{snapshots_table}"
23        environments_table = f"{schema}.{environments_table}"
24        versions_table = f"{schema}.{versions_table}"
25
26    index_type = index_text_type(engine_adapter.dialect)
27
28    engine_adapter.create_state_table(
29        snapshots_table,
30        {
31            "name": exp.DataType.build(index_type),
32            "identifier": exp.DataType.build(index_type),
33            "version": exp.DataType.build(index_type),
34            "snapshot": exp.DataType.build("text"),
35        },
36        primary_key=("name", "identifier"),
37    )
38
39    engine_adapter.create_index(snapshots_table, "name_version_idx", ("name", "version"))
40
41    engine_adapter.create_state_table(
42        environments_table,
43        {
44            "name": exp.DataType.build(index_type),
45            "snapshots": exp.DataType.build("text"),
46            "start_at": exp.DataType.build("text"),
47            "end_at": exp.DataType.build("text"),
48            "plan_id": exp.DataType.build("text"),
49            "previous_plan_id": exp.DataType.build("text"),
50            "expiration_ts": exp.DataType.build("bigint"),
51        },
52        primary_key=("name",),
53    )
54
55    engine_adapter.create_state_table(
56        versions_table,
57        {
58            "schema_version": exp.DataType.build("int"),
59            "sqlglot_version": exp.DataType.build("text"),
60        },
61    )