Edit on GitHub

The baseline migration script that sets up the initial state tables.

 1"""The baseline migration script that sets up the initial state tables."""
 2
 3from sqlglot import exp
 4from sqlmesh.utils.migration import blob_text_type, index_text_type
 5
 6
 7def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 8    intervals_table = "_intervals"
 9    snapshots_table = "_snapshots"
10    environments_table = "_environments"
11    versions_table = "_versions"
12    if schema:
13        engine_adapter.create_schema(schema)
14        intervals_table = f"{schema}.{intervals_table}"
15        snapshots_table = f"{schema}.{snapshots_table}"
16        environments_table = f"{schema}.{environments_table}"
17        versions_table = f"{schema}.{versions_table}"
18
19    index_type = index_text_type(engine_adapter.dialect)
20    blob_type = blob_text_type(engine_adapter.dialect)
21
22    snapshots_columns_to_types = {
23        "name": exp.DataType.build(index_type),
24        "identifier": exp.DataType.build(index_type),
25        "version": exp.DataType.build(index_type),
26        "snapshot": exp.DataType.build(blob_type),
27        "kind_name": exp.DataType.build(index_type),
28        "updated_ts": exp.DataType.build("bigint"),
29        "unpaused_ts": exp.DataType.build("bigint"),
30        "ttl_ms": exp.DataType.build("bigint"),
31        "unrestorable": exp.DataType.build("boolean"),
32    }
33
34    environments_columns_to_types = {
35        "name": exp.DataType.build(index_type),
36        "snapshots": exp.DataType.build(blob_type),
37        "start_at": exp.DataType.build("text"),
38        "end_at": exp.DataType.build("text"),
39        "plan_id": exp.DataType.build("text"),
40        "previous_plan_id": exp.DataType.build("text"),
41        "expiration_ts": exp.DataType.build("bigint"),
42        "finalized_ts": exp.DataType.build("bigint"),
43        "promoted_snapshot_ids": exp.DataType.build(blob_type),
44        "suffix_target": exp.DataType.build("text"),
45        "catalog_name_override": exp.DataType.build("text"),
46        "previous_finalized_snapshots": exp.DataType.build(blob_type),
47        "normalize_name": exp.DataType.build("boolean"),
48        "requirements": exp.DataType.build(blob_type),
49    }
50
51    intervals_columns_to_types = {
52        "id": exp.DataType.build(index_type),
53        "created_ts": exp.DataType.build("bigint"),
54        "name": exp.DataType.build(index_type),
55        "identifier": exp.DataType.build(index_type),
56        "version": exp.DataType.build(index_type),
57        "start_ts": exp.DataType.build("bigint"),
58        "end_ts": exp.DataType.build("bigint"),
59        "is_dev": exp.DataType.build("boolean"),
60        "is_removed": exp.DataType.build("boolean"),
61        "is_compacted": exp.DataType.build("boolean"),
62    }
63
64    versions_columns_to_types = {
65        "schema_version": exp.DataType.build("int"),
66        "sqlglot_version": exp.DataType.build(index_type),
67        "sqlmesh_version": exp.DataType.build(index_type),
68    }
69
70    # Create the versions table.
71    engine_adapter.create_state_table(versions_table, versions_columns_to_types)
72
73    # Create the snapshots table and its indexes.
74    engine_adapter.create_state_table(
75        snapshots_table, snapshots_columns_to_types, primary_key=("name", "identifier")
76    )
77    engine_adapter.create_index(snapshots_table, "_snapshots_name_version_idx", ("name", "version"))
78
79    # Create the environments table and its indexes.
80    engine_adapter.create_state_table(
81        environments_table, environments_columns_to_types, primary_key=("name",)
82    )
83
84    # Create the intervals table and its indexes.
85    engine_adapter.create_state_table(
86        intervals_table, intervals_columns_to_types, primary_key=("id",)
87    )
88    engine_adapter.create_index(
89        intervals_table, "_intervals_name_identifier_idx", ("name", "identifier")
90    )
91    engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version"))
92
93
94def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
95    pass
def migrate_schemas(engine_adapter, schema, **kwargs):
 8def migrate_schemas(engine_adapter, schema, **kwargs):  # type: ignore
 9    intervals_table = "_intervals"
