All migrations should be named _XXXX.py, they will be executed sequentially.
If a migration alters the payload of any pydantic models, you should not actually use them because the running model may not be able to load them. Make sure that these migration files are standalone.
1"""All migrations should be named _XXXX.py, they will be executed sequentially. 2 3If a migration alters the payload of any pydantic models, you should not actually use them because 4the running model may not be able to load them. Make sure that these migration files are standalone. 5""" 6 7from sqlglot import exp 8 9from sqlmesh.utils.migration import index_text_type 10 11 12def migrate(state_sync, **kwargs): # type: ignore 13 engine_adapter = state_sync.engine_adapter 14 schema = state_sync.schema 15 snapshots_table = "_snapshots" 16 environments_table = "_environments" 17 versions_table = "_versions" 18 19 if schema: 20 engine_adapter.create_schema(schema) 21 snapshots_table = f"{schema}.{snapshots_table}" 22 environments_table = f"{schema}.{environments_table}" 23 versions_table = f"{schema}.{versions_table}" 24 25 index_type = index_text_type(engine_adapter.dialect) 26 27 engine_adapter.create_state_table( 28 snapshots_table, 29 { 30 "name": exp.DataType.build(index_type), 31 "identifier": exp.DataType.build(index_type), 32 "version": exp.DataType.build(index_type), 33 "snapshot": exp.DataType.build("text"), 34 }, 35 primary_key=("name", "identifier"), 36 ) 37 38 engine_adapter.create_index(snapshots_table, "name_version_idx", ("name", "version")) 39 40 engine_adapter.create_state_table( 41 environments_table, 42 { 43 "name": exp.DataType.build(index_type), 44 "snapshots": exp.DataType.build("text"), 45 "start_at": exp.DataType.build("text"), 46 "end_at": exp.DataType.build("text"), 47 "plan_id": exp.DataType.build("text"), 48 "previous_plan_id": exp.DataType.build("text"), 49 "expiration_ts": exp.DataType.build("bigint"), 50 }, 51 primary_key=("name",), 52 ) 53 54 engine_adapter.create_state_table( 55 versions_table, 56 { 57 "schema_version": exp.DataType.build("int"), 58 "sqlglot_version": exp.DataType.build("text"), 59 }, 60 )
def
migrate(state_sync, **kwargs):
13def migrate(state_sync, **kwargs): # type: ignore 14 engine_adapter = state_sync.engine_adapter 15 schema = state_sync.schema 16 snapshots_table = "_snapshots" 17 environments_table = "_environments" 18 versions_table = "_versions" 19 20 if schema: 21 engine_adapter.create_schema(schema) 22 snapshots_table = f"{schema}.{snapshots_table}" 23 environments_table = f"{schema}.{environments_table}" 24 versions_table = f"{schema}.{versions_table}" 25 26 index_type = index_text_type(engine_adapter.dialect) 27 28 engine_adapter.create_state_table( 29 snapshots_table, 30 { 31 "name": exp.DataType.build(index_type), 32 "identifier": exp.DataType.build(index_type), 33 "version": exp.DataType.build(index_type), 34 "snapshot": exp.DataType.build("text"), 35 }, 36 primary_key=("name", "identifier"), 37 ) 38 39 engine_adapter.create_index(snapshots_table, "name_version_idx", ("name", "version")) 40 41 engine_adapter.create_state_table( 42 environments_table, 43 { 44 "name": exp.DataType.build(index_type), 45 "snapshots": exp.DataType.build("text"), 46 "start_at": exp.DataType.build("text"), 47 "end_at": exp.DataType.build("text"), 48 "plan_id": exp.DataType.build("text"), 49 "previous_plan_id": exp.DataType.build("text"), 50 "expiration_ts": exp.DataType.build("bigint"), 51 }, 52 primary_key=("name",), 53 ) 54 55 engine_adapter.create_state_table( 56 versions_table, 57 { 58 "schema_version": exp.DataType.build("int"), 59 "sqlglot_version": exp.DataType.build("text"), 60 }, 61 )