Edit on GitHub

Normalizes blueprint variables, so Customer_Field is stored as customer_field in the python_env:

MODEL ( ... blueprints ( Customer_Field := 1 ) );

SELECT @customer_field AS col

  1"""
  2Normalizes blueprint variables, so Customer_Field is stored as customer_field in the `python_env`:
  3
  4MODEL (
  5  ...
  6  blueprints (
  7    Customer_Field := 1
  8  )
  9);
 10
 11SELECT
 12  @customer_field AS col
 13"""
 14
 15import json
 16import logging
 17from dataclasses import dataclass
 18
 19from sqlglot import exp
 20from sqlmesh.core.console import get_console
 21from sqlmesh.utils.migration import index_text_type, blob_text_type
 22
 23
 24logger = logging.getLogger(__name__)
 25
 26
 27SQLMESH_BLUEPRINT_VARS = "__sqlmesh__blueprint__vars__"
 28
 29
 30# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration
 31@dataclass
 32class SqlValue:
 33    """A SQL string representing a generated SQLGlot AST."""
 34
 35    sql: str
 36
 37
 38def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 39    pass
 40
 41
 42def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 43    import pandas as pd
 44
 45    snapshots_table = "_snapshots"
 46    if schema:
 47        snapshots_table = f"{schema}.{snapshots_table}"
 48
 49    migration_needed = False
 50    new_snapshots = []
 51
 52    for (
 53        name,
 54        identifier,
 55        version,
 56        snapshot,
 57        kind_name,
 58        updated_ts,
 59        unpaused_ts,
 60        ttl_ms,
 61        unrestorable,
 62    ) in engine_adapter.fetchall(
 63        exp.select(
 64            "name",
 65            "identifier",
 66            "version",
 67            "snapshot",
 68            "kind_name",
 69            "updated_ts",
 70            "unpaused_ts",
 71            "ttl_ms",
 72            "unrestorable",
 73        ).from_(snapshots_table),
 74        quote_identifiers=True,
 75    ):
 76        parsed_snapshot = json.loads(snapshot)
 77        node = parsed_snapshot["node"]
 78        python_env = node.get("python_env") or {}
 79
 80        migrate_snapshot = False
 81
 82        if blueprint_vars_executable := python_env.get(SQLMESH_BLUEPRINT_VARS):
 83            blueprint_vars = eval(blueprint_vars_executable["payload"])
 84
 85            for var, value in dict(blueprint_vars).items():
 86                lowercase_var = var.lower()
 87                if var != lowercase_var:
 88                    if lowercase_var in blueprint_vars:
 89                        get_console().log_warning(
 90                            "SQLMesh is unable to fully migrate the state database, because the "
 91                            f"model '{node['name']}' contains two blueprint variables ('{var}' and "
 92                            f"'{lowercase_var}') that resolve to the same value ('{lowercase_var}'). "
 93                            "This may result in unexpected changes being reported by the next "
 94                            "`sqlmesh plan` command. If this happens, consider renaming either variable, "
 95                            "so that the lowercase version of their names are different."
 96                        )
 97                    else:
 98                        del blueprint_vars[var]
 99                        blueprint_vars[lowercase_var] = value
100                        migrate_snapshot = True
101
102            if migrate_snapshot:
103                migration_needed = True
104                blueprint_vars_executable["payload"] = repr(blueprint_vars)
105
106        new_snapshots.append(
107            {
108                "name": name,
109                "identifier": identifier,
110                "version": version,
111                "snapshot": json.dumps(parsed_snapshot),
112                "kind_name": kind_name,
113                "updated_ts": updated_ts,
114                "unpaused_ts": unpaused_ts,
115                "ttl_ms": ttl_ms,
116                "unrestorable": unrestorable,
117            }
118        )
119
120    if migration_needed and new_snapshots:
121        engine_adapter.delete_from(snapshots_table, "TRUE")
122
123        index_type = index_text_type(engine_adapter.dialect)
124        blob_type = blob_text_type(engine_adapter.dialect)
125
126        engine_adapter.insert_append(
127            snapshots_table,
128            pd.DataFrame(new_snapshots),
129            target_columns_to_types={
130                "name": exp.DataType.build(index_type),
131                "identifier": exp.DataType.build(index_type),
132                "version": exp.DataType.build(index_type),
133                "snapshot": exp.DataType.build(blob_type),
134                "kind_name": exp.DataType.build("text"),
135                "updated_ts": exp.DataType.build("bigint"),
136                "unpaused_ts": exp.DataType.build("bigint"),
137                "ttl_ms": exp.DataType.build("bigint"),
138                "unrestorable": exp.DataType.build("boolean"),
139            },
140        )
SQLMESH_BLUEPRINT_VARS = '__sqlmesh__blueprint__vars__'
@dataclass
class SqlValue:
32@dataclass
33class SqlValue:
34    """A SQL string representing a generated SQLGlot AST."""
35
36    sql: str

