Join list of WHEN [NOT] MATCHED strings into a single string.
1"""Join list of `WHEN [NOT] MATCHED` strings into a single string.""" 2 3import json 4 5from sqlglot import exp 6 7from sqlmesh.utils.migration import index_text_type, blob_text_type 8 9 10def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 11 pass 12 13 14def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 15 import pandas as pd 16 17 snapshots_table = "_snapshots" 18 index_type = index_text_type(engine_adapter.dialect) 19 if schema: 20 snapshots_table = f"{schema}.{snapshots_table}" 21 22 new_snapshots = [] 23 24 for ( 25 name, 26 identifier, 27 version, 28 snapshot, 29 kind_name, 30 updated_ts, 31 unpaused_ts, 32 ttl_ms, 33 unrestorable, 34 ) in engine_adapter.fetchall( 35 exp.select( 36 "name", 37 "identifier", 38 "version", 39 "snapshot", 40 "kind_name", 41 "updated_ts", 42 "unpaused_ts", 43 "ttl_ms", 44 "unrestorable", 45 ).from_(snapshots_table), 46 quote_identifiers=True, 47 ): 48 parsed_snapshot = json.loads(snapshot) 49 node = parsed_snapshot["node"] 50 51 kind = node.get("kind") 52 if kind and isinstance(when_matched := kind.get("when_matched"), list): 53 kind["when_matched"] = " ".join(when_matched) 54 55 new_snapshots.append( 56 { 57 "name": name, 58 "identifier": identifier, 59 "version": version, 60 "snapshot": json.dumps(parsed_snapshot), 61 "kind_name": kind_name, 62 "updated_ts": updated_ts, 63 "unpaused_ts": unpaused_ts, 64 "ttl_ms": ttl_ms, 65 "unrestorable": unrestorable, 66 } 67 ) 68 69 if new_snapshots: 70 engine_adapter.delete_from(snapshots_table, "TRUE") 71 blob_type = blob_text_type(engine_adapter.dialect) 72 73 engine_adapter.insert_append( 74 snapshots_table, 75 pd.DataFrame(new_snapshots), 76 target_columns_to_types={ 77 "name": exp.DataType.build(index_type), 78 "identifier": exp.DataType.build(index_type), 79 "version": exp.DataType.build(index_type), 80 "snapshot": exp.DataType.build(blob_type), 81 "kind_name": exp.DataType.build(index_type), 82 "updated_ts": exp.DataType.build("bigint"), 83 "unpaused_ts": exp.DataType.build("bigint"), 84 "ttl_ms": exp.DataType.build("bigint"), 85 "unrestorable": exp.DataType.build("boolean"), 86 }, 87 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
15def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 16 import pandas as pd 17 18 snapshots_table = "_snapshots" 19 index_type = index_text_type(engine_adapter.dialect) 20 if schema: 21 snapshots_table = f"{schema}.{snapshots_table}" 22 23 new_snapshots = [] 24 25 for ( 26 name, 27 identifier, 28 version, 29 snapshot, 30 kind_name, 31 updated_ts, 32 unpaused_ts, 33 ttl_ms, 34 unrestorable, 35 ) in engine_adapter.fetchall( 36 exp.select( 37 "name", 38 "identifier", 39 "version", 40 "snapshot", 41 "kind_name", 42 "updated_ts", 43 "unpaused_ts", 44 "ttl_ms", 45 "unrestorable", 46 ).from_(snapshots_table), 47 quote_identifiers=True, 48 ): 49 parsed_snapshot = json.loads(snapshot) 50 node = parsed_snapshot["node"] 51 52 kind = node.get("kind") 53 if kind and isinstance(when_matched := kind.get("when_matched"), list): 54 kind["when_matched"] = " ".join(when_matched) 55 56 new_snapshots.append( 57 { 58 "name": name, 59 "identifier": identifier, 60 "version": version, 61 "snapshot": json.dumps(parsed_snapshot), 62 "kind_name": kind_name, 63 "updated_ts": updated_ts, 64 "unpaused_ts": unpaused_ts, 65 "ttl_ms": ttl_ms, 66 "unrestorable": unrestorable, 67 } 68 ) 69 70 if new_snapshots: 71 engine_adapter.delete_from(snapshots_table, "TRUE") 72 blob_type = blob_text_type(engine_adapter.dialect) 73 74 engine_adapter.insert_append( 75 snapshots_table, 76 pd.DataFrame(new_snapshots), 77 target_columns_to_types={ 78 "name": exp.DataType.build(index_type), 79 "identifier": exp.DataType.build(index_type), 80 "version": exp.DataType.build(index_type), 81 "snapshot": exp.DataType.build(blob_type), 82 "kind_name": exp.DataType.build(index_type), 83 "updated_ts": exp.DataType.build("bigint"), 84 "unpaused_ts": exp.DataType.build("bigint"), 85 "ttl_ms": exp.DataType.build("bigint"), 86 "unrestorable": exp.DataType.build("boolean"), 87 }, 88 )