Edit on GitHub

Update the dev table suffix to be 'dev'. Rename temp_version to dev_version.

  1"""Update the dev table suffix to be 'dev'. Rename temp_version to dev_version."""
  2
  3import json
  4
  5from sqlglot import exp
  6
  7from sqlmesh.utils.migration import index_text_type, blob_text_type
  8
  9
 10def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 11    pass
 12
 13
 14def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 15    import pandas as pd
 16
 17    snapshots_table = "_snapshots"
 18    environments_table = "_environments"
 19    if schema:
 20        snapshots_table = f"{schema}.{snapshots_table}"
 21        environments_table = f"{schema}.{environments_table}"
 22
 23    index_type = index_text_type(engine_adapter.dialect)
 24    blob_type = blob_text_type(engine_adapter.dialect)
 25    snapshots_columns_to_types = {
 26        "name": exp.DataType.build(index_type),
 27        "identifier": exp.DataType.build(index_type),
 28        "version": exp.DataType.build(index_type),
 29        "snapshot": exp.DataType.build(blob_type),
 30        "kind_name": exp.DataType.build(index_type),
 31        "updated_ts": exp.DataType.build("bigint"),
 32        "unpaused_ts": exp.DataType.build("bigint"),
 33        "ttl_ms": exp.DataType.build("bigint"),
 34        "unrestorable": exp.DataType.build("boolean"),
 35    }
 36    environments_columns_to_types = {
 37        "name": exp.DataType.build(index_type),
 38        "snapshots": exp.DataType.build(blob_type),
 39        "start_at": exp.DataType.build("text"),
 40        "end_at": exp.DataType.build("text"),
 41        "plan_id": exp.DataType.build("text"),
 42        "previous_plan_id": exp.DataType.build("text"),
 43        "expiration_ts": exp.DataType.build("bigint"),
 44        "finalized_ts": exp.DataType.build("bigint"),
 45        "promoted_snapshot_ids": exp.DataType.build(blob_type),
 46        "suffix_target": exp.DataType.build("text"),
 47        "catalog_name_override": exp.DataType.build("text"),
 48        "previous_finalized_snapshots": exp.DataType.build(blob_type),
 49        "normalize_name": exp.DataType.build("boolean"),
 50        "requirements": exp.DataType.build(blob_type),
 51    }
 52
 53    new_snapshots = []
 54    for (
 55        name,
 56        identifier,
 57        version,
 58        snapshot,
 59        kind_name,
 60        updated_ts,
 61        unpaused_ts,
 62        ttl_ms,
 63        unrestorable,
 64    ) in engine_adapter.fetchall(
 65        exp.select(*snapshots_columns_to_types).from_(snapshots_table),
 66        quote_identifiers=True,
 67    ):
 68        parsed_snapshot = json.loads(snapshot)
 69        parsed_snapshot = _update_snapshot(parsed_snapshot)
 70
 71        new_snapshots.append(
 72            {
 73                "name": name,
 74                "identifier": identifier,
 75                "version": version,
 76                "snapshot": json.dumps(parsed_snapshot),
 77                "kind_name": kind_name,
 78                "updated_ts": updated_ts,
 79                "unpaused_ts": unpaused_ts,
 80                "ttl_ms": ttl_ms,
 81                "unrestorable": unrestorable,
 82            }
 83        )
 84
 85    if new_snapshots:
 86        engine_adapter.delete_from(snapshots_table, "TRUE")
 87        engine_adapter.insert_append(
 88            snapshots_table,
 89            pd.DataFrame(new_snapshots),
 90            target_columns_to_types=snapshots_columns_to_types,
 91        )
 92
 93    new_environments = []
 94    for (
 95        name,
 96        snapshots,
 97        start_at,
 98        end_at,
 99        plan_id,
100        previous_plan_id,
101        expiration_ts,
102        finalized_ts,
103        promoted_snapshot_ids,
104        suffix_target,
105        catalog_name_override,
106        previous_finalized_snapshots,
107        normalize_name,
108        requirements,
109    ) in engine_adapter.fetchall(
110        exp.select(*environments_columns_to_types).from_(environments_table),
111        quote_identifiers=True,
112    ):
113        if snapshots:
114            parsed_snapshots = json.loads(snapshots)
115            for s in parsed_snapshots:
116                _update_snapshot(s)
117
118        if previous_finalized_snapshots:
119            parsed_previous_finalized_snapshots = json.loads(previous_finalized_snapshots)
120            for s in parsed_previous_finalized_snapshots:
121                _update_snapshot(s)
122
123        new_environments.append(
124            {
125                "name": name,
126                "snapshots": json.dumps(parsed_snapshots) if snapshots else None,
127                "start_at": start_at,
128                "end_at": end_at,
129                "plan_id": plan_id,
130                "previous_plan_id": previous_plan_id,
131                "expiration_ts": expiration_ts,
132                "finalized_ts": finalized_ts,
133                "promoted_snapshot_ids": promoted_snapshot_ids,
134                "suffix_target": suffix_target,
135                "catalog_name_override": catalog_name_override,
136                "previous_finalized_snapshots": json.dumps(parsed_previous_finalized_snapshots)
137                if previous_finalized_snapshots
138                else None,
139                "normalize_name": normalize_name,
140                "requirements": requirements,
141            }
142        )
143
144    if new_environments:
145        engine_adapter.delete_from(environments_table, "TRUE")
146        engine_adapter.insert_append(
147            environments_table,
148            pd.DataFrame(new_environments),
149            target_columns_to_types=environments_columns_to_types,
150        )
151
152
153def _update_snapshot(snapshot: dict) -> dict:
154    snapshot = _update_fields(snapshot)
155
156    if "previous_versions" in snapshot:
157        for previous_version in snapshot["previous_versions"]:
158            _update_fields(previous_version)
159
160    return snapshot
161
162
163def _update_fields(target: dict) -> dict:
164    # Setting the old suffix to match the names of existing tables.
165    target["dev_table_suffix"] = "temp"
166    if "temp_version" in target:
167        target["dev_version"] = target.pop("temp_version")
168    return target
def migrate_schemas(engine_adapter, schema, **kwargs):
11def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
12    pass
def migrate_rows(engine_adapter, schema, **kwargs):
 15def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
 16    import pandas as pd
 17
 18    snapshots_table = "_snapshots"
 19    environments_table = "_environments"
 20    if schema:
 21        snapshots_table = f"{schema}.{snapshots_table}"
 22        environments_table = f"{schema}.{environments_table}"
 23
 24    index_type = index_text_type(engine_adapter.dialect)
 25    blob_type = blob_text_type(engine_adapter.dialect)
 26    snapshots_columns_to_types = {
 27        "name": exp.DataType.build(index_type),
 28        "identifier": exp.DataType.build(index_type),
 29        "version": exp.DataType.build(index_type),
 30        "snapshot": exp.DataType.build(blob_type),
 31        "kind_name": exp.DataType.build(index_type),
 32        "updated_ts": exp.DataType.build("bigint"),
 33        "unpaused_ts": exp.DataType.build("bigint"),
 34        "ttl_ms": exp.DataType.build("bigint"),
 35        "unrestorable": exp.DataType.build("boolean"),
 36    }
 37    environments_columns_to_types = {
 38        "name": exp.DataType.build(index_type),
 39        "snapshots": exp.DataType.build(blob_type),
 40        "start_at": exp.DataType.build("text"),
 41        "end_at": exp.DataType.build("text"),
 42        "plan_id": exp.DataType.build("text"),
 43        "previous_plan_id": exp.DataType.build("text"),
 44        "expiration_ts": exp.DataType.build("bigint"),
 45        "finalized_ts": exp.DataType.build("bigint"),
 46        "promoted_snapshot_ids": exp.DataType.build(blob_type),
 47        "suffix_target": exp.DataType.build("text"),
 48        "catalog_name_override": exp.DataType.build("text"),
 49        "previous_finalized_snapshots": exp.DataType.build(blob_type),
 50        "normalize_name": exp.DataType.build("boolean"),
 51        "requirements": exp.DataType.build(blob_type),
 52    }
 53
 54    new_snapshots = []
 55    for (
 56        name,
 57        identifier,
 58        version,
 59        snapshot,
 60        kind_name,
 61        updated_ts,
 62        unpaused_ts,
 63        ttl_ms,
 64        unrestorable,
 65    ) in engine_adapter.fetchall(
 66        exp.select(*snapshots_columns_to_types).from_(snapshots_table),
 67        quote_identifiers=True,
 68    ):
 69        parsed_snapshot = json.loads(snapshot)
 70        parsed_snapshot = _update_snapshot(parsed_snapshot)
 71
 72        new_snapshots.append(
 73            {
 74                "name": name,
 75                "identifier": identifier,
 76                "version": version,
 77                "snapshot": json.dumps(parsed_snapshot),
 78                "kind_name": kind_name,
 79                "updated_ts": updated_ts,
 80                "unpaused_ts": unpaused_ts,
 81                "ttl_ms": ttl_ms,
 82                "unrestorable": unrestorable,
 83            }
 84        )
 85
 86    if new_snapshots:
 87        engine_adapter.delete_from(snapshots_table, "TRUE")
 88        engine_adapter.insert_append(
 89            snapshots_table,
 90            pd.DataFrame(new_snapshots),
 91            target_columns_to_types=snapshots_columns_to_types,
 92        )
 93
 94    new_environments = []
 95    for (
 96        name,
 97        snapshots,
 98        start_at,
 99        end_at,
100        plan_id,
101        previous_plan_id,
102        expiration_ts,
103        finalized_ts,
104        promoted_snapshot_ids,
105        suffix_target,
106        catalog_name_override,
107        previous_finalized_snapshots,
108        normalize_name,
109        requirements,
110    ) in engine_adapter.fetchall(
111        exp.select(*environments_columns_to_types).from_(environments_table),
112        quote_identifiers=True,
113    ):
114        if snapshots:
115            parsed_snapshots = json.loads(snapshots)
116            for s in parsed_snapshots:
117                _update_snapshot(s)
118
119        if previous_finalized_snapshots:
120            parsed_previous_finalized_snapshots = json.loads(previous_finalized_snapshots)
121            for s in parsed_previous_finalized_snapshots:
122                _update_snapshot(s)
123
124        new_environments.append(
125            {
126                "name": name,
127                "snapshots": json.dumps(parsed_snapshots) if snapshots else None,
128                "start_at": start_at,
129                "end_at": end_at,
130                "plan_id": plan_id,
131                "previous_plan_id": previous_plan_id,
132                "expiration_ts": expiration_ts,
133                "finalized_ts": finalized_ts,
134                "promoted_snapshot_ids": promoted_snapshot_ids,
135                "suffix_target": suffix_target,
136                "catalog_name_override": catalog_name_override,
137                "previous_finalized_snapshots": json.dumps(parsed_previous_finalized_snapshots)
138                if previous_finalized_snapshots
139                else None,
140                "normalize_name": normalize_name,
141                "requirements": requirements,
142            }
143        )
144
145    if new_environments:
146        engine_adapter.delete_from(environments_table, "TRUE")
147        engine_adapter.insert_append(
148            environments_table,
149            pd.DataFrame(new_environments),
150            target_columns_to_types=environments_columns_to_types,
151        )