Change environments because snapshot table info now stores model kind name.
1"""Change environments because snapshot table info now stores model kind name.""" 2 3import json 4import zlib 5 6import pandas as pd 7from sqlglot import exp 8 9from sqlmesh.utils.migration import index_text_type 10 11 12def _hash(data): # type: ignore 13 return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8"))) 14 15 16def migrate(state_sync, **kwargs): # type: ignore 17 engine_adapter = state_sync.engine_adapter 18 schema = state_sync.schema 19 environments_table = "_environments" 20 snapshots_table = "_snapshots" 21 if schema: 22 environments_table = f"{schema}.{environments_table}" 23 snapshots_table = f"{schema}.{snapshots_table}" 24 snapshots_to_kind = {} 25 26 for name, identifier, snapshot in engine_adapter.fetchall( 27 exp.select("name", "identifier", "snapshot").from_(snapshots_table), 28 quote_identifiers=True, 29 ): 30 snapshot = json.loads(snapshot) 31 snapshots_to_kind[(name, identifier)] = snapshot["model"]["kind"]["name"] 32 33 environments = engine_adapter.fetchall( 34 exp.select("*").from_(environments_table), quote_identifiers=True 35 ) 36 new_environments = [] 37 38 for ( 39 name, 40 snapshots, 41 start_at, 42 end_at, 43 plan_id, 44 previous_plan_id, 45 expiration_ts, 46 finalized_ts, 47 ) in environments: 48 new_snapshots = [] 49 50 for snapshot in json.loads(snapshots): 51 snapshot.pop("is_materialized", None) 52 snapshot.pop("is_embedded_kind", None) 53 54 fingerprint = snapshot["fingerprint"] 55 identifier = _hash( 56 [ 57 fingerprint["data_hash"], 58 fingerprint["metadata_hash"], 59 fingerprint["parent_data_hash"], 60 fingerprint["parent_metadata_hash"], 61 ] 62 ) 63 64 snapshot["kind_name"] = snapshots_to_kind.get((snapshot["name"], identifier), "VIEW") 65 new_snapshots.append(snapshot) 66 67 new_environments.append( 68 { 69 "name": name, 70 "snapshots": json.dumps(new_snapshots), 71 "start_at": start_at, 72 "end_at": end_at, 73 "plan_id": plan_id, 74 "previous_plan_id": previous_plan_id, 75 "expiration_ts": expiration_ts, 76 "finalized_ts": finalized_ts, 77 } 78 ) 79 80 if new_environments: 81 engine_adapter.delete_from(environments_table, "TRUE") 82 83 index_type = index_text_type(engine_adapter.dialect) 84 85 engine_adapter.insert_append( 86 environments_table, 87 pd.DataFrame(new_environments), 88 columns_to_types={ 89 "name": exp.DataType.build(index_type), 90 "snapshots": exp.DataType.build("text"), 91 "start_at": exp.DataType.build("text"), 92 "end_at": exp.DataType.build("text"), 93 "plan_id": exp.DataType.build("text"), 94 "previous_plan_id": exp.DataType.build("text"), 95 "expiration_ts": exp.DataType.build("bigint"), 96 "finalized_ts": exp.DataType.build("bigint"), 97 }, 98 )
def
migrate(state_sync, **kwargs):
17def migrate(state_sync, **kwargs): # type: ignore 18 engine_adapter = state_sync.engine_adapter 19 schema = state_sync.schema 20 environments_table = "_environments" 21 snapshots_table = "_snapshots" 22 if schema: 23 environments_table = f"{schema}.{environments_table}" 24 snapshots_table = f"{schema}.{snapshots_table}" 25 snapshots_to_kind = {} 26 27 for name, identifier, snapshot in engine_adapter.fetchall( 28 exp.select("name", "identifier", "snapshot").from_(snapshots_table), 29 quote_identifiers=True, 30 ): 31 snapshot = json.loads(snapshot) 32 snapshots_to_kind[(name, identifier)] = snapshot["model"]["kind"]["name"] 33 34 environments = engine_adapter.fetchall( 35 exp.select("*").from_(environments_table), quote_identifiers=True 36 ) 37 new_environments = [] 38 39 for ( 40 name, 41 snapshots, 42 start_at, 43 end_at, 44 plan_id, 45 previous_plan_id, 46 expiration_ts, 47 finalized_ts, 48 ) in environments: 49 new_snapshots = [] 50 51 for snapshot in json.loads(snapshots): 52 snapshot.pop("is_materialized", None) 53 snapshot.pop("is_embedded_kind", None) 54 55 fingerprint = snapshot["fingerprint"] 56 identifier = _hash( 57 [ 58 fingerprint["data_hash"], 59 fingerprint["metadata_hash"], 60 fingerprint["parent_data_hash"], 61 fingerprint["parent_metadata_hash"], 62 ] 63 ) 64 65 snapshot["kind_name"] = snapshots_to_kind.get((snapshot["name"], identifier), "VIEW") 66 new_snapshots.append(snapshot) 67 68 new_environments.append( 69 { 70 "name": name, 71 "snapshots": json.dumps(new_snapshots), 72 "start_at": start_at, 73 "end_at": end_at, 74 "plan_id": plan_id, 75 "previous_plan_id": previous_plan_id, 76 "expiration_ts": expiration_ts, 77 "finalized_ts": finalized_ts, 78 } 79 ) 80 81 if new_environments: 82 engine_adapter.delete_from(environments_table, "TRUE") 83 84 index_type = index_text_type(engine_adapter.dialect) 85 86 engine_adapter.insert_append( 87 environments_table, 88 pd.DataFrame(new_environments), 89 columns_to_types={ 90 "name": exp.DataType.build(index_type), 91 "snapshots": exp.DataType.build("text"), 92 "start_at": exp.DataType.build("text"), 93 "end_at": exp.DataType.build("text"), 94 "plan_id": exp.DataType.build("text"), 95 "previous_plan_id": exp.DataType.build("text"), 96 "expiration_ts": exp.DataType.build("bigint"), 97 "finalized_ts": exp.DataType.build("bigint"), 98 }, 99 )