Remove disable restatement from external and embedded models.
1"""Remove disable restatement from external and embedded models.""" 2 3import json 4 5from sqlglot import exp 6from sqlmesh.utils.migration import index_text_type, blob_text_type 7 8 9def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 10 pass 11 12 13def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 14 import pandas as pd 15 16 snapshots_table = "_snapshots" 17 if schema: 18 snapshots_table = f"{schema}.{snapshots_table}" 19 20 index_type = index_text_type(engine_adapter.dialect) 21 blob_type = blob_text_type(engine_adapter.dialect) 22 snapshots_columns_to_types = { 23 "name": exp.DataType.build(index_type), 24 "identifier": exp.DataType.build(index_type), 25 "version": exp.DataType.build(index_type), 26 "snapshot": exp.DataType.build(blob_type), 27 "kind_name": exp.DataType.build(index_type), 28 "updated_ts": exp.DataType.build("bigint"), 29 "unpaused_ts": exp.DataType.build("bigint"), 30 "ttl_ms": exp.DataType.build("bigint"), 31 "unrestorable": exp.DataType.build("boolean"), 32 } 33 34 new_snapshots = [] 35 for ( 36 name, 37 identifier, 38 version, 39 snapshot, 40 kind_name, 41 updated_ts, 42 unpaused_ts, 43 ttl_ms, 44 unrestorable, 45 ) in engine_adapter.fetchall( 46 exp.select(*snapshots_columns_to_types).from_(snapshots_table), 47 quote_identifiers=True, 48 ): 49 parsed_snapshot = json.loads(snapshot) 50 kind = parsed_snapshot["node"].get("kind") 51 52 if kind and kind_name in ("EMBEDDED", "EXTERNAL"): 53 kind.pop("disable_restatement", None) 54 55 new_snapshots.append( 56 { 57 "name": name, 58 "identifier": identifier, 59 "version": version, 60 "snapshot": json.dumps(parsed_snapshot), 61 "kind_name": kind_name, 62 "updated_ts": updated_ts, 63 "unpaused_ts": unpaused_ts, 64 "ttl_ms": ttl_ms, 65 "unrestorable": unrestorable, 66 } 67 ) 68 69 if new_snapshots: 70 engine_adapter.delete_from(snapshots_table, "TRUE") 71 engine_adapter.insert_append( 72 snapshots_table, 73 pd.DataFrame(new_snapshots), 74 target_columns_to_types=snapshots_columns_to_types, 75 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
def
migrate_rows(engine_adapter, schema, **kwargs):
14def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 15 import pandas as pd 16 17 snapshots_table = "_snapshots" 18 if schema: 19 snapshots_table = f"{schema}.{snapshots_table}" 20 21 index_type = index_text_type(engine_adapter.dialect) 22 blob_type = blob_text_type(engine_adapter.dialect) 23 snapshots_columns_to_types = { 24 "name": exp.DataType.build(index_type), 25 "identifier": exp.DataType.build(index_type), 26 "version": exp.DataType.build(index_type), 27 "snapshot": exp.DataType.build(blob_type), 28 "kind_name": exp.DataType.build(index_type), 29 "updated_ts": exp.DataType.build("bigint"), 30 "unpaused_ts": exp.DataType.build("bigint"), 31 "ttl_ms": exp.DataType.build("bigint"), 32 "unrestorable": exp.DataType.build("boolean"), 33 } 34 35 new_snapshots = [] 36 for ( 37 name, 38 identifier, 39 version, 40 snapshot, 41 kind_name, 42 updated_ts, 43 unpaused_ts, 44 ttl_ms, 45 unrestorable, 46 ) in engine_adapter.fetchall( 47 exp.select(*snapshots_columns_to_types).from_(snapshots_table), 48 quote_identifiers=True, 49 ): 50 parsed_snapshot = json.loads(snapshot) 51 kind = parsed_snapshot["node"].get("kind") 52 53 if kind and kind_name in ("EMBEDDED", "EXTERNAL"): 54 kind.pop("disable_restatement", None) 55 56 new_snapshots.append( 57 { 58 "name": name, 59 "identifier": identifier, 60 "version": version, 61 "snapshot": json.dumps(parsed_snapshot), 62 "kind_name": kind_name, 63 "updated_ts": updated_ts, 64 "unpaused_ts": unpaused_ts, 65 "ttl_ms": ttl_ms, 66 "unrestorable": unrestorable, 67 } 68 ) 69 70 if new_snapshots: 71 engine_adapter.delete_from(snapshots_table, "TRUE") 72 engine_adapter.insert_append( 73 snapshots_table, 74 pd.DataFrame(new_snapshots), 75 target_columns_to_types=snapshots_columns_to_types, 76 )