Edit on GitHub

Remove dbt target fields from snapshots outside of limited list of approved fields

 1"""Remove dbt target fields from snapshots outside of limited list of approved fields"""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    new_snapshots = []
19    found_dbt_target = False
20    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
21        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
22        quote_identifiers=True,
23    ):
24        parsed_snapshot = json.loads(snapshot)
25        node = parsed_snapshot["node"]
26        dbt_target = node.get("jinja_macros", {}).get("global_objs", {}).get("target", {})
27        # Double check that `target_name` exists as a field since we know that all dbt targets have `target_name`
28        # We do this in case someone has a target macro defined that is not related to dbt
29        if dbt_target and dbt_target.get("target_name"):
30            found_dbt_target = True
31            node["jinja_macros"]["global_objs"]["target"] = {
32                "type": dbt_target.get("type", "None"),
33                "name": dbt_target.get("name", "None"),
34                "schema": dbt_target.get("schema", "None"),
35                "database": dbt_target.get("database", "None"),
36                "target_name": dbt_target["target_name"],
37            }
38
39        new_snapshots.append(
40            {
41                "name": name,
42                "identifier": identifier,
43                "version": version,
44                "snapshot": json.dumps(parsed_snapshot),
45                "kind_name": kind_name,
46            }
47        )
48
49    if found_dbt_target:
50        engine_adapter.delete_from(snapshots_table, "TRUE")
51
52        index_type = index_text_type(engine_adapter.dialect)
53
54        engine_adapter.insert_append(
55            snapshots_table,
56            pd.DataFrame(new_snapshots),
57            columns_to_types={
58                "name": exp.DataType.build(index_type),
59                "identifier": exp.DataType.build(index_type),
60                "version": exp.DataType.build(index_type),
61                "snapshot": exp.DataType.build("text"),
62                "kind_name": exp.DataType.build(index_type),
63            },
64        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20    found_dbt_target = False
21    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
22        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
23        quote_identifiers=True,
24    ):
25        parsed_snapshot = json.loads(snapshot)
26        node = parsed_snapshot["node"]
27        dbt_target = node.get("jinja_macros", {}).get("global_objs", {}).get("target", {})
28        # Double check that `target_name` exists as a field since we know that all dbt targets have `target_name`
29        # We do this in case someone has a target macro defined that is not related to dbt
30        if dbt_target and dbt_target.get("target_name"):
31            found_dbt_target = True
32            node["jinja_macros"]["global_objs"]["target"] = {
33                "type": dbt_target.get("type", "None"),
34                "name": dbt_target.get("name", "None"),
35                "schema": dbt_target.get("schema", "None"),
36                "database": dbt_target.get("database", "None"),
37                "target_name": dbt_target["target_name"],
38            }
39
40        new_snapshots.append(
41            {
42                "name": name,
43                "identifier": identifier,
44                "version": version,
45                "snapshot": json.dumps(parsed_snapshot),
46                "kind_name": kind_name,
47            }
48        )
49
50    if found_dbt_target:
51        engine_adapter.delete_from(snapshots_table, "TRUE")
52
53        index_type = index_text_type(engine_adapter.dialect)
54
55        engine_adapter.insert_append(
56            snapshots_table,
57            pd.DataFrame(new_snapshots),
58            columns_to_types={
59                "name": exp.DataType.build(index_type),
60                "identifier": exp.DataType.build(index_type),
61                "version": exp.DataType.build(index_type),
62                "snapshot": exp.DataType.build("text"),
63                "kind_name": exp.DataType.build(index_type),
64            },
65        )