Edit on GitHub

Remove superfluous exp.Paren references from partitioned_by

 1"""Remove superfluous exp.Paren references from partitioned_by"""
 2
 3import json
 4
 5from sqlglot import exp
 6
 7from sqlmesh.utils.migration import index_text_type
 8from sqlmesh.utils.migration import blob_text_type
 9
10
11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
12    pass
13
14
15def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
16    import pandas as pd
17
18    snapshots_table = "_snapshots"
19    index_type = index_text_type(engine_adapter.dialect)
20    if schema:
21        snapshots_table = f"{schema}.{snapshots_table}"
22
23    new_snapshots = []
24    updated = False
25
26    for (
27        name,
28        identifier,
29        version,
30        snapshot,
31        kind_name,
32        updated_ts,
33        unpaused_ts,
34        ttl_ms,
35        unrestorable,
36    ) in engine_adapter.fetchall(
37        exp.select(
38            "name",
39            "identifier",
40            "version",
41            "snapshot",
42            "kind_name",
43            "updated_ts",
44            "unpaused_ts",
45            "ttl_ms",
46            "unrestorable",
47        ).from_(snapshots_table),
48        quote_identifiers=True,
49    ):
50        parsed_snapshot = json.loads(snapshot)
51
52        if partitioned_by := parsed_snapshot["node"].get("partitioned_by"):
53            new_partitioned_by = []
54            for item in partitioned_by:
55                # rewrite '(foo)' to 'foo'
56                if item.startswith("(") and item.endswith(")"):
57                    item = item[1:-1]
58                    updated = True
59                new_partitioned_by.append(item)
60            parsed_snapshot["node"]["partitioned_by"] = new_partitioned_by
61
62        new_snapshots.append(
63            {
64                "name": name,
65                "identifier": identifier,
66                "version": version,
67                "snapshot": json.dumps(parsed_snapshot),
68                "kind_name": kind_name,
69                "updated_ts": updated_ts,
70                "unpaused_ts": unpaused_ts,
71                "ttl_ms": ttl_ms,
72                "unrestorable": unrestorable,
73            }
74        )
75
76    if new_snapshots and updated:
77        engine_adapter.delete_from(snapshots_table, "TRUE")
78        blob_type = blob_text_type(engine_adapter.dialect)
79
80        engine_adapter.insert_append(
81            snapshots_table,
82            pd.DataFrame(new_snapshots),
83            target_columns_to_types={
84                "name": exp.DataType.build(index_type),
85                "identifier": exp.DataType.build(index_type),
86                "version": exp.DataType.build(index_type),
87                "snapshot": exp.DataType.build(blob_type),
88                "kind_name": exp.DataType.build(index_type),
89                "updated_ts": exp.DataType.build("bigint"),
90                "unpaused_ts": exp.DataType.build("bigint"),
91                "ttl_ms": exp.DataType.build("bigint"),
92                "unrestorable": exp.DataType.build("boolean"),
93            },
94        )
def migrate_schemas(engine_adapter, schema, **kwargs):
12def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
13    pass
def migrate_rows(engine_adapter, schema, **kwargs):
16def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
17    import pandas as pd
18
19    snapshots_table = "_snapshots"
20    index_type = index_text_type(engine_adapter.dialect)
21    if schema:
22        snapshots_table = f"{schema}.{snapshots_table}"
23
24    new_snapshots = []
25    updated = False
26
27    for (
28        name,
29        identifier,
30        version,
31        snapshot,
32        kind_name,
33        updated_ts,
34        unpaused_ts,
35        ttl_ms,
36        unrestorable,
37    ) in engine_adapter.fetchall(
38        exp.select(
39            "name",
40            "identifier",
41            "version",
42            "snapshot",
43            "kind_name",
44            "updated_ts",
45            "unpaused_ts",
46            "ttl_ms",
47            "unrestorable",
48        ).from_(snapshots_table),
49        quote_identifiers=True,
50    ):
51        parsed_snapshot = json.loads(snapshot)
52
53        if partitioned_by := parsed_snapshot["node"].get("partitioned_by"):
54            new_partitioned_by = []
55            for item in partitioned_by:
56                # rewrite '(foo)' to 'foo'
57                if item.startswith("(") and item.endswith(")"):
58                    item = item[1:-1]
59                    updated = True
60                new_partitioned_by.append(item)
61            parsed_snapshot["node"]["partitioned_by"] = new_partitioned_by
62
63        new_snapshots.append(
64            {
65                "name": name,
66                "identifier": identifier,
67                "version": version,
68                "snapshot": json.dumps(parsed_snapshot),
69                "kind_name": kind_name,
70                "updated_ts": updated_ts,
71                "unpaused_ts": unpaused_ts,
72                "ttl_ms": ttl_ms,
73                "unrestorable": unrestorable,
74            }
75        )
76
77    if new_snapshots and updated:
78        engine_adapter.delete_from(snapshots_table, "TRUE")
79        blob_type = blob_text_type(engine_adapter.dialect)
80
81        engine_adapter.insert_append(
82            snapshots_table,
83            pd.DataFrame(new_snapshots),
84            target_columns_to_types={
85                "name": exp.DataType.build(index_type),
86                "identifier": exp.DataType.build(index_type),
87                "version": exp.DataType.build(index_type),
88                "snapshot": exp.DataType.build(blob_type),
89                "kind_name": exp.DataType.build(index_type),
90                "updated_ts": exp.DataType.build("bigint"),
91                "unpaused_ts": exp.DataType.build("bigint"),
92                "ttl_ms": exp.DataType.build("bigint"),
93                "unrestorable": exp.DataType.build("boolean"),
94            },
95        )