Edit on GitHub

Add the auto restatements table.

 1"""Add the auto restatements table."""
 2
 3from sqlglot import exp
 4
 5from sqlmesh.utils.migration import index_text_type
 6
 7
 8def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 9    auto_restatements_table = "_auto_restatements"
10    intervals_table = "_intervals"
11
12    if schema:
13        auto_restatements_table = f"{schema}.{auto_restatements_table}"
14        intervals_table = f"{schema}.{intervals_table}"
15
16    index_type = index_text_type(engine_adapter.dialect)
17
18    engine_adapter.create_state_table(
19        auto_restatements_table,
20        {
21            "snapshot_name": exp.DataType.build(index_type),
22            "snapshot_version": exp.DataType.build(index_type),
23            "next_auto_restatement_ts": exp.DataType.build("bigint"),
24        },
25        primary_key=("snapshot_name", "snapshot_version"),
26    )
27
28    alter_table_exp = exp.Alter(
29        this=exp.to_table(intervals_table),
30        kind="TABLE",
31        actions=[
32            exp.ColumnDef(
33                this=exp.to_column("is_pending_restatement"),
34                kind=exp.DataType.build("boolean"),
35            )
36        ],
37    )
38    engine_adapter.execute(alter_table_exp)
39
40
41def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
42    intervals_table = "_intervals"
43
44    if schema:
45        intervals_table = f"{schema}.{intervals_table}"
46
47    engine_adapter.update_table(
48        intervals_table,
49        {"is_pending_restatement": False},
50        where=exp.true(),
51    )
def migrate_schemas(engine_adapter, schema, **kwargs):
 9def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
10    auto_restatements_table = "_auto_restatements"
11    intervals_table = "_intervals"
12
13    if schema:
14        auto_restatements_table = f"{schema}.{auto_restatements_table}"
15        intervals_table = f"{schema}.{intervals_table}"
16
17    index_type = index_text_type(engine_adapter.dialect)
18
19    engine_adapter.create_state_table(
20        auto_restatements_table,
21        {
22            "snapshot_name": exp.DataType.build(index_type),
23            "snapshot_version": exp.DataType.build(index_type),
24            "next_auto_restatement_ts": exp.DataType.build("bigint"),
25        },
26        primary_key=("snapshot_name", "snapshot_version"),
27    )
28
29    alter_table_exp = exp.Alter(
30        this=exp.to_table(intervals_table),
31        kind="TABLE",
32        actions=[
33            exp.ColumnDef(
34                this=exp.to_column("is_pending_restatement"),
35                kind=exp.DataType.build("boolean"),
36            )
37        ],
38    )
39    engine_adapter.execute(alter_table_exp)
def migrate_rows(engine_adapter, schema, **kwargs):
42def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
43    intervals_table = "_intervals"
44
45    if schema:
46        intervals_table = f"{schema}.{intervals_table}"
47
48    engine_adapter.update_table(
49        intervals_table,
50        {"is_pending_restatement": False},
51        where=exp.true(),
52    )