Edit on GitHub

Move batch_size from the model and into the kind.

 1"""Move batch_size from the model and into the kind."""
 2
 3import json
 4
 5from sqlglot import exp
 6
 7
 8def migrate(state_sync, **kwargs):  # type: ignore
 9    snapshots_table = "_snapshots"
10    if state_sync.schema:
11        snapshots_table = f"{state_sync.schema}.{snapshots_table}"
12
13    for row in state_sync.engine_adapter.fetchall(
14        exp.select("*").from_(snapshots_table), quote_identifiers=True
15    ):
16        name, identifier, _, snapshot = row
17        snapshot = json.loads(snapshot)
18        model = snapshot["model"]
19        if "batch_size" in model:
20            batch_size = model.pop("batch_size")
21            kind = model.get("kind")
22
23            if kind:
24                if kind["name"] in ("INCREMENTAL_BY_TIME_RANGE", "INCREMENTAL_BY_UNIQUE_KEY"):
25                    kind["batch_size"] = batch_size
26
27                    # this is not efficient, i'm doing this because i'm lazy and no one has snapshots at the time of writing this migration
28                    # do not copy this code in future migrations
29
30                    state_sync.engine_adapter.update_table(
31                        snapshots_table,
32                        {"snapshot": json.dumps(snapshot)},
33                        where=f"name = '{name}' and identifier = '{identifier}'",
34                    )
def migrate(state_sync, **kwargs):
 9def migrate(state_sync, **kwargs):  # type: ignore
10    snapshots_table = "_snapshots"
11    if state_sync.schema:
12        snapshots_table = f"{state_sync.schema}.{snapshots_table}"
13
14    for row in state_sync.engine_adapter.fetchall(
15        exp.select("*").from_(snapshots_table), quote_identifiers=True
16    ):
17        name, identifier, _, snapshot = row
18        snapshot = json.loads(snapshot)
19        model = snapshot["model"]
20        if "batch_size" in model:
21            batch_size = model.pop("batch_size")
22            kind = model.get("kind")
23
24            if kind:
25                if kind["name"] in ("INCREMENTAL_BY_TIME_RANGE", "INCREMENTAL_BY_UNIQUE_KEY"):
26                    kind["batch_size"] = batch_size
27
28                    # this is not efficient, i'm doing this because i'm lazy and no one has snapshots at the time of writing this migration
29                    # do not copy this code in future migrations
30
31                    state_sync.engine_adapter.update_table(
32                        snapshots_table,
33                        {"snapshot": json.dumps(snapshot)},
34                        where=f"name = '{name}' and identifier = '{identifier}'",
35                    )