Edit on GitHub

Use version instead of identifier in the seeds table.

 1"""Use version instead of identifier in the seeds table."""
 2
 3from sqlglot import exp
 4
 5from sqlmesh.utils.migration import index_text_type
 6
 7
 8def migrate(state_sync, **kwargs):  # type: ignore
 9    engine_adapter = state_sync.engine_adapter
10
11    snapshots_table = "_snapshots"
12    seeds_table = "_seeds"
13    new_seeds_table = f"{seeds_table}_v49"
14
15    if state_sync.schema:
16        snapshots_table = f"{state_sync.schema}.{snapshots_table}"
17        seeds_table = f"{state_sync.schema}.{seeds_table}"
18        new_seeds_table = f"{state_sync.schema}.{new_seeds_table}"
19
20    index_type = index_text_type(engine_adapter.dialect)
21
22    engine_adapter.drop_table(new_seeds_table)
23    engine_adapter.create_state_table(
24        new_seeds_table,
25        {
26            "name": exp.DataType.build(index_type),
27            "version": exp.DataType.build(index_type),
28            "content": exp.DataType.build("text"),
29        },
30        primary_key=("name", "version"),
31    )
32
33    name_col = exp.column("name", table="seeds")
34    version_col = exp.column("version", table="snapshots")
35    query = (
36        exp.select(
37            name_col,
38            version_col,
39            exp.func("MAX", exp.column("content", table="seeds")).as_("content"),
40        )
41        .from_(exp.to_table(seeds_table).as_("seeds"))
42        .join(
43            exp.to_table(snapshots_table).as_("snapshots"),
44            on=exp.and_(
45                exp.column("name", table="seeds").eq(exp.column("name", table="snapshots")),
46                exp.column("identifier", table="seeds").eq(
47                    exp.column("identifier", table="snapshots")
48                ),
49            ),
50        )
51        .where(exp.column("version", table="snapshots").is_(exp.null()).not_())
52        .group_by(name_col, version_col)
53    )
54
55    engine_adapter.insert_append(new_seeds_table, query)
56    engine_adapter.drop_table(seeds_table)
57    engine_adapter.rename_table(new_seeds_table, seeds_table)
def migrate(state_sync, **kwargs):
 9def migrate(state_sync, **kwargs):  # type: ignore
10    engine_adapter = state_sync.engine_adapter
11
12    snapshots_table = "_snapshots"
13    seeds_table = "_seeds"
14    new_seeds_table = f"{seeds_table}_v49"
15
16    if state_sync.schema:
17        snapshots_table = f"{state_sync.schema}.{snapshots_table}"
18        seeds_table = f"{state_sync.schema}.{seeds_table}"
19        new_seeds_table = f"{state_sync.schema}.{new_seeds_table}"
20
21    index_type = index_text_type(engine_adapter.dialect)
22
23    engine_adapter.drop_table(new_seeds_table)
24    engine_adapter.create_state_table(
25        new_seeds_table,
26        {
27            "name": exp.DataType.build(index_type),
28            "version": exp.DataType.build(index_type),
29            "content": exp.DataType.build("text"),
30        },
31        primary_key=("name", "version"),
32    )
33
34    name_col = exp.column("name", table="seeds")
35    version_col = exp.column("version", table="snapshots")
36    query = (
37        exp.select(
38            name_col,
39            version_col,
40            exp.func("MAX", exp.column("content", table="seeds")).as_("content"),
41        )
42        .from_(exp.to_table(seeds_table).as_("seeds"))
43        .join(
44            exp.to_table(snapshots_table).as_("snapshots"),
45            on=exp.and_(
46                exp.column("name", table="seeds").eq(exp.column("name", table="snapshots")),
47                exp.column("identifier", table="seeds").eq(
48                    exp.column("identifier", table="snapshots")
49                ),
50            ),
51        )
52        .where(exp.column("version", table="snapshots").is_(exp.null()).not_())
53        .group_by(name_col, version_col)
54    )
55
56    engine_adapter.insert_append(new_seeds_table, query)
57    engine_adapter.drop_table(seeds_table)
58    engine_adapter.rename_table(new_seeds_table, seeds_table)