Edit on GitHub

Add the kind_name column to the snapshots table.

 1"""Add the kind_name column to the snapshots table."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    index_type = index_text_type(engine_adapter.dialect)
19
20    alter_table_exp = exp.AlterTable(
21        this=exp.to_table(snapshots_table),
22        actions=[
23            exp.ColumnDef(
24                this=exp.to_column("kind_name"),
25                kind=exp.DataType.build(index_type),
26            )
27        ],
28    )
29    engine_adapter.execute(alter_table_exp)
30
31    new_snapshots = []
32
33    for name, identifier, version, snapshot in engine_adapter.fetchall(
34        exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table),
35        quote_identifiers=True,
36    ):
37        parsed_snapshot = json.loads(snapshot)
38        new_snapshots.append(
39            {
40                "name": name,
41                "identifier": identifier,
42                "version": version,
43                "snapshot": snapshot,
44                "kind_name": parsed_snapshot["model"]["kind"]["name"],
45            }
46        )
47
48    if new_snapshots:
49        engine_adapter.delete_from(snapshots_table, "TRUE")
50
51        engine_adapter.insert_append(
52            snapshots_table,
53            pd.DataFrame(new_snapshots),
54            columns_to_types={
55                "name": exp.DataType.build(index_type),
56                "identifier": exp.DataType.build(index_type),
57                "version": exp.DataType.build(index_type),
58                "snapshot": exp.DataType.build("text"),
59                "kind_name": exp.DataType.build(index_type),
60            },
61        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    index_type = index_text_type(engine_adapter.dialect)
20
21    alter_table_exp = exp.AlterTable(
22        this=exp.to_table(snapshots_table),
23        actions=[
24            exp.ColumnDef(
25                this=exp.to_column("kind_name"),
26                kind=exp.DataType.build(index_type),
27            )
28        ],
29    )
30    engine_adapter.execute(alter_table_exp)
31
32    new_snapshots = []
33
34    for name, identifier, version, snapshot in engine_adapter.fetchall(
35        exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table),
36        quote_identifiers=True,
37    ):
38        parsed_snapshot = json.loads(snapshot)
39        new_snapshots.append(
40            {
41                "name": name,
42                "identifier": identifier,
43                "version": version,
44                "snapshot": snapshot,
45                "kind_name": parsed_snapshot["model"]["kind"]["name"],
46            }
47        )
48
49    if new_snapshots:
50        engine_adapter.delete_from(snapshots_table, "TRUE")
51
52        engine_adapter.insert_append(
53            snapshots_table,
54            pd.DataFrame(new_snapshots),
55            columns_to_types={
56                "name": exp.DataType.build(index_type),
57                "identifier": exp.DataType.build(index_type),
58                "version": exp.DataType.build(index_type),
59                "snapshot": exp.DataType.build("text"),
60                "kind_name": exp.DataType.build(index_type),
61            },
62        )