Edit on GitHub

Add dev version to the intervals table.

  1"""Add dev version to the intervals table."""
  2
  3import typing as t
  4import json
  5import zlib
  6
  7from sqlglot import exp
  8from sqlmesh.utils.migration import index_text_type, blob_text_type
  9
 10
 11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 12    intervals_table = "_intervals"
 13    if schema:
 14        intervals_table = f"{schema}.{intervals_table}"
 15
 16    index_type = index_text_type(engine_adapter.dialect)
 17    alter_table_exp = exp.Alter(
 18        this=exp.to_table(intervals_table),
 19        kind="TABLE",
 20        actions=[
 21            exp.ColumnDef(
 22                this=exp.to_column("dev_version"),
 23                kind=exp.DataType.build(index_type),
 24            )
 25        ],
 26    )
 27    engine_adapter.execute(alter_table_exp)
 28
 29
 30def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 31    intervals_table = "_intervals"
 32    snapshots_table = "_snapshots"
 33    if schema:
 34        intervals_table = f"{schema}.{intervals_table}"
 35        snapshots_table = f"{schema}.{snapshots_table}"
 36
 37    used_dev_versions: t.Set[t.Tuple[str, str]] = set()
 38    used_versions: t.Set[t.Tuple[str, str]] = set()
 39    used_snapshot_ids: t.Set[t.Tuple[str, str]] = set()
 40    snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str] = {}
 41
 42    _migrate_snapshots(
 43        engine_adapter,
 44        snapshots_table,
 45        used_dev_versions,
 46        used_versions,
 47        used_snapshot_ids,
 48        snapshot_ids_to_dev_versions,
 49    )
 50    _migrate_intervals(
 51        engine_adapter,
 52        intervals_table,
 53        used_dev_versions,
 54        used_versions,
 55        used_snapshot_ids,
 56        snapshot_ids_to_dev_versions,
 57    )
 58
 59
 60def _migrate_intervals(
 61    engine_adapter: t.Any,
 62    intervals_table: str,
 63    used_dev_versions: t.Set[t.Tuple[str, str]],
 64    used_versions: t.Set[t.Tuple[str, str]],
 65    used_snapshot_ids: t.Set[t.Tuple[str, str]],
 66    snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str],
 67) -> None:
 68    import pandas as pd
 69
 70    index_type = index_text_type(engine_adapter.dialect)
 71    intervals_columns_to_types = {
 72        "id": exp.DataType.build(index_type),
 73        "created_ts": exp.DataType.build("bigint"),
 74        "name": exp.DataType.build(index_type),
 75        "identifier": exp.DataType.build("text"),
 76        "version": exp.DataType.build(index_type),
 77        "dev_version": exp.DataType.build(index_type),
 78        "start_ts": exp.DataType.build("bigint"),
 79        "end_ts": exp.DataType.build("bigint"),
 80        "is_dev": exp.DataType.build("boolean"),
 81        "is_removed": exp.DataType.build("boolean"),
 82        "is_compacted": exp.DataType.build("boolean"),
 83        "is_pending_restatement": exp.DataType.build("boolean"),
 84    }
 85
 86    new_intervals = []
 87    for (
 88        interval_id,
 89        created_ts,
 90        name,
 91        identifier,
 92        version,
 93        _,
 94        start_ts,
 95        end_ts,
 96        is_dev,
 97        is_removed,
 98        is_compacted,
 99        is_pending_restatement,
100    ) in engine_adapter.fetchall(
101        exp.select(*intervals_columns_to_types).from_(intervals_table),
102        quote_identifiers=True,
103    ):
104        if (name, version) not in used_versions:
105            # If the interval's version is no longer used, we can safely delete it
106            continue
107
108        dev_version = snapshot_ids_to_dev_versions.get((name, identifier))
109        if dev_version not in used_dev_versions and is_dev:
110            # If the interval's dev version is no longer used and this is a dev interval, we can safely delete it
111            continue
112
113        if (name, identifier) not in used_snapshot_ids:
114            # If the snapshot associated with this interval no longer exists, we can nullify the interval's identifier
115            # to improve compaction
116            is_compacted = False
117            identifier = None
118            if not is_dev:
119                # If the interval is not dev, we can safely nullify the dev version as well
120                dev_version = None
121
122        new_intervals.append(
123            {
124                "id": interval_id,
125                "created_ts": created_ts,
126                "name": name,
127                "identifier": identifier,
128                "version": version,
129                "dev_version": dev_version,
130                "start_ts": start_ts,
131                "end_ts": end_ts,
132                "is_dev": is_dev,
133                "is_removed": is_removed,
134                "is_compacted": is_compacted,
135                "is_pending_restatement": is_pending_restatement,
136            }
137        )
138
139    if new_intervals:
140        engine_adapter.delete_from(intervals_table, "TRUE")
141        engine_adapter.insert_append(
142            intervals_table,
143            pd.DataFrame(new_intervals),
144            target_columns_to_types=intervals_columns_to_types,
145        )
146
147
148def _migrate_snapshots(
149    engine_adapter: t.Any,
150    snapshots_table: str,
151    used_dev_versions: t.Set[t.Tuple[str, str]],
152    used_versions: t.Set[t.Tuple[str, str]],
153    used_snapshot_ids: t.Set[t.Tuple[str, str]],
154    snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str],
155) -> None:
156    import pandas as pd
157
158    index_type = index_text_type(engine_adapter.dialect)
159    blob_type = blob_text_type(engine_adapter.dialect)
160    snapshots_columns_to_types = {
161        "name": exp.DataType.build(index_type),
162        "identifier": exp.DataType.build(index_type),
163        "version": exp.DataType.build(index_type),
164        "snapshot": exp.DataType.build(blob_type),
165        "kind_name": exp.DataType.build(index_type),
166        "updated_ts": exp.DataType.build("bigint"),
167        "unpaused_ts": exp.DataType.build("bigint"),
168        "ttl_ms": exp.DataType.build("bigint"),
169        "unrestorable": exp.DataType.build("boolean"),
170    }
171
172    new_snapshots = []
173    for (
174        name,
175        identifier,
176        version,
177        snapshot,
178        kind_name,
179        updated_ts,
180        unpaused_ts,
181        ttl_ms,
182        unrestorable,
183    ) in engine_adapter.fetchall(
184        exp.select(*snapshots_columns_to_types).from_(snapshots_table),
185        quote_identifiers=True,
186    ):
187        parsed_snapshot = json.loads(snapshot)
188        version = parsed_snapshot.get("version") or version
189        dev_version = get_dev_version(parsed_snapshot)
190        parsed_snapshot["dev_version"] = dev_version
191        parsed_snapshot["version"] = version
192
193        used_dev_versions.add((name, dev_version))
194        used_versions.add((name, version))
195        used_snapshot_ids.add((name, identifier))
196        snapshot_ids_to_dev_versions[(name, identifier)] = dev_version
197
198        for previous_version in parsed_snapshot.get("previous_versions", []):
199            previous_identifier = get_identifier(previous_version)
200            previous_dev_version = get_dev_version(previous_version)
201            snapshot_ids_to_dev_versions[(name, previous_identifier)] = previous_dev_version
202
203        new_snapshots.append(
204            {
205                "name": name,
206                "identifier": identifier,
207                "version": version,
208                "snapshot": json.dumps(parsed_snapshot),
209                "kind_name": kind_name,
210                "updated_ts": updated_ts,
211                "unpaused_ts": unpaused_ts,
212                "ttl_ms": ttl_ms,
213                "unrestorable": unrestorable,
214            }
215        )
216
217    if new_snapshots:
218        engine_adapter.delete_from(snapshots_table, "TRUE")
219        engine_adapter.insert_append(
220            snapshots_table,
221            pd.DataFrame(new_snapshots),
222            target_columns_to_types=snapshots_columns_to_types,
223        )
224
225
226def get_identifier(snapshot: t.Dict[str, t.Any]) -> str:
227    fingerprint = snapshot["fingerprint"]
228    return crc32(
229        [
230            fingerprint["data_hash"],
231            fingerprint["metadata_hash"],
232            fingerprint["parent_data_hash"],
233            fingerprint["parent_metadata_hash"],
234        ]
235    )
236
237
238def get_dev_version(snapshot: t.Dict[str, t.Any]) -> str:
239    dev_version = snapshot.get("dev_version")
240    if dev_version:
241        return dev_version
242    fingerprint = snapshot["fingerprint"]
243    return crc32([fingerprint["data_hash"], fingerprint["parent_data_hash"]])
244
245
246def crc32(data: t.Iterable[t.Optional[str]]) -> str:
247    return str(zlib.crc32(safe_concat(data)))
248
249
250def safe_concat(data: t.Iterable[t.Optional[str]]) -> bytes:
251    return ";".join("" if d is None else d for d in data).encode("utf-8")
def migrate_schemas(engine_adapter, schema, **kwargs):
12def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
13    intervals_table = "_intervals"
14    if schema:
15        intervals_table = f"{schema}.{intervals_table}"
16
17    index_type = index_text_type(engine_adapter.dialect)
18    alter_table_exp = exp.Alter(
19        this=exp.to_table(intervals_table),
20        kind="TABLE",
21        actions=[
22            exp.ColumnDef(
23                this=exp.to_column("dev_version"),
24                kind=exp.DataType.build(index_type),
25            )
26        ],
27    )
28    engine_adapter.execute(alter_table_exp)
def migrate_rows(engine_adapter, schema, **kwargs):
31def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
32    intervals_table = "_intervals"
33    snapshots_table = "_snapshots"
34    if schema:
35        intervals_table = f"{schema}.{intervals_table}"
36        snapshots_table = f"{schema}.{snapshots_table}"
37
38    used_dev_versions: t.Set[t.Tuple[str, str]] = set()
39    used_versions: t.Set[t.Tuple[str, str]] = set()
40    used_snapshot_ids: t.Set[t.Tuple[str, str]] = set()
41    snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str] = {}
42
43    _migrate_snapshots(
44        engine_adapter,
45        snapshots_table,
46        used_dev_versions,
47        used_versions,
48        used_snapshot_ids,
49        snapshot_ids_to_dev_versions,
50    )
51    _migrate_intervals(
52        engine_adapter,
53        intervals_table,
54        used_dev_versions,
55        used_versions,
56        used_snapshot_ids,
57        snapshot_ids_to_dev_versions,
58    )
def get_identifier(snapshot: Dict[str, Any]) -> str:
227def get_identifier(snapshot: t.Dict[str, t.Any]) -> str:
228    fingerprint = snapshot["fingerprint"]
229    return crc32(
230        [
231            fingerprint["data_hash"],
232            fingerprint["metadata_hash"],
233            fingerprint["parent_data_hash"],
234            fingerprint["parent_metadata_hash"],
235        ]
236    )
def get_dev_version(snapshot: Dict[str, Any]) -> str:
239def get_dev_version(snapshot: t.Dict[str, t.Any]) -> str:
240    dev_version = snapshot.get("dev_version")
241    if dev_version:
242        return dev_version
243    fingerprint = snapshot["fingerprint"]
244    return crc32([fingerprint["data_hash"], fingerprint["parent_data_hash"]])
def crc32(data: Iterable[Optional[str]]) -> str:
247def crc32(data: t.Iterable[t.Optional[str]]) -> str:
248    return str(zlib.crc32(safe_concat(data)))
def safe_concat(data: Iterable[Optional[str]]) -> bytes:
251def safe_concat(data: t.Iterable[t.Optional[str]]) -> bytes:
252    return ";".join("" if d is None else d for d in data).encode("utf-8")