Edit on GitHub

sqlmesh.migrations.v0086_check_deterministic_bug

 1import json
 2import logging
 3
 4from sqlglot import exp
 5
 6from sqlmesh.core.console import get_console
 7
 8
 9logger = logging.getLogger(__name__)
10KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
11
12
13def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
14    pass
15
16
17def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
18    snapshots_table = "_snapshots"
19    versions_table = "_versions"
20    if schema:
21        snapshots_table = f"{schema}.{snapshots_table}"
22        versions_table = f"{schema}.{versions_table}"
23
24    result = engine_adapter.fetchone(
25        exp.select("schema_version").from_(versions_table), quote_identifiers=True
26    )
27    if not result:
28        # This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug
29        return
30    schema_version = result[0]
31    if schema_version < 85:
32        # The project was not exposed to the bugged 85 migration, so we can skip it.
33        return
34
35    warning = (
36        "SQLMesh detected that it may not be able to fully migrate the state database. This should not impact "
37        "the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` "
38        "command. Please run `sqlmesh diff prod` after the migration has completed, before making any new "
39        "changes. If any unexpected changes are reported, consider running a forward-only plan to apply these "
40        "changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. "
41        "See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n"
42    )
43
44    for (
45        name,
46        identifier,
47        version,
48        snapshot,
49        kind_name,
50        updated_ts,
51        unpaused_ts,
52        ttl_ms,
53        unrestorable,
54    ) in engine_adapter.fetchall(
55        exp.select(
56            "name",
57            "identifier",
58            "version",
59            "snapshot",
60            "kind_name",
61            "updated_ts",
62            "unpaused_ts",
63            "ttl_ms",
64            "unrestorable",
65        ).from_(snapshots_table),
66        quote_identifiers=True,
67    ):
68        parsed_snapshot = json.loads(snapshot)
69        python_env = parsed_snapshot["node"].get("python_env")
70
71        if python_env:
72            for key, executable in python_env.items():
73                if (
74                    key not in KEYS_TO_MAKE_DETERMINISTIC
75                    and isinstance(executable, dict)
76                    and executable.get("kind") == "value"
77                ):
78                    try:
79                        parsed_value = eval(executable["payload"])
80                        if isinstance(parsed_value, dict):
81                            get_console().log_warning(warning)
82                            return
83                    except Exception:
84                        logger.warning("Exception trying to eval payload", exc_info=True)
KEYS_TO_MAKE_DETERMINISTIC = ['__sqlmesh__vars__', '__sqlmesh__blueprint__vars__']
def migrate_schemas(engine_adapter, schema, **kwargs):
14def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
15    pass
def migrate_rows(engine_adapter, schema, **kwargs):
18def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
19    snapshots_table = "_snapshots"
20    versions_table = "_versions"
21    if schema:
22        snapshots_table = f"{schema}.{snapshots_table}"
23        versions_table = f"{schema}.{versions_table}"
24
25    result = engine_adapter.fetchone(
26        exp.select("schema_version").from_(versions_table), quote_identifiers=True
27    )
28    if not result:
29        # This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug
30        return
31    schema_version = result[0]
32    if schema_version < 85:
33        # The project was not exposed to the bugged 85 migration, so we can skip it.
34        return
35
36    warning = (
37        "SQLMesh detected that it may not be able to fully migrate the state database. This should not impact "
38        "the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` "
39        "command. Please run `sqlmesh diff prod` after the migration has completed, before making any new "
40        "changes. If any unexpected changes are reported, consider running a forward-only plan to apply these "
41        "changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. "
42        "See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n"
43    )
44
45    for (
46        name,
47        identifier,
48        version,
49        snapshot,
50        kind_name,
51        updated_ts,
52        unpaused_ts,
53        ttl_ms,
54        unrestorable,
55    ) in engine_adapter.fetchall(
56        exp.select(
57            "name",
58            "identifier",
59            "version",
60            "snapshot",
61            "kind_name",
62            "updated_ts",
63            "unpaused_ts",
64            "ttl_ms",
65            "unrestorable",
66        ).from_(snapshots_table),
67        quote_identifiers=True,
68    ):
69        parsed_snapshot = json.loads(snapshot)
70        python_env = parsed_snapshot["node"].get("python_env")
71
72        if python_env:
73            for key, executable in python_env.items():
74                if (
75                    key not in KEYS_TO_MAKE_DETERMINISTIC
76                    and isinstance(executable, dict)
77                    and executable.get("kind") == "value"
78                ):
79                    try:
80                        parsed_value = eval(executable["payload"])
81                        if isinstance(parsed_value, dict):
82                            get_console().log_warning(warning)
83                            return
84                    except Exception:
85                        logger.warning("Exception trying to eval payload", exc_info=True)