Edit on GitHub

Join list of WHEN [NOT] MATCHED strings into a single string.

 1"""Join list of `WHEN [NOT] MATCHED` strings into a single string."""
 2
 3import json
 4
 5from sqlglot import exp
 6
 7from sqlmesh.utils.migration import index_text_type, blob_text_type
 8
 9
10def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
11    pass
12
13
14def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
15    import pandas as pd
16
17    snapshots_table = "_snapshots"
18    index_type = index_text_type(engine_adapter.dialect)
19    if schema:
20        snapshots_table = f"{schema}.{snapshots_table}"
21
22    new_snapshots = []
23
24    for (
25        name,
26        identifier,
27        version,
28        snapshot,
29        kind_name,
30        updated_ts,
31        unpaused_ts,
32        ttl_ms,
33        unrestorable,
34    ) in engine_adapter.fetchall(
35        exp.select(
36            "name",
37            "identifier",
38            "version",
39            "snapshot",
40            "kind_name",
41            "updated_ts",
42            "unpaused_ts",
43            "ttl_ms",
44            "unrestorable",
45        ).from_(snapshots_table),
46        quote_identifiers=True,
47    ):
48        parsed_snapshot = json.loads(snapshot)
49        node = parsed_snapshot["node"]
50
51        kind = node.get("kind")
52        if kind and isinstance(when_matched := kind.get("when_matched"), list):
53            kind["when_matched"] = " ".join(when_matched)
54
55        new_snapshots.append(
56            {
57                "name": name,
58                "identifier": identifier,
59                "version": version,
60                "snapshot": json.dumps(parsed_snapshot),
61                "kind_name": kind_name,
62                "updated_ts": updated_ts,
63                "unpaused_ts": unpaused_ts,
64                "ttl_ms": ttl_ms,
65                "unrestorable": unrestorable,
66            }
67        )
68
69    if new_snapshots:
70        engine_adapter.delete_from(snapshots_table, "TRUE")
71        blob_type = blob_text_type(engine_adapter.dialect)
72
73        engine_adapter.insert_append(
74            snapshots_table,
75            pd.DataFrame(new_snapshots),
76            target_columns_to_types={
77                "name": exp.DataType.build(index_type),
78                "identifier": exp.DataType.build(index_type),
79                "version": exp.DataType.build(index_type),
80                "snapshot": exp.DataType.build(blob_type),
81                "kind_name": exp.DataType.build(index_type),
82                "updated_ts": exp.DataType.build("bigint"),
83                "unpaused_ts": exp.DataType.build("bigint"),
84                "ttl_ms": exp.DataType.build("bigint"),
85                "unrestorable": exp.DataType.build("boolean"),
86            },
87        )
def migrate_schemas(engine_adapter, schema, **kwargs):
11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
12    pass
def migrate_rows(engine_adapter, schema, **kwargs):
15def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
16    import pandas as pd
17
18    snapshots_table = "_snapshots"
19    index_type = index_text_type(engine_adapter.dialect)
20    if schema:
21        snapshots_table = f"{schema}.{snapshots_table}"
22
23    new_snapshots = []
24
25    for (
26        name,
27        identifier,
28        version,
29        snapshot,
30        kind_name,
31        updated_ts,
32        unpaused_ts,
33        ttl_ms,
34        unrestorable,
35    ) in engine_adapter.fetchall(
36        exp.select(
37            "name",
38            "identifier",
39            "version",
40            "snapshot",
41            "kind_name",
42            "updated_ts",
43            "unpaused_ts",
44            "ttl_ms",
45            "unrestorable",
46        ).from_(snapshots_table),
47        quote_identifiers=True,
48    ):
49        parsed_snapshot = json.loads(snapshot)
50        node = parsed_snapshot["node"]
51
52        kind = node.get("kind")
53        if kind and isinstance(when_matched := kind.get("when_matched"), list):
54            kind["when_matched"] = " ".join(when_matched)
55
56        new_snapshots.append(
57            {
58                "name": name,
59                "identifier": identifier,
60                "version": version,
61                "snapshot": json.dumps(parsed_snapshot),
62                "kind_name": kind_name,
63                "updated_ts": updated_ts,
64                "unpaused_ts": unpaused_ts,
65                "ttl_ms": ttl_ms,
66                "unrestorable": unrestorable,
67            }
68        )
69
70    if new_snapshots:
71        engine_adapter.delete_from(snapshots_table, "TRUE")
72        blob_type = blob_text_type(engine_adapter.dialect)
73
74        engine_adapter.insert_append(
75            snapshots_table,
76            pd.DataFrame(new_snapshots),
77            target_columns_to_types={
78                "name": exp.DataType.build(index_type),
79                "identifier": exp.DataType.build(index_type),
80                "version": exp.DataType.build(index_type),
81                "snapshot": exp.DataType.build(blob_type),
82                "kind_name": exp.DataType.build(index_type),
83                "updated_ts": exp.DataType.build("bigint"),
84                "unpaused_ts": exp.DataType.build("bigint"),
85                "ttl_ms": exp.DataType.build("bigint"),
86                "unrestorable": exp.DataType.build("boolean"),
87            },
88        )