Use version instead of identifier in the seeds table.
1"""Use version instead of identifier in the seeds table.""" 2 3from sqlglot import exp 4 5from sqlmesh.utils.migration import index_text_type 6 7 8def migrate(state_sync, **kwargs): # type: ignore 9 engine_adapter = state_sync.engine_adapter 10 11 snapshots_table = "_snapshots" 12 seeds_table = "_seeds" 13 new_seeds_table = f"{seeds_table}_v49" 14 15 if state_sync.schema: 16 snapshots_table = f"{state_sync.schema}.{snapshots_table}" 17 seeds_table = f"{state_sync.schema}.{seeds_table}" 18 new_seeds_table = f"{state_sync.schema}.{new_seeds_table}" 19 20 index_type = index_text_type(engine_adapter.dialect) 21 22 engine_adapter.drop_table(new_seeds_table) 23 engine_adapter.create_state_table( 24 new_seeds_table, 25 { 26 "name": exp.DataType.build(index_type), 27 "version": exp.DataType.build(index_type), 28 "content": exp.DataType.build("text"), 29 }, 30 primary_key=("name", "version"), 31 ) 32 33 name_col = exp.column("name", table="seeds") 34 version_col = exp.column("version", table="snapshots") 35 query = ( 36 exp.select( 37 name_col, 38 version_col, 39 exp.func("MAX", exp.column("content", table="seeds")).as_("content"), 40 ) 41 .from_(exp.to_table(seeds_table).as_("seeds")) 42 .join( 43 exp.to_table(snapshots_table).as_("snapshots"), 44 on=exp.and_( 45 exp.column("name", table="seeds").eq(exp.column("name", table="snapshots")), 46 exp.column("identifier", table="seeds").eq( 47 exp.column("identifier", table="snapshots") 48 ), 49 ), 50 ) 51 .where(exp.column("version", table="snapshots").is_(exp.null()).not_()) 52 .group_by(name_col, version_col) 53 ) 54 55 engine_adapter.insert_append(new_seeds_table, query) 56 engine_adapter.drop_table(seeds_table) 57 engine_adapter.rename_table(new_seeds_table, seeds_table)
def
migrate(state_sync, **kwargs):
9def migrate(state_sync, **kwargs): # type: ignore 10 engine_adapter = state_sync.engine_adapter 11 12 snapshots_table = "_snapshots" 13 seeds_table = "_seeds" 14 new_seeds_table = f"{seeds_table}_v49" 15 16 if state_sync.schema: 17 snapshots_table = f"{state_sync.schema}.{snapshots_table}" 18 seeds_table = f"{state_sync.schema}.{seeds_table}" 19 new_seeds_table = f"{state_sync.schema}.{new_seeds_table}" 20 21 index_type = index_text_type(engine_adapter.dialect) 22 23 engine_adapter.drop_table(new_seeds_table) 24 engine_adapter.create_state_table( 25 new_seeds_table, 26 { 27 "name": exp.DataType.build(index_type), 28 "version": exp.DataType.build(index_type), 29 "content": exp.DataType.build("text"), 30 }, 31 primary_key=("name", "version"), 32 ) 33 34 name_col = exp.column("name", table="seeds") 35 version_col = exp.column("version", table="snapshots") 36 query = ( 37 exp.select( 38 name_col, 39 version_col, 40 exp.func("MAX", exp.column("content", table="seeds")).as_("content"), 41 ) 42 .from_(exp.to_table(seeds_table).as_("seeds")) 43 .join( 44 exp.to_table(snapshots_table).as_("snapshots"), 45 on=exp.and_( 46 exp.column("name", table="seeds").eq(exp.column("name", table="snapshots")), 47 exp.column("identifier", table="seeds").eq( 48 exp.column("identifier", table="snapshots") 49 ), 50 ), 51 ) 52 .where(exp.column("version", table="snapshots").is_(exp.null()).not_()) 53 .group_by(name_col, version_col) 54 ) 55 56 engine_adapter.insert_append(new_seeds_table, query) 57 engine_adapter.drop_table(seeds_table) 58 engine_adapter.rename_table(new_seeds_table, seeds_table)