Remove dbt is_incremental macro
1"""Remove dbt is_incremental macro""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 found_dbt_package = False 20 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 22 quote_identifiers=True, 23 ): 24 parsed_snapshot = json.loads(snapshot) 25 node = parsed_snapshot["node"] 26 dbt_package = node.get("jinja_macros", {}).get("packages", {}).get("dbt", {}) 27 28 if dbt_package: 29 found_dbt_package = True 30 dbt_package.pop("is_incremental", None) 31 dbt_package.pop("should_full_refresh", None) 32 33 new_snapshots.append( 34 { 35 "name": name, 36 "identifier": identifier, 37 "version": version, 38 "snapshot": json.dumps(parsed_snapshot), 39 "kind_name": kind_name, 40 } 41 ) 42 43 if found_dbt_package: 44 engine_adapter.delete_from(snapshots_table, "TRUE") 45 46 index_type = index_text_type(engine_adapter.dialect) 47 48 engine_adapter.insert_append( 49 snapshots_table, 50 pd.DataFrame(new_snapshots), 51 columns_to_types={ 52 "name": exp.DataType.build(index_type), 53 "identifier": exp.DataType.build(index_type), 54 "version": exp.DataType.build(index_type), 55 "snapshot": exp.DataType.build("text"), 56 "kind_name": exp.DataType.build(index_type), 57 }, 58 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 found_dbt_package = False 21 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 23 quote_identifiers=True, 24 ): 25 parsed_snapshot = json.loads(snapshot) 26 node = parsed_snapshot["node"] 27 dbt_package = node.get("jinja_macros", {}).get("packages", {}).get("dbt", {}) 28 29 if dbt_package: 30 found_dbt_package = True 31 dbt_package.pop("is_incremental", None) 32 dbt_package.pop("should_full_refresh", None) 33 34 new_snapshots.append( 35 { 36 "name": name, 37 "identifier": identifier, 38 "version": version, 39 "snapshot": json.dumps(parsed_snapshot), 40 "kind_name": kind_name, 41 } 42 ) 43 44 if found_dbt_package: 45 engine_adapter.delete_from(snapshots_table, "TRUE") 46 47 index_type = index_text_type(engine_adapter.dialect) 48 49 engine_adapter.insert_append( 50 snapshots_table, 51 pd.DataFrame(new_snapshots), 52 columns_to_types={ 53 "name": exp.DataType.build(index_type), 54 "identifier": exp.DataType.build(index_type), 55 "version": exp.DataType.build(index_type), 56 "snapshot": exp.DataType.build("text"), 57 "kind_name": exp.DataType.build(index_type), 58 }, 59 )