Edit on GitHub

Move project attr from snapshot to model.

 1"""Move project attr from snapshot to model."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    new_snapshots = []
19
20    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
21        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
22        quote_identifiers=True,
23    ):
24        parsed_snapshot = json.loads(snapshot)
25
26        parsed_snapshot["node"]["project"] = parsed_snapshot.pop("project", "")
27
28        new_snapshots.append(
29            {
30                "name": name,
31                "identifier": identifier,
32                "version": version,
33                "snapshot": json.dumps(parsed_snapshot),
34                "kind_name": kind_name,
35            }
36        )
37
38    engine_adapter.delete_from(snapshots_table, "TRUE")
39
40    index_type = index_text_type(engine_adapter.dialect)
41
42    if new_snapshots:
43        engine_adapter.insert_append(
44            snapshots_table,
45            pd.DataFrame(new_snapshots),
46            columns_to_types={
47                "name": exp.DataType.build(index_type),
48                "identifier": exp.DataType.build(index_type),
49                "version": exp.DataType.build(index_type),
50                "snapshot": exp.DataType.build("text"),
51                "kind_name": exp.DataType.build(index_type),
52            },
53        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20
21    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
22        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
23        quote_identifiers=True,
24    ):
25        parsed_snapshot = json.loads(snapshot)
26
27        parsed_snapshot["node"]["project"] = parsed_snapshot.pop("project", "")
28
29        new_snapshots.append(
30            {
31                "name": name,
32                "identifier": identifier,
33                "version": version,
34                "snapshot": json.dumps(parsed_snapshot),
35                "kind_name": kind_name,
36            }
37        )
38
39    engine_adapter.delete_from(snapshots_table, "TRUE")
40
41    index_type = index_text_type(engine_adapter.dialect)
42
43    if new_snapshots:
44        engine_adapter.insert_append(
45            snapshots_table,
46            pd.DataFrame(new_snapshots),
47            columns_to_types={
48                "name": exp.DataType.build(index_type),
49                "identifier": exp.DataType.build(index_type),
50                "version": exp.DataType.build(index_type),
51                "snapshot": exp.DataType.build("text"),
52                "kind_name": exp.DataType.build(index_type),
53            },
54        )