Create a dedicated table to store snapshot intervals.
1"""Create a dedicated table to store snapshot intervals.""" 2 3from sqlglot import exp 4 5from sqlmesh.utils.migration import index_text_type 6 7 8def migrate(state_sync, **kwargs): # type: ignore 9 engine_adapter = state_sync.engine_adapter 10 intervals_table = "_intervals" 11 if state_sync.schema: 12 intervals_table = f"{state_sync.schema}.{intervals_table}" 13 14 index_type = index_text_type(engine_adapter.dialect) 15 16 engine_adapter.create_state_table( 17 intervals_table, 18 { 19 "id": exp.DataType.build(index_type), 20 "created_ts": exp.DataType.build("bigint"), 21 "name": exp.DataType.build(index_type), 22 "identifier": exp.DataType.build(index_type), 23 "version": exp.DataType.build(index_type), 24 "start_ts": exp.DataType.build("bigint"), 25 "end_ts": exp.DataType.build("bigint"), 26 "is_dev": exp.DataType.build("boolean"), 27 "is_removed": exp.DataType.build("boolean"), 28 "is_compacted": exp.DataType.build("boolean"), 29 }, 30 primary_key=("id",), 31 ) 32 33 engine_adapter.create_index( 34 intervals_table, "name_version_idx", ("name", "version", "created_ts") 35 ) 36 engine_adapter.create_index( 37 intervals_table, "name_identifier_idx", ("name", "identifier", "created_ts") 38 )
def
migrate(state_sync, **kwargs):
9def migrate(state_sync, **kwargs): # type: ignore 10 engine_adapter = state_sync.engine_adapter 11 intervals_table = "_intervals" 12 if state_sync.schema: 13 intervals_table = f"{state_sync.schema}.{intervals_table}" 14 15 index_type = index_text_type(engine_adapter.dialect) 16 17 engine_adapter.create_state_table( 18 intervals_table, 19 { 20 "id": exp.DataType.build(index_type), 21 "created_ts": exp.DataType.build("bigint"), 22 "name": exp.DataType.build(index_type), 23 "identifier": exp.DataType.build(index_type), 24 "version": exp.DataType.build(index_type), 25 "start_ts": exp.DataType.build("bigint"), 26 "end_ts": exp.DataType.build("bigint"), 27 "is_dev": exp.DataType.build("boolean"), 28 "is_removed": exp.DataType.build("boolean"), 29 "is_compacted": exp.DataType.build("boolean"), 30 }, 31 primary_key=("id",), 32 ) 33 34 engine_adapter.create_index( 35 intervals_table, "name_version_idx", ("name", "version", "created_ts") 36 ) 37 engine_adapter.create_index( 38 intervals_table, "name_identifier_idx", ("name", "identifier", "created_ts") 39 )