When serializing some objects, like __sqlmesh__vars__, the order of keys in the dictionary were not deterministic
and therefore this migration applies deterministic sorting to the keys of the dictionary.
1""" 2When serializing some objects, like `__sqlmesh__vars__`, the order of keys in the dictionary were not deterministic 3and therefore this migration applies deterministic sorting to the keys of the dictionary. 4""" 5 6import json 7import logging 8import typing as t 9from dataclasses import dataclass 10 11from sqlglot import exp 12 13from sqlmesh.utils.migration import index_text_type, blob_text_type 14 15 16logger = logging.getLogger(__name__) 17 18 19KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"] 20 21 22# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration 23@dataclass 24class SqlValue: 25 """A SQL string representing a generated SQLGlot AST.""" 26 27 sql: str 28 29 30def _dict_sort(obj: t.Any) -> str: 31 try: 32 if isinstance(obj, dict): 33 obj = dict(sorted(obj.items(), key=lambda x: str(x[0]))) 34 except Exception: 35 logger.warning("Failed to sort non-recursive dict", exc_info=True) 36 return repr(obj) 37 38 39def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 40 pass 41 42 43def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 44 import pandas as pd 45 46 snapshots_table = "_snapshots" 47 if schema: 48 snapshots_table = f"{schema}.{snapshots_table}" 49 50 migration_needed = False 51 new_snapshots = [] 52 53 for ( 54 name, 55 identifier, 56 version, 57 snapshot, 58 kind_name, 59 updated_ts, 60 unpaused_ts, 61 ttl_ms, 62 unrestorable, 63 ) in engine_adapter.fetchall( 64 exp.select( 65 "name", 66 "identifier", 67 "version", 68 "snapshot", 69 "kind_name", 70 "updated_ts", 71 "unpaused_ts", 72 "ttl_ms", 73 "unrestorable", 74 ).from_(snapshots_table), 75 quote_identifiers=True, 76 ): 77 parsed_snapshot = json.loads(snapshot) 78 python_env = parsed_snapshot["node"].get("python_env") 79 80 if python_env: 81 for key, executable in python_env.items(): 82 if key not in KEYS_TO_MAKE_DETERMINISTIC: 83 continue 84 if isinstance(executable, dict) and executable.get("kind") == "value": 85 old_payload = executable["payload"] 86 try: 87 # Try to parse the old payload and re-serialize it deterministically 88 parsed_value = eval(old_payload) 89 new_payload = _dict_sort(parsed_value) 90 91 # Only update if the representation changed 92 if old_payload != new_payload: 93 executable["payload"] = new_payload 94 migration_needed = True 95 except Exception: 96 # If we still can't eval it, leave it as-is 97 logger.warning("Exception trying to eval payload", exc_info=True) 98 99 new_snapshots.append( 100 { 101 "name": name, 102 "identifier": identifier, 103 "version": version, 104 "snapshot": json.dumps(parsed_snapshot), 105 "kind_name": kind_name, 106 "updated_ts": updated_ts, 107 "unpaused_ts": unpaused_ts, 108 "ttl_ms": ttl_ms, 109 "unrestorable": unrestorable, 110 } 111 ) 112 113 if migration_needed and new_snapshots: 114 engine_adapter.delete_from(snapshots_table, "TRUE") 115 116 index_type = index_text_type(engine_adapter.dialect) 117 blob_type = blob_text_type(engine_adapter.dialect) 118 119 engine_adapter.insert_append( 120 snapshots_table, 121 pd.DataFrame(new_snapshots), 122 target_columns_to_types={ 123 "name": exp.DataType.build(index_type), 124 "identifier": exp.DataType.build(index_type), 125 "version": exp.DataType.build(index_type), 126 "snapshot": exp.DataType.build(blob_type), 127 "kind_name": exp.DataType.build("text"), 128 "updated_ts": exp.DataType.build("bigint"), 129 "unpaused_ts": exp.DataType.build("bigint"), 130 "ttl_ms": exp.DataType.build("bigint"), 131 "unrestorable": exp.DataType.build("boolean"), 132 }, 133 )
logger =
<Logger sqlmesh.migrations.v0085_deterministic_repr (WARNING)>
KEYS_TO_MAKE_DETERMINISTIC =
['__sqlmesh__vars__', '__sqlmesh__blueprint__vars__']
@dataclass
class
SqlValue:
24@dataclass 25class SqlValue: 26 """A SQL string representing a generated SQLGlot AST.""" 27 28 sql: str
A SQL string representing a generated SQLGlot AST.
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
44def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 45 import pandas as pd 46 47 snapshots_table = "_snapshots" 48 if schema: 49 snapshots_table = f"{schema}.{snapshots_table}" 50 51 migration_needed = False 52 new_snapshots = [] 53 54 for ( 55 name, 56 identifier, 57 version, 58 snapshot, 59 kind_name, 60 updated_ts, 61 unpaused_ts, 62 ttl_ms, 63 unrestorable, 64 ) in engine_adapter.fetchall( 65 exp.select( 66 "name", 67 "identifier", 68 "version", 69 "snapshot", 70 "kind_name", 71 "updated_ts", 72 "unpaused_ts", 73 "ttl_ms", 74 "unrestorable", 75 ).from_(snapshots_table), 76 quote_identifiers=True, 77 ): 78 parsed_snapshot = json.loads(snapshot) 79 python_env = parsed_snapshot["node"].get("python_env") 80 81 if python_env: 82 for key, executable in python_env.items(): 83 if key not in KEYS_TO_MAKE_DETERMINISTIC: 84 continue 85 if isinstance(executable, dict) and executable.get("kind") == "value": 86 old_payload = executable["payload"] 87 try: 88 # Try to parse the old payload and re-serialize it deterministically 89 parsed_value = eval(old_payload) 90 new_payload = _dict_sort(parsed_value) 91 92 # Only update if the representation changed 93 if old_payload != new_payload: 94 executable["payload"] = new_payload 95 migration_needed = True 96 except Exception: 97 # If we still can't eval it, leave it as-is 98 logger.warning("Exception trying to eval payload", exc_info=True) 99 100 new_snapshots.append( 101 { 102 "name": name, 103 "identifier": identifier, 104 "version": version, 105 "snapshot": json.dumps(parsed_snapshot), 106 "kind_name": kind_name, 107 "updated_ts": updated_ts, 108 "unpaused_ts": unpaused_ts, 109 "ttl_ms": ttl_ms, 110 "unrestorable": unrestorable, 111 } 112 ) 113 114 if migration_needed and new_snapshots: 115 engine_adapter.delete_from(snapshots_table, "TRUE") 116 117 index_type = index_text_type(engine_adapter.dialect) 118 blob_type = blob_text_type(engine_adapter.dialect) 119 120 engine_adapter.insert_append( 121 snapshots_table, 122 pd.DataFrame(new_snapshots), 123 target_columns_to_types={ 124 "name": exp.DataType.build(index_type), 125 "identifier": exp.DataType.build(index_type), 126 "version": exp.DataType.build(index_type), 127 "snapshot": exp.DataType.build(blob_type), 128 "kind_name": exp.DataType.build("text"), 129 "updated_ts": exp.DataType.build("bigint"), 130 "unpaused_ts": exp.DataType.build("bigint"), 131 "ttl_ms": exp.DataType.build("bigint"), 132 "unrestorable": exp.DataType.build("boolean"), 133 }, 134 )