Edit on GitHub

When serializing some objects, like __sqlmesh__vars__, the order of keys in the dictionary were not deterministic and therefore this migration applies deterministic sorting to the keys of the dictionary.

  1"""
  2When serializing some objects, like `__sqlmesh__vars__`, the order of keys in the dictionary were not deterministic
  3and therefore this migration applies deterministic sorting to the keys of the dictionary.
  4"""
  5
  6import json
  7import logging
  8import typing as t
  9from dataclasses import dataclass
 10
 11from sqlglot import exp
 12
 13from sqlmesh.utils.migration import index_text_type, blob_text_type
 14
 15
 16logger = logging.getLogger(__name__)
 17
 18
 19KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
 20
 21
 22# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration
 23@dataclass
 24class SqlValue:
 25    """A SQL string representing a generated SQLGlot AST."""
 26
 27    sql: str
 28
 29
 30def _dict_sort(obj: t.Any) -> str:
 31    try:
 32        if isinstance(obj, dict):
 33            obj = dict(sorted(obj.items(), key=lambda x: str(x[0])))
 34    except Exception:
 35        logger.warning("Failed to sort non-recursive dict", exc_info=True)
 36    return repr(obj)
 37
 38
 39def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 40    pass
 41
 42
 43def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 44    import pandas as pd
 45
 46    snapshots_table = "_snapshots"
 47    if schema:
 48        snapshots_table = f"{schema}.{snapshots_table}"
 49
 50    migration_needed = False
 51    new_snapshots = []
 52
 53    for (
 54        name,
 55        identifier,
 56        version,
 57        snapshot,
 58        kind_name,
 59        updated_ts,
 60        unpaused_ts,
 61        ttl_ms,
 62        unrestorable,
 63    ) in engine_adapter.fetchall(
 64        exp.select(
 65            "name",
 66            "identifier",
 67            "version",
 68            "snapshot",
 69            "kind_name",
 70            "updated_ts",
 71            "unpaused_ts",
 72            "ttl_ms",
 73            "unrestorable",
 74        ).from_(snapshots_table),
 75        quote_identifiers=True,
 76    ):
 77        parsed_snapshot = json.loads(snapshot)
 78        python_env = parsed_snapshot["node"].get("python_env")
 79
 80        if python_env:
 81            for key, executable in python_env.items():
 82                if key not in KEYS_TO_MAKE_DETERMINISTIC:
 83                    continue
 84                if isinstance(executable, dict) and executable.get("kind") == "value":
 85                    old_payload = executable["payload"]
 86                    try:
 87                        # Try to parse the old payload and re-serialize it deterministically
 88                        parsed_value = eval(old_payload)
 89                        new_payload = _dict_sort(parsed_value)
 90
 91                        # Only update if the representation changed
 92                        if old_payload != new_payload:
 93                            executable["payload"] = new_payload
 94                            migration_needed = True
 95                    except Exception:
 96                        # If we still can't eval it, leave it as-is
 97                        logger.warning("Exception trying to eval payload", exc_info=True)
 98
 99        new_snapshots.append(
100            {
101                "name": name,
102                "identifier": identifier,
103                "version": version,
104                "snapshot": json.dumps(parsed_snapshot),
105                "kind_name": kind_name,
106                "updated_ts": updated_ts,
107                "unpaused_ts": unpaused_ts,
108                "ttl_ms": ttl_ms,
109                "unrestorable": unrestorable,
110            }
111        )
112
113    if migration_needed and new_snapshots:
114        engine_adapter.delete_from(snapshots_table, "TRUE")
115
116        index_type = index_text_type(engine_adapter.dialect)
117        blob_type = blob_text_type(engine_adapter.dialect)
118
119        engine_adapter.insert_append(
120            snapshots_table,
121            pd.DataFrame(new_snapshots),
122            target_columns_to_types={
123                "name": exp.DataType.build(index_type),
124                "identifier": exp.DataType.build(index_type),
125                "version": exp.DataType.build(index_type),
126                "snapshot": exp.DataType.build(blob_type),
127                "kind_name": exp.DataType.build("text"),
128                "updated_ts": exp.DataType.build("bigint"),
129                "unpaused_ts": exp.DataType.build("bigint"),
130                "ttl_ms": exp.DataType.build("bigint"),
131                "unrestorable": exp.DataType.build("boolean"),
132            },
133        )
logger = <Logger sqlmesh.migrations.v0085_deterministic_repr (WARNING)>
KEYS_TO_MAKE_DETERMINISTIC = ['__sqlmesh__vars__', '__sqlmesh__blueprint__vars__']
@dataclass
class SqlValue:
24@dataclass
25class SqlValue:
26    """A SQL string representing a generated SQLGlot AST."""
27
28    sql: str

