Remove pre- / post- hooks from existing snapshots.
1"""Remove pre- / post- hooks from existing snapshots.""" 2 3import json 4 5import pandas as pd 6from sqlglot import exp 7 8from sqlmesh.utils.migration import index_text_type 9 10 11def migrate(state_sync, **kwargs): # type: ignore 12 engine_adapter = state_sync.engine_adapter 13 schema = state_sync.schema 14 snapshots_table = "_snapshots" 15 if schema: 16 snapshots_table = f"{schema}.{snapshots_table}" 17 18 new_snapshots = [] 19 20 for name, identifier, version, snapshopt in engine_adapter.fetchall( 21 exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table), 22 quote_identifiers=True, 23 ): 24 snapshot = json.loads(snapshopt) 25 pre_hooks = snapshot["model"].pop("pre", []) 26 post_hooks = snapshot["model"].pop("post", []) 27 28 expressions = snapshot["model"].pop("expressions", None) 29 if expressions and snapshot["model"]["source_type"] == "sql": 30 snapshot["model"]["pre_statements"] = expressions 31 32 if pre_hooks or post_hooks: 33 print( 34 "WARNING: Hooks are no longer supported by SQLMesh, use pre and post SQL statements instead. " 35 f"Removing 'pre' and 'post' attributes from snapshot name='{name}', identifier='{identifier}'" 36 ) 37 38 new_snapshots.append( 39 { 40 "name": name, 41 "identifier": identifier, 42 "version": version, 43 "snapshot": json.dumps(snapshot), 44 } 45 ) 46 47 if new_snapshots: 48 engine_adapter.delete_from(snapshots_table, "TRUE") 49 50 index_type = index_text_type(engine_adapter.dialect) 51 52 engine_adapter.insert_append( 53 snapshots_table, 54 pd.DataFrame(new_snapshots), 55 columns_to_types={ 56 "name": exp.DataType.build(index_type), 57 "identifier": exp.DataType.build(index_type), 58 "version": exp.DataType.build(index_type), 59 "snapshot": exp.DataType.build("text"), 60 }, 61 )
def
migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 if schema: 17 snapshots_table = f"{schema}.{snapshots_table}" 18 19 new_snapshots = [] 20 21 for name, identifier, version, snapshopt in engine_adapter.fetchall( 22 exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table), 23 quote_identifiers=True, 24 ): 25 snapshot = json.loads(snapshopt) 26 pre_hooks = snapshot["model"].pop("pre", []) 27 post_hooks = snapshot["model"].pop("post", []) 28 29 expressions = snapshot["model"].pop("expressions", None) 30 if expressions and snapshot["model"]["source_type"] == "sql": 31 snapshot["model"]["pre_statements"] = expressions 32 33 if pre_hooks or post_hooks: 34 print( 35 "WARNING: Hooks are no longer supported by SQLMesh, use pre and post SQL statements instead. " 36 f"Removing 'pre' and 'post' attributes from snapshot name='{name}', identifier='{identifier}'" 37 ) 38 39 new_snapshots.append( 40 { 41 "name": name, 42 "identifier": identifier, 43 "version": version, 44 "snapshot": json.dumps(snapshot), 45 } 46 ) 47 48 if new_snapshots: 49 engine_adapter.delete_from(snapshots_table, "TRUE") 50 51 index_type = index_text_type(engine_adapter.dialect) 52 53 engine_adapter.insert_append( 54 snapshots_table, 55 pd.DataFrame(new_snapshots), 56 columns_to_types={ 57 "name": exp.DataType.build(index_type), 58 "identifier": exp.DataType.build(index_type), 59 "version": exp.DataType.build(index_type), 60 "snapshot": exp.DataType.build("text"), 61 }, 62 )