A SQL string representing a generated SQLGlot AST.

SqlValue(sql: str)
sql: str
def migrate_schemas(engine_adapter, schema, **kwargs):
39def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
40    pass
def migrate_rows(engine_adapter, schema, **kwargs):
 43def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 44    import pandas as pd
 45
 46    snapshots_table = "_snapshots"
 47    if schema:
 48        snapshots_table = f"{schema}.{snapshots_table}"
 49
 50    migration_needed = False
 51    new_snapshots = []
 52
 53    for (
 54        name,
 55        identifier,
 56        version,
 57        snapshot,
 58        kind_name,
 59        updated_ts,
 60        unpaused_ts,
 61        ttl_ms,
 62        unrestorable,
 63    ) in engine_adapter.fetchall(
 64        exp.select(
 65            "name",
 66            "identifier",
 67            "version",
 68            "snapshot",
 69            "kind_name",
 70            "updated_ts",
 71            "unpaused_ts",
 72            "ttl_ms",
 73            "unrestorable",
 74        ).from_(snapshots_table),
 75        quote_identifiers=True,
 76    ):
 77        parsed_snapshot = json.loads(snapshot)
 78        node = parsed_snapshot["node"]
 79        python_env = node.get("python_env") or {}
 80
 81        migrate_snapshot = False
 82
 83        if blueprint_vars_executable := python_env.get(SQLMESH_BLUEPRINT_VARS):
 84            blueprint_vars = eval(blueprint_vars_executable["payload"])
 85
 86            for var, value in dict(blueprint_vars).items():
 87                lowercase_var = var.lower()
 88                if var != lowercase_var:
 89                    if lowercase_var in blueprint_vars:
 90                        get_console().log_warning(
 91                            "SQLMesh is unable to fully migrate the state database, because the "
 92                            f"model '{node['name']}' contains two blueprint variables ('{var}' and "
 93                            f"'{lowercase_var}') that resolve to the same value ('{lowercase_var}'). "
 94                            "This may result in unexpected changes being reported by the next "
 95                            "`sqlmesh plan` command. If this happens, consider renaming either variable, "
 96                            "so that the lowercase version of their names are different."
 97                        )
 98                    else:
 99                        del blueprint_vars[var]
100                        blueprint_vars[lowercase_var] = value
101                        migrate_snapshot = True
102
103            if migrate_snapshot:
104                migration_needed = True
105                blueprint_vars_executable["payload"] = repr(blueprint_vars)
106
107        new_snapshots.append(
108            {
109                "name": name,
110                "identifier": identifier,
111                "version": version,
112                "snapshot": json.dumps(parsed_snapshot),
113                "kind_name": kind_name,
114                "updated_ts": updated_ts,
115                "unpaused_ts": unpaused_ts,
116                "ttl_ms": ttl_ms,
117                "unrestorable": unrestorable,
118            }
119        )
120
121    if migration_needed and new_snapshots:
122        engine_adapter.delete_from(snapshots_table, "TRUE")
123
124        index_type = index_text_type(engine_adapter.dialect)
125        blob_type = blob_text_type(engine_adapter.dialect)
126
127        engine_adapter.insert_append(
128            snapshots_table,
129            pd.DataFrame(new_snapshots),
130            target_columns_to_types={
131                "name": exp.DataType.build(index_type),
132                "identifier": exp.DataType.build(index_type),
133                "version": exp.DataType.build(index_type),
134                "snapshot": exp.DataType.build(blob_type),
135                "kind_name": exp.DataType.build("text"),
136                "updated_ts": exp.DataType.build("bigint"),
137                "unpaused_ts": exp.DataType.build("bigint"),
138                "ttl_ms": exp.DataType.build("bigint"),
139                "unrestorable": exp.DataType.build("boolean"),
140            },
141        )