Edit on GitHub

Add forward_only column to the snapshots table.

  1"""Add forward_only column to the snapshots table."""
  2
  3import json
  4
  5from sqlglot import exp
  6
  7from sqlmesh.utils.migration import index_text_type, blob_text_type
  8
  9
 10def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 11    snapshots_table = "_snapshots"
 12    if schema:
 13        snapshots_table = f"{schema}.{snapshots_table}"
 14
 15    alter_table_exp = exp.Alter(
 16        this=exp.to_table(snapshots_table),
 17        kind="TABLE",
 18        actions=[
 19            exp.ColumnDef(
 20                this=exp.to_column("forward_only"),
 21                kind=exp.DataType.build("boolean"),
 22            )
 23        ],
 24    )
 25    engine_adapter.execute(alter_table_exp)
 26
 27
 28def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 29    import pandas as pd
 30
 31    snapshots_table = "_snapshots"
 32    if schema:
 33        snapshots_table = f"{schema}.{snapshots_table}"
 34
 35    new_snapshots = []
 36
 37    for (
 38        name,
 39        identifier,
 40        version,
 41        snapshot,
 42        kind_name,
 43        updated_ts,
 44        unpaused_ts,
 45        ttl_ms,
 46        unrestorable,
 47        forward_only,
 48    ) in engine_adapter.fetchall(
 49        exp.select(
 50            "name",
 51            "identifier",
 52            "version",
 53            "snapshot",
 54            "kind_name",
 55            "updated_ts",
 56            "unpaused_ts",
 57            "ttl_ms",
 58            "unrestorable",
 59            "forward_only",
 60        ).from_(snapshots_table),
 61        quote_identifiers=True,
 62    ):
 63        parsed_snapshot = json.loads(snapshot)
 64
 65        forward_only = parsed_snapshot.get("forward_only")
 66        if forward_only is None:
 67            forward_only = parsed_snapshot.get("change_category") == 3
 68
 69        new_snapshots.append(
 70            {
 71                "name": name,
 72                "identifier": identifier,
 73                "version": version,
 74                "snapshot": json.dumps(parsed_snapshot),
 75                "kind_name": kind_name,
 76                "updated_ts": updated_ts,
 77                "unpaused_ts": unpaused_ts,
 78                "ttl_ms": ttl_ms,
 79                "unrestorable": unrestorable,
 80                "forward_only": forward_only,
 81            }
 82        )
 83
 84    if new_snapshots:
 85        engine_adapter.delete_from(snapshots_table, "TRUE")
 86        index_type = index_text_type(engine_adapter.dialect)
 87        blob_type = blob_text_type(engine_adapter.dialect)
 88
 89        engine_adapter.insert_append(
 90            snapshots_table,
 91            pd.DataFrame(new_snapshots),
 92            target_columns_to_types={
 93                "name": exp.DataType.build(index_type),
 94                "identifier": exp.DataType.build(index_type),
 95                "version": exp.DataType.build(index_type),
 96                "snapshot": exp.DataType.build(blob_type),
 97                "kind_name": exp.DataType.build(index_type),
 98                "updated_ts": exp.DataType.build("bigint"),
 99                "unpaused_ts": exp.DataType.build("bigint"),
100                "ttl_ms": exp.DataType.build("bigint"),
101                "unrestorable": exp.DataType.build("boolean"),
102                "forward_only": exp.DataType.build("boolean"),
103            },
104        )
def migrate_schemas(engine_adapter, schema, **kwargs):
11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
12    snapshots_table = "_snapshots"
13    if schema:
14        snapshots_table = f"{schema}.{snapshots_table}"
15
16    alter_table_exp = exp.Alter(
17        this=exp.to_table(snapshots_table),
18        kind="TABLE",
19        actions=[
20            exp.ColumnDef(
21                this=exp.to_column("forward_only"),
22                kind=exp.DataType.build("boolean"),
23            )
24        ],
25    )
26    engine_adapter.execute(alter_table_exp)
def migrate_rows(engine_adapter, schema, **kwargs):
 29def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 30    import pandas as pd
 31
 32    snapshots_table = "_snapshots"
 33    if schema:
 34        snapshots_table = f"{schema}.{snapshots_table}"
 35
 36    new_snapshots = []
 37
 38    for (
 39        name,
 40        identifier,
 41        version,
 42        snapshot,
 43        kind_name,
 44        updated_ts,
 45        unpaused_ts,
 46        ttl_ms,
 47        unrestorable,
 48        forward_only,
 49    ) in engine_adapter.fetchall(
 50        exp.select(
 51            "name",
 52            "identifier",
 53            "version",
 54            "snapshot",
 55            "kind_name",
 56            "updated_ts",
 57            "unpaused_ts",
 58            "ttl_ms",
 59            "unrestorable",
 60            "forward_only",
 61        ).from_(snapshots_table),
 62        quote_identifiers=True,
 63    ):
 64        parsed_snapshot = json.loads(snapshot)
 65
 66        forward_only = parsed_snapshot.get("forward_only")
 67        if forward_only is None:
 68            forward_only = parsed_snapshot.get("change_category") == 3
 69
 70        new_snapshots.append(
 71            {
 72                "name": name,
 73                "identifier": identifier,
 74                "version": version,
 75                "snapshot": json.dumps(parsed_snapshot),
 76                "kind_name": kind_name,
 77                "updated_ts": updated_ts,
 78                "unpaused_ts": unpaused_ts,
 79                "ttl_ms": ttl_ms,
 80                "unrestorable": unrestorable,
 81                "forward_only": forward_only,
 82            }
 83        )
 84
 85    if new_snapshots:
 86        engine_adapter.delete_from(snapshots_table, "TRUE")
 87        index_type = index_text_type(engine_adapter.dialect)
 88        blob_type = blob_text_type(engine_adapter.dialect)
 89
 90        engine_adapter.insert_append(
 91            snapshots_table,
 92            pd.DataFrame(new_snapshots),
 93            target_columns_to_types={
 94                "name": exp.DataType.build(index_type),
 95                "identifier": exp.DataType.build(index_type),
 96                "version": exp.DataType.build(index_type),
 97                "snapshot": exp.DataType.build(blob_type),
 98                "kind_name": exp.DataType.build(index_type),
 99                "updated_ts": exp.DataType.build("bigint"),
100                "unpaused_ts": exp.DataType.build("bigint"),
101                "ttl_ms": exp.DataType.build("bigint"),
102                "unrestorable": exp.DataType.build("boolean"),
103                "forward_only": exp.DataType.build("boolean"),
104            },
105        )