Update the dev table suffix to be 'dev'. Rename temp_version to dev_version.
1"""Update the dev table suffix to be 'dev'. Rename temp_version to dev_version.""" 2 3import json 4 5from sqlglot import exp 6 7from sqlmesh.utils.migration import index_text_type, blob_text_type 8 9 10def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 11 pass 12 13 14def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 15 import pandas as pd 16 17 snapshots_table = "_snapshots" 18 environments_table = "_environments" 19 if schema: 20 snapshots_table = f"{schema}.{snapshots_table}" 21 environments_table = f"{schema}.{environments_table}" 22 23 index_type = index_text_type(engine_adapter.dialect) 24 blob_type = blob_text_type(engine_adapter.dialect) 25 snapshots_columns_to_types = { 26 "name": exp.DataType.build(index_type), 27 "identifier": exp.DataType.build(index_type), 28 "version": exp.DataType.build(index_type), 29 "snapshot": exp.DataType.build(blob_type), 30 "kind_name": exp.DataType.build(index_type), 31 "updated_ts": exp.DataType.build("bigint"), 32 "unpaused_ts": exp.DataType.build("bigint"), 33 "ttl_ms": exp.DataType.build("bigint"), 34 "unrestorable": exp.DataType.build("boolean"), 35 } 36 environments_columns_to_types = { 37 "name": exp.DataType.build(index_type), 38 "snapshots": exp.DataType.build(blob_type), 39 "start_at": exp.DataType.build("text"), 40 "end_at": exp.DataType.build("text"), 41 "plan_id": exp.DataType.build("text"), 42 "previous_plan_id": exp.DataType.build("text"), 43 "expiration_ts": exp.DataType.build("bigint"), 44 "finalized_ts": exp.DataType.build("bigint"), 45 "promoted_snapshot_ids": exp.DataType.build(blob_type), 46 "suffix_target": exp.DataType.build("text"), 47 "catalog_name_override": exp.DataType.build("text"), 48 "previous_finalized_snapshots": exp.DataType.build(blob_type), 49 "normalize_name": exp.DataType.build("boolean"), 50 "requirements": exp.DataType.build(blob_type), 51 } 52 53 new_snapshots = [] 54 for ( 55 name, 56 identifier, 57 version, 58 snapshot, 59 kind_name, 60 updated_ts, 61 unpaused_ts, 62 ttl_ms, 63 unrestorable, 64 ) in engine_adapter.fetchall( 65 exp.select(*snapshots_columns_to_types).from_(snapshots_table), 66 quote_identifiers=True, 67 ): 68 parsed_snapshot = json.loads(snapshot) 69 parsed_snapshot = _update_snapshot(parsed_snapshot) 70 71 new_snapshots.append( 72 { 73 "name": name, 74 "identifier": identifier, 75 "version": version, 76 "snapshot": json.dumps(parsed_snapshot), 77 "kind_name": kind_name, 78 "updated_ts": updated_ts, 79 "unpaused_ts": unpaused_ts, 80 "ttl_ms": ttl_ms, 81 "unrestorable": unrestorable, 82 } 83 ) 84 85 if new_snapshots: 86 engine_adapter.delete_from(snapshots_table, "TRUE") 87 engine_adapter.insert_append( 88 snapshots_table, 89 pd.DataFrame(new_snapshots), 90 target_columns_to_types=snapshots_columns_to_types, 91 ) 92 93 new_environments = [] 94 for ( 95 name, 96 snapshots, 97 start_at, 98 end_at, 99 plan_id, 100 previous_plan_id, 101 expiration_ts, 102 finalized_ts, 103 promoted_snapshot_ids, 104 suffix_target, 105 catalog_name_override, 106 previous_finalized_snapshots, 107 normalize_name, 108 requirements, 109 ) in engine_adapter.fetchall( 110 exp.select(*environments_columns_to_types).from_(environments_table), 111 quote_identifiers=True, 112 ): 113 if snapshots: 114 parsed_snapshots = json.loads(snapshots) 115 for s in parsed_snapshots: 116 _update_snapshot(s) 117 118 if previous_finalized_snapshots: 119 parsed_previous_finalized_snapshots = json.loads(previous_finalized_snapshots) 120 for s in parsed_previous_finalized_snapshots: 121 _update_snapshot(s) 122 123 new_environments.append( 124 { 125 "name": name, 126 "snapshots": json.dumps(parsed_snapshots) if snapshots else None, 127 "start_at": start_at, 128 "end_at": end_at, 129 "plan_id": plan_id, 130 "previous_plan_id": previous_plan_id, 131 "expiration_ts": expiration_ts, 132 "finalized_ts": finalized_ts, 133 "promoted_snapshot_ids": promoted_snapshot_ids, 134 "suffix_target": suffix_target, 135 "catalog_name_override": catalog_name_override, 136 "previous_finalized_snapshots": json.dumps(parsed_previous_finalized_snapshots) 137 if previous_finalized_snapshots 138 else None, 139 "normalize_name": normalize_name, 140 "requirements": requirements, 141 } 142 ) 143 144 if new_environments: 145 engine_adapter.delete_from(environments_table, "TRUE") 146 engine_adapter.insert_append( 147 environments_table, 148 pd.DataFrame(new_environments), 149 target_columns_to_types=environments_columns_to_types, 150 ) 151 152 153def _update_snapshot(snapshot: dict) -> dict: 154 snapshot = _update_fields(snapshot) 155 156 if "previous_versions" in snapshot: 157 for previous_version in snapshot["previous_versions"]: 158 _update_fields(previous_version) 159 160 return snapshot 161 162 163def _update_fields(target: dict) -> dict: 164 # Setting the old suffix to match the names of existing tables. 165 target["dev_table_suffix"] = "temp" 166 if "temp_version" in target: 167 target["dev_version"] = target.pop("temp_version") 168 return target
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
15def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 16 import pandas as pd 17 18 snapshots_table = "_snapshots" 19 environments_table = "_environments" 20 if schema: 21 snapshots_table = f"{schema}.{snapshots_table}" 22 environments_table = f"{schema}.{environments_table}" 23 24 index_type = index_text_type(engine_adapter.dialect) 25 blob_type = blob_text_type(engine_adapter.dialect) 26 snapshots_columns_to_types = { 27 "name": exp.DataType.build(index_type), 28 "identifier": exp.DataType.build(index_type), 29 "version": exp.DataType.build(index_type), 30 "snapshot": exp.DataType.build(blob_type), 31 "kind_name": exp.DataType.build(index_type), 32 "updated_ts": exp.DataType.build("bigint"), 33 "unpaused_ts": exp.DataType.build("bigint"), 34 "ttl_ms": exp.DataType.build("bigint"), 35 "unrestorable": exp.DataType.build("boolean"), 36 } 37 environments_columns_to_types = { 38 "name": exp.DataType.build(index_type), 39 "snapshots": exp.DataType.build(blob_type), 40 "start_at": exp.DataType.build("text"), 41 "end_at": exp.DataType.build("text"), 42 "plan_id": exp.DataType.build("text"), 43 "previous_plan_id": exp.DataType.build("text"), 44 "expiration_ts": exp.DataType.build("bigint"), 45 "finalized_ts": exp.DataType.build("bigint"), 46 "promoted_snapshot_ids": exp.DataType.build(blob_type), 47 "suffix_target": exp.DataType.build("text"), 48 "catalog_name_override": exp.DataType.build("text"), 49 "previous_finalized_snapshots": exp.DataType.build(blob_type), 50 "normalize_name": exp.DataType.build("boolean"), 51 "requirements": exp.DataType.build(blob_type), 52 } 53 54 new_snapshots = [] 55 for ( 56 name, 57 identifier, 58 version, 59 snapshot, 60 kind_name, 61 updated_ts, 62 unpaused_ts, 63 ttl_ms, 64 unrestorable, 65 ) in engine_adapter.fetchall( 66 exp.select(*snapshots_columns_to_types).from_(snapshots_table), 67 quote_identifiers=True, 68 ): 69 parsed_snapshot = json.loads(snapshot) 70 parsed_snapshot = _update_snapshot(parsed_snapshot) 71 72 new_snapshots.append( 73 { 74 "name": name, 75 "identifier": identifier, 76 "version": version, 77 "snapshot": json.dumps(parsed_snapshot), 78 "kind_name": kind_name, 79 "updated_ts": updated_ts, 80 "unpaused_ts": unpaused_ts, 81 "ttl_ms": ttl_ms, 82 "unrestorable": unrestorable, 83 } 84 ) 85 86 if new_snapshots: 87 engine_adapter.delete_from(snapshots_table, "TRUE") 88 engine_adapter.insert_append( 89 snapshots_table, 90 pd.DataFrame(new_snapshots), 91 target_columns_to_types=snapshots_columns_to_types, 92 ) 93 94 new_environments = [] 95 for ( 96 name, 97 snapshots, 98 start_at, 99 end_at, 100 plan_id, 101 previous_plan_id, 102 expiration_ts, 103 finalized_ts, 104 promoted_snapshot_ids, 105 suffix_target, 106 catalog_name_override, 107 previous_finalized_snapshots, 108 normalize_name, 109 requirements, 110 ) in engine_adapter.fetchall( 111 exp.select(*environments_columns_to_types).from_(environments_table), 112 quote_identifiers=True, 113 ): 114 if snapshots: 115 parsed_snapshots = json.loads(snapshots) 116 for s in parsed_snapshots: 117 _update_snapshot(s) 118 119 if previous_finalized_snapshots: 120 parsed_previous_finalized_snapshots = json.loads(previous_finalized_snapshots) 121 for s in parsed_previous_finalized_snapshots: 122 _update_snapshot(s) 123 124 new_environments.append( 125 { 126 "name": name, 127 "snapshots": json.dumps(parsed_snapshots) if snapshots else None, 128 "start_at": start_at, 129 "end_at": end_at, 130 "plan_id": plan_id, 131 "previous_plan_id": previous_plan_id, 132 "expiration_ts": expiration_ts, 133 "finalized_ts": finalized_ts, 134 "promoted_snapshot_ids": promoted_snapshot_ids, 135 "suffix_target": suffix_target, 136 "catalog_name_override": catalog_name_override, 137 "previous_finalized_snapshots": json.dumps(parsed_previous_finalized_snapshots) 138 if previous_finalized_snapshots 139 else None, 140 "normalize_name": normalize_name, 141 "requirements": requirements, 142 } 143 ) 144 145 if new_environments: 146 engine_adapter.delete_from(environments_table, "TRUE") 147 engine_adapter.insert_append( 148 environments_table, 149 pd.DataFrame(new_environments), 150 target_columns_to_types=environments_columns_to_types, 151 )