Edit on GitHub

Remove validate_query from existing snapshots.

 1"""Remove validate_query from existing snapshots."""
 2
 3import json
 4
 5from sqlglot import exp
 6
 7from sqlmesh.utils.migration import index_text_type
 8from sqlmesh.utils.migration import blob_text_type
 9
10
11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
12    pass
13
14
15def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
16    import pandas as pd
17
18    snapshots_table = "_snapshots"
19    index_type = index_text_type(engine_adapter.dialect)
20    if schema:
21        snapshots_table = f"{schema}.{snapshots_table}"
22
23    new_snapshots = []
24
25    for (
26        name,
27        identifier,
28        version,
29        snapshot,
30        kind_name,
31        updated_ts,
32        unpaused_ts,
33        ttl_ms,
34        unrestorable,
35    ) in engine_adapter.fetchall(
36        exp.select(
37            "name",
38            "identifier",
39            "version",
40            "snapshot",
41            "kind_name",
42            "updated_ts",
43            "unpaused_ts",
44            "ttl_ms",
45            "unrestorable",
46        ).from_(snapshots_table),
47        quote_identifiers=True,
48    ):
49        parsed_snapshot = json.loads(snapshot)
50
51        parsed_snapshot["node"].pop("validate_query", None)
52
53        new_snapshots.append(
54            {
55                "name": name,
56                "identifier": identifier,
57                "version": version,
58                "snapshot": json.dumps(parsed_snapshot),
59                "kind_name": kind_name,
60                "updated_ts": updated_ts,
61                "unpaused_ts": unpaused_ts,
62                "ttl_ms": ttl_ms,
63                "unrestorable": unrestorable,
64            }
65        )
66
67    if new_snapshots:
68        engine_adapter.delete_from(snapshots_table, "TRUE")
69        blob_type = blob_text_type(engine_adapter.dialect)
70
71        engine_adapter.insert_append(
72            snapshots_table,
73            pd.DataFrame(new_snapshots),
74            target_columns_to_types={
75                "name": exp.DataType.build(index_type),
76                "identifier": exp.DataType.build(index_type),
77                "version": exp.DataType.build(index_type),
78                "snapshot": exp.DataType.build(blob_type),
79                "kind_name": exp.DataType.build(index_type),
80                "updated_ts": exp.DataType.build("bigint"),
81                "unpaused_ts": exp.DataType.build("bigint"),
82                "ttl_ms": exp.DataType.build("bigint"),
83                "unrestorable": exp.DataType.build("boolean"),
84            },
85        )
def migrate_schemas(engine_adapter, schema, **kwargs):
12def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
13    pass
def migrate_rows(engine_adapter, schema, **kwargs):
16def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
17    import pandas as pd
18
19    snapshots_table = "_snapshots"
20    index_type = index_text_type(engine_adapter.dialect)
21    if schema:
22        snapshots_table = f"{schema}.{snapshots_table}"
23
24    new_snapshots = []
25
26    for (
27        name,
28        identifier,
29        version,
30        snapshot,
31        kind_name,
32        updated_ts,
33        unpaused_ts,
34        ttl_ms,
35        unrestorable,
36    ) in engine_adapter.fetchall(
37        exp.select(
38            "name",
39            "identifier",
40            "version",
41            "snapshot",
42            "kind_name",
43            "updated_ts",
44            "unpaused_ts",
45            "ttl_ms",
46            "unrestorable",
47        ).from_(snapshots_table),
48        quote_identifiers=True,
49    ):
50        parsed_snapshot = json.loads(snapshot)
51
52        parsed_snapshot["node"].pop("validate_query", None)
53
54        new_snapshots.append(
55            {
56                "name": name,
57                "identifier": identifier,
58                "version": version,
59                "snapshot": json.dumps(parsed_snapshot),
60                "kind_name": kind_name,
61                "updated_ts": updated_ts,
62                "unpaused_ts": unpaused_ts,
63                "ttl_ms": ttl_ms,
64                "unrestorable": unrestorable,
65            }
66        )
67
68    if new_snapshots:
69        engine_adapter.delete_from(snapshots_table, "TRUE")
70        blob_type = blob_text_type(engine_adapter.dialect)
71
72        engine_adapter.insert_append(
73            snapshots_table,
74            pd.DataFrame(new_snapshots),
75            target_columns_to_types={
76                "name": exp.DataType.build(index_type),
77                "identifier": exp.DataType.build(index_type),
78                "version": exp.DataType.build(index_type),
79                "snapshot": exp.DataType.build(blob_type),
80                "kind_name": exp.DataType.build(index_type),
81                "updated_ts": exp.DataType.build("bigint"),
82                "unpaused_ts": exp.DataType.build("bigint"),
83                "ttl_ms": exp.DataType.build("bigint"),
84                "unrestorable": exp.DataType.build("boolean"),
85            },
86        )