10    snapshots_table = "_snapshots"
11    environments_table = "_environments"
12    versions_table = "_versions"
13    if schema:
14        engine_adapter.create_schema(schema)
15        intervals_table = f"{schema}.{intervals_table}"
16        snapshots_table = f"{schema}.{snapshots_table}"
17        environments_table = f"{schema}.{environments_table}"
18        versions_table = f"{schema}.{versions_table}"
19
20    index_type = index_text_type(engine_adapter.dialect)
21    blob_type = blob_text_type(engine_adapter.dialect)
22
23    snapshots_columns_to_types = {
24        "name": exp.DataType.build(index_type),
25        "identifier": exp.DataType.build(index_type),
26        "version": exp.DataType.build(index_type),
27        "snapshot": exp.DataType.build(blob_type),
28        "kind_name": exp.DataType.build(index_type),
29        "updated_ts": exp.DataType.build("bigint"),
30        "unpaused_ts": exp.DataType.build("bigint"),
31        "ttl_ms": exp.DataType.build("bigint"),
32        "unrestorable": exp.DataType.build("boolean"),
33    }
34
35    environments_columns_to_types = {
36        "name": exp.DataType.build(index_type),
37        "snapshots": exp.DataType.build(blob_type),
38        "start_at": exp.DataType.build("text"),
39        "end_at": exp.DataType.build("text"),
40        "plan_id": exp.DataType.build("text"),
41        "previous_plan_id": exp.DataType.build("text"),
42        "expiration_ts": exp.DataType.build("bigint"),
43        "finalized_ts": exp.DataType.build("bigint"),
44        "promoted_snapshot_ids": exp.DataType.build(blob_type),
45        "suffix_target": exp.DataType.build("text"),
46        "catalog_name_override": exp.DataType.build("text"),
47        "previous_finalized_snapshots": exp.DataType.build(blob_type),
48        "normalize_name": exp.DataType.build("boolean"),
49        "requirements": exp.DataType.build(blob_type),
50    }
51
52    intervals_columns_to_types = {
53        "id": exp.DataType.build(index_type),
54        "created_ts": exp.DataType.build("bigint"),
55        "name": exp.DataType.build(index_type),
56        "identifier": exp.DataType.build(index_type),
57        "version": exp.DataType.build(index_type),
58        "start_ts": exp.DataType.build("bigint"),
59        "end_ts": exp.DataType.build("bigint"),
60        "is_dev": exp.DataType.build("boolean"),
61        "is_removed": exp.DataType.build("boolean"),
62        "is_compacted": exp.DataType.build("boolean"),
63    }
64
65    versions_columns_to_types = {
66        "schema_version": exp.DataType.build("int"),
67        "sqlglot_version": exp.DataType.build(index_type),
68        "sqlmesh_version": exp.DataType.build(index_type),
69    }
70
71    # Create the versions table.
72    engine_adapter.create_state_table(versions_table, versions_columns_to_types)
73
74    # Create the snapshots table and its indexes.
75    engine_adapter.create_state_table(
76        snapshots_table, snapshots_columns_to_types, primary_key=("name", "identifier")
77    )
78    engine_adapter.create_index(snapshots_table, "_snapshots_name_version_idx", ("name", "version"))
79
80    # Create the environments table and its indexes.
81    engine_adapter.create_state_table(
82        environments_table, environments_columns_to_types, primary_key=("name",)
83    )
84
85    # Create the intervals table and its indexes.
86    engine_adapter.create_state_table(
87        intervals_table, intervals_columns_to_types, primary_key=("id",)
88    )
89    engine_adapter.create_index(
90        intervals_table, "_intervals_name_identifier_idx", ("name", "identifier")
91    )
92    engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version"))
def migrate_rows(engine_adapter, schema, **kwargs):
95def migrate_rows(engine_adapter, schema, **kwargs):  # type: ignore
96    pass