Edit on GitHub

Generate mapping schema data types using the corresponding model's dialect.

 1"""Generate mapping schema data types using the corresponding model's dialect."""
 2
 3import json
 4
 5import pandas as pd
 6from sqlglot import exp, parse_one
 7
 8from sqlmesh.utils.migration import index_text_type
 9
10
11def migrate(state_sync, **kwargs):  # type: ignore
12    engine_adapter = state_sync.engine_adapter
13    schema = state_sync.schema
14    snapshots_table = "_snapshots"
15    if schema:
16        snapshots_table = f"{schema}.{snapshots_table}"
17
18    new_snapshots = []
19    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
20        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
21        quote_identifiers=True,
22    ):
23        parsed_snapshot = json.loads(snapshot)
24        node = parsed_snapshot["node"]
25
26        mapping_schema = node.get("mapping_schema")
27        if mapping_schema:
28            node["mapping_schema"] = _convert_schema_types(mapping_schema, node["dialect"])
29
30        new_snapshots.append(
31            {
32                "name": name,
33                "identifier": identifier,
34                "version": version,
35                "snapshot": json.dumps(parsed_snapshot),
36                "kind_name": kind_name,
37            }
38        )
39
40    if new_snapshots:
41        engine_adapter.delete_from(snapshots_table, "TRUE")
42
43        index_type = index_text_type(engine_adapter.dialect)
44
45        engine_adapter.insert_append(
46            snapshots_table,
47            pd.DataFrame(new_snapshots),
48            columns_to_types={
49                "name": exp.DataType.build(index_type),
50                "identifier": exp.DataType.build(index_type),
51                "version": exp.DataType.build(index_type),
52                "snapshot": exp.DataType.build("text"),
53                "kind_name": exp.DataType.build(index_type),
54            },
55        )
56
57
58def _convert_schema_types(schema, dialect):  # type: ignore
59    if not schema:
60        return schema
61
62    for k, v in schema.items():
63        if isinstance(v, dict):
64            _convert_schema_types(v, dialect)
65        else:
66            schema[k] = parse_one(v).sql(dialect=dialect)
67
68    return schema
def migrate(state_sync, **kwargs):
12def migrate(state_sync, **kwargs):  # type: ignore
13    engine_adapter = state_sync.engine_adapter
14    schema = state_sync.schema
15    snapshots_table = "_snapshots"
16    if schema:
17        snapshots_table = f"{schema}.{snapshots_table}"
18
19    new_snapshots = []
20    for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
21        exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
22        quote_identifiers=True,
23    ):
24        parsed_snapshot = json.loads(snapshot)
25        node = parsed_snapshot["node"]
26
27        mapping_schema = node.get("mapping_schema")
28        if mapping_schema:
29            node["mapping_schema"] = _convert_schema_types(mapping_schema, node["dialect"])
30
31        new_snapshots.append(
32            {
33                "name": name,
34                "identifier": identifier,
35                "version": version,
36                "snapshot": json.dumps(parsed_snapshot),
37                "kind_name": kind_name,
38            }
39        )
40
41    if new_snapshots:
42        engine_adapter.delete_from(snapshots_table, "TRUE")
43
44        index_type = index_text_type(engine_adapter.dialect)
45
46        engine_adapter.insert_append(
47            snapshots_table,
48            pd.DataFrame(new_snapshots),
49            columns_to_types={
50                "name": exp.DataType.build(index_type),
51                "identifier": exp.DataType.build(index_type),
52                "version": exp.DataType.build(index_type),
53                "snapshot": exp.DataType.build("text"),
54                "kind_name": exp.DataType.build(index_type),
55            },
56        )