Edit on GitHub

Trim irrelevant attributes from indirect versions.

 1"""Trim irrelevant attributes from indirect versions."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    new_snapshots = []
19
20    for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall(
21        exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_(
22            snapshots_table
23        ),
24        quote_identifiers=True,
25    ):
26        parsed_snapshot = json.loads(snapshot)
27        for indirect_versions in parsed_snapshot["indirect_versions"].values():
28            for indirect_version in indirect_versions:
29                # Only keep version and change_category.
30                version = indirect_version.get("version")
31                change_category = indirect_version.get("change_category")
32                indirect_version.clear()
33                indirect_version["version"] = version
34                indirect_version["change_category"] = change_category
35
36        new_snapshots.append(
37            {
38                "name": name,
39                "identifier": identifier,
40                "version": version,
41                "snapshot": json.dumps(parsed_snapshot),
42                "kind_name": kind_name,
43                "expiration_ts": expiration_ts,
44            }
45        )
46
47    if new_snapshots:
48        engine_adapter.delete_from(snapshots_table, "TRUE")
49
50        index_type = index_text_type(engine_adapter.dialect)
51
52        engine_adapter.insert_append(
53            snapshots_table,
54            pd.DataFrame(new_snapshots),
55            columns_to_types={
56                "name": exp.DataType.build(index_type),
57                "identifier": exp.DataType.build(index_type),
58                "version": exp.DataType.build(index_type),
59                "snapshot": exp.DataType.build("text"),
60                "kind_name": exp.DataType.build(index_type),
61                "expiration_ts": exp.DataType.build("bigint"),
62            },
63        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20
21    for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall(
22        exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_(
23            snapshots_table
24        ),
25        quote_identifiers=True,
26    ):
27        parsed_snapshot = json.loads(snapshot)
28        for indirect_versions in parsed_snapshot["indirect_versions"].values():
29            for indirect_version in indirect_versions:
30                # Only keep version and change_category.
31                version = indirect_version.get("version")
32                change_category = indirect_version.get("change_category")
33                indirect_version.clear()
34                indirect_version["version"] = version
35                indirect_version["change_category"] = change_category
36
37        new_snapshots.append(
38            {
39                "name": name,
40                "identifier": identifier,
41                "version": version,
42                "snapshot": json.dumps(parsed_snapshot),
43                "kind_name": kind_name,
44                "expiration_ts": expiration_ts,
45            }
46        )
47
48    if new_snapshots:
49        engine_adapter.delete_from(snapshots_table, "TRUE")
50
51        index_type = index_text_type(engine_adapter.dialect)
52
53        engine_adapter.insert_append(
54            snapshots_table,
55            pd.DataFrame(new_snapshots),
56            columns_to_types={
57                "name": exp.DataType.build(index_type),
58                "identifier": exp.DataType.build(index_type),
59                "version": exp.DataType.build(index_type),
60                "snapshot": exp.DataType.build("text"),
61                "kind_name": exp.DataType.build(index_type),
62                "expiration_ts": exp.DataType.build("bigint"),
63            },
64        )