Remove dbt target fields from snapshots outside of limited list of approved fields
1"""Remove dbt target fields from snapshots outside of limited list of approved fields""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 found_dbt_target = False 20 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 22 quote_identifiers=True, 23 ): 24 parsed_snapshot = json.loads(snapshot) 25 node = parsed_snapshot["node"] 26 dbt_target = node.get("jinja_macros", {}).get("global_objs", {}).get("target", {}) 27 # Double check that `target_name` exists as a field since we know that all dbt targets have `target_name` 28 # We do this in case someone has a target macro defined that is not related to dbt 29 if dbt_target and dbt_target.get("target_name"): 30 found_dbt_target = True 31 node["jinja_macros"]["global_objs"]["target"] = { 32 "type": dbt_target.get("type", "None"), 33 "name": dbt_target.get("name", "None"), 34 "schema": dbt_target.get("schema", "None"), 35 "database": dbt_target.get("database", "None"), 36 "target_name": dbt_target["target_name"], 37 } 38 39 new_snapshots.append( 40 { 41 "name": name, 42 "identifier": identifier, 43 "version": version, 44 "snapshot": json.dumps(parsed_snapshot), 45 "kind_name": kind_name, 46 } 47 ) 48 49 if found_dbt_target: 50 engine_adapter.delete_from(snapshots_table, "TRUE") 51 52 index_type = index_text_type(engine_adapter.dialect) 53 54 engine_adapter.insert_append( 55 snapshots_table, 56 pd.DataFrame(new_snapshots), 57 columns_to_types={ 58 "name": exp.DataType.build(index_type), 59 "identifier": exp.DataType.build(index_type), 60 "version": exp.DataType.build(index_type), 61 "snapshot": exp.DataType.build("text"), 62 "kind_name": exp.DataType.build(index_type), 63 }, 64 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 found_dbt_target = False 21 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 23 quote_identifiers=True, 24 ): 25 parsed_snapshot = json.loads(snapshot) 26 node = parsed_snapshot["node"] 27 dbt_target = node.get("jinja_macros", {}).get("global_objs", {}).get("target", {}) 28 # Double check that `target_name` exists as a field since we know that all dbt targets have `target_name` 29 # We do this in case someone has a target macro defined that is not related to dbt 30 if dbt_target and dbt_target.get("target_name"): 31 found_dbt_target = True 32 node["jinja_macros"]["global_objs"]["target"] = { 33 "type": dbt_target.get("type", "None"), 34 "name": dbt_target.get("name", "None"), 35 "schema": dbt_target.get("schema", "None"), 36 "database": dbt_target.get("database", "None"), 37 "target_name": dbt_target["target_name"], 38 } 39 40 new_snapshots.append( 41 { 42 "name": name, 43 "identifier": identifier, 44 "version": version, 45 "snapshot": json.dumps(parsed_snapshot), 46 "kind_name": kind_name, 47 } 48 ) 49 50 if found_dbt_target: 51 engine_adapter.delete_from(snapshots_table, "TRUE") 52 53 index_type = index_text_type(engine_adapter.dialect) 54 55 engine_adapter.insert_append( 56 snapshots_table, 57 pd.DataFrame(new_snapshots), 58 columns_to_types={ 59 "name": exp.DataType.build(index_type), 60 "identifier": exp.DataType.build(index_type), 61 "version": exp.DataType.build(index_type), 62 "snapshot": exp.DataType.build("text"), 63 "kind_name": exp.DataType.build(index_type), 64 }, 65 )