Edit on GitHub

Move the gateway variable.

 1"""Move the gateway variable."""
 2
 3import ast
 4import json
 5
 6import pandas as pd
 7from sqlglot import exp
 8
 9from sqlmesh.utils.migration import index_text_type
10
11
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    migration_needed = False
20    new_snapshots = []
21
22    for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall(
23        exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_(
24            snapshots_table
25        ),
26        quote_identifiers=True,
27    ):
28        parsed_snapshot = json.loads(snapshot)
29        python_env = parsed_snapshot["node"].get("python_env")
30        if python_env:
31            gateway = python_env.pop("gateway", None)
32            if gateway is not None:
33                migration_needed = True
34                sqlmesh_vars = {"gateway": ast.literal_eval(gateway["payload"])}
35                python_env["__sqlmesh__vars__"] = {
36                    "payload": repr(sqlmesh_vars),
37                    "kind": "value",
38                }
39
40        new_snapshots.append(
41            {
42                "name": name,
43                "identifier": identifier,
44                "version": version,
45                "snapshot": json.dumps(parsed_snapshot),
46                "kind_name": kind_name,
47                "expiration_ts": expiration_ts,
48            }
49        )
50
51    if migration_needed and new_snapshots:
52        engine_adapter.delete_from(snapshots_table, "TRUE")
53
54        index_type = index_text_type(engine_adapter.dialect)
55
56        engine_adapter.insert_append(
57            snapshots_table,
58            pd.DataFrame(new_snapshots),
59            columns_to_types={
60                "name": exp.DataType.build(index_type),
61                "identifier": exp.DataType.build(index_type),
62                "version": exp.DataType.build(index_type),
63                "snapshot": exp.DataType.build("text"),
64                "kind_name": exp.DataType.build(index_type),
65                "expiration_ts": exp.DataType.build("bigint"),
66            },
67        )
def migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs):  # type: ignore
14    engine_adapter = state_sync.engine_adapter
15    schema = state_sync.schema
16    snapshots_table = "_snapshots"
17    if schema:
18        snapshots_table = f"{schema}.{snapshots_table}"
19
20    migration_needed = False
21    new_snapshots = []
22
23    for name, identifier, version, snapshot, kind_name, expiration_ts in engine_adapter.fetchall(
24        exp.select("name", "identifier", "version", "snapshot", "kind_name", "expiration_ts").from_(
25            snapshots_table
26        ),
27        quote_identifiers=True,
28    ):
29        parsed_snapshot = json.loads(snapshot)
30        python_env = parsed_snapshot["node"].get("python_env")
31        if python_env:
32            gateway = python_env.pop("gateway", None)
33            if gateway is not None:
34                migration_needed = True
35                sqlmesh_vars = {"gateway": ast.literal_eval(gateway["payload"])}
36                python_env["__sqlmesh__vars__"] = {
37                    "payload": repr(sqlmesh_vars),
38                    "kind": "value",
39                }
40
41        new_snapshots.append(
42            {
43                "name": name,
44                "identifier": identifier,
45                "version": version,
46                "snapshot": json.dumps(parsed_snapshot),
47                "kind_name": kind_name,
48                "expiration_ts": expiration_ts,
49            }
50        )
51
52    if migration_needed and new_snapshots:
53        engine_adapter.delete_from(snapshots_table, "TRUE")
54
55        index_type = index_text_type(engine_adapter.dialect)
56
57        engine_adapter.insert_append(
58            snapshots_table,
59            pd.DataFrame(new_snapshots),
60            columns_to_types={
61                "name": exp.DataType.build(index_type),
62                "identifier": exp.DataType.build(index_type),
63                "version": exp.DataType.build(index_type),
64                "snapshot": exp.DataType.build("text"),
65                "kind_name": exp.DataType.build(index_type),
66                "expiration_ts": exp.DataType.build("bigint"),
67            },
68        )