Edit on GitHub

Remove pre- / post- hooks from existing snapshots.

 1"""Remove pre- / post- hooks from existing snapshots."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    new_snapshots = []
19
20    for name, identifier, version, snapshopt in engine_adapter.fetchall(
21        exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table),
22        quote_identifiers=True,
23    ):
24        snapshot = json.loads(snapshopt)
25        pre_hooks = snapshot["model"].pop("pre", [])
26        post_hooks = snapshot["model"].pop("post", [])
27
28        expressions = snapshot["model"].pop("expressions", None)
29        if expressions and snapshot["model"]["source_type"] == "sql":
30            snapshot["model"]["pre_statements"] = expressions
31
32        if pre_hooks or post_hooks:
33            print(
34                "WARNING: Hooks are no longer supported by SQLMesh, use pre and post SQL statements instead. "
35                f"Removing 'pre' and 'post' attributes from snapshot name='{name}', identifier='{identifier}'"
36            )
37
38        new_snapshots.append(
39            {
40                "name": name,
41                "identifier": identifier,
42                "version": version,
43                "snapshot": json.dumps(snapshot),
44            }
45        )
46
47    if new_snapshots:
48        engine_adapter.delete_from(snapshots_table, "TRUE")
49
50        index_type = index_text_type(engine_adapter.dialect)
51
52        engine_adapter.insert_append(
53            snapshots_table,
54            pd.DataFrame(new_snapshots),
55            columns_to_types={
56                "name": exp.DataType.build(index_type),
57                "identifier": exp.DataType.build(index_type),
58                "version": exp.DataType.build(index_type),
59                "snapshot": exp.DataType.build("text"),
60            },
61        )
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20
21    for name, identifier, version, snapshopt in engine_adapter.fetchall(
22        exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table),
23        quote_identifiers=True,
24    ):
25        snapshot = json.loads(snapshopt)
26        pre_hooks = snapshot["model"].pop("pre", [])
27        post_hooks = snapshot["model"].pop("post", [])
28
29        expressions = snapshot["model"].pop("expressions", None)
30        if expressions and snapshot["model"]["source_type"] == "sql":
31            snapshot["model"]["pre_statements"] = expressions
32
33        if pre_hooks or post_hooks:
34            print(
35                "WARNING: Hooks are no longer supported by SQLMesh, use pre and post SQL statements instead. "
36                f"Removing 'pre' and 'post' attributes from snapshot name='{name}', identifier='{identifier}'"
37            )
38
39        new_snapshots.append(
40            {
41                "name": name,
42                "identifier": identifier,
43                "version": version,
44                "snapshot": json.dumps(snapshot),
45            }
46        )
47
48    if new_snapshots:
49        engine_adapter.delete_from(snapshots_table, "TRUE")
50
51        index_type = index_text_type(engine_adapter.dialect)
52
53        engine_adapter.insert_append(
54            snapshots_table,
55            pd.DataFrame(new_snapshots),
56            columns_to_types={
57                "name": exp.DataType.build(index_type),
58                "identifier": exp.DataType.build(index_type),
59                "version": exp.DataType.build(index_type),
60                "snapshot": exp.DataType.build("text"),
61            },
62        )