Edit on GitHub

Fix table properties that have extra quoting due to a bug.

 1"""Fix table properties that have extra quoting due to a bug."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.core import dialect as d
 9from sqlmesh.utils.migration import index_text_type
10
11
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20    found_table_properties = False
21    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
22        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
23        quote_identifiers=True,
24    ):
25        parsed_snapshot = json.loads(snapshot)
26        table_properties = parsed_snapshot["node"].get("table_properties")
27        if table_properties:
28            found_table_properties = True
29            dialect = parsed_snapshot["node"].get("dialect")
30            parsed_snapshot["node"]["table_properties"] = exp.Tuple(
31                expressions=[
32                    exp.Literal.string(k).eq(d.parse_one(v)) for k, v in table_properties.items()
33                ]
34            ).sql(dialect=dialect)
35
36        new_snapshots.append(
37            {
38                "name": name,
39                "identifier": identifier,
40                "version": version,
41                "snapshot": json.dumps(parsed_snapshot),
42                "kind_name": kind_name,
43            }
44        )
45
46    if found_table_properties:
47        engine_adapter.delete_from(snapshots_table, "TRUE")
48
49        index_type = index_text_type(engine_adapter.dialect)
50
51        engine_adapter.insert_append(
52            snapshots_table,
53            pd.DataFrame(new_snapshots),
54            columns_to_types={
55                "name": exp.DataType.build(index_type),
56                "identifier": exp.DataType.build(index_type),
57                "version": exp.DataType.build(index_type),
58                "snapshot": exp.DataType.build("text"),
59                "kind_name": exp.DataType.build(index_type),
60            },
61        )
def migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs):  # type: ignore
14    engine_adapter = state_sync.engine_adapter
15    schema = state_sync.schema
16    snapshots_table = "_snapshots"
17    if schema:
18        snapshots_table = f"{schema}.{snapshots_table}"
19
20    new_snapshots = []
21    found_table_properties = False
22    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
23        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
24        quote_identifiers=True,
25    ):
26        parsed_snapshot = json.loads(snapshot)
27        table_properties = parsed_snapshot["node"].get("table_properties")
28        if table_properties:
29            found_table_properties = True
30            dialect = parsed_snapshot["node"].get("dialect")
31            parsed_snapshot["node"]["table_properties"] = exp.Tuple(
32                expressions=[
33                    exp.Literal.string(k).eq(d.parse_one(v)) for k, v in table_properties.items()
34                ]
35            ).sql(dialect=dialect)
36
37        new_snapshots.append(
38            {
39                "name": name,
40                "identifier": identifier,
41                "version": version,
42                "snapshot": json.dumps(parsed_snapshot),
43                "kind_name": kind_name,
44            }
45        )
46
47    if found_table_properties:
48        engine_adapter.delete_from(snapshots_table, "TRUE")
49
50        index_type = index_text_type(engine_adapter.dialect)
51
52        engine_adapter.insert_append(
53            snapshots_table,
54            pd.DataFrame(new_snapshots),
55            columns_to_types={
56                "name": exp.DataType.build(index_type),
57                "identifier": exp.DataType.build(index_type),
58                "version": exp.DataType.build(index_type),
59                "snapshot": exp.DataType.build("text"),
60                "kind_name": exp.DataType.build(index_type),
61            },
62        )