Move batch_size from the model and into the kind.
1"""Move batch_size from the model and into the kind.""" 2 3import json 4 5from sqlglot import exp 6 7 8def migrate(state_sync, **kwargs): # type: ignore 9 snapshots_table = "_snapshots" 10 if state_sync.schema: 11 snapshots_table = f"{state_sync.schema}.{snapshots_table}" 12 13 for row in state_sync.engine_adapter.fetchall( 14 exp.select("*").from_(snapshots_table), quote_identifiers=True 15 ): 16 name, identifier, _, snapshot = row 17 snapshot = json.loads(snapshot) 18 model = snapshot["model"] 19 if "batch_size" in model: 20 batch_size = model.pop("batch_size") 21 kind = model.get("kind") 22 23 if kind: 24 if kind["name"] in ("INCREMENTAL_BY_TIME_RANGE", "INCREMENTAL_BY_UNIQUE_KEY"): 25 kind["batch_size"] = batch_size 26 27 # this is not efficient, i'm doing this because i'm lazy and no one has snapshots at the time of writing this migration 28 # do not copy this code in future migrations 29 30 state_sync.engine_adapter.update_table( 31 snapshots_table, 32 {"snapshot": json.dumps(snapshot)}, 33 where=f"name = '{name}' and identifier = '{identifier}'", 34 )
def
migrate(state_sync, **kwargs):
9def migrate(state_sync, **kwargs): # type: ignore 10 snapshots_table = "_snapshots" 11 if state_sync.schema: 12 snapshots_table = f"{state_sync.schema}.{snapshots_table}" 13 14 for row in state_sync.engine_adapter.fetchall( 15 exp.select("*").from_(snapshots_table), quote_identifiers=True 16 ): 17 name, identifier, _, snapshot = row 18 snapshot = json.loads(snapshot) 19 model = snapshot["model"] 20 if "batch_size" in model: 21 batch_size = model.pop("batch_size") 22 kind = model.get("kind") 23 24 if kind: 25 if kind["name"] in ("INCREMENTAL_BY_TIME_RANGE", "INCREMENTAL_BY_UNIQUE_KEY"): 26 kind["batch_size"] = batch_size 27 28 # this is not efficient, i'm doing this because i'm lazy and no one has snapshots at the time of writing this migration 29 # do not copy this code in future migrations 30 31 state_sync.engine_adapter.update_table( 32 snapshots_table, 33 {"snapshot": json.dumps(snapshot)}, 34 where=f"name = '{name}' and identifier = '{identifier}'", 35 )