Edit on GitHub

Add dev_version and fingerprint columns to the snapshots table.

  1"""Add dev_version and fingerprint columns to the snapshots table."""
  2
  3import json
  4
  5from sqlglot import exp
  6
  7from sqlmesh.utils.migration import index_text_type, blob_text_type
  8
  9
 10def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 11    snapshots_table = "_snapshots"
 12    if schema:
 13        snapshots_table = f"{schema}.{snapshots_table}"
 14
 15    index_type = index_text_type(engine_adapter.dialect)
 16    blob_type = blob_text_type(engine_adapter.dialect)
 17
 18    add_dev_version_exp = exp.Alter(
 19        this=exp.to_table(snapshots_table),
 20        kind="TABLE",
 21        actions=[
 22            exp.ColumnDef(
 23                this=exp.to_column("dev_version"),
 24                kind=exp.DataType.build(index_type),
 25            )
 26        ],
 27    )
 28    engine_adapter.execute(add_dev_version_exp)
 29
 30    add_fingerprint_exp = exp.Alter(
 31        this=exp.to_table(snapshots_table),
 32        kind="TABLE",
 33        actions=[
 34            exp.ColumnDef(
 35                this=exp.to_column("fingerprint"),
 36                kind=exp.DataType.build(blob_type),
 37            )
 38        ],
 39    )
 40    engine_adapter.execute(add_fingerprint_exp)
 41
 42
 43def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 44    import pandas as pd
 45
 46    snapshots_table = "_snapshots"
 47    if schema:
 48        snapshots_table = f"{schema}.{snapshots_table}"
 49
 50    index_type = index_text_type(engine_adapter.dialect)
 51    blob_type = blob_text_type(engine_adapter.dialect)
 52
 53    new_snapshots = []
 54
 55    for (
 56        name,
 57        identifier,
 58        version,
 59        snapshot,
 60        kind_name,
 61        updated_ts,
 62        unpaused_ts,
 63        ttl_ms,
 64        unrestorable,
 65        forward_only,
 66        _,
 67        _,
 68    ) in engine_adapter.fetchall(
 69        exp.select(
 70            "name",
 71            "identifier",
 72            "version",
 73            "snapshot",
 74            "kind_name",
 75            "updated_ts",
 76            "unpaused_ts",
 77            "ttl_ms",
 78            "unrestorable",
 79            "forward_only",
 80            "dev_version",
 81            "fingerprint",
 82        ).from_(snapshots_table),
 83        quote_identifiers=True,
 84    ):
 85        parsed_snapshot = json.loads(snapshot)
 86        new_snapshots.append(
 87            {
 88                "name": name,
 89                "identifier": identifier,
 90                "version": version,
 91                "snapshot": snapshot,
 92                "kind_name": kind_name,
 93                "updated_ts": updated_ts,
 94                "unpaused_ts": unpaused_ts,
 95                "ttl_ms": ttl_ms,
 96                "unrestorable": unrestorable,
 97                "forward_only": forward_only,
 98                "dev_version": parsed_snapshot.get("dev_version"),
 99                "fingerprint": json.dumps(parsed_snapshot.get("fingerprint")),
100            }
101        )
102
103    if new_snapshots:
104        engine_adapter.delete_from(snapshots_table, "TRUE")
105
106        engine_adapter.insert_append(
107            snapshots_table,
108            pd.DataFrame(new_snapshots),
109            target_columns_to_types={
110                "name": exp.DataType.build(index_type),
111                "identifier": exp.DataType.build(index_type),
112                "version": exp.DataType.build(index_type),
113                "snapshot": exp.DataType.build(blob_type),
114                "kind_name": exp.DataType.build(index_type),
115                "updated_ts": exp.DataType.build("bigint"),
116                "unpaused_ts": exp.DataType.build("bigint"),
117                "ttl_ms": exp.DataType.build("bigint"),
118                "unrestorable": exp.DataType.build("boolean"),
119                "forward_only": exp.DataType.build("boolean"),
120                "dev_version": exp.DataType.build(index_type),
121                "fingerprint": exp.DataType.build(blob_type),
122            },
123        )
def migrate_schemas(engine_adapter, schema, **kwargs):
11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
12    snapshots_table = "_snapshots"
13    if schema:
14        snapshots_table = f"{schema}.{snapshots_table}"
15
16    index_type = index_text_type(engine_adapter.dialect)
17    blob_type = blob_text_type(engine_adapter.dialect)
18
19    add_dev_version_exp = exp.Alter(
20        this=exp.to_table(snapshots_table),
21        kind="TABLE",
22        actions=[
23            exp.ColumnDef(
24                this=exp.to_column("dev_version"),
25                kind=exp.DataType.build(index_type),
26            )
27        ],
28    )
29    engine_adapter.execute(add_dev_version_exp)
30
31    add_fingerprint_exp = exp.Alter(
32        this=exp.to_table(snapshots_table),
33        kind="TABLE",
34        actions=[
35            exp.ColumnDef(
36                this=exp.to_column("fingerprint"),
37                kind=exp.DataType.build(blob_type),
38            )
39        ],
40    )
41    engine_adapter.execute(add_fingerprint_exp)
def migrate_rows(engine_adapter, schema, **kwargs):
 44def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 45    import pandas as pd
 46
 47    snapshots_table = "_snapshots"
 48    if schema:
 49        snapshots_table = f"{schema}.{snapshots_table}"
 50
 51    index_type = index_text_type(engine_adapter.dialect)
 52    blob_type = blob_text_type(engine_adapter.dialect)
 53
 54    new_snapshots = []
 55
 56    for (
 57        name,
 58        identifier,
 59        version,
 60        snapshot,
 61        kind_name,
 62        updated_ts,
 63        unpaused_ts,
 64        ttl_ms,
 65        unrestorable,
 66        forward_only,
 67        _,
 68        _,
 69    ) in engine_adapter.fetchall(
 70        exp.select(
 71            "name",
 72            "identifier",
 73            "version",
 74            "snapshot",
 75            "kind_name",
 76            "updated_ts",
 77            "unpaused_ts",
 78            "ttl_ms",
 79            "unrestorable",
 80            "forward_only",
 81            "dev_version",
 82            "fingerprint",
 83        ).from_(snapshots_table),
 84        quote_identifiers=True,
 85    ):
 86        parsed_snapshot = json.loads(snapshot)
 87        new_snapshots.append(
 88            {
 89                "name": name,
 90                "identifier": identifier,
 91                "version": version,
 92                "snapshot": snapshot,
 93                "kind_name": kind_name,
 94                "updated_ts": updated_ts,
 95                "unpaused_ts": unpaused_ts,
 96                "ttl_ms": ttl_ms,
 97                "unrestorable": unrestorable,
 98                "forward_only": forward_only,
 99                "dev_version": parsed_snapshot.get("dev_version"),
100                "fingerprint": json.dumps(parsed_snapshot.get("fingerprint")),
101            }
102        )
103
104    if new_snapshots:
105        engine_adapter.delete_from(snapshots_table, "TRUE")
106
107        engine_adapter.insert_append(
108            snapshots_table,
109            pd.DataFrame(new_snapshots),
110            target_columns_to_types={
111                "name": exp.DataType.build(index_type),
112                "identifier": exp.DataType.build(index_type),
113                "version": exp.DataType.build(index_type),
114                "snapshot": exp.DataType.build(blob_type),
115                "kind_name": exp.DataType.build(index_type),
116                "updated_ts": exp.DataType.build("bigint"),
117                "unpaused_ts": exp.DataType.build("bigint"),
118                "ttl_ms": exp.DataType.build("bigint"),
119                "unrestorable": exp.DataType.build("boolean"),
120                "forward_only": exp.DataType.build("boolean"),
121                "dev_version": exp.DataType.build(index_type),
122                "fingerprint": exp.DataType.build(blob_type),
123            },
124        )