A SQL string representing a generated SQLGlot AST.

SqlValue(sql: str)
sql: str
def migrate_schemas(engine_adapter, schema, **kwargs):
40def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
41    pass
def migrate_rows(engine_adapter, schema, **kwargs):
 44def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 45    import pandas as pd
 46
 47    snapshots_table = "_snapshots"
 48    if schema:
 49        snapshots_table = f"{schema}.{snapshots_table}"
 50
 51    migration_needed = False
 52    new_snapshots = []
 53
 54    for (
 55        name,
 56        identifier,
 57        version,
 58        snapshot,
 59        kind_name,
 60        updated_ts,
 61        unpaused_ts,
 62        ttl_ms,
 63        unrestorable,
 64    ) in engine_adapter.fetchall(
 65        exp.select(
 66            "name",
 67            "identifier",
 68            "version",
 69            "snapshot",
 70            "kind_name",
 71            "updated_ts",
 72            "unpaused_ts",
 73            "ttl_ms",
 74            "unrestorable",
 75        ).from_(snapshots_table),
 76        quote_identifiers=True,
 77    ):
 78        parsed_snapshot = json.loads(snapshot)
 79        python_env = parsed_snapshot["node"].get("python_env")
 80
 81        if python_env:
 82            for key, executable in python_env.items():
 83                if key not in KEYS_TO_MAKE_DETERMINISTIC:
 84                    continue
 85                if isinstance(executable, dict) and executable.get("kind") == "value":
 86                    old_payload = executable["payload"]
 87                    try:
 88                        # Try to parse the old payload and re-serialize it deterministically
 89                        parsed_value = eval(old_payload)
 90                        new_payload = _dict_sort(parsed_value)
 91
 92                        # Only update if the representation changed
 93                        if old_payload != new_payload:
 94                            executable["payload"] = new_payload
 95                            migration_needed = True
 96                    except Exception:
 97                        # If we still can't eval it, leave it as-is
 98                        logger.warning("Exception trying to eval payload", exc_info=True)
 99
100        new_snapshots.append(
101            {
102                "name": name,
103                "identifier": identifier,
104                "version": version,
105                "snapshot": json.dumps(parsed_snapshot),
106                "kind_name": kind_name,
107                "updated_ts": updated_ts,
108                "unpaused_ts": unpaused_ts,
109                "ttl_ms": ttl_ms,
110                "unrestorable": unrestorable,
111            }
112        )
113
114    if migration_needed and new_snapshots:
115        engine_adapter.delete_from(snapshots_table, "TRUE")
116
117        index_type = index_text_type(engine_adapter.dialect)
118        blob_type = blob_text_type(engine_adapter.dialect)
119
120        engine_adapter.insert_append(
121            snapshots_table,
122            pd.DataFrame(new_snapshots),
123            target_columns_to_types={
124                "name": exp.DataType.build(index_type),
125                "identifier": exp.DataType.build(index_type),
126                "version": exp.DataType.build(index_type),
127                "snapshot": exp.DataType.build(blob_type),
128                "kind_name": exp.DataType.build("text"),
129                "updated_ts": exp.DataType.build("bigint"),
130                "unpaused_ts": exp.DataType.build("bigint"),
131                "ttl_ms": exp.DataType.build("bigint"),
132                "unrestorable": exp.DataType.build("boolean"),
133            },
134        )