Edit on GitHub

Normalize intervals and fix missing change category.

  1"""Normalize intervals and fix missing change category."""
  2
  3import json
  4import zlib
  5
  6import pandas as pd
  7from sqlglot import exp
  8
  9from sqlmesh.utils import random_id
 10from sqlmesh.utils.date import now_timestamp
 11from sqlmesh.utils.migration import index_text_type
 12
 13
 14def migrate(state_sync, **kwargs):  # type: ignore
 15    engine_adapter = state_sync.engine_adapter
 16    schema = state_sync.schema
 17    snapshots_table = "_snapshots"
 18    intervals_table = "_intervals"
 19    if schema:
 20        snapshots_table = f"{schema}.{snapshots_table}"
 21        intervals_table = f"{schema}.{intervals_table}"
 22
 23    migration_required = False
 24    new_snapshots = []
 25    new_intervals = []
 26
 27    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
 28        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
 29        quote_identifiers=True,
 30    ):
 31        parsed_snapshot = json.loads(snapshot)
 32
 33        if not parsed_snapshot.get("change_category"):
 34            fingerprint = parsed_snapshot.get("fingerprint")
 35            version = _hash(
 36                [
 37                    fingerprint["data_hash"],
 38                    fingerprint["parent_data_hash"],
 39                ]
 40            )
 41            parsed_snapshot["change_category"] = (
 42                4 if version == parsed_snapshot.get("version") else 5
 43            )
 44            migration_required = True
 45
 46        def _add_interval(start_ts: int, end_ts: int, is_dev: bool) -> None:
 47            new_intervals.append(
 48                {
 49                    "id": random_id(),
 50                    "created_ts": now_timestamp(),
 51                    "name": name,
 52                    "identifier": identifier,
 53                    "version": version,
 54                    "start_ts": start_ts,
 55                    "end_ts": end_ts,
 56                    "is_dev": is_dev,
 57                    "is_removed": False,
 58                    "is_compacted": True,
 59                }
 60            )
 61
 62        for interval in parsed_snapshot.pop("intervals", []):
 63            _add_interval(interval[0], interval[1], False)
 64            migration_required = True
 65
 66        for interval in parsed_snapshot.pop("dev_intervals", []):
 67            _add_interval(interval[0], interval[1], True)
 68            migration_required = True
 69
 70        new_snapshots.append(
 71            {
 72                "name": name,
 73                "identifier": identifier,
 74                "version": version,
 75                "snapshot": json.dumps(parsed_snapshot),
 76                "kind_name": kind_name,
 77            }
 78        )
 79
 80    if migration_required:
 81        index_type = index_text_type(engine_adapter.dialect)
 82
 83        engine_adapter.delete_from(snapshots_table, "TRUE")
 84        engine_adapter.insert_append(
 85            snapshots_table,
 86            pd.DataFrame(new_snapshots),
 87            columns_to_types={
 88                "name": exp.DataType.build(index_type),
 89                "identifier": exp.DataType.build(index_type),
 90                "version": exp.DataType.build(index_type),
 91                "snapshot": exp.DataType.build("text"),
 92                "kind_name": exp.DataType.build(index_type),
 93            },
 94        )
 95
 96        if new_intervals:
 97            engine_adapter.insert_append(
 98                intervals_table,
 99                pd.DataFrame(new_intervals),
100                columns_to_types={
101                    "id": exp.DataType.build(index_type),
102                    "created_ts": exp.DataType.build("bigint"),
103                    "name": exp.DataType.build(index_type),
104                    "identifier": exp.DataType.build(index_type),
105                    "version": exp.DataType.build(index_type),
106                    "start_ts": exp.DataType.build("bigint"),
107                    "end_ts": exp.DataType.build("bigint"),
108                    "is_dev": exp.DataType.build("boolean"),
109                    "is_removed": exp.DataType.build("boolean"),
110                    "is_compacted": exp.DataType.build("boolean"),
111                },
112            )
113
114
115def _hash(data):  # type: ignore
116    return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8")))
def migrate(state_sync, **kwargs):
 15def migrate(state_sync, **kwargs):  # type: ignore
 16    engine_adapter = state_sync.engine_adapter
 17    schema = state_sync.schema
 18    snapshots_table = "_snapshots"
 19    intervals_table = "_intervals"
 20    if schema:
 21        snapshots_table = f"{schema}.{snapshots_table}"
 22        intervals_table = f"{schema}.{intervals_table}"
 23
 24    migration_required = False
 25    new_snapshots = []
 26    new_intervals = []
 27
 28    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
 29        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
 30        quote_identifiers=True,
 31    ):
 32        parsed_snapshot = json.loads(snapshot)
 33
 34        if not parsed_snapshot.get("change_category"):
 35            fingerprint = parsed_snapshot.get("fingerprint")
 36            version = _hash(
 37                [
 38                    fingerprint["data_hash"],
 39                    fingerprint["parent_data_hash"],
 40                ]
 41            )
 42            parsed_snapshot["change_category"] = (
 43                4 if version == parsed_snapshot.get("version") else 5
 44            )
 45            migration_required = True
 46
 47        def _add_interval(start_ts: int, end_ts: int, is_dev: bool) -> None:
 48            new_intervals.append(
 49                {
 50                    "id": random_id(),
 51                    "created_ts": now_timestamp(),
 52                    "name": name,
 53                    "identifier": identifier,
 54                    "version": version,
 55                    "start_ts": start_ts,
 56                    "end_ts": end_ts,
 57                    "is_dev": is_dev,
 58                    "is_removed": False,
 59                    "is_compacted": True,
 60                }
 61            )
 62
 63        for interval in parsed_snapshot.pop("intervals", []):
 64            _add_interval(interval[0], interval[1], False)
 65            migration_required = True
 66
 67        for interval in parsed_snapshot.pop("dev_intervals", []):
 68            _add_interval(interval[0], interval[1], True)
 69            migration_required = True
 70
 71        new_snapshots.append(
 72            {
 73                "name": name,
 74                "identifier": identifier,
 75                "version": version,
 76                "snapshot": json.dumps(parsed_snapshot),
 77                "kind_name": kind_name,
 78            }
 79        )
 80
 81    if migration_required:
 82        index_type = index_text_type(engine_adapter.dialect)
 83
 84        engine_adapter.delete_from(snapshots_table, "TRUE")
 85        engine_adapter.insert_append(
 86            snapshots_table,
 87            pd.DataFrame(new_snapshots),
 88            columns_to_types={
 89                "name": exp.DataType.build(index_type),
 90                "identifier": exp.DataType.build(index_type),
 91                "version": exp.DataType.build(index_type),
 92                "snapshot": exp.DataType.build("text"),
 93                "kind_name": exp.DataType.build(index_type),
 94            },
 95        )
 96
 97        if new_intervals:
 98            engine_adapter.insert_append(
 99                intervals_table,
100                pd.DataFrame(new_intervals),
101                columns_to_types={
102                    "id": exp.DataType.build(index_type),
103                    "created_ts": exp.DataType.build("bigint"),
104                    "name": exp.DataType.build(index_type),
105                    "identifier": exp.DataType.build(index_type),
106                    "version": exp.DataType.build(index_type),
107                    "start_ts": exp.DataType.build("bigint"),
108                    "end_ts": exp.DataType.build("bigint"),
109                    "is_dev": exp.DataType.build("boolean"),
110                    "is_removed": exp.DataType.build("boolean"),
111                    "is_compacted": exp.DataType.build("boolean"),
112                },
113            )