Edit on GitHub

Change environments because snapshot table info now stores model kind name.

 1"""Change environments because snapshot table info now stores model kind name."""
 2
 3import json
 4import zlib
 5
 6import pandas as pd
 7from sqlglot import exp
 8
 9from sqlmesh.utils.migration import index_text_type
10
11
12def _hash(data):  # type: ignore
13    return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8")))
14
15
16def migrate(state_sync, **kwargs):  # type: ignore
17    engine_adapter = state_sync.engine_adapter
18    schema = state_sync.schema
19    environments_table = "_environments"
20    snapshots_table = "_snapshots"
21    if schema:
22        environments_table = f"{schema}.{environments_table}"
23        snapshots_table = f"{schema}.{snapshots_table}"
24    snapshots_to_kind = {}
25
26    for name, identifier, snapshot in engine_adapter.fetchall(
27        exp.select("name", "identifier", "snapshot").from_(snapshots_table),
28        quote_identifiers=True,
29    ):
30        snapshot = json.loads(snapshot)
31        snapshots_to_kind[(name, identifier)] = snapshot["model"]["kind"]["name"]
32
33    environments = engine_adapter.fetchall(
34        exp.select("*").from_(environments_table), quote_identifiers=True
35    )
36    new_environments = []
37
38    for (
39        name,
40        snapshots,
41        start_at,
42        end_at,
43        plan_id,
44        previous_plan_id,
45        expiration_ts,
46        finalized_ts,
47    ) in environments:
48        new_snapshots = []
49
50        for snapshot in json.loads(snapshots):
51            snapshot.pop("is_materialized", None)
52            snapshot.pop("is_embedded_kind", None)
53
54            fingerprint = snapshot["fingerprint"]
55            identifier = _hash(
56                [
57                    fingerprint["data_hash"],
58                    fingerprint["metadata_hash"],
59                    fingerprint["parent_data_hash"],
60                    fingerprint["parent_metadata_hash"],
61                ]
62            )
63
64            snapshot["kind_name"] = snapshots_to_kind.get((snapshot["name"], identifier), "VIEW")
65            new_snapshots.append(snapshot)
66
67        new_environments.append(
68            {
69                "name": name,
70                "snapshots": json.dumps(new_snapshots),
71                "start_at": start_at,
72                "end_at": end_at,
73                "plan_id": plan_id,
74                "previous_plan_id": previous_plan_id,
75                "expiration_ts": expiration_ts,
76                "finalized_ts": finalized_ts,
77            }
78        )
79
80    if new_environments:
81        engine_adapter.delete_from(environments_table, "TRUE")
82
83        index_type = index_text_type(engine_adapter.dialect)
84
85        engine_adapter.insert_append(
86            environments_table,
87            pd.DataFrame(new_environments),
88            columns_to_types={
89                "name": exp.DataType.build(index_type),
90                "snapshots": exp.DataType.build("text"),
91                "start_at": exp.DataType.build("text"),
92                "end_at": exp.DataType.build("text"),
93                "plan_id": exp.DataType.build("text"),
94                "previous_plan_id": exp.DataType.build("text"),
95                "expiration_ts": exp.DataType.build("bigint"),
96                "finalized_ts": exp.DataType.build("bigint"),
97            },
98        )
def migrate(state_sync, **kwargs):
17def migrate(state_sync, **kwargs):  # type: ignore
18    engine_adapter = state_sync.engine_adapter
19    schema = state_sync.schema
20    environments_table = "_environments"
21    snapshots_table = "_snapshots"
22    if schema:
23        environments_table = f"{schema}.{environments_table}"
24        snapshots_table = f"{schema}.{snapshots_table}"
25    snapshots_to_kind = {}
26
27    for name, identifier, snapshot in engine_adapter.fetchall(
28        exp.select("name", "identifier", "snapshot").from_(snapshots_table),
29        quote_identifiers=True,
30    ):
31        snapshot = json.loads(snapshot)
32        snapshots_to_kind[(name, identifier)] = snapshot["model"]["kind"]["name"]
33
34    environments = engine_adapter.fetchall(
35        exp.select("*").from_(environments_table), quote_identifiers=True
36    )
37    new_environments = []
38
39    for (
40        name,
41        snapshots,
42        start_at,
43        end_at,
44        plan_id,
45        previous_plan_id,
46        expiration_ts,
47        finalized_ts,
48    ) in environments:
49        new_snapshots = []
50
51        for snapshot in json.loads(snapshots):
52            snapshot.pop("is_materialized", None)
53            snapshot.pop("is_embedded_kind", None)
54
55            fingerprint = snapshot["fingerprint"]
56            identifier = _hash(
57                [
58                    fingerprint["data_hash"],
59                    fingerprint["metadata_hash"],
60                    fingerprint["parent_data_hash"],
61                    fingerprint["parent_metadata_hash"],
62                ]
63            )
64
65            snapshot["kind_name"] = snapshots_to_kind.get((snapshot["name"], identifier), "VIEW")
66            new_snapshots.append(snapshot)
67
68        new_environments.append(
69            {
70                "name": name,
71                "snapshots": json.dumps(new_snapshots),
72                "start_at": start_at,
73                "end_at": end_at,
74                "plan_id": plan_id,
75                "previous_plan_id": previous_plan_id,
76                "expiration_ts": expiration_ts,
77                "finalized_ts": finalized_ts,
78            }
79        )
80
81    if new_environments:
82        engine_adapter.delete_from(environments_table, "TRUE")
83
84        index_type = index_text_type(engine_adapter.dialect)
85
86        engine_adapter.insert_append(
87            environments_table,
88            pd.DataFrame(new_environments),
89            columns_to_types={
90                "name": exp.DataType.build(index_type),
91                "snapshots": exp.DataType.build("text"),
92                "start_at": exp.DataType.build("text"),
93                "end_at": exp.DataType.build("text"),
94                "plan_id": exp.DataType.build("text"),
95                "previous_plan_id": exp.DataType.build("text"),
96                "expiration_ts": exp.DataType.build("bigint"),
97                "finalized_ts": exp.DataType.build("bigint"),
98            },
99        )