Edit on GitHub

Replace 'dbt_name' with 'dbt_node_info' in the snapshot definition

  1"""Replace 'dbt_name' with 'dbt_node_info' in the snapshot definition"""
  2
  3import json
  4from sqlglot import exp
  5from sqlmesh.utils.migration import index_text_type, blob_text_type
  6
  7
  8def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
  9    pass
 10
 11
 12def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 13    import pandas as pd
 14
 15    snapshots_table = "_snapshots"
 16    if schema:
 17        snapshots_table = f"{schema}.{snapshots_table}"
 18
 19    index_type = index_text_type(engine_adapter.dialect)
 20    blob_type = blob_text_type(engine_adapter.dialect)
 21
 22    new_snapshots = []
 23    migration_needed = False
 24
 25    for (
 26        name,
 27        identifier,
 28        version,
 29        snapshot,
 30        kind_name,
 31        updated_ts,
 32        unpaused_ts,
 33        ttl_ms,
 34        unrestorable,
 35        forward_only,
 36        dev_version,
 37        fingerprint,
 38    ) in engine_adapter.fetchall(
 39        exp.select(
 40            "name",
 41            "identifier",
 42            "version",
 43            "snapshot",
 44            "kind_name",
 45            "updated_ts",
 46            "unpaused_ts",
 47            "ttl_ms",
 48            "unrestorable",
 49            "forward_only",
 50            "dev_version",
 51            "fingerprint",
 52        ).from_(snapshots_table),
 53        quote_identifiers=True,
 54    ):
 55        parsed_snapshot = json.loads(snapshot)
 56        if dbt_name := parsed_snapshot["node"].get("dbt_name"):
 57            parsed_snapshot["node"].pop("dbt_name")
 58            parsed_snapshot["node"]["dbt_node_info"] = {
 59                "unique_id": dbt_name,
 60                # these will get populated as metadata-only changes on the next plan
 61                "name": "",
 62                "fqn": "",
 63            }
 64            migration_needed = True
 65
 66        new_snapshots.append(
 67            {
 68                "name": name,
 69                "identifier": identifier,
 70                "version": version,
 71                "snapshot": json.dumps(parsed_snapshot),
 72                "kind_name": kind_name,
 73                "updated_ts": updated_ts,
 74                "unpaused_ts": unpaused_ts,
 75                "ttl_ms": ttl_ms,
 76                "unrestorable": unrestorable,
 77                "forward_only": forward_only,
 78                "dev_version": dev_version,
 79                "fingerprint": fingerprint,
 80            }
 81        )
 82
 83    if migration_needed and new_snapshots:
 84        engine_adapter.delete_from(snapshots_table, "TRUE")
 85
 86        engine_adapter.insert_append(
 87            snapshots_table,
 88            pd.DataFrame(new_snapshots),
 89            target_columns_to_types={
 90                "name": exp.DataType.build(index_type),
 91                "identifier": exp.DataType.build(index_type),
 92                "version": exp.DataType.build(index_type),
 93                "snapshot": exp.DataType.build(blob_type),
 94                "kind_name": exp.DataType.build(index_type),
 95                "updated_ts": exp.DataType.build("bigint"),
 96                "unpaused_ts": exp.DataType.build("bigint"),
 97                "ttl_ms": exp.DataType.build("bigint"),
 98                "unrestorable": exp.DataType.build("boolean"),
 99                "forward_only": exp.DataType.build("boolean"),
100                "dev_version": exp.DataType.build(index_type),
101                "fingerprint": exp.DataType.build(blob_type),
102            },
103        )
def migrate_schemas(engine_adapter, schema, **kwargs):
 9def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
10    pass
def migrate_rows(engine_adapter, schema, **kwargs):
 13def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 14    import pandas as pd
 15
 16    snapshots_table = "_snapshots"
 17    if schema:
 18        snapshots_table = f"{schema}.{snapshots_table}"
 19
 20    index_type = index_text_type(engine_adapter.dialect)
 21    blob_type = blob_text_type(engine_adapter.dialect)
 22
 23    new_snapshots = []
 24    migration_needed = False
 25
 26    for (
 27        name,
 28        identifier,
 29        version,
 30        snapshot,
 31        kind_name,
 32        updated_ts,
 33        unpaused_ts,
 34        ttl_ms,
 35        unrestorable,
 36        forward_only,
 37        dev_version,
 38        fingerprint,
 39    ) in engine_adapter.fetchall(
 40        exp.select(
 41            "name",
 42            "identifier",
 43            "version",
 44            "snapshot",
 45            "kind_name",
 46            "updated_ts",
 47            "unpaused_ts",
 48            "ttl_ms",
 49            "unrestorable",
 50            "forward_only",
 51            "dev_version",
 52            "fingerprint",
 53        ).from_(snapshots_table),
 54        quote_identifiers=True,
 55    ):
 56        parsed_snapshot = json.loads(snapshot)
 57        if dbt_name := parsed_snapshot["node"].get("dbt_name"):
 58            parsed_snapshot["node"].pop("dbt_name")
 59            parsed_snapshot["node"]["dbt_node_info"] = {
 60                "unique_id": dbt_name,
 61                # these will get populated as metadata-only changes on the next plan
 62                "name": "",
 63                "fqn": "",
 64            }
 65            migration_needed = True
 66
 67        new_snapshots.append(
 68            {
 69                "name": name,
 70                "identifier": identifier,
 71                "version": version,
 72                "snapshot": json.dumps(parsed_snapshot),
 73                "kind_name": kind_name,
 74                "updated_ts": updated_ts,
 75                "unpaused_ts": unpaused_ts,
 76                "ttl_ms": ttl_ms,
 77                "unrestorable": unrestorable,
 78                "forward_only": forward_only,
 79                "dev_version": dev_version,
 80                "fingerprint": fingerprint,
 81            }
 82        )
 83
 84    if migration_needed and new_snapshots:
 85        engine_adapter.delete_from(snapshots_table, "TRUE")
 86
 87        engine_adapter.insert_append(
 88            snapshots_table,
 89            pd.DataFrame(new_snapshots),
 90            target_columns_to_types={
 91                "name": exp.DataType.build(index_type),
 92                "identifier": exp.DataType.build(index_type),
 93                "version": exp.DataType.build(index_type),
 94                "snapshot": exp.DataType.build(blob_type),
 95                "kind_name": exp.DataType.build(index_type),
 96                "updated_ts": exp.DataType.build("bigint"),
 97                "unpaused_ts": exp.DataType.build("bigint"),
 98                "ttl_ms": exp.DataType.build("bigint"),
 99                "unrestorable": exp.DataType.build("boolean"),
100                "forward_only": exp.DataType.build("boolean"),
101                "dev_version": exp.DataType.build(index_type),
102                "fingerprint": exp.DataType.build(blob_type),
103            },
104        )