Remove redundant attributes from dbt models.
1"""Remove redundant attributes from dbt models.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 20 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 22 quote_identifiers=True, 23 ): 24 parsed_snapshot = json.loads(snapshot) 25 jinja_macros_global_objs = parsed_snapshot["node"]["jinja_macros"]["global_objs"] 26 if "config" in jinja_macros_global_objs and isinstance( 27 jinja_macros_global_objs["config"], dict 28 ): 29 for key in CONFIG_ATTRIBUTE_KEYS_TO_REMOVE: 30 jinja_macros_global_objs["config"].pop(key, None) 31 32 new_snapshots.append( 33 { 34 "name": name, 35 "identifier": identifier, 36 "version": version, 37 "snapshot": json.dumps(parsed_snapshot), 38 "kind_name": kind_name, 39 } 40 ) 41 42 if new_snapshots: 43 engine_adapter.delete_from(snapshots_table, "TRUE") 44 45 index_type = index_text_type(engine_adapter.dialect) 46 47 engine_adapter.insert_append( 48 snapshots_table, 49 pd.DataFrame(new_snapshots), 50 columns_to_types={ 51 "name": exp.DataType.build(index_type), 52 "identifier": exp.DataType.build(index_type), 53 "version": exp.DataType.build(index_type), 54 "snapshot": exp.DataType.build("text"), 55 "kind_name": exp.DataType.build(index_type), 56 }, 57 ) 58 59 60CONFIG_ATTRIBUTE_KEYS_TO_REMOVE = [ 61 "config", 62 "config_call_dict", 63 "depends_on", 64 "dependencies", 65 "metrics", 66 "original_file_path", 67 "packages", 68 "patch_path", 69 "path", 70 "post-hook", 71 "pre-hook", 72 "raw_code", 73 "refs", 74 "resource_type", 75 "sources", 76 "sql", 77 "tests", 78 "unrendered_config", 79]
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 21 for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table), 23 quote_identifiers=True, 24 ): 25 parsed_snapshot = json.loads(snapshot) 26 jinja_macros_global_objs = parsed_snapshot["node"]["jinja_macros"]["global_objs"] 27 if "config" in jinja_macros_global_objs and isinstance( 28 jinja_macros_global_objs["config"], dict 29 ): 30 for key in CONFIG_ATTRIBUTE_KEYS_TO_REMOVE: 31 jinja_macros_global_objs["config"].pop(key, None) 32 33 new_snapshots.append( 34 { 35 "name": name, 36 "identifier": identifier, 37 "version": version, 38 "snapshot": json.dumps(parsed_snapshot), 39 "kind_name": kind_name, 40 } 41 ) 42 43 if new_snapshots: 44 engine_adapter.delete_from(snapshots_table, "TRUE") 45 46 index_type = index_text_type(engine_adapter.dialect) 47 48 engine_adapter.insert_append( 49 snapshots_table, 50 pd.DataFrame(new_snapshots), 51 columns_to_types={ 52 "name": exp.DataType.build(index_type), 53 "identifier": exp.DataType.build(index_type), 54 "version": exp.DataType.build(index_type), 55 "snapshot": exp.DataType.build("text"), 56 "kind_name": exp.DataType.build(index_type), 57 }, 58 )