The baseline migration script that sets up the initial state tables.
1"""The baseline migration script that sets up the initial state tables.""" 2 3from sqlglot import exp 4from sqlmesh.utils.migration import blob_text_type, index_text_type 5 6 7def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 8 intervals_table = "_intervals" 9 snapshots_table = "_snapshots" 10 environments_table = "_environments" 11 versions_table = "_versions" 12 if schema: 13 engine_adapter.create_schema(schema) 14 intervals_table = f"{schema}.{intervals_table}" 15 snapshots_table = f"{schema}.{snapshots_table}" 16 environments_table = f"{schema}.{environments_table}" 17 versions_table = f"{schema}.{versions_table}" 18 19 index_type = index_text_type(engine_adapter.dialect) 20 blob_type = blob_text_type(engine_adapter.dialect) 21 22 snapshots_columns_to_types = { 23 "name": exp.DataType.build(index_type), 24 "identifier": exp.DataType.build(index_type), 25 "version": exp.DataType.build(index_type), 26 "snapshot": exp.DataType.build(blob_type), 27 "kind_name": exp.DataType.build(index_type), 28 "updated_ts": exp.DataType.build("bigint"), 29 "unpaused_ts": exp.DataType.build("bigint"), 30 "ttl_ms": exp.DataType.build("bigint"), 31 "unrestorable": exp.DataType.build("boolean"), 32 } 33 34 environments_columns_to_types = { 35 "name": exp.DataType.build(index_type), 36 "snapshots": exp.DataType.build(blob_type), 37 "start_at": exp.DataType.build("text"), 38 "end_at": exp.DataType.build("text"), 39 "plan_id": exp.DataType.build("text"), 40 "previous_plan_id": exp.DataType.build("text"), 41 "expiration_ts": exp.DataType.build("bigint"), 42 "finalized_ts": exp.DataType.build("bigint"), 43 "promoted_snapshot_ids": exp.DataType.build(blob_type), 44 "suffix_target": exp.DataType.build("text"), 45 "catalog_name_override": exp.DataType.build("text"), 46 "previous_finalized_snapshots": exp.DataType.build(blob_type), 47 "normalize_name": exp.DataType.build("boolean"), 48 "requirements": exp.DataType.build(blob_type), 49 } 50 51 intervals_columns_to_types = { 52 "id": exp.DataType.build(index_type), 53 "created_ts": exp.DataType.build("bigint"), 54 "name": exp.DataType.build(index_type), 55 "identifier": exp.DataType.build(index_type), 56 "version": exp.DataType.build(index_type), 57 "start_ts": exp.DataType.build("bigint"), 58 "end_ts": exp.DataType.build("bigint"), 59 "is_dev": exp.DataType.build("boolean"), 60 "is_removed": exp.DataType.build("boolean"), 61 "is_compacted": exp.DataType.build("boolean"), 62 } 63 64 versions_columns_to_types = { 65 "schema_version": exp.DataType.build("int"), 66 "sqlglot_version": exp.DataType.build(index_type), 67 "sqlmesh_version": exp.DataType.build(index_type), 68 } 69 70 # Create the versions table. 71 engine_adapter.create_state_table(versions_table, versions_columns_to_types) 72 73 # Create the snapshots table and its indexes. 74 engine_adapter.create_state_table( 75 snapshots_table, snapshots_columns_to_types, primary_key=("name", "identifier") 76 ) 77 engine_adapter.create_index(snapshots_table, "_snapshots_name_version_idx", ("name", "version")) 78 79 # Create the environments table and its indexes. 80 engine_adapter.create_state_table( 81 environments_table, environments_columns_to_types, primary_key=("name",) 82 ) 83 84 # Create the intervals table and its indexes. 85 engine_adapter.create_state_table( 86 intervals_table, intervals_columns_to_types, primary_key=("id",) 87 ) 88 engine_adapter.create_index( 89 intervals_table, "_intervals_name_identifier_idx", ("name", "identifier") 90 ) 91 engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version")) 92 93 94def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore 95 pass
def
migrate_schemas(engine_adapter, schema, **kwargs):
8def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore 9 intervals_table = "_intervals" 10 snapshots_table = "_snapshots" 11 environments_table = "_environments" 12 versions_table = "_versions" 13 if schema: 14 engine_adapter.create_schema(schema) 15 intervals_table = f"{schema}.{intervals_table}" 16 snapshots_table = f"{schema}.{snapshots_table}" 17 environments_table = f"{schema}.{environments_table}" 18 versions_table = f"{schema}.{versions_table}" 19 20 index_type = index_text_type(engine_adapter.dialect) 21 blob_type = blob_text_type(engine_adapter.dialect) 22 23 snapshots_columns_to_types = { 24 "name": exp.DataType.build(index_type), 25 "identifier": exp.DataType.build(index_type), 26 "version": exp.DataType.build(index_type), 27 "snapshot": exp.DataType.build(blob_type), 28 "kind_name": exp.DataType.build(index_type), 29 "updated_ts": exp.DataType.build("bigint"), 30 "unpaused_ts": exp.DataType.build("bigint"), 31 "ttl_ms": exp.DataType.build("bigint"), 32 "unrestorable": exp.DataType.build("boolean"), 33 } 34 35 environments_columns_to_types = { 36 "name": exp.DataType.build(index_type), 37 "snapshots": exp.DataType.build(blob_type), 38 "start_at": exp.DataType.build("text"), 39 "end_at": exp.DataType.build("text"), 40 "plan_id": exp.DataType.build("text"), 41 "previous_plan_id": exp.DataType.build("text"), 42 "expiration_ts": exp.DataType.build("bigint"), 43 "finalized_ts": exp.DataType.build("bigint"), 44 "promoted_snapshot_ids": exp.DataType.build(blob_type), 45 "suffix_target": exp.DataType.build("text"), 46 "catalog_name_override": exp.DataType.build("text"), 47 "previous_finalized_snapshots": exp.DataType.build(blob_type), 48 "normalize_name": exp.DataType.build("boolean"), 49 "requirements": exp.DataType.build(blob_type), 50 } 51 52 intervals_columns_to_types = { 53 "id": exp.DataType.build(index_type), 54 "created_ts": exp.DataType.build("bigint"), 55 "name": exp.DataType.build(index_type), 56 "identifier": exp.DataType.build(index_type), 57 "version": exp.DataType.build(index_type), 58 "start_ts": exp.DataType.build("bigint"), 59 "end_ts": exp.DataType.build("bigint"), 60 "is_dev": exp.DataType.build("boolean"), 61 "is_removed": exp.DataType.build("boolean"), 62 "is_compacted": exp.DataType.build("boolean"), 63 } 64 65 versions_columns_to_types = { 66 "schema_version": exp.DataType.build("int"), 67 "sqlglot_version": exp.DataType.build(index_type), 68 "sqlmesh_version": exp.DataType.build(index_type), 69 } 70 71 # Create the versions table. 72 engine_adapter.create_state_table(versions_table, versions_columns_to_types) 73 74 # Create the snapshots table and its indexes. 75 engine_adapter.create_state_table( 76 snapshots_table, snapshots_columns_to_types, primary_key=("name", "identifier") 77 ) 78 engine_adapter.create_index(snapshots_table, "_snapshots_name_version_idx", ("name", "version")) 79 80 # Create the environments table and its indexes. 81 engine_adapter.create_state_table( 82 environments_table, environments_columns_to_types, primary_key=("name",) 83 ) 84 85 # Create the intervals table and its indexes. 86 engine_adapter.create_state_table( 87 intervals_table, intervals_columns_to_types, primary_key=("id",) 88 ) 89 engine_adapter.create_index( 90 intervals_table, "_intervals_name_identifier_idx", ("name", "identifier") 91 ) 92 engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version"))
def
migrate_rows(engine_adapter, schema, **kwargs):