Add dev version to the intervals table.
1"""Add dev version to the intervals table.""" 2 3import typing as t 4import json 5import zlib 6 7from sqlglot import exp 8from sqlmesh.utils.migration import index_text_type, blob_text_type 9 10 11def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 12 intervals_table = "_intervals" 13 if schema: 14 intervals_table = f"{schema}.{intervals_table}" 15 16 index_type = index_text_type(engine_adapter.dialect) 17 alter_table_exp = exp.Alter( 18 this=exp.to_table(intervals_table), 19 kind="TABLE", 20 actions=[ 21 exp.ColumnDef( 22 this=exp.to_column("dev_version"), 23 kind=exp.DataType.build(index_type), 24 ) 25 ], 26 ) 27 engine_adapter.execute(alter_table_exp) 28 29 30def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 31 intervals_table = "_intervals" 32 snapshots_table = "_snapshots" 33 if schema: 34 intervals_table = f"{schema}.{intervals_table}" 35 snapshots_table = f"{schema}.{snapshots_table}" 36 37 used_dev_versions: t.Set[t.Tuple[str, str]] = set() 38 used_versions: t.Set[t.Tuple[str, str]] = set() 39 used_snapshot_ids: t.Set[t.Tuple[str, str]] = set() 40 snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str] = {} 41 42 _migrate_snapshots( 43 engine_adapter, 44 snapshots_table, 45 used_dev_versions, 46 used_versions, 47 used_snapshot_ids, 48 snapshot_ids_to_dev_versions, 49 ) 50 _migrate_intervals( 51 engine_adapter, 52 intervals_table, 53 used_dev_versions, 54 used_versions, 55 used_snapshot_ids, 56 snapshot_ids_to_dev_versions, 57 ) 58 59 60def _migrate_intervals( 61 engine_adapter: t.Any, 62 intervals_table: str, 63 used_dev_versions: t.Set[t.Tuple[str, str]], 64 used_versions: t.Set[t.Tuple[str, str]], 65 used_snapshot_ids: t.Set[t.Tuple[str, str]], 66 snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str], 67) -> None: 68 import pandas as pd 69 70 index_type = index_text_type(engine_adapter.dialect) 71 intervals_columns_to_types = { 72 "id": exp.DataType.build(index_type), 73 "created_ts": exp.DataType.build("bigint"), 74 "name": exp.DataType.build(index_type), 75 "identifier": exp.DataType.build("text"), 76 "version": exp.DataType.build(index_type), 77 "dev_version": exp.DataType.build(index_type), 78 "start_ts": exp.DataType.build("bigint"), 79 "end_ts": exp.DataType.build("bigint"), 80 "is_dev": exp.DataType.build("boolean"), 81 "is_removed": exp.DataType.build("boolean"), 82 "is_compacted": exp.DataType.build("boolean"), 83 "is_pending_restatement": exp.DataType.build("boolean"), 84 } 85 86 new_intervals = [] 87 for ( 88 interval_id, 89 created_ts, 90 name, 91 identifier, 92 version, 93 _, 94 start_ts, 95 end_ts, 96 is_dev, 97 is_removed, 98 is_compacted, 99 is_pending_restatement, 100 ) in engine_adapter.fetchall( 101 exp.select(*intervals_columns_to_types).from_(intervals_table), 102 quote_identifiers=True, 103 ): 104 if (name, version) not in used_versions: 105 # If the interval's version is no longer used, we can safely delete it 106 continue 107 108 dev_version = snapshot_ids_to_dev_versions.get((name, identifier)) 109 if dev_version not in used_dev_versions and is_dev: 110 # If the interval's dev version is no longer used and this is a dev interval, we can safely delete it 111 continue 112 113 if (name, identifier) not in used_snapshot_ids: 114 # If the snapshot associated with this interval no longer exists, we can nullify the interval's identifier 115 # to improve compaction 116 is_compacted = False 117 identifier = None 118 if not is_dev: 119 # If the interval is not dev, we can safely nullify the dev version as well 120 dev_version = None 121 122 new_intervals.append( 123 { 124 "id": interval_id, 125 "created_ts": created_ts, 126 "name": name, 127 "identifier": identifier, 128 "version": version, 129 "dev_version": dev_version, 130 "start_ts": start_ts, 131 "end_ts": end_ts, 132 "is_dev": is_dev, 133 "is_removed": is_removed, 134 "is_compacted": is_compacted, 135 "is_pending_restatement": is_pending_restatement, 136 } 137 ) 138 139 if new_intervals: 140 engine_adapter.delete_from(intervals_table, "TRUE") 141 engine_adapter.insert_append( 142 intervals_table, 143 pd.DataFrame(new_intervals), 144 target_columns_to_types=intervals_columns_to_types, 145 ) 146 147 148def _migrate_snapshots( 149 engine_adapter: t.Any, 150 snapshots_table: str, 151 used_dev_versions: t.Set[t.Tuple[str, str]], 152 used_versions: t.Set[t.Tuple[str, str]], 153 used_snapshot_ids: t.Set[t.Tuple[str, str]], 154 snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str], 155) -> None: 156 import pandas as pd 157 158 index_type = index_text_type(engine_adapter.dialect) 159 blob_type = blob_text_type(engine_adapter.dialect) 160 snapshots_columns_to_types = { 161 "name": exp.DataType.build(index_type), 162 "identifier": exp.DataType.build(index_type), 163 "version": exp.DataType.build(index_type), 164 "snapshot": exp.DataType.build(blob_type), 165 "kind_name": exp.DataType.build(index_type), 166 "updated_ts": exp.DataType.build("bigint"), 167 "unpaused_ts": exp.DataType.build("bigint"), 168 "ttl_ms": exp.DataType.build("bigint"), 169 "unrestorable": exp.DataType.build("boolean"), 170 } 171 172 new_snapshots = [] 173 for ( 174 name, 175 identifier, 176 version, 177 snapshot, 178 kind_name, 179 updated_ts, 180 unpaused_ts, 181 ttl_ms, 182 unrestorable, 183 ) in engine_adapter.fetchall( 184 exp.select(*snapshots_columns_to_types).from_(snapshots_table), 185 quote_identifiers=True, 186 ): 187 parsed_snapshot = json.loads(snapshot) 188 version = parsed_snapshot.get("version") or version 189 dev_version = get_dev_version(parsed_snapshot) 190 parsed_snapshot["dev_version"] = dev_version 191 parsed_snapshot["version"] = version 192 193 used_dev_versions.add((name, dev_version)) 194 used_versions.add((name, version)) 195 used_snapshot_ids.add((name, identifier)) 196 snapshot_ids_to_dev_versions[(name, identifier)] = dev_version 197 198 for previous_version in parsed_snapshot.get("previous_versions", []): 199 previous_identifier = get_identifier(previous_version) 200 previous_dev_version = get_dev_version(previous_version) 201 snapshot_ids_to_dev_versions[(name, previous_identifier)] = previous_dev_version 202 203 new_snapshots.append( 204 { 205 "name": name, 206 "identifier": identifier, 207 "version": version, 208 "snapshot": json.dumps(parsed_snapshot), 209 "kind_name": kind_name, 210 "updated_ts": updated_ts, 211 "unpaused_ts": unpaused_ts, 212 "ttl_ms": ttl_ms, 213 "unrestorable": unrestorable, 214 } 215 ) 216 217 if new_snapshots: 218 engine_adapter.delete_from(snapshots_table, "TRUE") 219 engine_adapter.insert_append( 220 snapshots_table, 221 pd.DataFrame(new_snapshots), 222 target_columns_to_types=snapshots_columns_to_types, 223 ) 224 225 226def get_identifier(snapshot: t.Dict[str, t.Any]) -> str: 227 fingerprint = snapshot["fingerprint"] 228 return crc32( 229 [ 230 fingerprint["data_hash"], 231 fingerprint["metadata_hash"], 232 fingerprint["parent_data_hash"], 233 fingerprint["parent_metadata_hash"], 234 ] 235 ) 236 237 238def get_dev_version(snapshot: t.Dict[str, t.Any]) -> str: 239 dev_version = snapshot.get("dev_version") 240 if dev_version: 241 return dev_version 242 fingerprint = snapshot["fingerprint"] 243 return crc32([fingerprint["data_hash"], fingerprint["parent_data_hash"]]) 244 245 246def crc32(data: t.Iterable[t.Optional[str]]) -> str: 247 return str(zlib.crc32(safe_concat(data))) 248 249 250def safe_concat(data: t.Iterable[t.Optional[str]]) -> bytes: 251 return ";".join("" if d is None else d for d in data).encode("utf-8")
def
migrate_schemas(engine_adapter, schema, **kwargs):
12def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 13 intervals_table = "_intervals" 14 if schema: 15 intervals_table = f"{schema}.{intervals_table}" 16 17 index_type = index_text_type(engine_adapter.dialect) 18 alter_table_exp = exp.Alter( 19 this=exp.to_table(intervals_table), 20 kind="TABLE", 21 actions=[ 22 exp.ColumnDef( 23 this=exp.to_column("dev_version"), 24 kind=exp.DataType.build(index_type), 25 ) 26 ], 27 ) 28 engine_adapter.execute(alter_table_exp)
def
migrate_rows(engine_adapter, schema, **kwargs):
31def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 32 intervals_table = "_intervals" 33 snapshots_table = "_snapshots" 34 if schema: 35 intervals_table = f"{schema}.{intervals_table}" 36 snapshots_table = f"{schema}.{snapshots_table}" 37 38 used_dev_versions: t.Set[t.Tuple[str, str]] = set() 39 used_versions: t.Set[t.Tuple[str, str]] = set() 40 used_snapshot_ids: t.Set[t.Tuple[str, str]] = set() 41 snapshot_ids_to_dev_versions: t.Dict[t.Tuple[str, str], str] = {} 42 43 _migrate_snapshots( 44 engine_adapter, 45 snapshots_table, 46 used_dev_versions, 47 used_versions, 48 used_snapshot_ids, 49 snapshot_ids_to_dev_versions, 50 ) 51 _migrate_intervals( 52 engine_adapter, 53 intervals_table, 54 used_dev_versions, 55 used_versions, 56 used_snapshot_ids, 57 snapshot_ids_to_dev_versions, 58 )
def
get_identifier(snapshot: Dict[str, Any]) -> str:
def
get_dev_version(snapshot: Dict[str, Any]) -> str:
def
crc32(data: Iterable[Optional[str]]) -> str:
def
safe_concat(data: Iterable[Optional[str]]) -> bytes: