Replace 'dbt_name' with 'dbt_node_info' in the snapshot definition
1"""Replace 'dbt_name' with 'dbt_node_info' in the snapshot definition""" 2 3import json 4from sqlglot import exp 5from sqlmesh.utils.migration import index_text_type, blob_text_type 6 7 8def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 9 pass 10 11 12def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 13 import pandas as pd 14 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 index_type = index_text_type(engine_adapter.dialect) 20 blob_type = blob_text_type(engine_adapter.dialect) 21 22 new_snapshots = [] 23 migration_needed = False 24 25 for ( 26 name, 27 identifier, 28 version, 29 snapshot, 30 kind_name, 31 updated_ts, 32 unpaused_ts, 33 ttl_ms, 34 unrestorable, 35 forward_only, 36 dev_version, 37 fingerprint, 38 ) in engine_adapter.fetchall( 39 exp.select( 40 "name", 41 "identifier", 42 "version", 43 "snapshot", 44 "kind_name", 45 "updated_ts", 46 "unpaused_ts", 47 "ttl_ms", 48 "unrestorable", 49 "forward_only", 50 "dev_version", 51 "fingerprint", 52 ).from_(snapshots_table), 53 quote_identifiers=True, 54 ): 55 parsed_snapshot = json.loads(snapshot) 56 if dbt_name := parsed_snapshot["node"].get("dbt_name"): 57 parsed_snapshot["node"].pop("dbt_name") 58 parsed_snapshot["node"]["dbt_node_info"] = { 59 "unique_id": dbt_name, 60 # these will get populated as metadata-only changes on the next plan 61 "name": "", 62 "fqn": "", 63 } 64 migration_needed = True 65 66 new_snapshots.append( 67 { 68 "name": name, 69 "identifier": identifier, 70 "version": version, 71 "snapshot": json.dumps(parsed_snapshot), 72 "kind_name": kind_name, 73 "updated_ts": updated_ts, 74 "unpaused_ts": unpaused_ts, 75 "ttl_ms": ttl_ms, 76 "unrestorable": unrestorable, 77 "forward_only": forward_only, 78 "dev_version": dev_version, 79 "fingerprint": fingerprint, 80 } 81 ) 82 83 if migration_needed and new_snapshots: 84 engine_adapter.delete_from(snapshots_table, "TRUE") 85 86 engine_adapter.insert_append( 87 snapshots_table, 88 pd.DataFrame(new_snapshots), 89 target_columns_to_types={ 90 "name": exp.DataType.build(index_type), 91 "identifier": exp.DataType.build(index_type), 92 "version": exp.DataType.build(index_type), 93 "snapshot": exp.DataType.build(blob_type), 94 "kind_name": exp.DataType.build(index_type), 95 "updated_ts": exp.DataType.build("bigint"), 96 "unpaused_ts": exp.DataType.build("bigint"), 97 "ttl_ms": exp.DataType.build("bigint"), 98 "unrestorable": exp.DataType.build("boolean"), 99 "forward_only": exp.DataType.build("boolean"), 100 "dev_version": exp.DataType.build(index_type), 101 "fingerprint": exp.DataType.build(blob_type), 102 }, 103 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
13def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 14 import pandas as pd 15 16 snapshots_table = "_snapshots" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 20 index_type = index_text_type(engine_adapter.dialect) 21 blob_type = blob_text_type(engine_adapter.dialect) 22 23 new_snapshots = [] 24 migration_needed = False 25 26 for ( 27 name, 28 identifier, 29 version, 30 snapshot, 31 kind_name, 32 updated_ts, 33 unpaused_ts, 34 ttl_ms, 35 unrestorable, 36 forward_only, 37 dev_version, 38 fingerprint, 39 ) in engine_adapter.fetchall( 40 exp.select( 41 "name", 42 "identifier", 43 "version", 44 "snapshot", 45 "kind_name", 46 "updated_ts", 47 "unpaused_ts", 48 "ttl_ms", 49 "unrestorable", 50 "forward_only", 51 "dev_version", 52 "fingerprint", 53 ).from_(snapshots_table), 54 quote_identifiers=True, 55 ): 56 parsed_snapshot = json.loads(snapshot) 57 if dbt_name := parsed_snapshot["node"].get("dbt_name"): 58 parsed_snapshot["node"].pop("dbt_name") 59 parsed_snapshot["node"]["dbt_node_info"] = { 60 "unique_id": dbt_name, 61 # these will get populated as metadata-only changes on the next plan 62 "name": "", 63 "fqn": "", 64 } 65 migration_needed = True 66 67 new_snapshots.append( 68 { 69 "name": name, 70 "identifier": identifier, 71 "version": version, 72 "snapshot": json.dumps(parsed_snapshot), 73 "kind_name": kind_name, 74 "updated_ts": updated_ts, 75 "unpaused_ts": unpaused_ts, 76 "ttl_ms": ttl_ms, 77 "unrestorable": unrestorable, 78 "forward_only": forward_only, 79 "dev_version": dev_version, 80 "fingerprint": fingerprint, 81 } 82 ) 83 84 if migration_needed and new_snapshots: 85 engine_adapter.delete_from(snapshots_table, "TRUE") 86 87 engine_adapter.insert_append( 88 snapshots_table, 89 pd.DataFrame(new_snapshots), 90 target_columns_to_types={ 91 "name": exp.DataType.build(index_type), 92 "identifier": exp.DataType.build(index_type), 93 "version": exp.DataType.build(index_type), 94 "snapshot": exp.DataType.build(blob_type), 95 "kind_name": exp.DataType.build(index_type), 96 "updated_ts": exp.DataType.build("bigint"), 97 "unpaused_ts": exp.DataType.build("bigint"), 98 "ttl_ms": exp.DataType.build("bigint"), 99 "unrestorable": exp.DataType.build("boolean"), 100 "forward_only": exp.DataType.build("boolean"), 101 "dev_version": exp.DataType.build(index_type), 102 "fingerprint": exp.DataType.build(blob_type), 103 }, 104 )