Edit on GitHub

Add the expiration_ts column to the snapshots table.

 1"""Add the expiration_ts column to the snapshots table."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.date import to_datetime, to_timestamp
 9from sqlmesh.utils.migration import index_text_type
10
11
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    index_type = index_text_type(engine_adapter.dialect)
20
21    alter_table_exp = exp.AlterTable(
22        this=exp.to_table(snapshots_table),
23        actions=[
24            exp.ColumnDef(
25                this=exp.to_column("expiration_ts"),
26                kind=exp.DataType.build("bigint"),
27            )
28        ],
29    )
30    engine_adapter.execute(alter_table_exp)
31
32    new_snapshots = []
33
34    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
35        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
36        quote_identifiers=True,
37    ):
38        parsed_snapshot = json.loads(snapshot)
39
40        updated_ts = parsed_snapshot["updated_ts"]
41        ttl = parsed_snapshot["ttl"]
42        expiration_ts = to_timestamp(ttl, relative_base=to_datetime(updated_ts))
43
44        new_snapshots.append(
45            {
46                "name": name,
47                "identifier": identifier,
48                "version": version,
49                "snapshot": snapshot,
50                "kind_name": kind_name,
51                "expiration_ts": expiration_ts,
52            }
53        )
54
55    if new_snapshots:
56        engine_adapter.delete_from(snapshots_table, "TRUE")
57
58        engine_adapter.insert_append(
59            snapshots_table,
60            pd.DataFrame(new_snapshots),
61            columns_to_types={
62                "name": exp.DataType.build(index_type),
63                "identifier": exp.DataType.build(index_type),
64                "version": exp.DataType.build(index_type),
65                "snapshot": exp.DataType.build("text"),
66                "kind_name": exp.DataType.build(index_type),
67                "expiration_ts": exp.DataType.build("bigint"),
68            },
69        )
def migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs):  # type: ignore
14    engine_adapter = state_sync.engine_adapter
15    schema = state_sync.schema
16    snapshots_table = "_snapshots"
17    if schema:
18        snapshots_table = f"{schema}.{snapshots_table}"
19
20    index_type = index_text_type(engine_adapter.dialect)
21
22    alter_table_exp = exp.AlterTable(
23        this=exp.to_table(snapshots_table),
24        actions=[
25            exp.ColumnDef(
26                this=exp.to_column("expiration_ts"),
27                kind=exp.DataType.build("bigint"),
28            )
29        ],
30    )
31    engine_adapter.execute(alter_table_exp)
32
33    new_snapshots = []
34
35    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
36        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
37        quote_identifiers=True,
38    ):
39        parsed_snapshot = json.loads(snapshot)
40
41        updated_ts = parsed_snapshot["updated_ts"]
42        ttl = parsed_snapshot["ttl"]
43        expiration_ts = to_timestamp(ttl, relative_base=to_datetime(updated_ts))
44
45        new_snapshots.append(
46            {
47                "name": name,
48                "identifier": identifier,
49                "version": version,
50                "snapshot": snapshot,
51                "kind_name": kind_name,
52                "expiration_ts": expiration_ts,
53            }
54        )
55
56    if new_snapshots:
57        engine_adapter.delete_from(snapshots_table, "TRUE")
58
59        engine_adapter.insert_append(
60            snapshots_table,
61            pd.DataFrame(new_snapshots),
62            columns_to_types={
63                "name": exp.DataType.build(index_type),
64                "identifier": exp.DataType.build(index_type),
65                "version": exp.DataType.build(index_type),
66                "snapshot": exp.DataType.build("text"),
67                "kind_name": exp.DataType.build(index_type),
68                "expiration_ts": exp.DataType.build("bigint"),
69            },
70        )