sqlmesh.migrations.v0086_check_deterministic_bug
1import json 2import logging 3 4from sqlglot import exp 5 6from sqlmesh.core.console import get_console 7 8 9logger = logging.getLogger(__name__) 10KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"] 11 12 13def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 14 pass 15 16 17def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 18 snapshots_table = "_snapshots" 19 versions_table = "_versions" 20 if schema: 21 snapshots_table = f"{schema}.{snapshots_table}" 22 versions_table = f"{schema}.{versions_table}" 23 24 result = engine_adapter.fetchone( 25 exp.select("schema_version").from_(versions_table), quote_identifiers=True 26 ) 27 if not result: 28 # This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug 29 return 30 schema_version = result[0] 31 if schema_version < 85: 32 # The project was not exposed to the bugged 85 migration, so we can skip it. 33 return 34 35 warning = ( 36 "SQLMesh detected that it may not be able to fully migrate the state database. This should not impact " 37 "the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` " 38 "command. Please run `sqlmesh diff prod` after the migration has completed, before making any new " 39 "changes. If any unexpected changes are reported, consider running a forward-only plan to apply these " 40 "changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. " 41 "See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n" 42 ) 43 44 for ( 45 name, 46 identifier, 47 version, 48 snapshot, 49 kind_name, 50 updated_ts, 51 unpaused_ts, 52 ttl_ms, 53 unrestorable, 54 ) in engine_adapter.fetchall( 55 exp.select( 56 "name", 57 "identifier", 58 "version", 59 "snapshot", 60 "kind_name", 61 "updated_ts", 62 "unpaused_ts", 63 "ttl_ms", 64 "unrestorable", 65 ).from_(snapshots_table), 66 quote_identifiers=True, 67 ): 68 parsed_snapshot = json.loads(snapshot) 69 python_env = parsed_snapshot["node"].get("python_env") 70 71 if python_env: 72 for key, executable in python_env.items(): 73 if ( 74 key not in KEYS_TO_MAKE_DETERMINISTIC 75 and isinstance(executable, dict) 76 and executable.get("kind") == "value" 77 ): 78 try: 79 parsed_value = eval(executable["payload"]) 80 if isinstance(parsed_value, dict): 81 get_console().log_warning(warning) 82 return 83 except Exception: 84 logger.warning("Exception trying to eval payload", exc_info=True)
logger =
<Logger sqlmesh.migrations.v0086_check_deterministic_bug (WARNING)>
KEYS_TO_MAKE_DETERMINISTIC =
['__sqlmesh__vars__', '__sqlmesh__blueprint__vars__']
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
18def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 19 snapshots_table = "_snapshots" 20 versions_table = "_versions" 21 if schema: 22 snapshots_table = f"{schema}.{snapshots_table}" 23 versions_table = f"{schema}.{versions_table}" 24 25 result = engine_adapter.fetchone( 26 exp.select("schema_version").from_(versions_table), quote_identifiers=True 27 ) 28 if not result: 29 # This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug 30 return 31 schema_version = result[0] 32 if schema_version < 85: 33 # The project was not exposed to the bugged 85 migration, so we can skip it. 34 return 35 36 warning = ( 37 "SQLMesh detected that it may not be able to fully migrate the state database. This should not impact " 38 "the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` " 39 "command. Please run `sqlmesh diff prod` after the migration has completed, before making any new " 40 "changes. If any unexpected changes are reported, consider running a forward-only plan to apply these " 41 "changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. " 42 "See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n" 43 ) 44 45 for ( 46 name, 47 identifier, 48 version, 49 snapshot, 50 kind_name, 51 updated_ts, 52 unpaused_ts, 53 ttl_ms, 54 unrestorable, 55 ) in engine_adapter.fetchall( 56 exp.select( 57 "name", 58 "identifier", 59 "version", 60 "snapshot", 61 "kind_name", 62 "updated_ts", 63 "unpaused_ts", 64 "ttl_ms", 65 "unrestorable", 66 ).from_(snapshots_table), 67 quote_identifiers=True, 68 ): 69 parsed_snapshot = json.loads(snapshot) 70 python_env = parsed_snapshot["node"].get("python_env") 71 72 if python_env: 73 for key, executable in python_env.items(): 74 if ( 75 key not in KEYS_TO_MAKE_DETERMINISTIC 76 and isinstance(executable, dict) 77 and executable.get("kind") == "value" 78 ): 79 try: 80 parsed_value = eval(executable["payload"]) 81 if isinstance(parsed_value, dict): 82 get_console().log_warning(warning) 83 return 84 except Exception: 85 logger.warning("Exception trying to eval payload", exc_info=True)