Edit on GitHub

Remove disable restatement from external and embedded models.

 1"""Remove disable restatement from external and embedded models."""
 2
 3import json
 4
 5from sqlglot import exp
 6from sqlmesh.utils.migration import index_text_type, blob_text_type
 7
 8
 9def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
10    pass
11
12
13def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
14    import pandas as pd
15
16    snapshots_table = "_snapshots"
17    if schema:
18        snapshots_table = f"{schema}.{snapshots_table}"
19
20    index_type = index_text_type(engine_adapter.dialect)
21    blob_type = blob_text_type(engine_adapter.dialect)
22    snapshots_columns_to_types = {
23        "name": exp.DataType.build(index_type),
24        "identifier": exp.DataType.build(index_type),
25        "version": exp.DataType.build(index_type),
26        "snapshot": exp.DataType.build(blob_type),
27        "kind_name": exp.DataType.build(index_type),
28        "updated_ts": exp.DataType.build("bigint"),
29        "unpaused_ts": exp.DataType.build("bigint"),
30        "ttl_ms": exp.DataType.build("bigint"),
31        "unrestorable": exp.DataType.build("boolean"),
32    }
33
34    new_snapshots = []
35    for (
36        name,
37        identifier,
38        version,
39        snapshot,
40        kind_name,
41        updated_ts,
42        unpaused_ts,
43        ttl_ms,
44        unrestorable,
45    ) in engine_adapter.fetchall(
46        exp.select(*snapshots_columns_to_types).from_(snapshots_table),
47        quote_identifiers=True,
48    ):
49        parsed_snapshot = json.loads(snapshot)
50        kind = parsed_snapshot["node"].get("kind")
51
52        if kind and kind_name in ("EMBEDDED", "EXTERNAL"):
53            kind.pop("disable_restatement", None)
54
55        new_snapshots.append(
56            {
57                "name": name,
58                "identifier": identifier,
59                "version": version,
60                "snapshot": json.dumps(parsed_snapshot),
61                "kind_name": kind_name,
62                "updated_ts": updated_ts,
63                "unpaused_ts": unpaused_ts,
64                "ttl_ms": ttl_ms,
65                "unrestorable": unrestorable,
66            }
67        )
68
69    if new_snapshots:
70        engine_adapter.delete_from(snapshots_table, "TRUE")
71        engine_adapter.insert_append(
72            snapshots_table,
73            pd.DataFrame(new_snapshots),
74            target_columns_to_types=snapshots_columns_to_types,
75        )
def migrate_schemas(engine_adapter, schema, **kwargs):
10def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
11    pass
def migrate_rows(engine_adapter, schema, **kwargs):
14def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
15    import pandas as pd
16
17    snapshots_table = "_snapshots"
18    if schema:
19        snapshots_table = f"{schema}.{snapshots_table}"
20
21    index_type = index_text_type(engine_adapter.dialect)
22    blob_type = blob_text_type(engine_adapter.dialect)
23    snapshots_columns_to_types = {
24        "name": exp.DataType.build(index_type),
25        "identifier": exp.DataType.build(index_type),
26        "version": exp.DataType.build(index_type),
27        "snapshot": exp.DataType.build(blob_type),
28        "kind_name": exp.DataType.build(index_type),
29        "updated_ts": exp.DataType.build("bigint"),
30        "unpaused_ts": exp.DataType.build("bigint"),
31        "ttl_ms": exp.DataType.build("bigint"),
32        "unrestorable": exp.DataType.build("boolean"),
33    }
34
35    new_snapshots = []
36    for (
37        name,
38        identifier,
39        version,
40        snapshot,
41        kind_name,
42        updated_ts,
43        unpaused_ts,
44        ttl_ms,
45        unrestorable,
46    ) in engine_adapter.fetchall(
47        exp.select(*snapshots_columns_to_types).from_(snapshots_table),
48        quote_identifiers=True,
49    ):
50        parsed_snapshot = json.loads(snapshot)
51        kind = parsed_snapshot["node"].get("kind")
52
53        if kind and kind_name in ("EMBEDDED", "EXTERNAL"):
54            kind.pop("disable_restatement", None)
55
56        new_snapshots.append(
57            {
58                "name": name,
59                "identifier": identifier,
60                "version": version,
61                "snapshot": json.dumps(parsed_snapshot),
62                "kind_name": kind_name,
63                "updated_ts": updated_ts,
64                "unpaused_ts": unpaused_ts,
65                "ttl_ms": ttl_ms,
66                "unrestorable": unrestorable,
67            }
68        )
69
70    if new_snapshots:
71        engine_adapter.delete_from(snapshots_table, "TRUE")
72        engine_adapter.insert_append(
73            snapshots_table,
74            pd.DataFrame(new_snapshots),
75            target_columns_to_types=snapshots_columns_to_types,
76        )