Add the auto restatements table.
1"""Add the auto restatements table.""" 2 3from sqlglot import exp 4 5from sqlmesh.utils.migration import index_text_type 6 7 8def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 9 auto_restatements_table = "_auto_restatements" 10 intervals_table = "_intervals" 11 12 if schema: 13 auto_restatements_table = f"{schema}.{auto_restatements_table}" 14 intervals_table = f"{schema}.{intervals_table}" 15 16 index_type = index_text_type(engine_adapter.dialect) 17 18 engine_adapter.create_state_table( 19 auto_restatements_table, 20 { 21 "snapshot_name": exp.DataType.build(index_type), 22 "snapshot_version": exp.DataType.build(index_type), 23 "next_auto_restatement_ts": exp.DataType.build("bigint"), 24 }, 25 primary_key=("snapshot_name", "snapshot_version"), 26 ) 27 28 alter_table_exp = exp.Alter( 29 this=exp.to_table(intervals_table), 30 kind="TABLE", 31 actions=[ 32 exp.ColumnDef( 33 this=exp.to_column("is_pending_restatement"), 34 kind=exp.DataType.build("boolean"), 35 ) 36 ], 37 ) 38 engine_adapter.execute(alter_table_exp) 39 40 41def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 42 intervals_table = "_intervals" 43 44 if schema: 45 intervals_table = f"{schema}.{intervals_table}" 46 47 engine_adapter.update_table( 48 intervals_table, 49 {"is_pending_restatement": False}, 50 where=exp.true(), 51 )
def
migrate_schemas(engine_adapter, schema, **kwargs):
9def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 10 auto_restatements_table = "_auto_restatements" 11 intervals_table = "_intervals" 12 13 if schema: 14 auto_restatements_table = f"{schema}.{auto_restatements_table}" 15 intervals_table = f"{schema}.{intervals_table}" 16 17 index_type = index_text_type(engine_adapter.dialect) 18 19 engine_adapter.create_state_table( 20 auto_restatements_table, 21 { 22 "snapshot_name": exp.DataType.build(index_type), 23 "snapshot_version": exp.DataType.build(index_type), 24 "next_auto_restatement_ts": exp.DataType.build("bigint"), 25 }, 26 primary_key=("snapshot_name", "snapshot_version"), 27 ) 28 29 alter_table_exp = exp.Alter( 30 this=exp.to_table(intervals_table), 31 kind="TABLE", 32 actions=[ 33 exp.ColumnDef( 34 this=exp.to_column("is_pending_restatement"), 35 kind=exp.DataType.build("boolean"), 36 ) 37 ], 38 ) 39 engine_adapter.execute(alter_table_exp)
def
migrate_rows(engine_adapter, schema, **kwargs):