Edit on GitHub

Remove dbt is_incremental macro

 1"""Remove dbt is_incremental macro"""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    new_snapshots = []
19    found_dbt_package = False
20    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
21        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
22        quote_identifiers=True,
23    ):
24        parsed_snapshot = json.loads(snapshot)
25        node = parsed_snapshot["node"]
26        dbt_package = node.get("jinja_macros", {}).get("packages", {}).get("dbt", {})
27
28        if dbt_package:
29            found_dbt_package = True
30            dbt_package.pop("is_incremental", None)
31            dbt_package.pop("should_full_refresh", None)
32
33        new_snapshots.append(
34            {
35                "name": name,
36                "identifier": identifier,
37                "version": version,
38                "snapshot": json.dumps(parsed_snapshot),
39                "kind_name": kind_name,
40            }
41        )
42
43    if found_dbt_package:
44        engine_adapter.delete_from(snapshots_table, "TRUE")
45
46        index_type = index_text_type(engine_adapter.dialect)
47
48        engine_adapter.insert_append(
49            snapshots_table,
50            pd.DataFrame(new_snapshots),
51            columns_to_types={
52                "name": exp.DataType.build(index_type),
53                "identifier": exp.DataType.build(index_type),
54                "version": exp.DataType.build(index_type),
55                "snapshot": exp.DataType.build("text"),
56                "kind_name": exp.DataType.build(index_type),
57            },
58        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20    found_dbt_package = False
21    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
22        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
23        quote_identifiers=True,
24    ):
25        parsed_snapshot = json.loads(snapshot)
26        node = parsed_snapshot["node"]
27        dbt_package = node.get("jinja_macros", {}).get("packages", {}).get("dbt", {})
28
29        if dbt_package:
30            found_dbt_package = True
31            dbt_package.pop("is_incremental", None)
32            dbt_package.pop("should_full_refresh", None)
33
34        new_snapshots.append(
35            {
36                "name": name,
37                "identifier": identifier,
38                "version": version,
39                "snapshot": json.dumps(parsed_snapshot),
40                "kind_name": kind_name,
41            }
42        )
43
44    if found_dbt_package:
45        engine_adapter.delete_from(snapshots_table, "TRUE")
46
47        index_type = index_text_type(engine_adapter.dialect)
48
49        engine_adapter.insert_append(
50            snapshots_table,
51            pd.DataFrame(new_snapshots),
52            columns_to_types={
53                "name": exp.DataType.build(index_type),
54                "identifier": exp.DataType.build(index_type),
55                "version": exp.DataType.build(index_type),
56                "snapshot": exp.DataType.build("text"),
57                "kind_name": exp.DataType.build(index_type),
58            },
59        )