Edit on GitHub

Create a dedicated table to store snapshot intervals.

 1"""Create a dedicated table to store snapshot intervals."""
 2
 3from sqlglot import exp
 4
 5from sqlmesh.utils.migration import index_text_type
 6
 7
 8def migrate(state_sync, **kwargs):  # type: ignore
 9    engine_adapter = state_sync.engine_adapter
10    intervals_table = "_intervals"
11    if state_sync.schema:
12        intervals_table = f"{state_sync.schema}.{intervals_table}"
13
14    index_type = index_text_type(engine_adapter.dialect)
15
16    engine_adapter.create_state_table(
17        intervals_table,
18        {
19            "id": exp.DataType.build(index_type),
20            "created_ts": exp.DataType.build("bigint"),
21            "name": exp.DataType.build(index_type),
22            "identifier": exp.DataType.build(index_type),
23            "version": exp.DataType.build(index_type),
24            "start_ts": exp.DataType.build("bigint"),
25            "end_ts": exp.DataType.build("bigint"),
26            "is_dev": exp.DataType.build("boolean"),
27            "is_removed": exp.DataType.build("boolean"),
28            "is_compacted": exp.DataType.build("boolean"),
29        },
30        primary_key=("id",),
31    )
32
33    engine_adapter.create_index(
34        intervals_table, "name_version_idx", ("name", "version", "created_ts")
35    )
36    engine_adapter.create_index(
37        intervals_table, "name_identifier_idx", ("name", "identifier", "created_ts")
38    )
def migrate(state_sync, **kwargs):
 9def migrate(state_sync, **kwargs):  # type: ignore
10    engine_adapter = state_sync.engine_adapter
11    intervals_table = "_intervals"
12    if state_sync.schema:
13        intervals_table = f"{state_sync.schema}.{intervals_table}"
14
15    index_type = index_text_type(engine_adapter.dialect)
16
17    engine_adapter.create_state_table(
18        intervals_table,
19        {
20            "id": exp.DataType.build(index_type),
21            "created_ts": exp.DataType.build("bigint"),
22            "name": exp.DataType.build(index_type),
23            "identifier": exp.DataType.build(index_type),
24            "version": exp.DataType.build(index_type),
25            "start_ts": exp.DataType.build("bigint"),
26            "end_ts": exp.DataType.build("bigint"),
27            "is_dev": exp.DataType.build("boolean"),
28            "is_removed": exp.DataType.build("boolean"),
29            "is_compacted": exp.DataType.build("boolean"),
30        },
31        primary_key=("id",),
32    )
33
34    engine_adapter.create_index(
35        intervals_table, "name_version_idx", ("name", "version", "created_ts")
36    )
37    engine_adapter.create_index(
38        intervals_table, "name_identifier_idx", ("name", "identifier", "created_ts")
39    )