Normalizes blueprint variables, so Customer_Field is stored as customer_field in the python_env:
MODEL ( ... blueprints ( Customer_Field := 1 ) );
SELECT @customer_field AS col
1""" 2Normalizes blueprint variables, so Customer_Field is stored as customer_field in the `python_env`: 3 4MODEL ( 5 ... 6 blueprints ( 7 Customer_Field := 1 8 ) 9); 10 11SELECT 12 @customer_field AS col 13""" 14 15import json 16import logging 17from dataclasses import dataclass 18 19from sqlglot import exp 20from sqlmesh.core.console import get_console 21from sqlmesh.utils.migration import index_text_type, blob_text_type 22 23 24logger = logging.getLogger(__name__) 25 26 27SQLMESH_BLUEPRINT_VARS = "__sqlmesh__blueprint__vars__" 28 29 30# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration 31@dataclass 32class SqlValue: 33 """A SQL string representing a generated SQLGlot AST.""" 34 35 sql: str 36 37 38def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 39 pass 40 41 42def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 43 import pandas as pd 44 45 snapshots_table = "_snapshots" 46 if schema: 47 snapshots_table = f"{schema}.{snapshots_table}" 48 49 migration_needed = False 50 new_snapshots = [] 51 52 for ( 53 name, 54 identifier, 55 version, 56 snapshot, 57 kind_name, 58 updated_ts, 59 unpaused_ts, 60 ttl_ms, 61 unrestorable, 62 ) in engine_adapter.fetchall( 63 exp.select( 64 "name", 65 "identifier", 66 "version", 67 "snapshot", 68 "kind_name", 69 "updated_ts", 70 "unpaused_ts", 71 "ttl_ms", 72 "unrestorable", 73 ).from_(snapshots_table), 74 quote_identifiers=True, 75 ): 76 parsed_snapshot = json.loads(snapshot) 77 node = parsed_snapshot["node"] 78 python_env = node.get("python_env") or {} 79 80 migrate_snapshot = False 81 82 if blueprint_vars_executable := python_env.get(SQLMESH_BLUEPRINT_VARS): 83 blueprint_vars = eval(blueprint_vars_executable["payload"]) 84 85 for var, value in dict(blueprint_vars).items(): 86 lowercase_var = var.lower() 87 if var != lowercase_var: 88 if lowercase_var in blueprint_vars: 89 get_console().log_warning( 90 "SQLMesh is unable to fully migrate the state database, because the " 91 f"model '{node['name']}' contains two blueprint variables ('{var}' and " 92 f"'{lowercase_var}') that resolve to the same value ('{lowercase_var}'). " 93 "This may result in unexpected changes being reported by the next " 94 "`sqlmesh plan` command. If this happens, consider renaming either variable, " 95 "so that the lowercase version of their names are different." 96 ) 97 else: 98 del blueprint_vars[var] 99 blueprint_vars[lowercase_var] = value 100 migrate_snapshot = True 101 102 if migrate_snapshot: 103 migration_needed = True 104 blueprint_vars_executable["payload"] = repr(blueprint_vars) 105 106 new_snapshots.append( 107 { 108 "name": name, 109 "identifier": identifier, 110 "version": version, 111 "snapshot": json.dumps(parsed_snapshot), 112 "kind_name": kind_name, 113 "updated_ts": updated_ts, 114 "unpaused_ts": unpaused_ts, 115 "ttl_ms": ttl_ms, 116 "unrestorable": unrestorable, 117 } 118 ) 119 120 if migration_needed and new_snapshots: 121 engine_adapter.delete_from(snapshots_table, "TRUE") 122 123 index_type = index_text_type(engine_adapter.dialect) 124 blob_type = blob_text_type(engine_adapter.dialect) 125 126 engine_adapter.insert_append( 127 snapshots_table, 128 pd.DataFrame(new_snapshots), 129 target_columns_to_types={ 130 "name": exp.DataType.build(index_type), 131 "identifier": exp.DataType.build(index_type), 132 "version": exp.DataType.build(index_type), 133 "snapshot": exp.DataType.build(blob_type), 134 "kind_name": exp.DataType.build("text"), 135 "updated_ts": exp.DataType.build("bigint"), 136 "unpaused_ts": exp.DataType.build("bigint"), 137 "ttl_ms": exp.DataType.build("bigint"), 138 "unrestorable": exp.DataType.build("boolean"), 139 }, 140 )
logger =
<Logger sqlmesh.migrations.v0087_normalize_blueprint_variables (WARNING)>
SQLMESH_BLUEPRINT_VARS =
'__sqlmesh__blueprint__vars__'
@dataclass
class
SqlValue:
32@dataclass 33class SqlValue: 34 """A SQL string representing a generated SQLGlot AST.""" 35 36 sql: str
A SQL string representing a generated SQLGlot AST.
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
43def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 44 import pandas as pd 45 46 snapshots_table = "_snapshots" 47 if schema: 48 snapshots_table = f"{schema}.{snapshots_table}" 49 50 migration_needed = False 51 new_snapshots = [] 52 53 for ( 54 name, 55 identifier, 56 version, 57 snapshot, 58 kind_name, 59 updated_ts, 60 unpaused_ts, 61 ttl_ms, 62 unrestorable, 63 ) in engine_adapter.fetchall( 64 exp.select( 65 "name", 66 "identifier", 67 "version", 68 "snapshot", 69 "kind_name", 70 "updated_ts", 71 "unpaused_ts", 72 "ttl_ms", 73 "unrestorable", 74 ).from_(snapshots_table), 75 quote_identifiers=True, 76 ): 77 parsed_snapshot = json.loads(snapshot) 78 node = parsed_snapshot["node"] 79 python_env = node.get("python_env") or {} 80 81 migrate_snapshot = False 82 83 if blueprint_vars_executable := python_env.get(SQLMESH_BLUEPRINT_VARS): 84 blueprint_vars = eval(blueprint_vars_executable["payload"]) 85 86 for var, value in dict(blueprint_vars).items(): 87 lowercase_var = var.lower() 88 if var != lowercase_var: 89 if lowercase_var in blueprint_vars: 90 get_console().log_warning( 91 "SQLMesh is unable to fully migrate the state database, because the " 92 f"model '{node['name']}' contains two blueprint variables ('{var}' and " 93 f"'{lowercase_var}') that resolve to the same value ('{lowercase_var}'). " 94 "This may result in unexpected changes being reported by the next " 95 "`sqlmesh plan` command. If this happens, consider renaming either variable, " 96 "so that the lowercase version of their names are different." 97 ) 98 else: 99 del blueprint_vars[var] 100 blueprint_vars[lowercase_var] = value 101 migrate_snapshot = True 102 103 if migrate_snapshot: 104 migration_needed = True 105 blueprint_vars_executable["payload"] = repr(blueprint_vars) 106 107 new_snapshots.append( 108 { 109 "name": name, 110 "identifier": identifier, 111 "version": version, 112 "snapshot": json.dumps(parsed_snapshot), 113 "kind_name": kind_name, 114 "updated_ts": updated_ts, 115 "unpaused_ts": unpaused_ts, 116 "ttl_ms": ttl_ms, 117 "unrestorable": unrestorable, 118 } 119 ) 120 121 if migration_needed and new_snapshots: 122 engine_adapter.delete_from(snapshots_table, "TRUE") 123 124 index_type = index_text_type(engine_adapter.dialect) 125 blob_type = blob_text_type(engine_adapter.dialect) 126 127 engine_adapter.insert_append( 128 snapshots_table, 129 pd.DataFrame(new_snapshots), 130 target_columns_to_types={ 131 "name": exp.DataType.build(index_type), 132 "identifier": exp.DataType.build(index_type), 133 "version": exp.DataType.build(index_type), 134 "snapshot": exp.DataType.build(blob_type), 135 "kind_name": exp.DataType.build("text"), 136 "updated_ts": exp.DataType.build("bigint"), 137 "unpaused_ts": exp.DataType.build("bigint"), 138 "ttl_ms": exp.DataType.build("bigint"), 139 "unrestorable": exp.DataType.build("boolean"), 140 }, 141 )